Changeset 45 for trunk/pbs_drmaa


Ignore:
Timestamp:
11/28/11 15:02:58 (12 years ago)
Author:
mmamonski
Message:

version 1.0.11 - ready to try

Location:
trunk/pbs_drmaa
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/job.c

    r40 r45  
    9090        TRY 
    9191         { 
    92                 int try_count; 
    93                 const int max_tries = 3; 
     92                int tries_left = session->max_retries_count; 
     93                int sleep_time = 1; 
    9494 
    9595                conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 
    9696 
    9797                /*TODO reconnect */ 
    98                 for( try_count=0;  try_count < max_tries;  try_count++ ) 
     98                while ( true ) 
    9999                 { 
    100100                        switch( action ) 
     
    151151                         } 
    152152 
    153                         if( rc == PBSE_NONE ) 
    154                                 break; 
    155                         else if( rc == PBSE_INTERNAL ) 
     153retry_connect: 
     154                        if ( rc == PBSE_NONE ) 
     155                                break; 
     156                        else if (( rc == PBSE_INTERNAL || rc == PBSE_PROTOCOL || rc == PBSE_EXPIRED) && (tries_left--)) 
    156157                         { 
    157                                 /* 
    158                                  * In PBS Pro pbs_sigjob raises internal server error (PBSE_INTERNAL) 
    159                                  * when job just changed its state to running. 
    160                                  */ 
    161                                 fsd_log_debug(( "repeating request (%d of %d)", 
    162                                                         try_count+2, max_tries )); 
    163                                 sleep( 1 ); 
     158                                if (rc == PBSE_PROTOCOL || rc == PBSE_EXPIRED) 
     159                                 { 
     160                                        if ( session->pbs_conn >= 0) 
     161                                                pbs_disconnect( session->pbs_conn ); 
     162 
     163                                        sleep( sleep_time++ ); 
     164 
     165                                        session->pbs_conn = pbs_connect( session->super.contact ); 
     166 
     167                                        if (session->pbs_conn < 0) 
     168                                                goto retry_connect; 
     169 
     170                                        fsd_log_info(( "pbs_connect(%s) =%d", session->super.contact, session->pbs_conn )); 
     171                                 } 
     172                                else /* PBSE_INTERNAL */ 
     173                                 { 
     174                                        /* 
     175                                         * In PBS Pro pbs_sigjob raises internal server error (PBSE_INTERNAL) 
     176                                         * when job just changed its state to running. 
     177                                         */ 
     178                                        sleep( sleep_time++ ); 
     179                                 } 
     180                                fsd_log_debug(( "repeating request (%d of %d)", tries_left, session->max_retries_count)); 
    164181                         } 
    165182                        else 
    166183                                pbsdrmaa_exc_raise_pbs( apicall ); 
    167                  } /* end for */ 
     184                 } /* end while */ 
    168185         } 
    169186        FINALLY 
     
    184201        struct batch_status *volatile status = NULL; 
    185202        pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 
     203        int tries_left = session->max_retries_count; 
     204        int sleep_time = 1; 
    186205 
    187206        fsd_log_enter(( "({job_id=%s})", self->job_id )); 
     
    229248                                        if ( session->pbs_conn >= 0 ) 
    230249                                                pbs_disconnect( session->pbs_conn ); 
    231                                         sleep(1); 
     250retry_connect: 
     251                                        sleep(sleep_time++); 
    232252                                        session->pbs_conn = pbs_connect( session->super.contact ); 
    233253                                        if( session->pbs_conn < 0 ) 
    234                                                 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     254                                         { 
     255                                                if (tries_left--) 
     256                                                        goto retry_connect; 
     257                                                else 
     258                                                        pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     259                                         } 
    235260                                        else 
    236261                                         { 
    237                                                 fsd_log_error(("retry:")); 
    238262                                                goto retry; 
    239263                                         } 
  • trunk/pbs_drmaa/log_reader.c

    r38 r45  
    457457 
    458458                                fsd_mutex_unlock( &self->session->mutex ); 
    459                                  
    460                                 usleep(300000); /* 300 ms - consider using inotify - but this would not work with NFS */                                 
     459 
     460                                sleep(((pbsdrmaa_session_t *)self->session)->wait_thread_sleep_time); 
    461461 
    462462                                fsd_mutex_lock( &self->session->mutex ); 
  • trunk/pbs_drmaa/session.c

    r41 r45  
    6565pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id ); 
    6666 
    67 static bool 
    68 pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self ); 
    69  
    7067static void 
    7168pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self ); 
     
    115112                self->super.apply_configuration = pbsdrmaa_session_apply_configuration; 
    116113 
    117                 self->do_drm_keeps_completed_jobs = 
    118                         pbsdrmaa_session_do_drm_keeps_completed_jobs; 
    119  
    120114                self->status_attrl = pbsdrmaa_create_status_attrl(); 
    121  
    122          {      /* ugly. But this is life... ;( */ 
    123                 #define MAX_PBS_CONNECT_RETRIES (3) 
    124                 int tries_counter = MAX_PBS_CONNECT_RETRIES; 
    125              retry: 
    126                 self->pbs_conn = pbs_connect( self->super.contact ); 
    127                 fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, 
    128                                         self->pbs_conn )); 
    129                 if( self->pbs_conn < 0 && tries_counter-- ) 
    130                  { 
    131                         sleep(1); 
    132                         goto retry; 
    133                  } 
    134          } 
    135                 if( self->pbs_conn < 0 ) 
    136                         pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     115                self->max_retries_count = 3; 
     116                self->wait_thread_sleep_time = 1; 
    137117 
    138118                self->super.load_configuration( &self->super, "pbs_drmaa" ); 
    139119 
    140120                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS; 
    141                 if( self->do_drm_keeps_completed_jobs( self ) ) 
    142                         self->super.missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS; 
     121 
     122                 { 
     123                        int tries_left = self->max_retries_count; 
     124                        int sleep_time = 1; 
     125retry_connect: /* Life... */ 
     126                        self->pbs_conn = pbs_connect( self->super.contact ); 
     127                        fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn )); 
     128                        if( self->pbs_conn < 0 && tries_left-- ) 
     129                         { 
     130                                sleep(sleep_time++); 
     131                                goto retry_connect; 
     132                         } 
     133 
     134                        if( self->pbs_conn < 0 ) 
     135                                pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     136                 } 
    143137         } 
    144138        EXCEPT_DEFAULT 
     
    223217{ 
    224218        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; 
    225         fsd_conf_option_t *pbs_home; 
     219        fsd_conf_option_t *pbs_home = NULL; 
     220        fsd_conf_option_t *wait_thread_sleep_time = NULL; 
     221        fsd_conf_option_t *max_retries_count = NULL; 
     222 
    226223        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" ); 
     224        wait_thread_sleep_time = fsd_conf_dict_get(self->configuration, "wait_thread_sleep_time" ); 
     225        max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" ); 
    227226 
    228227        if( pbs_home && pbs_home->type == FSD_CONF_STRING ) 
     
    259258          } 
    260259 
     260        if ( max_retries_count && max_retries_count->type == FSD_CONF_INTEGER) 
     261          { 
     262                pbsself->max_retries_count = max_retries_count->val.integer; 
     263                fsd_log_info(("Max retries count: %d", pbsself->max_retries_count)); 
     264          } 
     265 
     266        if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER) 
     267          { 
     268                pbsself->wait_thread_sleep_time = wait_thread_sleep_time->val.integer; 
     269                fsd_log_info(("Wait thread sleep time: %d", pbsself->wait_thread_sleep_time)); 
     270          } 
     271 
    261272        pbsself->super_apply_configuration(self); /* call method from the superclass */ 
    262273} 
     
    271282        fsd_job_set_t *jobs = self->jobs; 
    272283        struct batch_status *volatile status = NULL; 
     284        volatile int tries_left = pbsself->max_retries_count; 
     285        volatile int sleep_time = 1; 
    273286 
    274287        fsd_log_enter(("")); 
     
    284297                status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL ); 
    285298#endif 
    286                 fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", 
    287                                  pbsself->pbs_conn, (void*)status )); 
     299                fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status )); 
    288300                if( status == NULL  &&  pbs_errno != 0 ) 
    289301                 { 
     
    292304                                if ( pbsself->pbs_conn >= 0) 
    293305                                        pbs_disconnect( pbsself->pbs_conn ); 
    294                                 sleep(1); 
     306retry_connect: 
     307                                sleep(sleep_time++); 
    295308                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact ); 
    296                                 if( pbsself->pbs_conn < 0 ) 
    297                                         pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     309                                if( pbsself->pbs_conn < 0) 
     310                                 { 
     311                                        if (tries_left--) 
     312                                                goto retry_connect; 
     313                                        else 
     314                                                pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     315                                 } 
    298316                                else 
    299317                                        goto retry; 
     
    429447} 
    430448 
    431  
    432 bool 
    433 pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self ) 
    434 { 
    435  
    436 #ifndef PBS_PROFESSIONAL 
    437         struct attrl default_queue_query; 
    438         struct attrl keep_completed_query; 
    439         struct batch_status *default_queue_result = NULL; 
    440         struct batch_status *keep_completed_result = NULL; 
    441         const char *default_queue = NULL; 
    442         const char *keep_completed = NULL; 
    443         volatile bool result = false; 
    444         volatile bool conn_lock = false; 
    445  
    446         TRY 
    447          { 
    448                 default_queue_query.next = NULL; 
    449                 default_queue_query.name = "default_queue"; 
    450                 default_queue_query.resource = NULL; 
    451                 default_queue_query.value = NULL; 
    452                 keep_completed_query.next = NULL; 
    453                 keep_completed_query.name = "keep_completed"; 
    454                 keep_completed_query.resource = NULL; 
    455                 keep_completed_query.value = NULL; 
    456  
    457                 conn_lock = fsd_mutex_lock( &self->super.drm_connection_mutex ); 
    458  
    459                 default_queue_result = 
    460                                 pbs_statserver( self->pbs_conn, &default_queue_query, NULL ); 
    461                 if( default_queue_result == NULL ) 
    462                         pbsdrmaa_exc_raise_pbs( "pbs_statserver" ); 
    463                 if( default_queue_result->attribs 
    464                                 &&  !strcmp( default_queue_result->attribs->name, 
    465                                         "default_queue" ) ) 
    466                         default_queue = default_queue_result->attribs->value; 
    467  
    468                 fsd_log_debug(( "default_queue: %s", default_queue )); 
    469  
    470                 if( default_queue ) 
    471                  { 
    472                         keep_completed_result = pbs_statque( self->pbs_conn, 
    473                                         (char*)default_queue, &keep_completed_query, NULL ); 
    474                         if( keep_completed_result == NULL ) 
    475                                 pbsdrmaa_exc_raise_pbs( "pbs_statque" ); 
    476                         if( keep_completed_result->attribs 
    477                                         &&  !strcmp( keep_completed_result->attribs->name, 
    478                                                 "keep_completed" ) ) 
    479                                 keep_completed = keep_completed_result->attribs->value; 
    480                  } 
    481  
    482                 fsd_log_debug(( "keep_completed: %s", keep_completed )); 
    483          } 
    484         EXCEPT_DEFAULT 
    485          { 
    486                 const fsd_exc_t *e = fsd_exc_get(); 
    487                 fsd_log_warning(( "PBS server seems not to keep completed jobs\n" 
    488                                 "detail: %s", e->message(e) )); 
    489                 result = false; 
    490          } 
    491         ELSE 
    492          { 
    493                 result = false; 
    494                 if( default_queue == NULL ) 
    495                         fsd_log_warning(( "no default queue set on PBS server" )); 
    496                 else if( keep_completed == NULL && self->pbs_home == NULL ) 
    497                         fsd_log_warning(( "Torque server is not configured to keep completed jobs\n" 
    498                                                 "in Torque: set keep_completed parameter of default queue\n" 
    499                                                 "  $ qmgr -c 'set queue batch keep_completed = 60'\n" 
    500                                                 " or configure DRMAA to utilize log files" 
    501                                                 )); 
    502                 else 
    503                         result = true; 
    504          } 
    505         FINALLY 
    506          { 
    507                 if( default_queue_result ) 
    508                         pbs_statfree( default_queue_result ); 
    509                 if( keep_completed_result ) 
    510                         pbs_statfree( keep_completed_result ); 
    511                 if( conn_lock ) 
    512                         conn_lock = fsd_mutex_unlock( &self->super.drm_connection_mutex ); 
    513  
    514          } 
    515         END_TRY 
    516  
    517         return result; 
    518 #endif 
    519         fsd_log_warning(( "PBS Professional does not keep information about the completed jobs\n" 
    520                                 " You must configure DRMAA to utilize log files in order to always get valid job exit status" 
    521                                 )); 
    522         return false; 
    523 } 
    524  
    525449void * 
    526450pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ) 
  • trunk/pbs_drmaa/session.h

    r29 r45  
    3434struct pbsdrmaa_session_s { 
    3535        fsd_drmaa_session_t super; 
    36  
    37         bool (*do_drm_keeps_completed_jobs)( pbsdrmaa_session_t *self ); 
    3836 
    3937        void (*super_destroy)( fsd_drmaa_session_t *self ); 
     
    7674         */ 
    7775        time_t log_file_initial_time; 
     76 
     77        /* 
     78         * Maximal number of retires in pbs_connect. Default 3 
     79         */ 
     80        int max_retries_count; 
     81 
     82        /* 
     83         * Wait thread sleep time (in seconds). Default 1s. 
     84         */ 
     85        int wait_thread_sleep_time; 
    7886}; 
    7987 
  • trunk/pbs_drmaa/submit.c

    r37 r45  
    145145                const fsd_template_t *pbs_tmpl = self->pbs_job_attributes; 
    146146                unsigned i; 
     147                int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count; 
     148                int sleep_time = 1; 
    147149 
    148150                for( i = 0;  i < PBSDRMAA_N_PBS_ATTRIBUTES;  i++ ) 
     
    173175                                if (pbsself->pbs_conn >= 0 ) 
    174176                                        pbs_disconnect( pbsself->pbs_conn ); 
    175                                 sleep(1); 
     177retry_connect: 
     178                                sleep(sleep_time++); 
    176179                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact ); 
    177                                 if( pbsself->pbs_conn < 0 ) 
    178                                         pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     180                                if( pbsself->pbs_conn < 0) 
     181                                        if (tries_left--) 
     182                                                goto retry_connect; 
     183                                        else 
     184                                                pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
    179185                                else 
    180186                                        goto retry; 
     
    612618                TRY 
    613619                  { 
    614                         for (arg = strtok_r(native_spec_copy, " \t", &ctxt); arg; arg = strtok_r(NULL, " \t",&ctxt) ) { 
     620                        for (arg = strtok_r((char *)native_spec_copy, " \t", &ctxt); arg; arg = strtok_r(NULL, " \t",&ctxt) ) { 
    615621                                if (!opt) 
    616622                                  { 
     
    718724#endif 
    719725                        args_list->destroy(args_list); 
    720                         fsd_free(native_spec_copy); 
     726                        fsd_free((char *)native_spec_copy); 
    721727                 } 
    722728                END_TRY 
Note: See TracChangeset for help on using the changeset viewer.