Changeset 85 for trunk/pbs_drmaa/job.c
- Timestamp:
- 01/17/13 18:44:15 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/job.c
r83 r85 77 77 pbsdrmaa_job_control( fsd_job_t *self, int action ) 78 78 { 79 volatile bool conn_lock = false;80 79 pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 81 80 const char *job_id = self->job_id; 82 const char *apicall = NULL; 83 int rc = PBSE_NONE; 84 85 fsd_log_enter(( "({job_id=%s}, action=%d)", 86 self->job_id, action )); 87 88 TRY 89 { 90 int tries_left = session->max_retries_count; 91 int sleep_time = 1; 92 93 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex ); 94 95 /*TODO reconnect */ 96 while ( true ) 97 { 98 switch( action ) 99 { 100 /* 101 * We cannot know whether we did suspend job 102 * in other way than remembering this inside DRMAA session. 103 */ 104 case DRMAA_CONTROL_SUSPEND: 105 apicall = "pbs_sigjob"; 106 rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 107 "SIGSTOP", NULL ); 108 fsd_log_info(("pbs_sigjob(%s, SIGSTOP) =%d", job_id, rc)); 109 if( rc == PBSE_NONE ) 110 self->flags |= FSD_JOB_SUSPENDED; 111 break; 112 case DRMAA_CONTROL_RESUME: 113 apicall = "pbs_sigjob"; 114 rc = pbs_sigjob( session->pbs_conn, (char*)job_id, 115 "SIGCONT", NULL ); 116 fsd_log_info(("pbs_sigjob(%s, SIGCONT) =%d", job_id, rc)); 117 if( rc == PBSE_NONE ) 118 self->flags &= ~FSD_JOB_SUSPENDED; 119 break; 120 case DRMAA_CONTROL_HOLD: 121 apicall = "pbs_holdjob"; 122 rc = pbs_holdjob( session->pbs_conn, (char*)job_id, 123 USER_HOLD, NULL ); 124 fsd_log_info(("pbs_sigjob(%s, SIGHOLD) =%d", job_id, rc)); 125 if( rc == PBSE_NONE ) 126 self->flags |= FSD_JOB_HOLD; 127 break; 128 case DRMAA_CONTROL_RELEASE: 129 apicall = "pbs_rlsjob"; 130 rc = pbs_rlsjob( session->pbs_conn, (char*)job_id, 131 USER_HOLD, NULL ); 132 fsd_log_info(("pbs_rlsjob(%s) =%d", job_id, rc)); 133 if( rc == PBSE_NONE ) 134 self->flags &= FSD_JOB_HOLD; 135 break; 136 case DRMAA_CONTROL_TERMINATE: 137 apicall = "pbs_deljob"; 138 rc = pbs_deljob( session->pbs_conn, (char*)job_id, NULL ); 139 fsd_log_info(("pbs_deljob(%s) =%d", job_id, rc)); 140 /* Torque: 141 * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 142 if( rc == PBSE_NONE ) 143 { 144 self->flags &= FSD_JOB_TERMINATED_MASK; 145 if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 146 self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 147 } 148 break; 149 } 150 151 retry_connect: 152 if ( rc == PBSE_NONE ) 153 break; 154 else if (( rc == PBSE_INTERNAL || rc == PBSE_PROTOCOL || rc == PBSOLDE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_EXPIRED) && (tries_left--)) 155 { 156 if (rc == PBSE_PROTOCOL || rc == PBSE_EXPIRED || rc == PBSOLDE_PROTOCOL || rc == PBSOLDE_EXPIRED) 157 { 158 if ( session->pbs_conn >= 0) 159 pbs_disconnect( session->pbs_conn ); 160 161 sleep( sleep_time++ ); 162 163 session->pbs_conn = pbs_connect( session->super.contact ); 164 165 if (session->pbs_conn < 0) 166 goto retry_connect; 167 168 fsd_log_info(( "pbs_connect(%s) =%d", session->super.contact, session->pbs_conn )); 169 } 170 else /* PBSE_INTERNAL */ 171 { 172 /* 173 * In PBS Pro pbs_sigjob raises internal server error (PBSE_INTERNAL) 174 * when job just changed its state to running. 175 */ 176 sleep( sleep_time++ ); 177 } 178 fsd_log_debug(( "repeating request (%d of %d)", tries_left, session->max_retries_count)); 179 } 180 else 181 pbsdrmaa_exc_raise_pbs( apicall ); 182 } /* end while */ 183 } 184 FINALLY 185 { 186 if( conn_lock ) 187 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex ); 188 } 189 END_TRY 81 82 fsd_log_enter(( "({job_id=%s}, action=%d)", self->job_id, action )); 83 84 while ( true ) 85 { 86 switch( action ) 87 { 88 /* 89 * We cannot know whether we did suspend job 90 * in other way than remembering this inside DRMAA session. 91 */ 92 case DRMAA_CONTROL_SUSPEND: 93 session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGSTOP"); 94 self->flags |= FSD_JOB_SUSPENDED; 95 break; 96 case DRMAA_CONTROL_RESUME: 97 session->pbs_connection->sigjob( session->pbs_connection, (char*)job_id, "SIGCONT"); 98 self->flags &= ~FSD_JOB_SUSPENDED; 99 break; 100 case DRMAA_CONTROL_HOLD: 101 session->pbs_connection->holdjob( session->pbs_connection, (char*)job_id ); 102 self->flags |= FSD_JOB_HOLD; 103 break; 104 case DRMAA_CONTROL_RELEASE: 105 session->pbs_connection->rlsjob( session->pbs_connection, (char*)job_id ); 106 self->flags &= ~FSD_JOB_HOLD; 107 break; 108 case DRMAA_CONTROL_TERMINATE: 109 session->pbs_connection->deljob( session->pbs_connection, (char*)job_id ); 110 /* TODO: make deldelay configurable ???: 111 * deldelay=N -- delay between SIGTERM and SIGKILL (default 0) */ 112 self->flags &= FSD_JOB_TERMINATED_MASK; 113 if( (self->flags & FSD_JOB_TERMINATED) == 0 ) 114 self->flags |= FSD_JOB_TERMINATED | FSD_JOB_ABORTED; 115 break; 116 } 117 } 190 118 191 119 fsd_log_return(("")); … … 196 124 pbsdrmaa_job_update_status( fsd_job_t *self ) 197 125 { 198 volatile bool conn_lock = false;199 126 struct batch_status *volatile status = NULL; 200 127 pbsdrmaa_session_t *session = (pbsdrmaa_session_t*)self->session; 201 int tries_left = session->max_retries_count;202 int sleep_time = 1;203 128 204 129 fsd_log_enter(( "({job_id=%s})", self->job_id )); … … 206 131 TRY 207 132 { 208 conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex );209 retry:210 if (session->pbs_conn < 0) {211 fsd_log_info(("No connection with pbs. Reconnecting"));212 goto retry_connect;213 }214 215 133 216 134 #ifdef PBS_PROFESSIONAL 217 status = pbs_statjob( session->pbs_conn, self->job_id, NULL, NULL);135 status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, NULL); 218 136 #else 219 status = pbs_statjob( session->pbs_conn, self->job_id, session->status_attrl, NULL);137 status = session->pbs_connection->statjob( session->pbs_connection, self->job_id, session->status_attrl); 220 138 #endif 221 fsd_log_info(( "pbs_statjob(fd=%d, job_id=%s, attribs={...}) =%p",222 session->pbs_conn, self->job_id, (void*)status ));223 if( status == NULL )224 {225 226 #ifndef PBS_PROFESSIONAL227 if ( pbs_errno != PBSE_UNKJOBID )228 fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno)));229 else230 fsd_log_debug(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno)));231 #else232 # ifndef PBS_PROFESSIONAL_NO_LOG233 if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID )234 fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno)));235 else236 fsd_log_debug(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno)));237 # else238 if ( pbs_errno != PBSE_UNKJOBID && pbs_errno != PBSE_HISTJOBID )239 fsd_log_error(("pbs_statjob error: %d", pbs_errno));240 else241 fsd_log_debug(("pbs_statjob error: %d", pbs_errno));242 # endif243 #endif244 245 switch( pbs_errno )246 {247 case PBSE_UNKJOBID:248 #ifdef PBS_PROFESSIONAL249 case PBSE_HISTJOBID:250 #endif251 break;252 case PBSE_PROTOCOL:253 #if PBSOLDE_PROTOCOL != PBSE_PROTOCOL254 case PBSOLDE_PROTOCOL:255 #endif256 case PBSE_EXPIRED:257 #if PBSOLDE_EXPIRED != PBSE_EXPIRED258 case PBSOLDE_EXPIRED:259 #endif260 if ( session->pbs_conn >= 0 )261 pbs_disconnect( session->pbs_conn );262 fsd_log_info(("Protocol error. Reconnecting..."));263 retry_connect:264 sleep(sleep_time++);265 session->pbs_conn = pbs_connect( session->super.contact );266 if( session->pbs_conn < 0 )267 {268 if (tries_left--) {269 fsd_log_info(("Retrying... (%d tries left)", tries_left));270 goto retry_connect;271 } else {272 fsd_log_error(("No more tries left... Throwing exception"));273 pbsdrmaa_exc_raise_pbs( "pbs_connect" );274 }275 }276 else277 {278 goto retry;279 }280 default:281 pbsdrmaa_exc_raise_pbs( "pbs_statjob" );282 break;283 case 0: /* ? */284 fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR );285 break;286 }287 288 }289 290 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );291 292 139 293 140 if( status != NULL ) … … 302 149 FINALLY 303 150 { 304 if( conn_lock )305 conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );306 151 if( status != NULL ) 307 pbs_statfree(status );152 session->pbs_connection->statjob_free( session->pbs_connection, status ); 308 153 } 309 154 END_TRY
Note: See TracChangeset
for help on using the changeset viewer.