Changeset 85 for trunk/pbs_drmaa/job.c


Ignore:
Timestamp:
01/17/13 18:44:15 (10 years ago)
Author:
mmamonski
Message:

PBS DRMAA autoclose connection

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/job.c

    r83 r85  
    7777pbsdrmaa_job_control( fsd_job_t *self, int action ) 
    7878{ 
    79         volatile bool conn_lock = false; 
    8079        pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 
    8180        const char *job_id = self->job_id; 
    82         const char *apicall = NULL; 
    83         int rc = PBSE_NONE; 
    84  
    85         fsd_log_enter(( "({job_id=%s}, action=%d)", 
    86                         self->job_id, action )); 
    87  
    88         TRY 
    89          { 
    90                 int tries_left = session->max_retries_count; 
    91                 int sleep_time = 1; 
    92  
    93                 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 
    94  
    95                 /*TODO reconnect */ 
    96                 while ( true ) 
    97                  { 
    98                         switch( action ) 
    99                          { 
    100                                 /* 
    101                                  * We cannot know whether we did suspend job 
    102                                  * in other way than remembering this inside DRMAA session. 
    103                                  */ 
    104                                 case DRMAA_CONTROL_SUSPEND: 
    105                                         apicall = "pbs_sigjob"; 
    106                                         rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 
    107                                                         "SIGSTOP", NULL ); 
    108                                         fsd_log_info(("pbs_sigjob(%s, SIGSTOP) =%d", job_id, rc)); 
    109                                         if( rc == PBSE_NONE ) 
    110                                                 self->flags |= FSD_JOB_SUSPENDED; 
    111                                         break; 
    112                                 case DRMAA_CONTROL_RESUME: 
    113                                         apicall = "pbs_sigjob"; 
    114                                         rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 
    115                                                         "SIGCONT", NULL ); 
    116                                         fsd_log_info(("pbs_sigjob(%s, SIGCONT) =%d", job_id, rc)); 
    117                                         if( rc == PBSE_NONE ) 
    118                                                 self->flags &= ~FSD_JOB_SUSPENDED; 
    119                                         break; 
    120                                 case DRMAA_CONTROL_HOLD: 
    121                                         apicall = "pbs_holdjob"; 
    122                                         rc = pbs_holdjob( session->pbs_conn, (char*)job_id, 
    123                                                         USER_HOLD, NULL ); 
    124                                         fsd_log_info(("pbs_sigjob(%s, SIGHOLD) =%d", job_id, rc)); 
    125                                         if( rc == PBSE_NONE ) 
    126                                                 self->flags |= FSD_JOB_HOLD; 
    127                                         break; 
    128                                 case DRMAA_CONTROL_RELEASE: 
    129                                         apicall = "pbs_rlsjob"; 
    130                                         rc = pbs_rlsjob( session->pbs_conn, (char*)job_id, 
    131                                                         USER_HOLD, NULL ); 
    132                                         fsd_log_info(("pbs_rlsjob(%s) =%d", job_id, rc)); 
    133                                         if( rc == PBSE_NONE ) 
    134                                                 self->flags &= FSD_JOB_HOLD; 
    135                                         break; 
    136                                 case DRMAA_CONTROL_TERMINATE: 
    137                                         apicall = "pbs_deljob"; 
    138                                         rc = pbs_deljob( session->pbs_conn, (char*)job_id, NULL ); 
    139                                         fsd_log_info(("pbs_deljob(%s) =%d", job_id, rc)); 
    140                                         /* Torque: 
    141                                          * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 
    142                                         if( rc == PBSE_NONE ) 
    143                                          { 
    144                                                 self->flags &= FSD_JOB_TERMINATED_MASK; 
    145                                                 if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 
    146                                                         self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 
    147                                          } 
    148                                         break; 
    149                          } 
    150  
    151 retry_connect: 
    152                         if ( rc == PBSE_NONE ) 
    153                                 break; 
    154                         else if (( rc == PBSE_INTERNAL || rc == PBSE_PROTOCOL || rc == PBSOLDE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_EXPIRED) && (tries_left--)) 
    155                          { 
    156                                 if (rc == PBSE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_PROTOCOL || rc == PBSOLDE_EXPIRED) 
    157                                  { 
    158                                         if ( session->pbs_conn >= 0) 
    159                                                 pbs_disconnect( session->pbs_conn ); 
    160  
    161                                         sleep( sleep_time++ ); 
    162  
    163                                         session->pbs_conn = pbs_connect( session->super.contact ); 
    164  
    165                                         if (session->pbs_conn < 0) 
    166                                                 goto retry_connect; 
    167  
    168                                         fsd_log_info(( "pbs_connect(%s) =%d", session->super.contact, session->pbs_conn )); 
    169                                  } 
    170                                 else /* PBSE_INTERNAL */ 
    171                                  { 
    172                                         /* 
    173                                          * In PBS Pro pbs_sigjob raises internal server error (PBSE_INTERNAL) 
    174                                          * when job just changed its state to running. 
    175                                          */ 
    176                                         sleep( sleep_time++ ); 
    177                                  } 
    178                                 fsd_log_debug(( "repeating request (%d of %d)", tries_left, session->max_retries_count)); 
    179                          } 
    180                         else 
    181                                 pbsdrmaa_exc_raise_pbs( apicall ); 
    182                  } /* end while */ 
    183          } 
    184         FINALLY 
    185          { 
    186                 if( conn_lock ) 
    187                         conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    188          } 
    189         END_TRY 
     81 
     82        fsd_log_enter(( "({job_id=%s}, action=%d)", self->job_id, action )); 
     83 
     84        while ( true ) 
     85         { 
     86                switch( action ) 
     87                 { 
     88                        /* 
     89                         * We cannot know whether we did suspend job 
     90                         * in other way than remembering this inside DRMAA session. 
     91                         */ 
     92                        case DRMAA_CONTROL_SUSPEND: 
     93                                session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGSTOP"); 
     94                                self->flags |= FSD_JOB_SUSPENDED; 
     95                                break; 
     96                        case DRMAA_CONTROL_RESUME: 
     97                                session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGCONT"); 
     98                                self->flags &= ~FSD_JOB_SUSPENDED; 
     99                                break; 
     100                        case DRMAA_CONTROL_HOLD: 
     101                                session->pbs_connection->holdjob( session->pbs_connection, (char*)job_id ); 
     102                                self->flags |= FSD_JOB_HOLD; 
     103                                break; 
     104                        case DRMAA_CONTROL_RELEASE: 
     105                                session->pbs_connection->rlsjob( session->pbs_connection, (char*)job_id ); 
     106                                self->flags &= ~FSD_JOB_HOLD; 
     107                                break; 
     108                        case DRMAA_CONTROL_TERMINATE: 
     109                                session->pbs_connection->deljob( session->pbs_connection, (char*)job_id ); 
     110                                /* TODO: make deldelay configurable ???: 
     111                                 * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 
     112                                self->flags &= FSD_JOB_TERMINATED_MASK; 
     113                                if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 
     114                                        self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 
     115                                break; 
     116                 } 
     117         } 
    190118 
    191119        fsd_log_return(("")); 
     
    196124pbsdrmaa_job_update_status( fsd_job_t *self ) 
    197125{ 
    198         volatile bool conn_lock = false; 
    199126        struct batch_status *volatile status = NULL; 
    200127        pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 
    201         int tries_left = session->max_retries_count; 
    202         int sleep_time = 1; 
    203128 
    204129        fsd_log_enter(( "({job_id=%s})", self->job_id )); 
     
    206131        TRY 
    207132         { 
    208                 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 
    209 retry: 
    210                 if (session->pbs_conn < 0) { 
    211                         fsd_log_info(("No connection with pbs. Reconnecting")); 
    212                         goto retry_connect; 
    213                 } 
    214  
    215133 
    216134#ifdef PBS_PROFESSIONAL 
    217                 status = pbs_statjob( session->pbs_conn, self->job_id, NULL, NULL ); 
     135                status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, NULL); 
    218136#else 
    219                 status = pbs_statjob( session->pbs_conn, self->job_id, session->status_attrl, NULL ); 
     137                status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, session->status_attrl); 
    220138#endif 
    221                 fsd_log_info(( "pbs_statjob(fd=%d, job_id=%s, attribs={...}) =%p", 
    222                                  session->pbs_conn, self->job_id, (void*)status )); 
    223                 if( status == NULL ) 
    224                  { 
    225  
    226 #ifndef PBS_PROFESSIONAL 
    227                         if ( pbs_errno != PBSE_UNKJOBID ) 
    228                                 fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 
    229                         else 
    230                                 fsd_log_debug(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 
    231 #else 
    232 #  ifndef PBS_PROFESSIONAL_NO_LOG 
    233                         if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID ) 
    234                                 fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 
    235                         else 
    236                                 fsd_log_debug(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 
    237 #  else 
    238                         if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID ) 
    239                                 fsd_log_error(("pbs_statjob error: %d", pbs_errno)); 
    240                         else 
    241                                 fsd_log_debug(("pbs_statjob error: %d", pbs_errno)); 
    242 #  endif 
    243 #endif 
    244  
    245                         switch( pbs_errno ) 
    246                          { 
    247                                 case PBSE_UNKJOBID: 
    248 #ifdef PBS_PROFESSIONAL 
    249                                 case PBSE_HISTJOBID: 
    250 #endif 
    251                                         break; 
    252                                 case PBSE_PROTOCOL: 
    253 #if PBSOLDE_PROTOCOL != PBSE_PROTOCOL 
    254                                 case PBSOLDE_PROTOCOL: 
    255 #endif 
    256                                 case PBSE_EXPIRED: 
    257 #if PBSOLDE_EXPIRED != PBSE_EXPIRED 
    258                                 case PBSOLDE_EXPIRED: 
    259 #endif 
    260                                         if ( session->pbs_conn >= 0 ) 
    261                                                 pbs_disconnect( session->pbs_conn ); 
    262                                         fsd_log_info(("Protocol error. Reconnecting...")); 
    263 retry_connect: 
    264                                         sleep(sleep_time++); 
    265                                         session->pbs_conn = pbs_connect( session->super.contact ); 
    266                                         if( session->pbs_conn < 0 ) 
    267                                          { 
    268                                                 if (tries_left--) { 
    269                                                         fsd_log_info(("Retrying... (%d tries left)", tries_left)); 
    270                                                         goto retry_connect; 
    271                                                 } else { 
    272                                                         fsd_log_error(("No more tries left... Throwing exception")); 
    273                                                         pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
    274                                                 } 
    275                                          } 
    276                                         else 
    277                                          { 
    278                                                 goto retry; 
    279                                          } 
    280                                 default: 
    281                                         pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); 
    282                                         break; 
    283                                 case 0:  /* ? */ 
    284                                         fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR ); 
    285                                         break; 
    286                          } 
    287  
    288                  } 
    289  
    290                 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    291  
    292139 
    293140                if( status != NULL ) 
     
    302149        FINALLY 
    303150         { 
    304                 if( conn_lock ) 
    305                         conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 
    306151                if( status != NULL ) 
    307                         pbs_statfree( status ); 
     152                        session->pbs_connection->statjob_free( session->pbs_connection, status ); 
    308153         } 
    309154        END_TRY 
Note: See TracChangeset for help on using the changeset viewer.