Changeset 85


Ignore:
Timestamp:
01/17/13 18:44:15 (7 years ago)
Author:
mmamonski
Message:

PBS DRMAA autoclose connection

Location:
trunk/pbs_drmaa
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/job.c

    r83 r85  
    7777pbsdrmaa_job_control( fsd_job_t *self, int action ) 
    7878{ 
    79         volatile bool conn_lock = false; 
    8079        pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 
    8180        const char *job_id = self->job_id; 
    82         const char *apicall = NULL; 
    83         int rc = PBSE_NONE; 
    84  
    85         fsd_log_enter(( "({job_id=%s}, action=%d)", 
    86                         self->job_id, action )); 
    87  
    88         TRY 
    89          { 
    90                 int tries_left = session->max_retries_count; 
    91                 int sleep_time = 1; 
    92  
    93                 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 
    94  
    95                 /*TODO reconnect */ 
    96                 while ( true ) 
    97                  { 
    98                         switch( action ) 
    99                          { 
    100                                 /* 
    101                                  * We cannot know whether we did suspend job 
    102                                  * in other way than remembering this inside DRMAA session. 
    103                                  */ 
    104                                 case DRMAA_CONTROL_SUSPEND: 
    105                                         apicall = "pbs_sigjob"; 
    106                                         rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 
    107                                                         "SIGSTOP", NULL ); 
    108                                         fsd_log_info(("pbs_sigjob(%s, SIGSTOP) =%d", job_id, rc)); 
    109                                         if( rc == PBSE_NONE ) 
    110                                                 self->flags |= FSD_JOB_SUSPENDED; 
    111                                         break; 
    112                                 case DRMAA_CONTROL_RESUME: 
    113                                         apicall = "pbs_sigjob"; 
    114                                         rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 
    115                                                         "SIGCONT", NULL ); 
    116                                         fsd_log_info(("pbs_sigjob(%s, SIGCONT) =%d", job_id, rc)); 
    117                                         if( rc == PBSE_NONE ) 
    118                                                 self->flags &= ~FSD_JOB_SUSPENDED; 
    119                                         break; 
    120                                 case DRMAA_CONTROL_HOLD: 
    121                                         apicall = "pbs_holdjob"; 
    122                                         rc = pbs_holdjob( session->pbs_conn, (char*)job_id, 
    123                                                         USER_HOLD, NULL ); 
    124                                         fsd_log_info(("pbs_sigjob(%s, SIGHOLD) =%d", job_id, rc)); 
    125                                         if( rc == PBSE_NONE ) 
    126                                                 self->flags |= FSD_JOB_HOLD; 
    127                                         break; 
    128                                 case DRMAA_CONTROL_RELEASE: 
    129                                         apicall = "pbs_rlsjob"; 
    130                                         rc = pbs_rlsjob( session->pbs_conn, (char*)job_id, 
    131                                                         USER_HOLD, NULL ); 
    132                                         fsd_log_info(("pbs_rlsjob(%s) =%d", job_id, rc)); 
    133                                         if( rc == PBSE_NONE ) 
    134                                                 self->flags &= FSD_JOB_HOLD; 
    135                                         break; 
    136                                 case DRMAA_CONTROL_TERMINATE: 
    137                                         apicall = "pbs_deljob"; 
    138                                         rc = pbs_deljob( session->pbs_conn, (char*)job_id, NULL ); 
    139                                         fsd_log_info(("pbs_deljob(%s) =%d", job_id, rc)); 
    140                                         /* Torque: 
    141                                          * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 
    142                                         if( rc == PBSE_NONE ) 
    143                                          { 
    144                                                 self->flags &= FSD_JOB_TERMINATED_MASK; 
    145                                                 if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 
    146                                                         self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 
    147                                          } 
    148                                         break; 
    149                          } 
    150  
    151 retry_connect: 
    152                         if ( rc == PBSE_NONE ) 
    153                                 break; 
    154                         else if (( rc == PBSE_INTERNAL || rc == PBSE_PROTOCOL || rc == PBSOLDE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_EXPIRED) && (tries_left--)) 
    155                          { 
    156                                 if (rc == PBSE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_PROTOCOL || rc == PBSOLDE_EXPIRED) 
    157                                  { 
    158                                         if ( session->pbs_conn >= 0) 
    159                                                 pbs_disconnect( session->pbs_conn ); 
    160  
    161                                         sleep( sleep_time++ ); 
    162  
    163                                         session->pbs_conn = pbs_connect( session->super.contact ); 
    164  
    165                                         if (session->pbs_conn < 0) 
    166                                                 goto retry_connect; 
    167  
    168                                         fsd_log_info(( "pbs_connect(%s) =%d", session->super.contact, session->pbs_conn )); 
    169                                  } 
    170                                 else /* PBSE_INTERNAL */ 
    171                                  { 
    172                                         /* 
    173                                          * In PBS Pro pbs_sigjob raises internal server error (PBSE_INTERNAL) 
    174                                          * when job just changed its state to running. 
    175                                          */ 
    176                                         sleep( sleep_time++ ); 
    177                                  } 
    178                                 fsd_log_debug(( "repeating request (%d of %d)", tries_left, session->max_retries_count)); 
    179                          } 
    180                         else 
    181                                 pbsdrmaa_exc_raise_pbs( apicall ); 
    182                  } /* end while */ 
    183          } 
    184         FINALLY 
    185          { 
    186                 if( conn_lock ) 
    187                         conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    188          } 
    189         END_TRY 
     81 
     82        fsd_log_enter(( "({job_id=%s}, action=%d)", self->job_id, action )); 
     83 
     84        while ( true ) 
     85         { 
     86                switch( action ) 
     87                 { 
     88                        /* 
     89                         * We cannot know whether we did suspend job 
     90                         * in other way than remembering this inside DRMAA session. 
     91                         */ 
     92                        case DRMAA_CONTROL_SUSPEND: 
     93                                session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGSTOP"); 
     94                                self->flags |= FSD_JOB_SUSPENDED; 
     95                                break; 
     96                        case DRMAA_CONTROL_RESUME: 
     97                                session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGCONT"); 
     98                                self->flags &= ~FSD_JOB_SUSPENDED; 
     99                                break; 
     100                        case DRMAA_CONTROL_HOLD: 
     101                                session->pbs_connection->holdjob( session->pbs_connection, (char*)job_id ); 
     102                                self->flags |= FSD_JOB_HOLD; 
     103                                break; 
     104                        case DRMAA_CONTROL_RELEASE: 
     105                                session->pbs_connection->rlsjob( session->pbs_connection, (char*)job_id ); 
     106                                self->flags &= ~FSD_JOB_HOLD; 
     107                                break; 
     108                        case DRMAA_CONTROL_TERMINATE: 
     109                                session->pbs_connection->deljob( session->pbs_connection, (char*)job_id ); 
     110                                /* TODO: make deldelay configurable ???: 
     111                                 * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 
     112                                self->flags &= FSD_JOB_TERMINATED_MASK; 
     113                                if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 
     114                                        self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 
     115                                break; 
     116                 } 
     117         } 
    190118 
    191119        fsd_log_return(("")); 
     
    196124pbsdrmaa_job_update_status( fsd_job_t *self ) 
    197125{ 
    198         volatile bool conn_lock = false; 
    199126        struct batch_status *volatile status = NULL; 
    200127        pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 
    201         int tries_left = session->max_retries_count; 
    202         int sleep_time = 1; 
    203128 
    204129        fsd_log_enter(( "({job_id=%s})", self->job_id )); 
     
    206131        TRY 
    207132         { 
    208                 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 
    209 retry: 
    210                 if (session->pbs_conn < 0) { 
    211                         fsd_log_info(("No connection with pbs. Reconnecting")); 
    212                         goto retry_connect; 
    213                 } 
    214  
    215133 
    216134#ifdef PBS_PROFESSIONAL 
    217                 status = pbs_statjob( session->pbs_conn, self->job_id, NULL, NULL ); 
     135                status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, NULL); 
    218136#else 
    219                 status = pbs_statjob( session->pbs_conn, self->job_id, session->status_attrl, NULL ); 
     137                status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, session->status_attrl); 
    220138#endif 
    221                 fsd_log_info(( "pbs_statjob(fd=%d, job_id=%s, attribs={...}) =%p", 
    222                                  session->pbs_conn, self->job_id, (void*)status )); 
    223                 if( status == NULL ) 
    224                  { 
    225  
    226 #ifndef PBS_PROFESSIONAL 
    227                         if ( pbs_errno != PBSE_UNKJOBID ) 
    228                                 fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 
    229                         else 
    230                                 fsd_log_debug(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 
    231 #else 
    232 #  ifndef PBS_PROFESSIONAL_NO_LOG 
    233                         if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID ) 
    234                                 fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 
    235                         else 
    236                                 fsd_log_debug(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 
    237 #  else 
    238                         if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID ) 
    239                                 fsd_log_error(("pbs_statjob error: %d", pbs_errno)); 
    240                         else 
    241                                 fsd_log_debug(("pbs_statjob error: %d", pbs_errno)); 
    242 #  endif 
    243 #endif 
    244  
    245                         switch( pbs_errno ) 
    246                          { 
    247                                 case PBSE_UNKJOBID: 
    248 #ifdef PBS_PROFESSIONAL 
    249                                 case PBSE_HISTJOBID: 
    250 #endif 
    251                                         break; 
    252                                 case PBSE_PROTOCOL: 
    253 #if PBSOLDE_PROTOCOL != PBSE_PROTOCOL 
    254                                 case PBSOLDE_PROTOCOL: 
    255 #endif 
    256                                 case PBSE_EXPIRED: 
    257 #if PBSOLDE_EXPIRED != PBSE_EXPIRED 
    258                                 case PBSOLDE_EXPIRED: 
    259 #endif 
    260                                         if ( session->pbs_conn >= 0 ) 
    261                                                 pbs_disconnect( session->pbs_conn ); 
    262                                         fsd_log_info(("Protocol error. Reconnecting...")); 
    263 retry_connect: 
    264                                         sleep(sleep_time++); 
    265                                         session->pbs_conn = pbs_connect( session->super.contact ); 
    266                                         if( session->pbs_conn < 0 ) 
    267                                          { 
    268                                                 if (tries_left--) { 
    269                                                         fsd_log_info(("Retrying... (%d tries left)", tries_left)); 
    270                                                         goto retry_connect; 
    271                                                 } else { 
    272                                                         fsd_log_error(("No more tries left... Throwing exception")); 
    273                                                         pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
    274                                                 } 
    275                                          } 
    276                                         else 
    277                                          { 
    278                                                 goto retry; 
    279                                          } 
    280                                 default: 
    281                                         pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); 
    282                                         break; 
    283                                 case 0:  /* ? */ 
    284                                         fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR ); 
    285                                         break; 
    286                          } 
    287  
    288                  } 
    289  
    290                 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    291  
    292139 
    293140                if( status != NULL ) 
     
    302149        FINALLY 
    303150         { 
    304                 if( conn_lock ) 
    305                         conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    306151                if( status != NULL ) 
    307                         pbs_statfree( status ); 
     152                        session->pbs_connection->statjob_free( session->pbs_connection, status ); 
    308153         } 
    309154        END_TRY 
  • trunk/pbs_drmaa/log_reader.c

    r60 r85  
    693693        self->current_offset = ftello(self->fhandle); 
    694694         
    695         fsd_log_debug(("Closing log  file (offset=%d)", self->current_offset));   
     695        fsd_log_debug(("Closing log  file (offset=%d)", (int)self->current_offset)); 
    696696 
    697697        fclose(self->fhandle); 
     
    703703pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self ) 
    704704{ 
    705         fsd_log_debug(("Reopening log file: %s (offset=%d)", self->log_path, self->current_offset));   
     705        fsd_log_debug(("Reopening log file: %s (offset=%d)", self->log_path, (int)self->current_offset)); 
    706706 
    707707        if ((self->fhandle = fopen(self->log_path,"r")) == NULL) 
  • trunk/pbs_drmaa/pbs_conn.c

    r84 r85  
    2828#include <drmaa_utils/iter.h> 
    2929#include <drmaa_utils/conf.h> 
    30 #include <drmaa_utils/session.h> 
    3130#include <drmaa_utils/datetime.h> 
    3231 
     32#include <pbs_drmaa/session.h> 
    3333#include <pbs_drmaa/pbs_conn.h> 
    3434#include <pbs_drmaa/util.h> 
     
    5353static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id ); 
    5454 
    55 static void pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect); 
    56  
    57 static void pbsdrmaa_pbs_check_connect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect); 
    58  
     55static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); 
     56 
     57 
     58static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect); 
     59 
     60static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self ); 
     61 
     62static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self ); 
     63 
     64 
     65#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID 
     66        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID) 
     67#else 
     68        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID) 
     69#endif 
    5970#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED) 
    6071 
    61          
    6272pbsdrmaa_pbs_conn_t *  
    63 pbsdrmaa_pbs_conn_new( pbsdrmaa_session_t *session, char *server ) 
     73pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server ) 
    6474{ 
    6575        pbsdrmaa_pbs_conn_t *volatile self = NULL; 
     
    8494 
    8595                self->connection_fd = -1; 
    86                 self->last_usage = time(NULL); 
    8796 
    8897                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */ 
    8998                signal(SIGPIPE, SIG_IGN);        
    9099 
    91                 pbsdrmaa_pbs_reconnect_internal(self, false); 
     100                check_reconnect(self, false); 
    92101          } 
    93102        EXCEPT_DEFAULT 
     
    100109                        if (self->connection_fd != -1) 
    101110                                pbs_disconnect(self->connection_fd); 
     111                        stop_autoclose_thread(self); 
    102112                  } 
    103113                         
     
    110120        return self; 
    111121} 
    112  
    113122 
    114123void 
     
    148157        TRY 
    149158         { 
    150                 conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex); 
    151  
    152                 pbsdrmaa_pbs_reconnect_internal(self, false); 
     159                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 
     160 
     161                check_reconnect(self, false); 
    153162 
    154163retry: 
     
    162171                        if (IS_TRANSIENT_ERROR && first_try) 
    163172                         { 
    164                                 pbsdrmaa_pbs_reconnect_internal(self, true); 
     173                                check_reconnect(self, true); 
    165174                                first_try = false; 
    166175                                goto retry; 
     
    168177                        else 
    169178                         { 
    170                                 pbsdrmaa_exc_raise_pbs( "pbs_submit"); 
     179                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd); 
    171180                         } 
    172181                 } 
     
    180189         { 
    181190                if(conn_lock) 
    182                         conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex); 
     191                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 
    183192         } 
    184193        END_TRY 
     
    202211        TRY 
    203212         { 
    204                 conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex); 
    205  
    206                 pbsdrmaa_pbs_reconnect_internal(self, false); 
     213                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 
     214 
     215                check_reconnect(self, false); 
    207216 
    208217retry: 
     
    213222                if(status == NULL) 
    214223                 { 
    215                         fsd_log_error(( "pbs_statjob failed, pbs_errno = %d", pbs_errno )); 
    216                         if (IS_TRANSIENT_ERROR && first_try) 
    217                          { 
    218                                 pbsdrmaa_pbs_reconnect_internal(self, true); 
     224                        if (IS_MISSING_JOB) 
     225                         { 
     226                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno )); 
     227                         } 
     228                        else if (IS_TRANSIENT_ERROR && first_try) 
     229                         { 
     230                                fsd_log_error(( "pbs_statjob failed, pbs_errno = %d", pbs_errno )); 
     231                                check_reconnect(self, true); 
    219232                                first_try = false; 
    220233                                goto retry; 
     
    222235                        else 
    223236                         { 
    224                                 pbsdrmaa_exc_raise_pbs( "pbs_statjob"); 
     237                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd); 
    225238                         } 
    226239                 } 
     
    236249         { 
    237250                if(conn_lock) 
    238                         conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex); 
     251                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 
    239252         } 
    240253        END_TRY 
     
    266279        TRY 
    267280         { 
    268                 conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex); 
    269  
    270                 pbsdrmaa_pbs_reconnect_internal(self, false); 
     281                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 
     282 
     283                check_reconnect(self, false); 
    271284 
    272285retry: 
     
    280293                        if (IS_TRANSIENT_ERROR && first_try) 
    281294                         { 
    282                                 pbsdrmaa_pbs_reconnect_internal(self, true); 
     295                                check_reconnect(self, true); 
    283296                                first_try = false; 
    284297                                goto retry; 
     
    286299                        else 
    287300                         { 
    288                                 pbsdrmaa_exc_raise_pbs( "pbs_sigjob"); 
     301                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd); 
    289302                         } 
    290303                 } 
     
    297310         { 
    298311                if(conn_lock) 
    299                         conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex); 
     312                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 
    300313         } 
    301314        END_TRY 
     
    318331        TRY 
    319332         { 
    320                 conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex); 
    321  
    322                 pbsdrmaa_pbs_reconnect_internal(self, false); 
     333                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 
     334 
     335                check_reconnect(self, false); 
    323336 
    324337retry: 
     
    332345                        if (IS_TRANSIENT_ERROR && first_try) 
    333346                         { 
    334                                 pbsdrmaa_pbs_reconnect_internal(self, true); 
     347                                check_reconnect(self, true); 
    335348                                first_try = false; 
    336349                                goto retry; 
     
    338351                        else 
    339352                         { 
    340                                 pbsdrmaa_exc_raise_pbs( "pbs_deljob"); 
     353                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd); 
    341354                         } 
    342355                 } 
     
    349362         { 
    350363                if(conn_lock) 
    351                         conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex); 
     364                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 
    352365         } 
    353366        END_TRY 
     
    369382        TRY 
    370383         { 
    371                 conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex); 
    372  
    373                 pbsdrmaa_pbs_reconnect_internal(self, false); 
     384                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 
     385 
     386                check_reconnect(self, false); 
    374387 
    375388retry: 
     
    383396                        if (IS_TRANSIENT_ERROR && first_try) 
    384397                         { 
    385                                 pbsdrmaa_pbs_reconnect_internal(self, true); 
     398                                check_reconnect(self, true); 
    386399                                first_try = false; 
    387400                                goto retry; 
     
    389402                        else 
    390403                         { 
    391                                 pbsdrmaa_exc_raise_pbs( "pbs_rlsjob"); 
     404                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd); 
    392405                         } 
    393406                 } 
     
    400413         { 
    401414                if(conn_lock) 
    402                         conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex); 
     415                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 
    403416         } 
    404417        END_TRY 
     
    420433        TRY 
    421434         { 
    422                 conn_lock = fsd_mutex_lock(&self->session->super.drm_connection_mutex); 
    423  
    424                 pbsdrmaa_pbs_reconnect_internal(self, false); 
     435                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 
     436 
     437                check_reconnect(self, false); 
    425438 
    426439retry: 
     
    434447                        if (IS_TRANSIENT_ERROR && first_try) 
    435448                         { 
    436                                 pbsdrmaa_pbs_reconnect_internal(self, true); 
     449                                check_reconnect(self, true); 
    437450                                first_try = false; 
    438451                                goto retry; 
     
    440453                        else 
    441454                         { 
    442                                 pbsdrmaa_exc_raise_pbs( "pbs_holdjob"); 
     455                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd); 
    443456                         } 
    444457                 } 
     
    451464         { 
    452465                if(conn_lock) 
    453                         conn_lock = fsd_mutex_unlock(&self->session->super.drm_connection_mutex); 
     466                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 
    454467         } 
    455468        END_TRY 
     
    460473 
    461474void  
    462 pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool force_reconnect) 
    463 { 
    464         int tries_left = self->session->max_retries_count; 
     475check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect) 
     476{ 
     477        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count; 
    465478        int sleep_time = 1; 
    466479 
     
    476489                else 
    477490                 { 
     491                        stop_autoclose_thread(self); 
    478492                        pbs_disconnect(self->connection_fd); 
    479493                        self->connection_fd = -1; 
    480494                 } 
    481495          } 
     496 
     497 
    482498 
    483499retry_connect: /* Life... */ 
     
    492508         
    493509        if( self->connection_fd < 0 ) 
    494                 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     510                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd ); 
    495511         
    496512        fsd_log_return(("(%d)", self->connection_fd)); 
    497513} 
    498514 
     515 
     516static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self ) 
     517{ 
     518 
     519 
     520} 
     521 
     522static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self ) 
     523{ 
     524 
     525 
     526} 
     527 
  • trunk/pbs_drmaa/pbs_conn.h

    r76 r85  
    2929#include <drmaa_utils/job.h> 
    3030#include <drmaa_utils/session.h> 
     31#include <drmaa_utils/thread.h> 
    3132 
    32 #include <session.h> 
    3333 
    3434#include <pbs_ifl.h> 
     
    3636typedef struct pbsdrmaa_pbs_conn_s pbsdrmaa_pbs_conn_t; 
    3737 
    38 pbsdrmaa_pbs_conn_t * pbsdrmaa_pbs_conn_new ( pbsdrmaa_session_t * session, char *server); 
     38pbsdrmaa_pbs_conn_t * 
     39pbsdrmaa_pbs_conn_new( 
     40                fsd_drmaa_session_t * session, 
     41                const char *server); 
    3942 
    40 void 
    41 pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ); 
     43void pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ); 
    4244 
    4345struct pbsdrmaa_pbs_conn_s { 
    44         pbsdrmaa_session_t *volatile session; 
     46        fsd_drmaa_session_t *volatile session; 
    4547 
    4648        char* (*submit) ( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ); 
     
    6365        int connection_fd; 
    6466         
    65         /* timestamp of last usage */ 
    66         time_t last_usage;       
     67        /* timestamp of last connect time */ 
     68        time_t last_connect_time; 
     69 
     70        fsd_cond_t autoclose_cond; 
     71        fsd_mutex_t autoclose_mutex; 
     72        bool close_connection; 
    6773}; 
    6874 
  • trunk/pbs_drmaa/session.c

    r65 r85  
    9898 
    9999                self->log_file_initial_size = 0; 
    100                 self->pbs_conn = -1; 
    101100                self->pbs_home = NULL; 
    102101 
     
    123122                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS; 
    124123 
    125                  { 
    126                         int tries_left = self->max_retries_count; 
    127                         int sleep_time = 1; 
    128                         /*ignore SIGPIPE - otheriwse pbs_disconnect cause the program to exit */ 
    129                         signal(SIGPIPE, SIG_IGN); 
    130 retry_connect: /* Life... */ 
    131                         self->pbs_conn = pbs_connect( self->super.contact ); 
    132                         fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn )); 
    133                         if( self->pbs_conn < 0 && tries_left-- ) 
    134                          { 
    135                                 sleep(sleep_time++); 
    136                                 goto retry_connect; 
    137                          } 
    138  
    139                         if( self->pbs_conn < 0 ) 
    140                                 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
    141                  } 
     124                self->pbs_connection = pbsdrmaa_pbs_conn_new( (fsd_drmaa_session_t *)self, contact ); 
     125                self->connection_max_lifetime =  30; /* 30 seconds */ 
    142126 
    143127         } 
     
    162146        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; 
    163147        self->stop_wait_thread( self ); 
    164         if( pbsself->pbs_conn >= 0 ) 
    165                 pbs_disconnect( pbsself->pbs_conn ); 
     148        pbsdrmaa_pbs_conn_destroy(pbsself->pbs_connection); 
    166149        fsd_free( pbsself->status_attrl ); 
    167150        fsd_free( pbsself->job_exit_status_file_prefix ); 
     
    229212        fsd_conf_option_t *max_retries_count = NULL; 
    230213        fsd_conf_option_t *user_state_dir = NULL; 
     214        fsd_conf_option_t *connection_max_lifetime = NULL; 
     215 
    231216 
    232217        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" ); 
     
    234219        max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" ); 
    235220        user_state_dir = fsd_conf_dict_get(self->configuration, "user_state_dir" ); 
     221        connection_max_lifetime = fsd_conf_dict_get(self->configuration, "connection_max_lifetime"); 
    236222 
    237223        if( pbs_home && pbs_home->type == FSD_CONF_STRING ) 
     
    274260          } 
    275261 
     262        if ( connection_max_lifetime && connection_max_lifetime->type == FSD_CONF_INTEGER) 
     263          { 
     264                pbsself->connection_max_lifetime = connection_max_lifetime->val.integer; 
     265                fsd_log_info(("Max connection lifetime: %d", pbsself->connection_max_lifetime)); 
     266          } 
     267 
    276268        if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER) 
    277269          { 
     
    315307pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self ) 
    316308{ 
    317         volatile bool conn_lock = false; 
    318309        volatile bool jobs_lock = false; 
    319310        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; 
    320311        fsd_job_set_t *jobs = self->jobs; 
    321312        struct batch_status *volatile status = NULL; 
    322         volatile int tries_left = pbsself->max_retries_count; 
    323         volatile int sleep_time = 1; 
    324313 
    325314        fsd_log_enter(("")); 
     
    327316        TRY 
    328317         { 
    329                 conn_lock = fsd_mutex_lock( &self->drm_connection_mutex ); 
    330 retry: 
     318 
    331319/* TODO: query only for user's jobs pbs_selstat + ATTR_u */ 
    332320#ifdef PBS_PROFESSIONAL 
    333                 status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL ); 
     321                status = pbsself->pbs_connection->statjob(pbsself->pbs_connection, NULL, NULL); 
    334322#else 
    335                 status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL ); 
     323                status = pbsself->pbs_connection->statjob(pbsself->pbs_connection, NULL, pbsself->status_attrl); 
    336324#endif 
    337                 fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status )); 
    338                 if( status == NULL  &&  pbs_errno != 0 ) 
    339                  { 
    340                         if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED) 
    341                          { 
    342                                 if ( pbsself->pbs_conn >= 0) 
    343                                         pbs_disconnect( pbsself->pbs_conn ); 
    344 retry_connect: 
    345                                 sleep(sleep_time++); 
    346                                 pbsself->pbs_conn = pbs_connect( pbsself->super.contact ); 
    347                                 if( pbsself->pbs_conn < 0) 
    348                                  { 
    349                                         if (tries_left--) 
    350                                                 goto retry_connect; 
    351                                         else 
    352                                                 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
    353                                  } 
    354                                 else 
    355                                         goto retry; 
    356                          } 
    357                         else 
    358                          { 
    359                                 pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); 
    360                          } 
    361                  } 
    362                 conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); 
    363325 
    364326                 { 
     
    421383         { 
    422384                if( status != NULL ) 
    423                         pbs_statfree( status ); 
    424                 if( conn_lock ) 
    425                         conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); 
     385                         pbsself->pbs_connection->statjob_free(pbsself->pbs_connection, status ); 
    426386                if( jobs_lock ) 
    427387                        jobs_lock = fsd_mutex_unlock( &jobs->mutex ); 
  • trunk/pbs_drmaa/session.h

    r72 r85  
    2727#include <drmaa_utils/session.h> 
    2828 
     29#include <pbs_drmaa/pbs_conn.h> 
     30 
    2931typedef struct pbsdrmaa_session_s pbsdrmaa_session_t; 
    3032 
    31 fsd_drmaa_session_t * 
    32 pbsdrmaa_session_new( const char *contact ); 
     33fsd_drmaa_session_t *pbsdrmaa_session_new( const char *contact ); 
    3334 
    3435struct pbsdrmaa_session_s { 
     
    4546 
    4647        /* 
    47          * PBS connection (or -1 if not connected). 
    48          * A descriptor of socket conencted to PBS server. 
     48         * PBS connection handle 
    4949         */ 
    50         int pbs_conn; 
     50        pbsdrmaa_pbs_conn_t *pbs_connection; 
    5151 
    5252        /* 
     
    9191 
    9292        /* 
    93          * Whether to cache PBS Connection 
     93         * PBS Max connection time 
    9494         */ 
    95         bool cache_connection; 
     95        int connection_max_lifetime; 
    9696}; 
    9797 
  • trunk/pbs_drmaa/submit.c

    r75 r85  
    136136pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self ) 
    137137{ 
    138         volatile bool conn_lock = false; 
    139138        struct attrl *volatile pbs_attr = NULL; 
    140139        char *volatile job_id = NULL; 
     
    143142                fsd_template_t *pbs_tmpl = self->pbs_job_attributes; 
    144143                int i; 
    145                 int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count; 
    146                 int sleep_time = 1; 
    147144 
    148145                for( i = PBSDRMAA_N_PBS_ATTRIBUTES - 1; i >= 0; i-- ) /* down loop -> start with custom resources */ 
     
    208205                pbs_attr = pbsdrmaa_submit_filter(pbs_attr); 
    209206 
    210                 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 
    211 retry: 
    212                 job_id = pbs_submit( ((pbsdrmaa_session_t*)self->session)->pbs_conn, 
    213                                 (struct attropl*)pbs_attr, self->script_filename, 
    214                                 self->destination_queue, NULL ); 
     207                job_id = ((pbsdrmaa_session_t *)self->session)->pbs_connection->submit( ((pbsdrmaa_session_t *)self->session)->pbs_connection, (struct attropl*)pbs_attr, self->script_filename, self->destination_queue); 
    215208 
    216209                fsd_log_info(("pbs_submit(%s, %s) =%s", self->script_filename, self->destination_queue, job_id)); 
    217210 
    218                 if( job_id == NULL ) 
    219                 { 
    220                         fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno )); 
    221                         if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED) 
    222                          { 
    223                                 pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self->session; 
    224  
    225                                 fsd_log_error(( "Protocol error. Retrying..." )); 
    226  
    227                                 if (pbsself->pbs_conn >= 0 ) 
    228                                         pbs_disconnect( pbsself->pbs_conn ); 
    229 retry_connect: 
    230                                 sleep(sleep_time++); 
    231                                 pbsself->pbs_conn = pbs_connect( pbsself->super.contact ); 
    232                                 if( pbsself->pbs_conn < 0)  
    233                                  { 
    234                                         if (tries_left--) 
    235                                                 goto retry_connect; 
    236                                         else 
    237                                                 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
    238                                  } 
    239                                 else 
    240                                  { 
    241                                         if (tries_left--) 
    242                                                 goto retry; 
    243                                         else 
    244                                                 pbsdrmaa_exc_raise_pbs( "pbs_submit" ); 
    245                                  } 
    246                          } 
    247                         else 
    248                          { 
    249                                 pbsdrmaa_exc_raise_pbs( "pbs_submit" ); 
    250                          } 
    251                 } 
    252                 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    253211         } 
    254212        EXCEPT_DEFAULT 
     
    259217        FINALLY 
    260218         { 
    261                 if( conn_lock ) 
    262                         conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    263219                if( pbs_attr ) 
    264220                        pbsdrmaa_free_attrl( pbs_attr ); 
     
    894850                        if (!attr_value) 
    895851                          { 
    896                                 fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT, "Invalid output line of submit filter:", output_line); 
     852                                fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT, "Invalid output line of submit filter: %s", output_line); 
    897853                          } 
    898854                        else 
  • trunk/pbs_drmaa/util.c

    r70 r85  
    114114 
    115115void 
    116 pbsdrmaa_exc_raise_pbs( const char *function ) 
     116pbsdrmaa_exc_raise_pbs( const char *function, int connection ) 
    117117{ 
    118118        int _pbs_errno; 
    119119        int fsd_errno; 
    120120        const char *message = NULL; 
     121        const char *extended_message = NULL; 
    121122 
    122123        _pbs_errno = pbs_errno; 
     
    128129#endif 
    129130 
     131        if ( connection != -1 ) 
     132         { 
     133                extended_message = pbs_geterrmsg(connection); 
     134         } 
     135 
    130136        fsd_errno = pbsdrmaa_map_pbs_errno( _pbs_errno ); 
    131         fsd_log_error(( 
    132                                 "call to %s returned with error %d:%s mapped to %d:%s", 
    133                                 function, 
    134                                 _pbs_errno, message, 
    135                                 fsd_errno, fsd_strerror(fsd_errno) 
    136                                 )); 
    137         fsd_exc_raise_fmt( fsd_errno, " %s", function, message ); 
    138 } 
    139  
     137 
     138        fsd_log_error(( "call to %s returned with error %d:%s(%s) mapped to %d:%s", 
     139                                        function, 
     140                                        _pbs_errno, message, extended_message, 
     141                                        fsd_errno, fsd_strerror(fsd_errno) 
     142                        )); 
     143 
     144        if (extended_message) 
     145                fsd_exc_raise_fmt(fsd_errno, "%s: %s ", message, extended_message); 
     146        else 
     147                fsd_exc_raise_fmt(fsd_errno, "%s", message); 
     148} 
    140149 
    141150/** Maps PBS error code into DMRAA code. */ 
  • trunk/pbs_drmaa/util.h

    r65 r85  
    3030 
    3131 
    32 void pbsdrmaa_exc_raise_pbs( const char *function ); 
     32void pbsdrmaa_exc_raise_pbs( const char *function, int connection_fd ); 
    3333int pbsdrmaa_map_pbs_errno( int _pbs_errno ); 
    3434 
Note: See TracChangeset for help on using the changeset viewer.