Changeset 85
- Timestamp:
- 01/17/13 18:44:15 (12 years ago)
- Location:
- trunk/pbs_drmaa
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/job.c
r83 r85 77 77 pbsdrmaa_job_control( fsd_job_t *self, int action ) 78 78 { 79 volatile bool conn_lock = false;80 79 pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 81 80 const char *job_id = self->job_id; 82 const char *apicall = NULL; 83 int rc = PBSE_NONE; 84 85 fsd_log_enter(( "({job_id=%s}, action=%d)", 86 self->job_id, action )); 87 88 TRY 89 { 90 int tries_left = session->max_retries_count; 91 int sleep_time = 1; 92 93 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 94 95 /*TODO reconnect */ 96 while ( true ) 97 { 98 switch( action ) 99 { 100 /* 101 * We cannot know whether we did suspend job 102 * in other way than remembering this inside DRMAA session. 103 */ 104 case DRMAA_CONTROL_SUSPEND: 105 apicall = "pbs_sigjob"; 106 rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 107 "SIGSTOP", NULL ); 108 fsd_log_info(("pbs_sigjob(%s, SIGSTOP) =%d", job_id, rc)); 109 if( rc == PBSE_NONE ) 110 self->flags |= FSD_JOB_SUSPENDED; 111 break; 112 case DRMAA_CONTROL_RESUME: 113 apicall = "pbs_sigjob"; 114 rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 115 "SIGCONT", NULL ); 116 fsd_log_info(("pbs_sigjob(%s, SIGCONT) =%d", job_id, rc)); 117 if( rc == PBSE_NONE ) 118 self->flags &= ~FSD_JOB_SUSPENDED; 119 break; 120 case DRMAA_CONTROL_HOLD: 121 apicall = "pbs_holdjob"; 122 rc = pbs_holdjob( session->pbs_conn, (char*)job_id, 123 USER_HOLD, NULL ); 124 fsd_log_info(("pbs_sigjob(%s, SIGHOLD) =%d", job_id, rc)); 125 if( rc == PBSE_NONE ) 126 self->flags |= FSD_JOB_HOLD; 127 break; 128 case DRMAA_CONTROL_RELEASE: 129 apicall = "pbs_rlsjob"; 130 rc = pbs_rlsjob( session->pbs_conn, (char*)job_id, 131 USER_HOLD, NULL ); 132 fsd_log_info(("pbs_rlsjob(%s) =%d", job_id, rc)); 133 if( rc == PBSE_NONE ) 134 self->flags &= FSD_JOB_HOLD; 135 break; 136 case DRMAA_CONTROL_TERMINATE: 137 apicall = "pbs_deljob"; 138 rc = pbs_deljob( session->pbs_conn, (char*)job_id, NULL ); 139 fsd_log_info(("pbs_deljob(%s) =%d", job_id, rc)); 140 /* Torque: 141 * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 142 if( rc == PBSE_NONE ) 143 { 144 self->flags &= FSD_JOB_TERMINATED_MASK; 145 if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 146 self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 147 } 148 break; 149 } 150 151 retry_connect: 152 if ( rc == PBSE_NONE ) 153 break; 154 else if (( rc == PBSE_INTERNAL || rc == PBSE_PROTOCOL || rc == PBSOLDE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_EXPIRED) && (tries_left--)) 155 { 156 if (rc == PBSE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_PROTOCOL || rc == PBSOLDE_EXPIRED) 157 { 158 if ( session->pbs_conn >= 0) 159 pbs_disconnect( session->pbs_conn ); 160 161 sleep( sleep_time++ ); 162 163 session->pbs_conn = pbs_connect( session->super.contact ); 164 165 if (session->pbs_conn < 0) 166 goto retry_connect; 167 168 fsd_log_info(( "pbs_connect(%s) =%d", session->super.contact, session->pbs_conn )); 169 } 170 else /* PBSE_INTERNAL */ 171 { 172 /* 173 * In PBS Pro pbs_sigjob raises internal server error (PBSE_INTERNAL) 174 * when job just changed its state to running. 175 */ 176 sleep( sleep_time++ ); 177 } 178 fsd_log_debug(( "repeating request (%d of %d)", tries_left, session->max_retries_count)); 179 } 180 else 181 pbsdrmaa_exc_raise_pbs( apicall ); 182 } /* end while */ 183 } 184 FINALLY 185 { 186 if( conn_lock ) 187 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 188 } 189 END_TRY 81 82 fsd_log_enter(( "({job_id=%s}, action=%d)", self->job_id, action )); 83 84 while ( true ) 85 { 86 switch( action ) 87 { 88 /* 89 * We cannot know whether we did suspend job 90 * in other way than remembering this inside DRMAA session. 91 */ 92 case DRMAA_CONTROL_SUSPEND: 93 session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGSTOP"); 94 self->flags |= FSD_JOB_SUSPENDED; 95 break; 96 case DRMAA_CONTROL_RESUME: 97 session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGCONT"); 98 self->flags &= ~FSD_JOB_SUSPENDED; 99 break; 100 case DRMAA_CONTROL_HOLD: 101 session->pbs_connection->holdjob( session->pbs_connection, (char*)job_id ); 102 self->flags |= FSD_JOB_HOLD; 103 break; 104 case DRMAA_CONTROL_RELEASE: 105 session->pbs_connection->rlsjob( session->pbs_connection, (char*)job_id ); 106 self->flags &= ~FSD_JOB_HOLD; 107 break; 108 case DRMAA_CONTROL_TERMINATE: 109 session->pbs_connection->deljob( session->pbs_connection, (char*)job_id ); 110 /* TODO: make deldelay configurable ???: 111 * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 112 self->flags &= FSD_JOB_TERMINATED_MASK; 113 if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 114 self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 115 break; 116 } 117 } 190 118 191 119 fsd_log_return(("")); … … 196 124 pbsdrmaa_job_update_status( fsd_job_t *self ) 197 125 { 198 volatile bool conn_lock = false;199 126 struct batch_status *volatile status = NULL; 200 127 pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 201 int tries_left = session->max_retries_count;202 int sleep_time = 1;203 128 204 129 fsd_log_enter(( "({job_id=%s})", self->job_id )); … … 206 131 TRY 207 132 { 208 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex );209 retry:210 if (session->pbs_conn < 0) {211 fsd_log_info(("No connection with pbs. Reconnecting"));212 goto retry_connect;213 }214 215 133 216 134 #ifdef PBS_PROFESSIONAL 217 status = pbs_statjob( session->pbs_conn, self->job_id, NULL, NULL);135 status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, NULL); 218 136 #else 219 status = pbs_statjob( session->pbs_conn, self->job_id, session->status_attrl, NULL);137 status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, session->status_attrl); 220 138 #endif 221 fsd_log_info(( "pbs_statjob(fd=%d, job_id=%s, attribs={...}) =%p",222 session->pbs_conn, self->job_id, (void*)status ));223 if( status == NULL )224 {225 226 #ifndef PBS_PROFESSIONAL227 if ( pbs_errno != PBSE_UNKJOBID )228 fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno)));229 else230 fsd_log_debug(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno)));231 #else232 # ifndef PBS_PROFESSIONAL_NO_LOG233 if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID )234 fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno)));235 else236 fsd_log_debug(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno)));237 # else238 if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID )239 fsd_log_error(("pbs_statjob error: %d", pbs_errno));240 else241 fsd_log_debug(("pbs_statjob error: %d", pbs_errno));242 # endif243 #endif244 245 switch( pbs_errno )246 {247 case PBSE_UNKJOBID:248 #ifdef PBS_PROFESSIONAL249 case PBSE_HISTJOBID:250 #endif251 break;252 case PBSE_PROTOCOL:253 #if PBSOLDE_PROTOCOL != PBSE_PROTOCOL254 case PBSOLDE_PROTOCOL:255 #endif256 case PBSE_EXPIRED:257 #if PBSOLDE_EXPIRED != PBSE_EXPIRED258 case PBSOLDE_EXPIRED:259 #endif260 if ( session->pbs_conn >= 0 )261 pbs_disconnect( session->pbs_conn );262 fsd_log_info(("Protocol error. Reconnecting..."));263 retry_connect:264 sleep(sleep_time++);265 session->pbs_conn = pbs_connect( session->super.contact );266 if( session->pbs_conn < 0 )267 {268 if (tries_left--) {269 fsd_log_info(("Retrying... (%d tries left)", tries_left));270 goto retry_connect;271 } else {272 fsd_log_error(("No more tries left... Throwing exception"));273 pbsdrmaa_exc_raise_pbs( "pbs_connect" );274 }275 }276 else277 {278 goto retry;279 }280 default:281 pbsdrmaa_exc_raise_pbs( "pbs_statjob" );282 break;283 case 0: /* ? */284 fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR );285 break;286 }287 288 }289 290 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );291 292 139 293 140 if( status != NULL ) … … 302 149 FINALLY 303 150 { 304 if( conn_lock )305 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );306 151 if( status != NULL ) 307 pbs_statfree(status );152 session->pbs_connection->statjob_free( session->pbs_connection, status ); 308 153 } 309 154 END_TRY -
trunk/pbs_drmaa/log_reader.c
r60 r85 693 693 self->current_offset = ftello(self->fhandle); 694 694 695 fsd_log_debug(("Closing log file (offset=%d)", self->current_offset));695 fsd_log_debug(("Closing log file (offset=%d)", (int)self->current_offset)); 696 696 697 697 fclose(self->fhandle); … … 703 703 pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self ) 704 704 { 705 fsd_log_debug(("Reopening log file: %s (offset=%d)", self->log_path, self->current_offset));705 fsd_log_debug(("Reopening log file: %s (offset=%d)", self->log_path, (int)self->current_offset)); 706 706 707 707 if ((self->fhandle = fopen(self->log_path,"r")) == NULL) -
trunk/pbs_drmaa/pbs_conn.c
r84 r85 28 28 #include <drmaa_utils/iter.h> 29 29 #include <drmaa_utils/conf.h> 30 #include <drmaa_utils/session.h>31 30 #include <drmaa_utils/datetime.h> 32 31 32 #include <pbs_drmaa/session.h> 33 33 #include <pbs_drmaa/pbs_conn.h> 34 34 #include <pbs_drmaa/util.h> … … 53 53 static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self, char *job_id ); 54 54 55 static void pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect); 56 57 static void pbsdrmaa_pbs_check_connect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect); 58 55 static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); 56 57 58 static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect); 59 60 static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self ); 61 62 static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self ); 63 64 65 #if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID 66 #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID) 67 #else 68 #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID) 69 #endif 59 70 #define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED) 60 71 61 62 72 pbsdrmaa_pbs_conn_t * 63 pbsdrmaa_pbs_conn_new( pbsdrmaa_session_t *session,char *server )73 pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server ) 64 74 { 65 75 pbsdrmaa_pbs_conn_t *volatile self = NULL; … … 84 94 85 95 self->connection_fd = -1; 86 self->last_usage = time(NULL);87 96 88 97 /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */ 89 98 signal(SIGPIPE, SIG_IGN); 90 99 91 pbsdrmaa_pbs_reconnect_internal(self, false);100 check_reconnect(self, false); 92 101 } 93 102 EXCEPT_DEFAULT … … 100 109 if (self->connection_fd != -1) 101 110 pbs_disconnect(self->connection_fd); 111 stop_autoclose_thread(self); 102 112 } 103 113 … … 110 120 return self; 111 121 } 112 113 122 114 123 void … … 148 157 TRY 149 158 { 150 conn_lock = fsd_mutex_lock(&self->session-> super.drm_connection_mutex);151 152 pbsdrmaa_pbs_reconnect_internal(self, false);159 conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 160 161 check_reconnect(self, false); 153 162 154 163 retry: … … 162 171 if (IS_TRANSIENT_ERROR && first_try) 163 172 { 164 pbsdrmaa_pbs_reconnect_internal(self, true);173 check_reconnect(self, true); 165 174 first_try = false; 166 175 goto retry; … … 168 177 else 169 178 { 170 pbsdrmaa_exc_raise_pbs( "pbs_submit" );179 pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd); 171 180 } 172 181 } … … 180 189 { 181 190 if(conn_lock) 182 conn_lock = fsd_mutex_unlock(&self->session-> super.drm_connection_mutex);191 conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 183 192 } 184 193 END_TRY … … 202 211 TRY 203 212 { 204 conn_lock = fsd_mutex_lock(&self->session-> super.drm_connection_mutex);205 206 pbsdrmaa_pbs_reconnect_internal(self, false);213 conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 214 215 check_reconnect(self, false); 207 216 208 217 retry: … … 213 222 if(status == NULL) 214 223 { 215 fsd_log_error(( "pbs_statjob failed, pbs_errno = %d", pbs_errno )); 216 if (IS_TRANSIENT_ERROR && first_try) 217 { 218 pbsdrmaa_pbs_reconnect_internal(self, true); 224 if (IS_MISSING_JOB) 225 { 226 fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno )); 227 } 228 else if (IS_TRANSIENT_ERROR && first_try) 229 { 230 fsd_log_error(( "pbs_statjob failed, pbs_errno = %d", pbs_errno )); 231 check_reconnect(self, true); 219 232 first_try = false; 220 233 goto retry; … … 222 235 else 223 236 { 224 pbsdrmaa_exc_raise_pbs( "pbs_statjob" );237 pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd); 225 238 } 226 239 } … … 236 249 { 237 250 if(conn_lock) 238 conn_lock = fsd_mutex_unlock(&self->session-> super.drm_connection_mutex);251 conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 239 252 } 240 253 END_TRY … … 266 279 TRY 267 280 { 268 conn_lock = fsd_mutex_lock(&self->session-> super.drm_connection_mutex);269 270 pbsdrmaa_pbs_reconnect_internal(self, false);281 conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 282 283 check_reconnect(self, false); 271 284 272 285 retry: … … 280 293 if (IS_TRANSIENT_ERROR && first_try) 281 294 { 282 pbsdrmaa_pbs_reconnect_internal(self, true);295 check_reconnect(self, true); 283 296 first_try = false; 284 297 goto retry; … … 286 299 else 287 300 { 288 pbsdrmaa_exc_raise_pbs( "pbs_sigjob" );301 pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd); 289 302 } 290 303 } … … 297 310 { 298 311 if(conn_lock) 299 conn_lock = fsd_mutex_unlock(&self->session-> super.drm_connection_mutex);312 conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 300 313 } 301 314 END_TRY … … 318 331 TRY 319 332 { 320 conn_lock = fsd_mutex_lock(&self->session-> super.drm_connection_mutex);321 322 pbsdrmaa_pbs_reconnect_internal(self, false);333 conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 334 335 check_reconnect(self, false); 323 336 324 337 retry: … … 332 345 if (IS_TRANSIENT_ERROR && first_try) 333 346 { 334 pbsdrmaa_pbs_reconnect_internal(self, true);347 check_reconnect(self, true); 335 348 first_try = false; 336 349 goto retry; … … 338 351 else 339 352 { 340 pbsdrmaa_exc_raise_pbs( "pbs_deljob" );353 pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd); 341 354 } 342 355 } … … 349 362 { 350 363 if(conn_lock) 351 conn_lock = fsd_mutex_unlock(&self->session-> super.drm_connection_mutex);364 conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 352 365 } 353 366 END_TRY … … 369 382 TRY 370 383 { 371 conn_lock = fsd_mutex_lock(&self->session-> super.drm_connection_mutex);372 373 pbsdrmaa_pbs_reconnect_internal(self, false);384 conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 385 386 check_reconnect(self, false); 374 387 375 388 retry: … … 383 396 if (IS_TRANSIENT_ERROR && first_try) 384 397 { 385 pbsdrmaa_pbs_reconnect_internal(self, true);398 check_reconnect(self, true); 386 399 first_try = false; 387 400 goto retry; … … 389 402 else 390 403 { 391 pbsdrmaa_exc_raise_pbs( "pbs_rlsjob" );404 pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd); 392 405 } 393 406 } … … 400 413 { 401 414 if(conn_lock) 402 conn_lock = fsd_mutex_unlock(&self->session-> super.drm_connection_mutex);415 conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 403 416 } 404 417 END_TRY … … 420 433 TRY 421 434 { 422 conn_lock = fsd_mutex_lock(&self->session-> super.drm_connection_mutex);423 424 pbsdrmaa_pbs_reconnect_internal(self, false);435 conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex); 436 437 check_reconnect(self, false); 425 438 426 439 retry: … … 434 447 if (IS_TRANSIENT_ERROR && first_try) 435 448 { 436 pbsdrmaa_pbs_reconnect_internal(self, true);449 check_reconnect(self, true); 437 450 first_try = false; 438 451 goto retry; … … 440 453 else 441 454 { 442 pbsdrmaa_exc_raise_pbs( "pbs_holdjob" );455 pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd); 443 456 } 444 457 } … … 451 464 { 452 465 if(conn_lock) 453 conn_lock = fsd_mutex_unlock(&self->session-> super.drm_connection_mutex);466 conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex); 454 467 } 455 468 END_TRY … … 460 473 461 474 void 462 pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)463 { 464 int tries_left = self->session->max_retries_count;475 check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect) 476 { 477 int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count; 465 478 int sleep_time = 1; 466 479 … … 476 489 else 477 490 { 491 stop_autoclose_thread(self); 478 492 pbs_disconnect(self->connection_fd); 479 493 self->connection_fd = -1; 480 494 } 481 495 } 496 497 482 498 483 499 retry_connect: /* Life... */ … … 492 508 493 509 if( self->connection_fd < 0 ) 494 pbsdrmaa_exc_raise_pbs( "pbs_connect" );510 pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd ); 495 511 496 512 fsd_log_return(("(%d)", self->connection_fd)); 497 513 } 498 514 515 516 static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self ) 517 { 518 519 520 } 521 522 static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self ) 523 { 524 525 526 } 527 -
trunk/pbs_drmaa/pbs_conn.h
r76 r85 29 29 #include <drmaa_utils/job.h> 30 30 #include <drmaa_utils/session.h> 31 #include <drmaa_utils/thread.h> 31 32 32 #include <session.h>33 33 34 34 #include <pbs_ifl.h> … … 36 36 typedef struct pbsdrmaa_pbs_conn_s pbsdrmaa_pbs_conn_t; 37 37 38 pbsdrmaa_pbs_conn_t * pbsdrmaa_pbs_conn_new ( pbsdrmaa_session_t * session, char *server); 38 pbsdrmaa_pbs_conn_t * 39 pbsdrmaa_pbs_conn_new( 40 fsd_drmaa_session_t * session, 41 const char *server); 39 42 40 void 41 pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ); 43 void pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ); 42 44 43 45 struct pbsdrmaa_pbs_conn_s { 44 pbsdrmaa_session_t *volatile session;46 fsd_drmaa_session_t *volatile session; 45 47 46 48 char* (*submit) ( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ); … … 63 65 int connection_fd; 64 66 65 /* timestamp of last usage */ 66 time_t last_usage; 67 /* timestamp of last connect time */ 68 time_t last_connect_time; 69 70 fsd_cond_t autoclose_cond; 71 fsd_mutex_t autoclose_mutex; 72 bool close_connection; 67 73 }; 68 74 -
trunk/pbs_drmaa/session.c
r65 r85 98 98 99 99 self->log_file_initial_size = 0; 100 self->pbs_conn = -1;101 100 self->pbs_home = NULL; 102 101 … … 123 122 self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS; 124 123 125 { 126 int tries_left = self->max_retries_count; 127 int sleep_time = 1; 128 /*ignore SIGPIPE - otheriwse pbs_disconnect cause the program to exit */ 129 signal(SIGPIPE, SIG_IGN); 130 retry_connect: /* Life... */ 131 self->pbs_conn = pbs_connect( self->super.contact ); 132 fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn )); 133 if( self->pbs_conn < 0 && tries_left-- ) 134 { 135 sleep(sleep_time++); 136 goto retry_connect; 137 } 138 139 if( self->pbs_conn < 0 ) 140 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 141 } 124 self->pbs_connection = pbsdrmaa_pbs_conn_new( (fsd_drmaa_session_t *)self, contact ); 125 self->connection_max_lifetime = 30; /* 30 seconds */ 142 126 143 127 } … … 162 146 pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; 163 147 self->stop_wait_thread( self ); 164 if( pbsself->pbs_conn >= 0 ) 165 pbs_disconnect( pbsself->pbs_conn ); 148 pbsdrmaa_pbs_conn_destroy(pbsself->pbs_connection); 166 149 fsd_free( pbsself->status_attrl ); 167 150 fsd_free( pbsself->job_exit_status_file_prefix ); … … 229 212 fsd_conf_option_t *max_retries_count = NULL; 230 213 fsd_conf_option_t *user_state_dir = NULL; 214 fsd_conf_option_t *connection_max_lifetime = NULL; 215 231 216 232 217 pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" ); … … 234 219 max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" ); 235 220 user_state_dir = fsd_conf_dict_get(self->configuration, "user_state_dir" ); 221 connection_max_lifetime = fsd_conf_dict_get(self->configuration, "connection_max_lifetime"); 236 222 237 223 if( pbs_home && pbs_home->type == FSD_CONF_STRING ) … … 274 260 } 275 261 262 if ( connection_max_lifetime && connection_max_lifetime->type == FSD_CONF_INTEGER) 263 { 264 pbsself->connection_max_lifetime = connection_max_lifetime->val.integer; 265 fsd_log_info(("Max connection lifetime: %d", pbsself->connection_max_lifetime)); 266 } 267 276 268 if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER) 277 269 { … … 315 307 pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self ) 316 308 { 317 volatile bool conn_lock = false;318 309 volatile bool jobs_lock = false; 319 310 pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; 320 311 fsd_job_set_t *jobs = self->jobs; 321 312 struct batch_status *volatile status = NULL; 322 volatile int tries_left = pbsself->max_retries_count;323 volatile int sleep_time = 1;324 313 325 314 fsd_log_enter(("")); … … 327 316 TRY 328 317 { 329 conn_lock = fsd_mutex_lock( &self->drm_connection_mutex ); 330 retry: 318 331 319 /* TODO: query only for user's jobs pbs_selstat + ATTR_u */ 332 320 #ifdef PBS_PROFESSIONAL 333 status = pbs _statjob( pbsself->pbs_conn, NULL, NULL, NULL);321 status = pbsself->pbs_connection->statjob(pbsself->pbs_connection, NULL, NULL); 334 322 #else 335 status = pbs _statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL);323 status = pbsself->pbs_connection->statjob(pbsself->pbs_connection, NULL, pbsself->status_attrl); 336 324 #endif 337 fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status ));338 if( status == NULL && pbs_errno != 0 )339 {340 if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED)341 {342 if ( pbsself->pbs_conn >= 0)343 pbs_disconnect( pbsself->pbs_conn );344 retry_connect:345 sleep(sleep_time++);346 pbsself->pbs_conn = pbs_connect( pbsself->super.contact );347 if( pbsself->pbs_conn < 0)348 {349 if (tries_left--)350 goto retry_connect;351 else352 pbsdrmaa_exc_raise_pbs( "pbs_connect" );353 }354 else355 goto retry;356 }357 else358 {359 pbsdrmaa_exc_raise_pbs( "pbs_statjob" );360 }361 }362 conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );363 325 364 326 { … … 421 383 { 422 384 if( status != NULL ) 423 pbs_statfree( status ); 424 if( conn_lock ) 425 conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); 385 pbsself->pbs_connection->statjob_free(pbsself->pbs_connection, status ); 426 386 if( jobs_lock ) 427 387 jobs_lock = fsd_mutex_unlock( &jobs->mutex ); -
trunk/pbs_drmaa/session.h
r72 r85 27 27 #include <drmaa_utils/session.h> 28 28 29 #include <pbs_drmaa/pbs_conn.h> 30 29 31 typedef struct pbsdrmaa_session_s pbsdrmaa_session_t; 30 32 31 fsd_drmaa_session_t * 32 pbsdrmaa_session_new( const char *contact ); 33 fsd_drmaa_session_t *pbsdrmaa_session_new( const char *contact ); 33 34 34 35 struct pbsdrmaa_session_s { … … 45 46 46 47 /* 47 * PBS connection (or -1 if not connected). 48 * A descriptor of socket conencted to PBS server. 48 * PBS connection handle 49 49 */ 50 int pbs_conn;50 pbsdrmaa_pbs_conn_t *pbs_connection; 51 51 52 52 /* … … 91 91 92 92 /* 93 * Whether to cache PBS Connection93 * PBS Max connection time 94 94 */ 95 bool cache_connection;95 int connection_max_lifetime; 96 96 }; 97 97 -
trunk/pbs_drmaa/submit.c
r75 r85 136 136 pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self ) 137 137 { 138 volatile bool conn_lock = false;139 138 struct attrl *volatile pbs_attr = NULL; 140 139 char *volatile job_id = NULL; … … 143 142 fsd_template_t *pbs_tmpl = self->pbs_job_attributes; 144 143 int i; 145 int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;146 int sleep_time = 1;147 144 148 145 for( i = PBSDRMAA_N_PBS_ATTRIBUTES - 1; i >= 0; i-- ) /* down loop -> start with custom resources */ … … 208 205 pbs_attr = pbsdrmaa_submit_filter(pbs_attr); 209 206 210 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 211 retry: 212 job_id = pbs_submit( ((pbsdrmaa_session_t*)self->session)->pbs_conn, 213 (struct attropl*)pbs_attr, self->script_filename, 214 self->destination_queue, NULL ); 207 job_id = ((pbsdrmaa_session_t *)self->session)->pbs_connection->submit( ((pbsdrmaa_session_t *)self->session)->pbs_connection, (struct attropl*)pbs_attr, self->script_filename, self->destination_queue); 215 208 216 209 fsd_log_info(("pbs_submit(%s, %s) =%s", self->script_filename, self->destination_queue, job_id)); 217 210 218 if( job_id == NULL )219 {220 fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));221 if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED)222 {223 pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self->session;224 225 fsd_log_error(( "Protocol error. Retrying..." ));226 227 if (pbsself->pbs_conn >= 0 )228 pbs_disconnect( pbsself->pbs_conn );229 retry_connect:230 sleep(sleep_time++);231 pbsself->pbs_conn = pbs_connect( pbsself->super.contact );232 if( pbsself->pbs_conn < 0)233 {234 if (tries_left--)235 goto retry_connect;236 else237 pbsdrmaa_exc_raise_pbs( "pbs_connect" );238 }239 else240 {241 if (tries_left--)242 goto retry;243 else244 pbsdrmaa_exc_raise_pbs( "pbs_submit" );245 }246 }247 else248 {249 pbsdrmaa_exc_raise_pbs( "pbs_submit" );250 }251 }252 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );253 211 } 254 212 EXCEPT_DEFAULT … … 259 217 FINALLY 260 218 { 261 if( conn_lock )262 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );263 219 if( pbs_attr ) 264 220 pbsdrmaa_free_attrl( pbs_attr ); … … 894 850 if (!attr_value) 895 851 { 896 fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT, "Invalid output line of submit filter: ", output_line);852 fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT, "Invalid output line of submit filter: %s", output_line); 897 853 } 898 854 else -
trunk/pbs_drmaa/util.c
r70 r85 114 114 115 115 void 116 pbsdrmaa_exc_raise_pbs( const char *function )116 pbsdrmaa_exc_raise_pbs( const char *function, int connection ) 117 117 { 118 118 int _pbs_errno; 119 119 int fsd_errno; 120 120 const char *message = NULL; 121 const char *extended_message = NULL; 121 122 122 123 _pbs_errno = pbs_errno; … … 128 129 #endif 129 130 131 if ( connection != -1 ) 132 { 133 extended_message = pbs_geterrmsg(connection); 134 } 135 130 136 fsd_errno = pbsdrmaa_map_pbs_errno( _pbs_errno ); 131 fsd_log_error(( 132 "call to %s returned with error %d:%s mapped to %d:%s", 133 function, 134 _pbs_errno, message, 135 fsd_errno, fsd_strerror(fsd_errno) 136 )); 137 fsd_exc_raise_fmt( fsd_errno, " %s", function, message ); 138 } 139 137 138 fsd_log_error(( "call to %s returned with error %d:%s(%s) mapped to %d:%s", 139 function, 140 _pbs_errno, message, extended_message, 141 fsd_errno, fsd_strerror(fsd_errno) 142 )); 143 144 if (extended_message) 145 fsd_exc_raise_fmt(fsd_errno, "%s: %s ", message, extended_message); 146 else 147 fsd_exc_raise_fmt(fsd_errno, "%s", message); 148 } 140 149 141 150 /** Maps PBS error code into DMRAA code. */ -
trunk/pbs_drmaa/util.h
r65 r85 30 30 31 31 32 void pbsdrmaa_exc_raise_pbs( const char *function );32 void pbsdrmaa_exc_raise_pbs( const char *function, int connection_fd ); 33 33 int pbsdrmaa_map_pbs_errno( int _pbs_errno ); 34 34
Note: See TracChangeset
for help on using the changeset viewer.