source: trunk/pbs_drmaa/pbs_conn.c @ 87

Revision 87, 11.8 KB checked in by mmamonski, 11 years ago (diff)

BUMP version 1.0.13

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/datetime.h>
31
32#include <pbs_drmaa/session.h>
33#include <pbs_drmaa/pbs_conn.h>
34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
37#include <signal.h>
38#include <unistd.h>
39
40
41static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
42
43static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
44
45static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
46
47static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
48
49static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
50
51static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
52
53static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
54
55/* static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); */
56
57
58static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect);
59
60/*
61static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
62
63static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
64
65static void autoclose_thread_loop( void *data ); */
66
67
68#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID
69        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID)
70#else
71        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID)
72#endif
73#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED)
74
75pbsdrmaa_pbs_conn_t *
76pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server )
77{
78        pbsdrmaa_pbs_conn_t *volatile self = NULL;
79
80        fsd_log_enter((""));
81
82        TRY
83          {
84                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
85               
86                self->session = session;
87               
88                self->submit = pbsdrmaa_pbs_submit;
89                self->statjob = pbsdrmaa_pbs_statjob;
90                self->statjob_free = pbsdrmaa_pbs_statjob_free;
91                self->sigjob = pbsdrmaa_pbs_sigjob;
92                self->deljob = pbsdrmaa_pbs_deljob;
93                self->rlsjob = pbsdrmaa_pbs_rlsjob;
94                self->holdjob = pbsdrmaa_pbs_holdjob;
95
96                self->server = fsd_strdup(server);
97
98                self->connection_fd = -1;
99
100                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
101                signal(SIGPIPE, SIG_IGN);       
102
103                check_reconnect(self, false);
104          }
105        EXCEPT_DEFAULT
106          {
107                if( self != NULL)
108                  {
109                        fsd_free(self->server);
110                        fsd_free(self);
111
112                        if (self->connection_fd != -1)
113                          {
114                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
115                                pbs_disconnect(self->connection_fd);
116                          }
117                  }
118                       
119                fsd_exc_reraise();
120          }
121        END_TRY
122
123        fsd_log_return((""));
124
125        return self;
126}
127
128void
129pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
130{
131        fsd_log_enter((""));
132
133        TRY
134        {
135                if(self != NULL)
136                {
137                        if (self->connection_fd != -1)
138                          {
139                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
140                                pbs_disconnect(self->connection_fd);
141
142                          }
143                        fsd_free(self->server);
144                        fsd_free(self);
145                }
146        }
147        EXCEPT_DEFAULT
148        {
149                fsd_exc_reraise();
150        }
151        END_TRY
152       
153        fsd_log_return((""));
154}
155
156char*
157pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
158{
159        char *volatile job_id = NULL;
160        volatile bool first_try = true;
161        volatile bool conn_lock = false;
162
163        fsd_log_enter((""));
164
165        TRY
166         {
167                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
168
169                check_reconnect(self, false);
170
171retry:
172                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
173
174                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
175
176                if(job_id == NULL)
177                 {
178                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
179                        if (IS_TRANSIENT_ERROR && first_try)
180                         {
181                                check_reconnect(self, true);
182                                first_try = false;
183                                goto retry;
184                         }
185                        else
186                         {
187                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd);
188                         }
189                 }
190         }
191        EXCEPT_DEFAULT
192         {
193                fsd_free(job_id);
194                fsd_exc_reraise();
195         }
196        FINALLY
197         {
198                if(conn_lock)
199                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
200         }
201        END_TRY
202
203
204        fsd_log_return(("%s", job_id));
205
206        return job_id;
207}
208
209struct batch_status*
210pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
211{
212        struct batch_status *volatile status = NULL;
213        volatile bool first_try = true;
214        volatile bool conn_lock = false;
215
216
217        fsd_log_enter((""));
218
219        TRY
220         {
221                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
222
223                check_reconnect(self, false);
224
225retry:
226                status = pbs_statjob(self->connection_fd, job_id, attrib, NULL);
227
228                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) = %p", self->connection_fd, job_id, (void*)status));
229
230                if(status == NULL)
231                 {
232                        if (IS_MISSING_JOB)
233                         {
234                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno ));
235                         }
236                        else if (IS_TRANSIENT_ERROR && first_try)
237                         {
238                                fsd_log_info(( "pbs_statjob failed, pbs_errno = %d, retrying", pbs_errno ));
239                                check_reconnect(self, true);
240                                first_try = false;
241                                goto retry;
242                         }
243                        else
244                         {
245                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd);
246                         }
247                 }
248         }
249        EXCEPT_DEFAULT
250         {
251                if( status != NULL )
252                        pbs_statfree( status );
253
254                fsd_exc_reraise();
255         }
256        FINALLY
257         {
258                if(conn_lock)
259                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
260         }
261        END_TRY
262
263
264        fsd_log_return((""));
265
266        return status;
267}
268
269void
270pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
271{
272        fsd_log_enter((""));
273
274        pbs_statfree( job_status );
275}
276
277void
278pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
279{
280        int rc = PBSE_NONE;
281        volatile bool first_try = true;
282        volatile bool conn_lock = false;
283
284
285        fsd_log_enter((""));
286
287        TRY
288         {
289                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
290
291                check_reconnect(self, false);
292
293retry:
294                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
295
296                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
297
298                if(rc != PBSE_NONE)
299                 {
300                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
301                        if (IS_TRANSIENT_ERROR && first_try)
302                         {
303                                check_reconnect(self, true);
304                                first_try = false;
305                                goto retry;
306                         }
307                        else
308                         {
309                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd);
310                         }
311                 }
312         }
313        EXCEPT_DEFAULT
314         {
315                fsd_exc_reraise();
316         }
317        FINALLY
318         {
319                if(conn_lock)
320                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
321         }
322        END_TRY
323
324
325        fsd_log_return((""));
326
327}
328
329void
330pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
331{
332        int rc = PBSE_NONE;
333        volatile bool first_try = true;
334        volatile bool conn_lock = false;
335
336
337        fsd_log_enter((""));
338
339        TRY
340         {
341                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
342
343                check_reconnect(self, false);
344
345retry:
346                rc = pbs_deljob(self->connection_fd, job_id, NULL);
347
348                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
349
350                if(rc != PBSE_NONE)
351                 {
352                        if (IS_TRANSIENT_ERROR && first_try)
353                         {
354                                fsd_log_info(( "pbs_deljob failed, rc = %d, pbs_errno = %d. Retrying...", rc, pbs_errno ));
355                                check_reconnect(self, true);
356                                first_try = false;
357                                goto retry;
358                         }
359                        else
360                         {
361                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd);
362                         }
363                 }
364         }
365        EXCEPT_DEFAULT
366         {
367                fsd_exc_reraise();
368         }
369        FINALLY
370         {
371                if(conn_lock)
372                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
373         }
374        END_TRY
375
376
377        fsd_log_return((""));
378}
379
380void
381pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
382{
383        int rc = PBSE_NONE;
384        volatile bool first_try = true;
385        volatile bool conn_lock = false;
386
387
388        fsd_log_enter((""));
389
390        TRY
391         {
392                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
393
394                check_reconnect(self, false);
395
396retry:
397                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
398
399                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
400
401                if(rc != PBSE_NONE)
402                 {
403                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
404                        if (IS_TRANSIENT_ERROR && first_try)
405                         {
406                                check_reconnect(self, true);
407                                first_try = false;
408                                goto retry;
409                         }
410                        else
411                         {
412                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd);
413                         }
414                 }
415         }
416        EXCEPT_DEFAULT
417         {
418                fsd_exc_reraise();
419         }
420        FINALLY
421         {
422                if(conn_lock)
423                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
424         }
425        END_TRY
426
427
428        fsd_log_return((""));
429}
430
431void
432pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
433{
434        int rc = PBSE_NONE;
435        volatile bool first_try = true;
436        volatile bool conn_lock = false;
437
438
439        fsd_log_enter((""));
440
441        TRY
442         {
443                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
444
445                check_reconnect(self, false);
446
447retry:
448                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
449
450                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
451
452                if(rc != PBSE_NONE)
453                 {
454                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
455                        if (IS_TRANSIENT_ERROR && first_try)
456                         {
457                                check_reconnect(self, true);
458                                first_try = false;
459                                goto retry;
460                         }
461                        else
462                         {
463                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd);
464                         }
465                 }
466         }
467        EXCEPT_DEFAULT
468         {
469                fsd_exc_reraise();
470         }
471        FINALLY
472         {
473                if(conn_lock)
474                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
475         }
476        END_TRY
477
478
479        fsd_log_return((""));
480}
481
482void
483check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
484{
485        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
486        int sleep_time = 1;
487
488        fsd_log_enter(("(%d)", self->connection_fd));
489
490        if ( self->connection_fd != -1 )
491          {
492                if (!force_reconnect)
493                  {
494                        fsd_log_return(("(%d)", self->connection_fd));
495                        return;
496                  }
497                else
498                 {
499                        fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
500                        pbs_disconnect(self->connection_fd);
501                        self->connection_fd = -1;
502                 }
503          }
504
505
506
507retry_connect: /* Life... */
508        self->connection_fd = pbs_connect( self->server );
509        fsd_log_info(( "pbs_connect(%s) = %d", self->server, self->connection_fd ));
510        if( self->connection_fd < 0 && tries_left-- )
511          {
512                sleep(sleep_time);
513                sleep_time *=2;
514                goto retry_connect;
515          }
516       
517        if( self->connection_fd < 0 )
518                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd );
519       
520        fsd_log_return(("(%d)", self->connection_fd));
521}
522
523
524/*
525void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
526{
527
528
529}
530
531void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
532{
533
534
535}
536
537void autoclose_thread_loop( void *data )
538{
539        pbsdrmaa_pbs_conn_t *self = (pbsdrmaa_pbs_conn_t *)data;
540        struct timespec wait_time;
541
542        fsd_mutex_lock(&self->session->drm_connection_mutex);
543
544        if (fsd_cond_timedwait(&self->autoclose_cond, &self->session->drm_connection_mutex, wait_time);
545         {
546                fsd_log_debug("autoclose thread signaled, waiting again");
547         }
548        else
549         {
550                fsd_log_info("autoclosing PBS connection: fd=%d, time_diff=%d", self->connection_fd, (int)(time(NULL) - self->last_connect_time));
551                pbs_disconnect(self->connection_fd);
552                self->connection_fd = -1;
553         }
554
555        fsd_mutex_unlock(&self->session->drm_connection_mutex);
556}
557*/
Note: See TracBrowser for help on using the repository browser.