source: trunk/pbs_drmaa/pbs_conn.c @ 87

Revision 87, 11.8 KB checked in by mmamonski, 11 years ago (diff)

BUMP version 1.0.13

  • Property svn:keywords set to Id
RevLine 
[12]1/* $Id$ */
[7]2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/datetime.h>
31
[85]32#include <pbs_drmaa/session.h>
[76]33#include <pbs_drmaa/pbs_conn.h>
[7]34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
[76]37#include <signal.h>
38#include <unistd.h>
[7]39
40
[76]41static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
[7]42
[76]43static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
[7]44
[76]45static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
[7]46
[76]47static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
[7]48
[76]49static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
[21]50
[76]51static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
[21]52
[76]53static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
[48]54
[87]55/* static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); */
[83]56
57
[85]58static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect);
59
[86]60/*
[85]61static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
62
63static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
64
[86]65static void autoclose_thread_loop( void *data ); */
[85]66
[86]67
[85]68#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID
69        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID)
70#else
71        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID)
72#endif
[83]73#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED)
74
[76]75pbsdrmaa_pbs_conn_t *
[85]76pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server )
[7]77{
[76]78        pbsdrmaa_pbs_conn_t *volatile self = NULL;
[7]79
80        fsd_log_enter((""));
[29]81
[7]82        TRY
[76]83          {
84                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
[7]85               
86                self->session = session;
[76]87               
88                self->submit = pbsdrmaa_pbs_submit;
89                self->statjob = pbsdrmaa_pbs_statjob;
90                self->statjob_free = pbsdrmaa_pbs_statjob_free;
91                self->sigjob = pbsdrmaa_pbs_sigjob;
92                self->deljob = pbsdrmaa_pbs_deljob;
93                self->rlsjob = pbsdrmaa_pbs_rlsjob;
94                self->holdjob = pbsdrmaa_pbs_holdjob;
[29]95
[76]96                self->server = fsd_strdup(server);
97
98                self->connection_fd = -1;
[83]99
100                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
[76]101                signal(SIGPIPE, SIG_IGN);       
102
[85]103                check_reconnect(self, false);
[76]104          }
[7]105        EXCEPT_DEFAULT
[76]106          {
[7]107                if( self != NULL)
[76]108                  {
109                        fsd_free(self->server);
[7]110                        fsd_free(self);
[76]111
112                        if (self->connection_fd != -1)
[87]113                          {
114                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
[76]115                                pbs_disconnect(self->connection_fd);
[87]116                          }
[76]117                  }
[7]118                       
119                fsd_exc_reraise();
[76]120          }
[7]121        END_TRY
[29]122
[7]123        fsd_log_return((""));
[29]124
[7]125        return self;
126}
127
128void
[76]129pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
[7]130{
131        fsd_log_enter((""));
[83]132
[7]133        TRY
134        {
135                if(self != NULL)
136                {
[87]137                        if (self->connection_fd != -1)
138                          {
139                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
140                                pbs_disconnect(self->connection_fd);
141
142                          }
[76]143                        fsd_free(self->server);
[7]144                        fsd_free(self);
[29]145                }
[7]146        }
147        EXCEPT_DEFAULT
148        {
149                fsd_exc_reraise();
150        }
151        END_TRY
152       
153        fsd_log_return((""));
154}
155
[76]156char*
157pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
[7]158{
[83]159        char *volatile job_id = NULL;
160        volatile bool first_try = true;
161        volatile bool conn_lock = false;
[8]162
[83]163        fsd_log_enter((""));
[7]164
[83]165        TRY
166         {
[85]167                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[83]168
[85]169                check_reconnect(self, false);
[83]170
171retry:
172                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
173
174                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
175
176                if(job_id == NULL)
177                 {
178                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
179                        if (IS_TRANSIENT_ERROR && first_try)
180                         {
[85]181                                check_reconnect(self, true);
[83]182                                first_try = false;
183                                goto retry;
184                         }
185                        else
186                         {
[85]187                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd);
[83]188                         }
189                 }
190         }
191        EXCEPT_DEFAULT
192         {
193                fsd_free(job_id);
194                fsd_exc_reraise();
195         }
196        FINALLY
197         {
198                if(conn_lock)
[85]199                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[83]200         }
201        END_TRY
202
203
204        fsd_log_return(("%s", job_id));
205
206        return job_id;
[76]207}
[7]208
[76]209struct batch_status*
210pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
211{
[84]212        struct batch_status *volatile status = NULL;
213        volatile bool first_try = true;
214        volatile bool conn_lock = false;
[7]215
[84]216
217        fsd_log_enter((""));
218
219        TRY
220         {
[85]221                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]222
[85]223                check_reconnect(self, false);
[84]224
225retry:
226                status = pbs_statjob(self->connection_fd, job_id, attrib, NULL);
227
[87]228                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) = %p", self->connection_fd, job_id, (void*)status));
[84]229
230                if(status == NULL)
231                 {
[85]232                        if (IS_MISSING_JOB)
[84]233                         {
[85]234                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno ));
235                         }
236                        else if (IS_TRANSIENT_ERROR && first_try)
237                         {
[87]238                                fsd_log_info(( "pbs_statjob failed, pbs_errno = %d, retrying", pbs_errno ));
[85]239                                check_reconnect(self, true);
[84]240                                first_try = false;
241                                goto retry;
242                         }
243                        else
244                         {
[85]245                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd);
[84]246                         }
247                 }
248         }
249        EXCEPT_DEFAULT
250         {
251                if( status != NULL )
252                        pbs_statfree( status );
253
254                fsd_exc_reraise();
255         }
256        FINALLY
257         {
258                if(conn_lock)
[85]259                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]260         }
261        END_TRY
262
263
264        fsd_log_return((""));
265
266        return status;
[76]267}
[26]268
[76]269void
270pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
271{
[84]272        fsd_log_enter((""));
[31]273
[84]274        pbs_statfree( job_status );
[76]275}
[25]276
[76]277void
[84]278pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
[76]279{
[84]280        int rc = PBSE_NONE;
281        volatile bool first_try = true;
282        volatile bool conn_lock = false;
[30]283
[25]284
[84]285        fsd_log_enter((""));
286
287        TRY
288         {
[85]289                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]290
[85]291                check_reconnect(self, false);
[84]292
293retry:
294                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
295
296                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
297
298                if(rc != PBSE_NONE)
299                 {
300                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
301                        if (IS_TRANSIENT_ERROR && first_try)
302                         {
[85]303                                check_reconnect(self, true);
[84]304                                first_try = false;
305                                goto retry;
306                         }
307                        else
308                         {
[85]309                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd);
[84]310                         }
311                 }
312         }
313        EXCEPT_DEFAULT
314         {
315                fsd_exc_reraise();
316         }
317        FINALLY
318         {
319                if(conn_lock)
[85]320                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]321         }
322        END_TRY
323
324
325        fsd_log_return((""));
326
[76]327}
[30]328
[76]329void
330pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
331{
[84]332        int rc = PBSE_NONE;
333        volatile bool first_try = true;
334        volatile bool conn_lock = false;
[30]335
[84]336
337        fsd_log_enter((""));
338
339        TRY
340         {
[85]341                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]342
[85]343                check_reconnect(self, false);
[84]344
345retry:
346                rc = pbs_deljob(self->connection_fd, job_id, NULL);
347
348                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
349
350                if(rc != PBSE_NONE)
351                 {
352                        if (IS_TRANSIENT_ERROR && first_try)
353                         {
[87]354                                fsd_log_info(( "pbs_deljob failed, rc = %d, pbs_errno = %d. Retrying...", rc, pbs_errno ));
[85]355                                check_reconnect(self, true);
[84]356                                first_try = false;
357                                goto retry;
358                         }
359                        else
360                         {
[85]361                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd);
[84]362                         }
363                 }
364         }
365        EXCEPT_DEFAULT
366         {
367                fsd_exc_reraise();
368         }
369        FINALLY
370         {
371                if(conn_lock)
[85]372                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]373         }
374        END_TRY
375
376
377        fsd_log_return((""));
[7]378}
379
[76]380void
381pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
[7]382{
[84]383        int rc = PBSE_NONE;
384        volatile bool first_try = true;
385        volatile bool conn_lock = false;
[7]386
[48]387
[84]388        fsd_log_enter((""));
389
390        TRY
391         {
[85]392                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]393
[85]394                check_reconnect(self, false);
[84]395
396retry:
397                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
398
399                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
400
401                if(rc != PBSE_NONE)
402                 {
403                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
404                        if (IS_TRANSIENT_ERROR && first_try)
405                         {
[85]406                                check_reconnect(self, true);
[84]407                                first_try = false;
408                                goto retry;
409                         }
410                        else
411                         {
[85]412                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd);
[84]413                         }
414                 }
415         }
416        EXCEPT_DEFAULT
417         {
418                fsd_exc_reraise();
419         }
420        FINALLY
421         {
422                if(conn_lock)
[85]423                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]424         }
425        END_TRY
426
427
428        fsd_log_return((""));
[7]429}
430
[76]431void
432pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
[7]433{
[84]434        int rc = PBSE_NONE;
435        volatile bool first_try = true;
436        volatile bool conn_lock = false;
[7]437
[84]438
439        fsd_log_enter((""));
440
441        TRY
442         {
[85]443                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]444
[85]445                check_reconnect(self, false);
[84]446
447retry:
448                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
449
450                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
451
452                if(rc != PBSE_NONE)
453                 {
454                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
455                        if (IS_TRANSIENT_ERROR && first_try)
456                         {
[85]457                                check_reconnect(self, true);
[84]458                                first_try = false;
459                                goto retry;
460                         }
461                        else
462                         {
[85]463                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd);
[84]464                         }
465                 }
466         }
467        EXCEPT_DEFAULT
468         {
469                fsd_exc_reraise();
470         }
471        FINALLY
472         {
473                if(conn_lock)
[85]474                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]475         }
476        END_TRY
477
478
479        fsd_log_return((""));
[7]480}
481
[76]482void
[85]483check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
[7]484{
[85]485        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
[76]486        int sleep_time = 1;
[34]487
[76]488        fsd_log_enter(("(%d)", self->connection_fd));
[34]489
[76]490        if ( self->connection_fd != -1 )
491          {
492                if (!force_reconnect)
493                  {
494                        fsd_log_return(("(%d)", self->connection_fd));
495                        return;
496                  }
[34]497                else
498                 {
[87]499                        fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
[76]500                        pbs_disconnect(self->connection_fd);
501                        self->connection_fd = -1;
[34]502                 }
[76]503          }
[34]504
[85]505
506
[76]507retry_connect: /* Life... */
508        self->connection_fd = pbs_connect( self->server );
[87]509        fsd_log_info(( "pbs_connect(%s) = %d", self->server, self->connection_fd ));
[76]510        if( self->connection_fd < 0 && tries_left-- )
511          {
512                sleep(sleep_time);
513                sleep_time *=2;
514                goto retry_connect;
515          }
[49]516       
[76]517        if( self->connection_fd < 0 )
[85]518                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd );
[76]519       
520        fsd_log_return(("(%d)", self->connection_fd));
[48]521}
522
[85]523
[86]524/*
525void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
[85]526{
527
528
529}
530
[86]531void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
[85]532{
533
534
535}
536
[86]537void autoclose_thread_loop( void *data )
538{
539        pbsdrmaa_pbs_conn_t *self = (pbsdrmaa_pbs_conn_t *)data;
540        struct timespec wait_time;
541
542        fsd_mutex_lock(&self->session->drm_connection_mutex);
543
544        if (fsd_cond_timedwait(&self->autoclose_cond, &self->session->drm_connection_mutex, wait_time);
545         {
546                fsd_log_debug("autoclose thread signaled, waiting again");
547         }
548        else
549         {
550                fsd_log_info("autoclosing PBS connection: fd=%d, time_diff=%d", self->connection_fd, (int)(time(NULL) - self->last_connect_time));
551                pbs_disconnect(self->connection_fd);
552                self->connection_fd = -1;
553         }
554
555        fsd_mutex_unlock(&self->session->drm_connection_mutex);
556}
557*/
Note: See TracBrowser for help on using the repository browser.