source: trunk/pbs_drmaa/pbs_conn.c @ 84

Revision 84, 10.5 KB checked in by mmamonski, 11 years ago (diff)

PBS API

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/session.h>
31#include <drmaa_utils/datetime.h>
32
33#include <pbs_drmaa/pbs_conn.h>
34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
37#include <signal.h>
38#include <unistd.h>
39
40
41static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
42
43static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
44
45static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
46
47static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
48
49static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
50
51static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
52
53static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
54
55static void pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect);
56
57static void pbsdrmaa_pbs_check_connect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect);
58
59#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED)
60
61       
62pbsdrmaa_pbs_conn_t *
63pbsdrmaa_pbs_conn_new( pbsdrmaa_session_t *session, char *server )
64{
65        pbsdrmaa_pbs_conn_t *volatile self = NULL;
66
67        fsd_log_enter((""));
68
69        TRY
70          {
71                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
72               
73                self->session = session;
74               
75                self->submit = pbsdrmaa_pbs_submit;
76                self->statjob = pbsdrmaa_pbs_statjob;
77                self->statjob_free = pbsdrmaa_pbs_statjob_free;
78                self->sigjob = pbsdrmaa_pbs_sigjob;
79                self->deljob = pbsdrmaa_pbs_deljob;
80                self->rlsjob = pbsdrmaa_pbs_rlsjob;
81                self->holdjob = pbsdrmaa_pbs_holdjob;
82
83                self->server = fsd_strdup(server);
84
85                self->connection_fd = -1;
86                self->last_usage = time(NULL);
87
88                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
89                signal(SIGPIPE, SIG_IGN);       
90
91                pbsdrmaa_pbs_reconnect_internal(self, false);
92          }
93        EXCEPT_DEFAULT
94          {
95                if( self != NULL)
96                  {
97                        fsd_free(self->server);
98                        fsd_free(self);
99
100                        if (self->connection_fd != -1)
101                                pbs_disconnect(self->connection_fd);
102                  }
103                       
104                fsd_exc_reraise();
105          }
106        END_TRY
107
108        fsd_log_return((""));
109
110        return self;
111}
112
113
114void
115pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
116{
117        fsd_log_enter((""));
118
119        TRY
120        {
121                if(self != NULL)
122                {
123                        fsd_free(self->server);
124                        fsd_free(self);
125
126                        if (self->connection_fd != -1)
127                                pbs_disconnect(self->connection_fd);
128                }
129        }
130        EXCEPT_DEFAULT
131        {
132                fsd_exc_reraise();
133        }
134        END_TRY
135       
136        fsd_log_return((""));
137}
138
139char*
140pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
141{
142        char *volatile job_id = NULL;
143        volatile bool first_try = true;
144        volatile bool conn_lock = false;
145
146        fsd_log_enter((""));
147
148        TRY
149         {
150                conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex);
151
152                pbsdrmaa_pbs_reconnect_internal(self, false);
153
154retry:
155                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
156
157                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
158
159                if(job_id == NULL)
160                 {
161                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
162                        if (IS_TRANSIENT_ERROR && first_try)
163                         {
164                                pbsdrmaa_pbs_reconnect_internal(self, true);
165                                first_try = false;
166                                goto retry;
167                         }
168                        else
169                         {
170                                pbsdrmaa_exc_raise_pbs( "pbs_submit");
171                         }
172                 }
173         }
174        EXCEPT_DEFAULT
175         {
176                fsd_free(job_id);
177                fsd_exc_reraise();
178         }
179        FINALLY
180         {
181                if(conn_lock)
182                        conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex);
183         }
184        END_TRY
185
186
187        fsd_log_return(("%s", job_id));
188
189        return job_id;
190}
191
192struct batch_status*
193pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
194{
195        struct batch_status *volatile status = NULL;
196        volatile bool first_try = true;
197        volatile bool conn_lock = false;
198
199
200        fsd_log_enter((""));
201
202        TRY
203         {
204                conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex);
205
206                pbsdrmaa_pbs_reconnect_internal(self, false);
207
208retry:
209                status = pbs_statjob(self->connection_fd, job_id, attrib, NULL);
210
211                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) =%p", self->connection_fd, job_id, (void*)status));
212
213                if(status == NULL)
214                 {
215                        fsd_log_error(( "pbs_statjob failed, pbs_errno = %d", pbs_errno ));
216                        if (IS_TRANSIENT_ERROR && first_try)
217                         {
218                                pbsdrmaa_pbs_reconnect_internal(self, true);
219                                first_try = false;
220                                goto retry;
221                         }
222                        else
223                         {
224                                pbsdrmaa_exc_raise_pbs( "pbs_statjob");
225                         }
226                 }
227         }
228        EXCEPT_DEFAULT
229         {
230                if( status != NULL )
231                        pbs_statfree( status );
232
233                fsd_exc_reraise();
234         }
235        FINALLY
236         {
237                if(conn_lock)
238                        conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex);
239         }
240        END_TRY
241
242
243        fsd_log_return((""));
244
245        return status;
246}
247
248void
249pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
250{
251        fsd_log_enter((""));
252
253        pbs_statfree( job_status );
254}
255
256void
257pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
258{
259        int rc = PBSE_NONE;
260        volatile bool first_try = true;
261        volatile bool conn_lock = false;
262
263
264        fsd_log_enter((""));
265
266        TRY
267         {
268                conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex);
269
270                pbsdrmaa_pbs_reconnect_internal(self, false);
271
272retry:
273                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
274
275                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
276
277                if(rc != PBSE_NONE)
278                 {
279                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
280                        if (IS_TRANSIENT_ERROR && first_try)
281                         {
282                                pbsdrmaa_pbs_reconnect_internal(self, true);
283                                first_try = false;
284                                goto retry;
285                         }
286                        else
287                         {
288                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob");
289                         }
290                 }
291         }
292        EXCEPT_DEFAULT
293         {
294                fsd_exc_reraise();
295         }
296        FINALLY
297         {
298                if(conn_lock)
299                        conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex);
300         }
301        END_TRY
302
303
304        fsd_log_return((""));
305
306}
307
308void
309pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
310{
311        int rc = PBSE_NONE;
312        volatile bool first_try = true;
313        volatile bool conn_lock = false;
314
315
316        fsd_log_enter((""));
317
318        TRY
319         {
320                conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex);
321
322                pbsdrmaa_pbs_reconnect_internal(self, false);
323
324retry:
325                rc = pbs_deljob(self->connection_fd, job_id, NULL);
326
327                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
328
329                if(rc != PBSE_NONE)
330                 {
331                        fsd_log_error(( "pbs_deljob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
332                        if (IS_TRANSIENT_ERROR && first_try)
333                         {
334                                pbsdrmaa_pbs_reconnect_internal(self, true);
335                                first_try = false;
336                                goto retry;
337                         }
338                        else
339                         {
340                                pbsdrmaa_exc_raise_pbs( "pbs_deljob");
341                         }
342                 }
343         }
344        EXCEPT_DEFAULT
345         {
346                fsd_exc_reraise();
347         }
348        FINALLY
349         {
350                if(conn_lock)
351                        conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex);
352         }
353        END_TRY
354
355
356        fsd_log_return((""));
357}
358
359void
360pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
361{
362        int rc = PBSE_NONE;
363        volatile bool first_try = true;
364        volatile bool conn_lock = false;
365
366
367        fsd_log_enter((""));
368
369        TRY
370         {
371                conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex);
372
373                pbsdrmaa_pbs_reconnect_internal(self, false);
374
375retry:
376                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
377
378                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
379
380                if(rc != PBSE_NONE)
381                 {
382                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
383                        if (IS_TRANSIENT_ERROR && first_try)
384                         {
385                                pbsdrmaa_pbs_reconnect_internal(self, true);
386                                first_try = false;
387                                goto retry;
388                         }
389                        else
390                         {
391                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob");
392                         }
393                 }
394         }
395        EXCEPT_DEFAULT
396         {
397                fsd_exc_reraise();
398         }
399        FINALLY
400         {
401                if(conn_lock)
402                        conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex);
403         }
404        END_TRY
405
406
407        fsd_log_return((""));
408}
409
410void
411pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
412{
413        int rc = PBSE_NONE;
414        volatile bool first_try = true;
415        volatile bool conn_lock = false;
416
417
418        fsd_log_enter((""));
419
420        TRY
421         {
422                conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex);
423
424                pbsdrmaa_pbs_reconnect_internal(self, false);
425
426retry:
427                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
428
429                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
430
431                if(rc != PBSE_NONE)
432                 {
433                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
434                        if (IS_TRANSIENT_ERROR && first_try)
435                         {
436                                pbsdrmaa_pbs_reconnect_internal(self, true);
437                                first_try = false;
438                                goto retry;
439                         }
440                        else
441                         {
442                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob");
443                         }
444                 }
445         }
446        EXCEPT_DEFAULT
447         {
448                fsd_exc_reraise();
449         }
450        FINALLY
451         {
452                if(conn_lock)
453                        conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex);
454         }
455        END_TRY
456
457
458        fsd_log_return((""));
459}
460
461void
462pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
463{
464        int tries_left = self->session->max_retries_count;
465        int sleep_time = 1;
466
467        fsd_log_enter(("(%d)", self->connection_fd));
468
469        if ( self->connection_fd != -1 )
470          {
471                if (!force_reconnect)
472                  {
473                        fsd_log_return(("(%d)", self->connection_fd));
474                        return;
475                  }
476                else
477                 {
478                        pbs_disconnect(self->connection_fd);
479                        self->connection_fd = -1;
480                 }
481          }
482
483retry_connect: /* Life... */
484        self->connection_fd = pbs_connect( self->server );
485        fsd_log_info(( "pbs_connect(%s) =%d", self->server, self->connection_fd ));
486        if( self->connection_fd < 0 && tries_left-- )
487          {
488                sleep(sleep_time);
489                sleep_time *=2;
490                goto retry_connect;
491          }
492       
493        if( self->connection_fd < 0 )
494                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
495       
496        fsd_log_return(("(%d)", self->connection_fd));
497}
498
Note: See TracBrowser for help on using the repository browser.