- Timestamp:
- 10/17/11 01:49:55 (13 years ago)
- Location:
- trunk/pbs_drmaa
- Files:
-
- 1 deleted
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/job.c
r26 r29 59 59 pbsdrmaa_job_on_missing_standard( fsd_job_t *self ); 60 60 61 void62 pbsdrmaa_job_on_missing_log_based( fsd_job_t *self );63 64 61 static void 65 62 pbsdrmaa_job_update( fsd_job_t *self, struct batch_status* ); 66 67 bool68 pbsdrmaa_job_update_status_accounting( fsd_job_t *self );69 63 70 64 … … 254 248 else if( self->state < DRMAA_PS_DONE ) 255 249 { 256 #ifndef PBS_PROFESSIONAL257 /*best effort call*/258 if (pbsdrmaa_job_update_status_accounting(self) == false)259 self->on_missing( self );260 #else261 250 self->on_missing( self ); 262 #endif263 251 } 264 252 } … … 305 293 break; 306 294 case PBSDRMAA_ATTR_EXIT_STATUS: 307 exit_status = atoi( i->value );295 exit_status = fsd_atoi( i->value ); 308 296 break; 309 297 case PBSDRMAA_ATTR_RESOURCES_USED: … … 453 441 pbsdrmaa_job_on_missing_standard( self ); 454 442 else 455 pbsdrmaa_job_on_missing_ log_based( self );443 pbsdrmaa_job_on_missing_standard( self ); /* TODO: try to provide implementation that uses accounting/server log files */ 456 444 } 457 445 … … 506 494 } 507 495 508 void509 pbsdrmaa_job_on_missing_log_based( fsd_job_t *self )510 {511 fsd_drmaa_session_t *session = self->session;512 pbsdrmaa_log_reader_t *log_reader = NULL;513 514 fsd_log_enter(( "({job_id=%s})", self->job_id ));515 fsd_log_info(( "Job %s missing from DRM queue", self->job_id ));516 517 TRY518 {519 log_reader = pbsdrmaa_log_reader_new( session, self);520 log_reader->read_log( log_reader );521 }522 FINALLY523 {524 pbsdrmaa_log_reader_destroy( log_reader );525 }526 END_TRY527 528 fsd_log_return(( "; job_ps=%s, exit_status=%d",529 drmaa_job_ps_to_str(self->state), self->exit_status ));530 }531 532 bool533 pbsdrmaa_job_update_status_accounting( fsd_job_t *self )534 {535 fsd_drmaa_session_t *session = self->session;536 pbsdrmaa_log_reader_t *log_reader = NULL;537 bool res = false;538 539 fsd_log_enter(( "({job_id=%s})", self->job_id ));540 fsd_log_info(( "Reading job %s info from accounting file", self->job_id ));541 542 TRY543 {544 log_reader = pbsdrmaa_log_reader_accounting_new( session, self);545 bool res = log_reader->read_log( log_reader );546 }547 FINALLY548 {549 pbsdrmaa_log_reader_destroy( log_reader );550 }551 END_TRY552 553 fsd_log_return((""));554 return res;555 } -
trunk/pbs_drmaa/log_reader.c
r28 r29 46 46 #include <pbs_drmaa/submit.h> 47 47 #include <pbs_drmaa/util.h> 48 #include <pbs_drmaa/pbs_attrib.h> 48 49 49 50 #include <errno.h> 50 51 51 static bool 52 pbsdrmaa_read_log(); 53 54 static void 55 pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self); 56 57 static ssize_t 58 pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 59 60 static void 61 pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self ); 62 63 static ssize_t 64 pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 65 66 static void 67 pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ); 68 69 static ssize_t 70 pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 71 72 static bool 73 pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self);74 75 int 76 fsd_job_id_cmp(const char *s1, const char *s2); 77 78 int 79 pbsdrmaa_date_compare(const void *a, const void *b);52 enum pbsdrmaa_field_id 53 { 54 PBSDRMAA_FLD_ID_DATE = 0, 55 PBSDRMAA_FLD_ID_EVENT = 1, 56 PBSDRMAA_FLD_ID_SRC = 2, 57 PBSDRMAA_FLD_ID_OBJ_TYPE = 3, 58 PBSDRMAA_FLD_ID_OBJ_ID = 4, 59 PBSDRMAA_FLD_ID_MSG = 5 60 }; 61 62 63 #define PBSDRMAA_FLD_MSG_0008 "0008" 64 #define PBSDRMAA_FLD_MSG_0010 "0010" 65 66 enum pbsdrmaa_event_type 67 { 68 pbsdrmaa_event_0008 = 8, 69 pbsdrmaa_event_0010 = 10 70 }; 71 72 static void pbsdrmaa_read_log(); 73 74 static void pbsdrmaa_select_file_wait_thread( pbsdrmaa_log_reader_t * self); 75 76 char *pbsdrmaa_read_line_wait_thread( pbsdrmaa_log_reader_t * self); 77 78 static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size); 79 80 static char *pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id); 80 81 81 82 /* … … 97 98 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 98 99 100 deleting job: 101 I . PBS Pro 102 a) in Q state 103 10/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Queued at request of mmamonski@grass1.man.poznan.pl, owner = mmamonski@grass1.man.poznan.pl, job name = STDIN, queue = workq 104 10/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Modified at request of Scheduler@grass1.man.poznan.pl 105 10/16/2011 09:49:37;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 106 10/16/2011 09:49:37;0100;Server@grass1;Job;2178.grass1.man.poznan.pl;dequeuing from workq, state 5 107 108 109 b) in R state 110 10/16/2011 09:45:12;0080;Server@grass1;Job;2177.grass1.man.poznan.pl;delete job request received 111 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job sent signal TermJob on delete 112 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 113 10/16/2011 09:45:12;0010;Server@grass1;Job;2177.grass1.man.poznan.pl;Exit_status=271 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=2772kb resources_used.ncpus=1 resources_used.vmem=199288kb resources_used.walltime=00:00:26 114 10/16/2011 09:45:12;0100;Server@grass1;Job;2177.grass1.man.poznan.pl;dequeuing from workq, state 5 115 116 II. Torque 117 a) in Q state 118 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 119 10/15/2011 21:19:25;0100;PBS_Server;Job;113045.grass1.man.poznan.pl;dequeuing from batch, state EXITING 120 121 b) in R state 122 10/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 123 10/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job sent signal SIGTERM on delete 124 10/15/2011 21:19:47;0010;PBS_Server;Job;113046.grass1.man.poznan.pl;Exit_status=271 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:10 125 126 Log closed: 127 10/16/2011 00:00:17;0002;PBS_Server;Svr;Log;Log closed 128 99 129 */ 100 130 pbsdrmaa_log_reader_t * 101 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job)131 pbsdrmaa_log_reader_new( fsd_drmaa_session_t *session ) 102 132 { 103 133 pbsdrmaa_log_reader_t *volatile self = NULL; 104 134 105 135 fsd_log_enter(("")); 136 106 137 TRY 107 138 { … … 109 140 110 141 self->session = session; 111 112 /* ~templete method pattern */ 113 if(job != NULL) /* job on missing */ 114 { 115 self->job = job; 116 self->name = "Job_on_missing"; 117 self->select_file = pbsdrmaa_select_file_job_on_missing; 118 self->read_line = pbsdrmaa_read_line_job_on_missing; 119 } 120 else /* wait thread */ 121 { 122 self->job = NULL; 123 self->name = "WT"; 124 self->select_file = pbsdrmaa_select_file_wait_thread; 125 self->read_line = pbsdrmaa_read_line_wait_thread; 126 } 142 143 self->select_file = pbsdrmaa_select_file_wait_thread; 127 144 self->read_log = pbsdrmaa_read_log; 128 145 129 self->log_files = NULL;130 self->log_files_number = 0;131 132 146 self->run_flag = true; 133 self->f d = -1;147 self->fhandle = NULL; 134 148 self->date_changed = true; 135 149 self->first_open = true; 136 150 137 self->log_file_initial_size = 0;138 self->log_file_read_size = 0;139 151 } 140 152 EXCEPT_DEFAULT … … 146 158 } 147 159 END_TRY 160 148 161 fsd_log_return(("")); 162 149 163 return self; 150 164 } 151 165 152 pbsdrmaa_log_reader_t *153 pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job )154 {155 pbsdrmaa_log_reader_t *volatile self = NULL;156 157 fsd_log_enter((""));158 TRY159 {160 fsd_malloc(self, pbsdrmaa_log_reader_t );161 162 self->session = session;163 164 self->job = job;165 self->name = "Accounting";166 self->select_file = pbsdrmaa_select_file_accounting;167 self->read_line = pbsdrmaa_read_line_accounting;168 169 self->read_log = pbsdrmaa_read_log_accounting;170 171 self->log_files = NULL;172 self->log_files_number = 0;173 174 self->run_flag = true;175 self->fd = -1;176 self->date_changed = true;177 self->first_open = true;178 179 self->log_file_initial_size = 0;180 self->log_file_read_size = 0;181 }182 EXCEPT_DEFAULT183 {184 if( self != NULL)185 fsd_free(self);186 187 fsd_exc_reraise();188 }189 END_TRY190 fsd_log_return((""));191 return self;192 }193 166 194 167 void … … 200 173 if(self != NULL) 201 174 { 202 int i = -1;203 for(i = 0; i < self->log_files_number ; i++)204 fsd_free(self->log_files[i]);205 fsd_free(self->log_files);206 175 fsd_free(self); 207 } 176 } 208 177 } 209 178 EXCEPT_DEFAULT … … 216 185 } 217 186 218 enum field 219 { 220 FLD_DATE = 0, 221 FLD_EVENT = 1, 222 FLD_OBJ = 2, 223 FLD_TYPE = 3, 224 FLD_ID = 4, 225 FLD_MSG = 5 226 }; 227 228 enum field_msg 229 { 230 FLD_MSG_EXIT_STATUS = 0, 231 FLD_MSG_CPUT = 1, 232 FLD_MSG_MEM = 2, 233 FLD_MSG_VMEM = 3, 234 FLD_MSG_WALLTIME = 4 235 }; 236 237 enum field_msg_accounting 238 { 239 FLD_MSG_ACC_USER = 0, 240 FLD_MSG_ACC_GROUP = 1, 241 FLD_MSG_ACC_JOBNAME = 2, 242 FLD_MSG_ACC_QUEUE = 3, 243 FLD_MSG_ACC_CTIME = 4, 244 FLD_MSG_ACC_QTIME = 5, 245 FLD_MSG_ACC_ETIME = 6, 246 FLD_MSG_ACC_START = 7, 247 FLD_MSG_ACC_OWNER = 8, 248 FLD_MSG_ACC_EXEC_HOST = 9, 249 FLD_MSG_ACC_RES_NEEDNODES = 10, 250 FLD_MSG_ACC_RES_NODECT = 11, 251 FLD_MSG_ACC_RES_NODES = 12, 252 FLD_MSG_ACC_RES_WALLTIME = 13 253 }; 254 255 #define FLD_MSG_STATUS "0010" 256 #define FLD_MSG_STATE "0008" 257 #define FLD_MSG_LOG "0002" 258 259 bool 187 188 void 260 189 pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self ) 261 190 { 262 pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;263 fsd_job_t *volatile temp_job = NULL;264 265 191 fsd_log_enter(("")); 266 192 267 if(self->job == NULL) 268 fsd_mutex_lock( &self->session->mutex ); 193 fsd_mutex_lock( &self->session->mutex ); 269 194 270 195 TRY 271 {196 { 272 197 while( self->run_flag ) 273 TRY 274 { 275 char line[4096] = ""; 276 char buffer[4096] = ""; 277 int idx = 0, end_idx = 0, line_idx = 0; 278 279 self->select_file(self); 280 281 while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0) 198 { 199 TRY 282 200 { 283 const char *volatile ptr = line; 284 char field[256] = ""; 285 char job_id[256] = ""; 286 char event[256] = ""; 287 int volatile field_n = 0; 288 int n; 201 char *line = NULL; 289 202 290 bool volatile job_id_match = false; 291 bool volatile event_match = false; 292 bool volatile log_event = false; 293 bool volatile log_match = false; 294 bool volatile older_job_found = false; 295 bool volatile job_found = false; 296 char * temp_date = NULL; 297 298 struct batch_status status; 299 status.next = NULL; 300 301 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */ 302 { 303 if(field_n == FLD_DATE) 304 { 305 temp_date = fsd_strdup(field); 306 } 307 else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || strcmp(field,FLD_MSG_STATE) == 0 )) 308 { 309 /* event described by log line*/ 310 if(strlcpy(event, field,sizeof(event)) > sizeof(event)) 311 { 312 fsd_log_error(("%s - strlcpy error",self->name)); 313 } 314 event_match = true; 315 } 316 else if(event_match && field_n == FLD_ID) 317 { 318 TRY 319 { 320 if(self->job == NULL) /* wait_thread */ 321 { 322 temp_job = self->session->get_job( self->session, field ); 323 pbsjob = (pbsdrmaa_job_t*) temp_job; 324 325 if( temp_job ) 326 { 327 if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) { 328 fsd_log_error(("%s - strlcpy error",self->name)); 329 } 330 fsd_log_debug(("%s - job_id: %s",self->name,job_id)); 331 status.name = fsd_strdup(job_id); 332 job_id_match = true; /* job_id is in drmaa */ 333 } 334 else 335 { 336 fsd_log_debug(("%s - Unknown job: %s", self->name,field)); 337 } 338 } 339 else /* job_on_missing */ 340 { 341 int diff = -1; 342 diff = fsd_job_id_cmp(self->job->job_id,field); 343 if( diff == 0) 344 { 345 /* read this file to the place we started and exit*/ 346 fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id)); 347 job_found = true; 348 older_job_found = false; 349 self->run_flag = false; 350 job_id_match = true; 351 status.name = fsd_strdup(self->job->job_id); 352 } 353 else if ( !job_found && diff >= 1) 354 { 355 /* older job, find its beginning */ 356 fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field)); 357 older_job_found = true; 358 job_id_match = true; 359 status.name = fsd_strdup(self->job->job_id); 360 } 361 else if( !job_found ) 362 { 363 fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field)); 364 } 365 } 366 } 367 END_TRY 368 } 369 else if(job_id_match && field_n == FLD_MSG) 370 { 371 /* parse msg - depends on FLD_EVENT */ 372 struct attrl struct_resource_cput, 373 struct_resource_mem, 374 struct_resource_vmem, 375 struct_resource_walltime, 376 struct_status, 377 struct_state, 378 struct_start_time, 379 struct_mtime, 380 struct_queue, 381 struct_account_name, 382 struct_exec_vnode; 383 struct attrl *last_attr = NULL; 384 385 bool state_running = false; 386 387 memset(&struct_status,0,sizeof(struct attrl)); 388 memset(&struct_state,0,sizeof(struct attrl)); 389 memset(&struct_resource_cput,0,sizeof(struct attrl)); 390 memset(&struct_resource_mem,0,sizeof(struct attrl)); 391 memset(&struct_resource_vmem,0,sizeof(struct attrl)); 392 memset(&struct_resource_walltime,0,sizeof(struct attrl)); 393 memset(&struct_start_time,0,sizeof(struct attrl)); 394 memset(&struct_mtime,0,sizeof(struct attrl)); 395 memset(&struct_queue,0,sizeof(struct attrl)); 396 memset(&struct_account_name,0,sizeof(struct attrl)); 397 memset(&struct_exec_vnode,0,sizeof(struct attrl)); 398 399 if (strcmp(event,FLD_MSG_STATE) == 0) 400 { 401 /* job run, modified, queued etc */ 402 int n = 0; 403 status.attribs = &struct_state; 404 struct_state.next = NULL; 405 struct_state.name = "job_state"; 406 last_attr = &struct_state; 407 408 if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/ 409 { 410 n = 4; 411 if(older_job_found) /* job_on_missing - older job beginning - read this file and end */ 203 self->select_file(self); 204 205 while ((line = fsd_readline(self->fhandle)) != NULL) 206 { 207 int field_id = PBSDRMAA_FLD_ID_DATE; 208 char *tok_ctx = NULL; 209 char *field_token = NULL; 210 char *event_timestamp = NULL; 211 int event_type = -1; 212 fsd_job_t *job = NULL; 213 214 /* at first detect if this not the end of log file */ 215 if (strstr(line, "Log;Log closed")) /*TODO try to be more effective and safe */ 216 { 217 fsd_log_debug(("WT - Date changed. Closing log file")); 218 self->date_changed = true; 219 } 220 221 for (field_token = strtok_r(line, ";", &tok_ctx); field_token; field_token = strtok_r(NULL, ";", &tok_ctx), field_id++) 222 { 223 if ( field_id == PBSDRMAA_FLD_ID_DATE) 224 { 225 event_timestamp = field_token; 226 } 227 else if ( field_id == PBSDRMAA_FLD_ID_EVENT) 228 { 229 if (strncmp(field_token, PBSDRMAA_FLD_MSG_0008, 4) == 0) 230 event_type = pbsdrmaa_event_0008; 231 else if (strncmp(field_token, PBSDRMAA_FLD_MSG_0010, 4) == 0) 232 event_type = pbsdrmaa_event_0010; 233 else 234 break; /*we are interested only in the above log messages */ 235 } 236 else if ( field_id == PBSDRMAA_FLD_ID_SRC) 237 { 238 /* not used ignore */ 239 } 240 else if (field_id == PBSDRMAA_FLD_ID_OBJ_TYPE) 241 { 242 if (strncmp(field_token, "Job", 3) != 0) 243 break; /* we are interested only in job events */ 244 } 245 else if (field_id == PBSDRMAA_FLD_ID_OBJ_ID) 246 { 247 const char *event_jobid = field_token; 248 249 TRY 250 { 251 job = self->session->get_job( self->session, event_jobid ); 252 253 if( job ) 412 254 { 413 self->run_flag = false; 414 fsd_log_debug(("Job_on_missing found older job beginning")); 415 fsd_free(status.name); 255 fsd_log_debug(("WT - Found job event: %s", event_jobid)); 256 } 257 else 258 { 259 fsd_log_debug(("WT - Unknown job: %s", event_jobid)); /* Not a DRMAA job */ 416 260 break; 417 261 } 418 419 { /* modified */420 struct tm temp_time_tm;421 memset(&temp_time_tm, 0, sizeof(temp_time_tm));422 temp_time_tm.tm_isdst = -1;423 424 if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)425 {426 fsd_log_error(("failed to parse mtime: %s (line = %s)", temp_date, line));427 }428 else429 {430 time_t temp_time = mktime(&temp_time_tm);431 last_attr->next = &struct_mtime;432 last_attr = &struct_mtime;433 struct_mtime.name = "mtime";434 struct_mtime.next = NULL;435 struct_mtime.value = fsd_asprintf("%lu",temp_time);436 }437 }438 262 } 263 END_TRY 264 } 265 else if (field_id == PBSDRMAA_FLD_ID_MSG) 266 { 267 char *msg = field_token; 268 struct batch_status status; 269 struct attrl *attribs = NULL; 270 bool in_running_state = false; 271 272 if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Queued", 10) == 0) 273 { 274 /* Queued 275 * PBS Pro: 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal 276 * Torque: 10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short 277 */ 278 char *p_queue = NULL; 279 280 if ((p_queue = strstr(msg,"queue =")) == NULL) 281 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"No queue attribute found in log line = %s", line); 439 282 440 283 /* != Job deleted and Job to be deleted*/ … … 447 290 #endif 448 291 struct_state.value = fsd_asprintf("%c",field[n]); 449 if(struct_state.value[0] == 'R') 450 { 451 state_running = true; 292 (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 293 294 in_running_state = true; 295 296 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "R"); 297 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_START_TIME, timestamp_unix); 452 298 #ifdef PBS_PROFESSIONAL 453 299 { … … 463 309 } 464 310 #endif 311 } 312 #ifndef PBS_PBS_PROFESSIONAL 313 else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job deleted", 11)) 314 #else 315 else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job to be deleted", 17)) 316 #endif 317 { 318 /* Deleted 319 * PBS Pro: 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 320 * Torque: 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 321 */ 322 char timestamp_unix[64]; 323 324 (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 325 326 if (job->state < DRMAA_PS_RUNNING) 327 { 328 fsd_log_info(("Job %s killed before entering running state (%d).", job->job_id, job->state)); 329 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 330 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 331 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, "-2"); 332 } 333 else 334 { 335 break; /* job was started, ignore, wait for Exit_status message */ 336 } 337 } 338 else if (event_type == pbsdrmaa_event_0010 && strncmp(msg, "Exit_status=", 12)) 339 { 340 /* Completed: 341 * PBS Pro: 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01 342 * Torque: 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 343 */ 344 char timestamp_unix[64]; 345 time_t timestamp_time_t = pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 346 char *tok_ctx2 = NULL; 347 char *token = NULL; 348 349 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 350 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 351 352 /* tokenize !!! */ 353 for (token = strtok_r(msg, " ", &tok_ctx2); token; token = strtok_r(NULL, " ", &tok_ctx2)) 354 { 355 if (strncmp(token, "Exit_status=", 12) == 0) 356 { 357 token[12] = '\0'; 358 attribs = pbsdrmaa_add_attr(attribs, token, token + 12); 359 fsd_log_info(("WT - Completion of job %s (Exit_status=%s) detected after %d seconds", job->job_id, token+12, (int)(time(NULL) - timestamp_time_t) )); 360 } 361 else if (strncmp(token, "resources_used.cput=", 20) == 0) 362 { 363 token[20] = '\0'; 364 attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 365 } 366 else if (strncmp(token, "resources_used.mem=", 19) == 0) 367 { 368 token[19] = '\0'; 369 attribs = pbsdrmaa_add_attr(attribs, token, token + 19); 370 } 371 else if (strncmp(token, "resources_used.vmem=", 20) == 0) 372 { 373 token[20] = '\0'; 374 attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 375 } 376 else if (strncmp(token, "resources_used.walltime=", 24) == 0) 377 { 378 token[24] = '\0'; 379 attribs = pbsdrmaa_add_attr(attribs, token, token + 24); 380 } 381 } 382 383 if (!job->execution_hosts) 384 { 385 char *exec_host = NULL; 386 fsd_log_info(("WT - No execution host information for job %s. Reading accounting logs...", job->job_id)); 387 exec_host = pbsdrmaa_get_exec_host_from_accountig(self, job->job_id); 388 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_HOST, exec_host); 389 fsd_free(exec_host); 465 390 } 466 391 } 467 392 else 468 { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/ 469 struct_status.name = "exit_status"; 470 struct_status.value = fsd_strdup("-1"); 471 struct_status.next = NULL; 472 struct_state.next = &struct_status; 473 struct_state.value = fsd_strdup("C"); 474 } 475 } 476 else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/ 477 { 478 /* exit status and rusage */ 479 const char *ptr2 = field; 480 char msg[ 256 ] = ""; 481 int n2; 482 int msg_field_n = 0; 483 484 struct_resource_cput.name = "resources_used"; 485 struct_resource_mem.name = "resources_used"; 486 struct_resource_vmem.name = "resources_used"; 487 struct_resource_walltime.name = "resources_used"; 488 struct_status.name = "exit_status"; 489 struct_state.name = "job_state"; 490 491 status.attribs = &struct_resource_cput; 492 struct_resource_cput.next = &struct_resource_mem; 493 struct_resource_mem.next = &struct_resource_vmem; 494 struct_resource_vmem.next = &struct_resource_walltime; 495 struct_resource_walltime.next = &struct_status; 496 struct_status.next = &struct_state; 497 struct_state.next = NULL; 498 499 while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 500 { 501 switch(msg_field_n) 502 { 503 case FLD_MSG_EXIT_STATUS: 504 struct_status.value = fsd_strdup(strchr(msg,'=')+1); 505 break; 506 507 case FLD_MSG_CPUT: 508 struct_resource_cput.resource = "cput"; 509 struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1); 510 break; 511 512 case FLD_MSG_MEM: 513 struct_resource_mem.resource = "mem"; 514 struct_resource_mem.value = fsd_strdup(strchr(msg,'=')+1); 515 break; 516 517 case FLD_MSG_VMEM: 518 struct_resource_vmem.resource = "vmem"; 519 struct_resource_vmem.value = fsd_strdup(strchr(msg,'=')+1); 520 break; 521 522 case FLD_MSG_WALLTIME: 523 struct_resource_walltime.resource = "walltime"; 524 struct_resource_walltime.value = fsd_strdup(strchr(msg,'=')+1); 525 break; 526 } 527 528 ptr2 += n2; 529 msg_field_n++; 530 if ( *ptr2 != ' ' ) 531 break; 532 ++ptr2; 393 { 394 break; /* ignore other job events*/ 533 395 } 534 struct_state.value = fsd_strdup("C"); /* we got exit_status so we say that it has completed */ 535 fsd_log_info(("WT - job %s found as finished on %u", temp_job->job_id, (unsigned int)time(NULL))); 536 } 537 538 if(self->job == NULL) /* wait_thread */ 539 { 540 if ( state_running ) 541 { 542 fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id )); 396 397 if ( in_running_state ) 398 { 399 fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); 543 400 TRY 544 401 { 545 temp_job->update_status( temp_job );402 job->update_status( job ); 546 403 } 547 404 EXCEPT_DEFAULT 548 405 { 549 406 /*TODO: distinguish between invalid job and internal errors */ 550 fsd_log_debug(("Job finished just after entering running state: %s", temp_job->job_id));407 fsd_log_debug(("Job finished just after entering running state: %s", job->job_id)); 551 408 } 552 409 END_TRY 553 }410 } 554 411 else 555 { 556 fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id )); 557 pbsjob->update( temp_job, &status ); 558 } 559 } 560 else if( job_found ) /* job_on_missing */ 561 { 562 fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id )); 563 pbsjob->update( self->job, &status ); 564 } 565 566 if(self->job == NULL) 567 { 568 fsd_cond_broadcast( &temp_job->status_cond); 412 { 413 fsd_log_debug(("WT - updating job: %s", job->job_id )); 414 status.name = job->job_id; 415 status.attribs = attribs; 416 417 ((pbsdrmaa_job_t *)job)->update( job, &status ); 418 419 pbsdrmaa_free_attrl(attribs); /* TODO free on exception */ 420 } 421 422 fsd_cond_broadcast( &job->status_cond); 569 423 fsd_cond_broadcast( &self->session->wait_condition ); 570 } 571 if ( temp_job ) 572 temp_job->release( temp_job ); 573 574 fsd_free(struct_resource_cput.value); 575 fsd_free(struct_resource_mem.value); 576 fsd_free(struct_resource_vmem.value); 577 fsd_free(struct_resource_walltime.value); 578 fsd_free(struct_status.value); 579 fsd_free(struct_state.value); 580 fsd_free(struct_start_time.value); 581 fsd_free(struct_mtime.value); 582 fsd_free(struct_queue.value); 583 fsd_free(struct_account_name.value); 584 585 if ( status.name!=NULL ) 586 fsd_free(status.name); 587 } 588 else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0) 589 { 590 log_event = true; 591 } 592 else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 ) 593 { 594 log_match = true; 595 log_event = false; 596 } 597 else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0) 598 { 599 fsd_log_debug(("%s - Date changed. Closing log file",self->name)); 600 self->date_changed = true; 601 log_match = false; 602 } 424 425 if ( job ) 426 job->release( job ); 427 428 fsd_free(line); /* TODO free on exception */ 429 } 430 else 431 { 432 fsd_assert(0); /*not reached */ 433 } 434 } 435 436 } /* end of while getline loop */ 437 438 { /* poll on log file */ 439 struct timeval timeout_tv; 440 fd_set log_fds; 441 442 fsd_mutex_unlock( &self->session->mutex ); 603 443 604 ptr += n; 605 if ( *ptr != ';' ) 606 { 607 break; /* end of line */ 608 } 609 field_n++; 610 ++ptr; 611 } 612 613 fsd_free(temp_date); 614 } /* end of while getline loop */ 615 616 if(self->job == NULL) 444 FD_ZERO(&log_fds); 445 FD_SET(fileno(self->fhandle), &log_fds); 446 447 timeout_tv.tv_sec = 1; 448 timeout_tv.tv_usec = 0; 449 450 /* ignore return value - the next get line call will handle IO errors */ 451 (void)select(1, &log_fds, NULL, NULL, &timeout_tv); 452 453 fsd_mutex_lock( &self->session->mutex ); 454 455 self->run_flag = self->session->wait_thread_run_flag; 456 } 457 } 458 EXCEPT_DEFAULT 617 459 { 618 struct timeval timeout_tv; 619 fd_set log_fds; 620 621 fsd_mutex_unlock( &self->session->mutex ); 622 623 FD_ZERO(&log_fds); 624 FD_SET(self->fd, &log_fds); 625 626 timeout_tv.tv_sec = 1; 627 timeout_tv.tv_usec = 0; 628 629 /* ignore return value - the next get line call will handle IO errors */ 630 (void)select(1, &log_fds, NULL, NULL, &timeout_tv); 631 632 fsd_mutex_lock( &self->session->mutex ); 633 634 self->run_flag = self->session->wait_thread_run_flag; 460 const fsd_exc_t *e = fsd_exc_get(); 461 /* Its better to exit and communicate error rather then let the application to hang */ 462 fsd_log_fatal(( "Exception in wait thread: <%d:%s>. Exiting !!!", e->code(e), e->message(e) )); 463 exit(1); 635 464 } 636 } 637 EXCEPT_DEFAULT 638 { 639 const fsd_exc_t *e = fsd_exc_get(); 640 /* Its better to exit and communicate error rather then let the application to hang */ 641 fsd_log_fatal(( "Exception in wait thread %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) )); 642 exit(1); 643 } 644 END_TRY 645 646 if(self->fd != -1) 647 close(self->fd); 648 fsd_log_debug(("%s - Log file closed",self->name)); 465 END_TRY 466 } 467 468 if(self->fhandle) 469 fclose(self->fhandle); 470 471 fsd_log_debug(("WT - Log file closed")); 649 472 } 650 473 FINALLY 651 474 { 652 fsd_log_debug(("%s - Terminated.",self->name)); 653 if(self->job == NULL) 654 fsd_mutex_unlock( &self->session->mutex ); /**/ 475 fsd_log_debug(("WT - Terminated.")); 476 fsd_mutex_unlock( &self->session->mutex ); /**/ 655 477 } 656 478 END_TRY 657 479 658 480 fsd_log_return(("")); 659 return true;660 481 } 661 482 … … 665 486 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 666 487 667 if (self->date_changed)668 {488 if (self->date_changed) 489 { 669 490 char * log_path = NULL; 670 491 int num_tries = 0; … … 682 503 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 683 504 /* generate new date, close file and open new */ 684 if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 685 pbssession->pbs_home, 686 tm.tm_year + 1900, 687 tm.tm_mon + 1, 688 tm.tm_mday)) == NULL) { 689 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 690 } 691 692 if(self->fd != -1) 693 close(self->fd); 694 695 fsd_log_debug(("Log file: %s",log_path)); 505 log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 506 507 if(self->fhandle) 508 fclose(self->fhandle); 509 510 fsd_log_info(("Opening log file: %s",log_path)); 696 511 697 512 retry: 698 if ((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES)699 {513 if ((self->fhandle = fopen(log_path,"")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open)) 514 { 700 515 fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread.")); 701 fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));516 fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status")); 702 517 /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */ 703 518 pbssession->wait_thread_log = false; 704 519 pbssession->super.wait_thread = pbssession->super_wait_thread; 705 520 pbssession->super.wait_thread(self->session); 706 } else if ( self->fd == -1 ) { 521 } 522 else if ( self->fhandle == NULL ) 523 { /* Torque seems not to create a new file immediately after the old one is closed */ 707 524 fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 708 525 num_tries++; 709 sleep( 5);526 sleep(2 * num_tries); 710 527 goto retry; 711 }528 } 712 529 713 530 fsd_free(log_path); … … 715 532 fsd_log_debug(("Log file opened")); 716 533 717 if(self->first_open) { 534 if(self->first_open) 535 { 718 536 fsd_log_debug(("Log file lseek")); 719 if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) { 720 char errbuf[256] = "InternalError";721 (void)strerror_r(errno, errbuf, 256);722 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR," lseek error: %s",errbuf);723 }537 538 if(fseek(self->fhandle, pbssession->log_file_initial_size, SEEK_SET) == (off_t) -1) 539 { 540 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 541 } 724 542 self->first_open = false; 725 }543 } 726 544 727 545 self->date_changed = false; … … 731 549 } 732 550 733 ssize_t 734 pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 735 { 736 return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx); 551 time_t 552 pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size) 553 { 554 struct tm temp_time_tm; 555 memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 556 temp_time_tm.tm_isdst = -1; 557 558 if (strptime(timestamp, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) 559 { 560 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - failed to parse log timestamp: %s", timestamp); 561 } 562 else 563 { 564 time_t temp_time = mktime(&temp_time_tm); 565 snprintf(unixtime_str, size, "%lu", temp_time); 566 return temp_time; 567 } 737 568 } 738 569 739 /* reverse date compare*/ 740 int 741 pbsdrmaa_date_compare(const void *a, const void *b) 742 { 743 const char *ia = *(const char **) a; 744 const char *ib = *(const char **) b; 745 return strcmp(ib, ia); 570 char * 571 pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id) 572 { 573 /* TODO: implement */ 574 return NULL; 746 575 } 747 576 748 void 749 pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self ) 750 { 751 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 752 753 char * log_path = NULL; 754 int num_tries = 0; 755 static int file_number = 0; 756 fsd_log_enter(("")); 757 758 if(self->first_open) 759 { 760 DIR *dp = NULL; 761 char * path = NULL; 762 struct dirent *ep = NULL; 763 764 if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL) 765 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible"); 766 767 self->log_files_number = 0; 768 dp = opendir (path); 769 770 fsd_calloc(self->log_files,2,char*); 771 772 if (dp != NULL) 773 { 774 while ((ep = readdir (dp))) 775 { 776 self->log_files_number++; 777 if(self->log_files_number > 2) 778 fsd_realloc(self->log_files,self->log_files_number,char *); 779 780 self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name); 781 } 782 (void) closedir (dp); 783 } 784 else 785 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory"); 786 787 qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare); 788 789 if(self->log_files_number <= 2) 790 { 791 self->run_flag = false; 792 fsd_log_error(("Job_on_missing - No log files available")); 793 } 794 795 self->first_open = false; 796 fsd_free(path); 797 } 798 else /* check previous day*/ 799 { 800 if(++file_number > self->log_files_number - 2) 801 fsd_log_error(("Job_on_missing - All available log files checked")); 802 else 803 fsd_log_debug(("Job_on_missing checking previous day")); 804 805 self->run_flag = false; 806 pbsdrmaa_job_on_missing_standard( self->job ); 807 } 808 809 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 810 if((log_path = fsd_asprintf("%s/server_logs/%s", 811 pbssession->pbs_home, 812 self->log_files[file_number])) == NULL) { 813 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible"); 814 } 815 816 if(self->fd != -1) 817 close(self->fd); 818 819 fsd_log_debug(("Log file: %s",log_path)); 820 821 retry: 822 if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES ) 823 { 824 fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing")); 825 fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly")); 826 self->run_flag = false; 827 pbsdrmaa_job_on_missing_standard( self->job ); 828 } else if ( self->fd == -1 ) { 829 fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 830 num_tries++; 831 sleep(5); 832 goto retry; 833 } 834 else 835 { 836 struct stat statbuf; 837 if(stat(log_path,&statbuf) == -1) { 838 char errbuf[256] = "InternalError"; 839 (void)strerror_r(errno, errbuf, 256); 840 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf); 841 } 842 self->log_file_read_size = 0; 843 self->log_file_initial_size = statbuf.st_size; 844 fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size)); 845 } 846 847 fsd_free(log_path); 848 849 fsd_log_debug(("Log file opened")); 850 851 fsd_log_return(("")); 852 } 853 854 ssize_t 855 pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 856 { 857 int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx); 858 859 if(n >= 0) 860 self->log_file_read_size += n; 861 862 if(self->log_file_read_size >= self->log_file_initial_size) 863 return -1; 864 865 return n; 866 } 867 868 void 869 pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ) 870 { 871 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 872 873 char * log_path = NULL; 874 875 struct tm tm; 876 877 fsd_log_enter(("")); 878 879 time(&self->t); 880 881 localtime_r(&self->t,&tm); 882 883 #define DRMAA_ACCOUNTING_MAX_TRIES (12) 884 /* generate new date, close file and open new */ 885 if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", 886 pbssession->pbs_home, 887 tm.tm_year + 1900, 888 tm.tm_mon + 1, 889 tm.tm_mday)) == NULL) { 890 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible"); 891 } 892 893 if(self->fd != -1) 894 close(self->fd); 895 896 fsd_log_debug(("Accounting Log file: %s",log_path)); 897 898 if((self->fd = open(log_path,O_RDONLY) ) == -1 ) 899 { 900 fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home.")); 901 } 902 903 fsd_free(log_path); 904 905 fsd_log_debug(("Accounting Log file opened")); 906 907 fsd_log_return(("")); 908 } 909 910 ssize_t 911 pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 912 { 913 return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx); 914 } 915 916 enum field_acc 917 { 918 FLD_ACC_DATE = 0, 919 FLD_ACC_EVENT = 1, 920 FLD_ACC_ID = 2, 921 FLD_ACC_MSG = 3 922 }; 923 924 bool 925 pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self ) 926 { 927 pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job; 928 bool res = false; 929 930 fsd_job_t *volatile temp_job = NULL; 931 932 fsd_log_enter(("")); 933 fsd_log_debug(("Accounting Log file opened")); 934 if(self->job == NULL) 935 fsd_mutex_lock( &self->session->mutex ); 936 937 TRY 938 { 939 TRY 940 { 941 char line[4096] = ""; 942 char buffer[4096] = ""; 943 int idx = 0, end_idx = 0, line_idx = 0; 944 945 self->select_file(self); 946 947 if(self->fd != -1) 948 while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0) 949 { 950 const char *volatile ptr = line; 951 char field[256] = ""; 952 int volatile field_n = 0; 953 int n; 954 955 bool volatile job_id_match = false; 956 957 bool volatile job_found = false; 958 char * temp_date = NULL; 959 960 struct batch_status status; 961 962 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */ 963 { 964 status.next = NULL; 965 status.attribs = NULL; 966 967 if(field_n == FLD_ACC_DATE) 968 { 969 temp_date = fsd_strdup(field); 970 } 971 else if(field_n == FLD_ACC_EVENT) 972 { 973 974 } 975 else if(field_n == FLD_ACC_ID) 976 { 977 TRY 978 { 979 int diff = -1; 980 diff = fsd_job_id_cmp(self->job->job_id,field); 981 if( diff == 0) 982 { 983 /* read this file to the place we started and exit*/ 984 fsd_log_debug(("Accounting found job: %s",self->job->job_id)); 985 job_found = true; 986 job_id_match = true; 987 status.name = fsd_strdup(self->job->job_id); 988 } 989 } 990 END_TRY 991 } 992 else if(job_id_match && field_n == FLD_ACC_MSG) 993 { 994 struct attrl * struct_attrl = calloc(10,sizeof(struct attrl)); 995 int i; 996 997 if(field[0] == 'q') 998 { 999 status.attribs = &struct_attrl[0]; 1000 struct_attrl[0].name = ATTR_queue; 1001 struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1); 1002 struct_attrl[0].next = NULL; 1003 } 1004 else if(field[0] == 'u') 1005 { 1006 /* rusage */ 1007 const char *ptr2 = field; 1008 char msg[ 256 ] = ""; 1009 int n2 = 0; 1010 int msg_field_n = 0; 1011 1012 status.attribs = &struct_attrl[0]; 1013 1014 while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 1015 { 1016 switch(msg_field_n) 1017 { 1018 case FLD_MSG_ACC_USER: 1019 struct_attrl[msg_field_n].name = ATTR_euser; 1020 break; 1021 1022 case FLD_MSG_ACC_GROUP: 1023 struct_attrl[msg_field_n].name = ATTR_egroup; 1024 break; 1025 1026 case FLD_MSG_ACC_JOBNAME: 1027 struct_attrl[msg_field_n].name = ATTR_name; 1028 break; 1029 1030 case FLD_MSG_ACC_QUEUE: 1031 struct_attrl[msg_field_n].name = ATTR_queue; 1032 break; 1033 1034 case FLD_MSG_ACC_CTIME: 1035 struct_attrl[msg_field_n].name = ATTR_ctime; 1036 break; 1037 1038 case FLD_MSG_ACC_QTIME: 1039 struct_attrl[msg_field_n].name = ATTR_qtime; 1040 break; 1041 1042 case FLD_MSG_ACC_ETIME: 1043 struct_attrl[msg_field_n].name = ATTR_etime; 1044 break; 1045 #ifndef PBS_PROFESSIONAL 1046 case FLD_MSG_ACC_START: 1047 struct_attrl[msg_field_n].name = ATTR_start_time; 1048 #else 1049 case FLD_MSG_ACC_START: 1050 struct_attrl[msg_field_n].name = ATTR_stime; 1051 #endif 1052 1053 case FLD_MSG_ACC_OWNER: 1054 struct_attrl[msg_field_n].name = ATTR_owner; 1055 break; 1056 1057 case FLD_MSG_ACC_EXEC_HOST: 1058 struct_attrl[msg_field_n].name = ATTR_exechost; 1059 break; 1060 } 1061 1062 struct_attrl[msg_field_n].value = fsd_strdup(strchr(msg,'=')+1); 1063 if(msg_field_n!=9) 1064 { 1065 struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1]; 1066 } 1067 else 1068 { 1069 struct_attrl[msg_field_n].next = NULL; 1070 break; 1071 } 1072 1073 ptr2 += n2; 1074 msg_field_n++; 1075 if ( *ptr2 != ' ' ) 1076 break; 1077 1078 ++ptr2; 1079 } 1080 } 1081 1082 if( job_found && status.attribs != NULL) 1083 { 1084 fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id )); 1085 pbsjob->update( self->job, &status ); 1086 res = true; 1087 } 1088 1089 if(self->job == NULL) 1090 { 1091 fsd_cond_broadcast( &temp_job->status_cond); 1092 fsd_cond_broadcast( &self->session->wait_condition ); 1093 } 1094 if ( temp_job ) 1095 temp_job->release( temp_job ); 1096 1097 for(i = 0; i < 10; i++) 1098 { 1099 fsd_free(struct_attrl[i].value); 1100 } 1101 fsd_free(struct_attrl); 1102 fsd_free(status.name); 1103 } 1104 1105 1106 ptr += n; 1107 if ( *ptr != ';' ) 1108 { 1109 break; /* end of line */ 1110 } 1111 field_n++; 1112 ++ptr; 1113 } 1114 1115 fsd_free(temp_date); 1116 } /* end of while getline loop */ 1117 1118 } 1119 EXCEPT_DEFAULT 1120 { 1121 const fsd_exc_t *e = fsd_exc_get(); 1122 /* Its better to exit and communicate error rather then let the application to hang */ 1123 fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) )); 1124 exit(1); 1125 } 1126 END_TRY 1127 1128 if(self->fd != -1) 1129 close(self->fd); 1130 fsd_log_debug(("%s - Accounting log file closed",self->name)); 1131 } 1132 FINALLY 1133 { 1134 fsd_log_debug(("%s - Terminated.",self->name)); 1135 if(self->job == NULL) 1136 fsd_mutex_unlock( &self->session->mutex ); /**/ 1137 } 1138 END_TRY 1139 1140 fsd_log_return(("")); 1141 return res; 1142 } 1143 1144 int 1145 fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */ 1146 { 1147 int job1; 1148 int job2; 1149 char *rest = NULL; 1150 char *token = NULL; 1151 char *ptr = fsd_strdup(s1); 1152 token = strtok_r(ptr, ".", &rest); 1153 job1 = atoi(token); 1154 1155 fsd_free(token); 1156 1157 ptr = fsd_strdup(s2); 1158 token = strtok_r(ptr,".",&rest); 1159 job2 = atoi(token); 1160 1161 fsd_free(token); 1162 return job1 - job2; 1163 } 1164 577 -
trunk/pbs_drmaa/log_reader.h
r21 r29 25 25 #endif 26 26 27 #include <stdio.h> 28 27 29 #include <drmaa_utils/job.h> 28 30 #include <drmaa_utils/session.h> … … 31 33 32 34 pbsdrmaa_log_reader_t * 33 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 34 35 pbsdrmaa_log_reader_t * 36 pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 35 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session); 37 36 38 37 void … … 41 40 struct pbsdrmaa_log_reader_s { 42 41 fsd_drmaa_session_t *volatile session ; 43 fsd_job_t *volatile job;44 42 45 bool (* 46 read_log) ( pbsdrmaa_log_reader_t * self ); 43 void (*read_log) ( pbsdrmaa_log_reader_t * self ); 47 44 48 void (* 49 select_file) ( pbsdrmaa_log_reader_t * self ); 45 void (*select_file) ( pbsdrmaa_log_reader_t * self ); 50 46 51 /* line - read line, buffer - keeps read but not returned lines, idx, end_idx and line_idx values needed to be kept outside the function */ 52 ssize_t (* 53 read_line) ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 54 55 /* specifies if function should run */ 47 /* determines if function should run */ 56 48 bool run_flag; 57 49 … … 59 51 time_t t; 60 52 61 /* for job_on_missing - available log files */ 62 char ** log_files; 63 64 /* for job_on_missing - number of log files */ 65 int log_files_number; 66 67 /* log file descriptor */ 68 int volatile fd; 69 70 /* for job_on_missing - log file size when function was ran */ 71 off_t log_file_initial_size; 72 73 /* for job_on_missing - read lines size */ 74 off_t log_file_read_size; 53 /* log file handle */ 54 FILE *fhandle; 75 55 76 56 /* for wait_thread - day changed */ … … 79 59 /* for wait_thread - log file first open */ 80 60 bool volatile first_open; 81 82 char * name;83 61 }; 84 62 -
trunk/pbs_drmaa/pbs_attrib.h
r26 r29 81 81 #define PBSDRMAA_EXTENSION "extension" 82 82 #define PBSDRMAA_SUBMIT_ARGS "submit_args" 83 #define PBSDRMAA_MTIME "mtime" 83 84 84 85 -
trunk/pbs_drmaa/session.c
r12 r29 520 520 TRY 521 521 { 522 log_reader = pbsdrmaa_log_reader_new( self , NULL);522 log_reader = pbsdrmaa_log_reader_new( self ); 523 523 log_reader->read_log( log_reader ); 524 524 } -
trunk/pbs_drmaa/session.h
r12 r29 35 35 fsd_drmaa_session_t super; 36 36 37 bool (* 38 do_drm_keeps_completed_jobs)( pbsdrmaa_session_t *self ); 37 bool (*do_drm_keeps_completed_jobs)( pbsdrmaa_session_t *self ); 39 38 40 void (* 41 super_destroy)( fsd_drmaa_session_t *self ); 39 void (*super_destroy)( fsd_drmaa_session_t *self ); 42 40 43 void (* 44 super_apply_configuration)(fsd_drmaa_session_t *self); 41 void (*super_apply_configuration)(fsd_drmaa_session_t *self); 45 42 46 43 /* 47 44 * Pointer to standard wait_thread drmaa_utils function 48 45 */ 49 void* (* 50 super_wait_thread)( fsd_drmaa_session_t *self ); 46 void* (*super_wait_thread)( fsd_drmaa_session_t *self ); 51 47 52 48 /* … … 72 68 73 69 /* 74 * Log file initial size - used by wait_thread which reads log files 70 * Log file initial size - used by wait_thread which reads log files TODO: check if it can be safely moved to log_reader 75 71 */ 76 72 off_t log_file_initial_size; 77 73 78 74 /* 79 * Time we checked log file initial size - used by wait_thread which reads log files 75 * Time we checked log file initial size - used by wait_thread which reads log files TODO: check if it can be safely moved to log_reader 80 76 */ 81 77 time_t log_file_initial_time; -
trunk/pbs_drmaa/submit.c
r24 r29 151 151 if( name && name[0] != '!' && pbs_tmpl->get_attr( pbs_tmpl, name ) ) 152 152 { 153 struct attrl *p;154 const char *resource;155 153 const char *value; 154 156 155 value = pbs_tmpl->get_attr( pbs_tmpl, name ); 157 fsd_malloc( p, struct attrl ); 158 memset( p, 0, sizeof(struct attrl) ); 159 p->next = pbs_attr; 160 pbs_attr = p; 161 resource = strchr( name, '.' ); 162 if( resource ) 163 { 164 p->name = fsd_strndup( name, resource-name ); 165 p->resource = fsd_strdup( resource+1 ); 166 } 167 else 168 p->name = fsd_strdup( name ); 169 fsd_log_debug(("set attr: %s = %s", name, value)); 170 p->value = fsd_strdup( value ); 171 p->op = SET; 156 pbs_attr = pbsdrmaa_add_attr( pbs_attr, name, value ); 172 157 } 173 158 } -
trunk/pbs_drmaa/util.c
r16 r29 73 73 fsd_free( p ); 74 74 } 75 } 76 77 struct attrl * 78 pbsdrmaa_add_attr( struct attrl *head, const char *name, const char *value) 79 { 80 struct attrl *p = NULL; 81 char *resource = NULL; 82 83 fsd_malloc( p, struct attrl ); 84 memset( p, 0, sizeof(struct attrl) ); 85 86 resource = strchr( name, '.' ); 87 88 if( resource ) 89 { 90 p->name = fsd_strndup( name, resource - name ); 91 p->resource = fsd_strdup( resource+1 ); 92 } 93 else 94 { 95 p->name = fsd_strdup( name ); 96 } 97 98 p->value = fsd_strdup(value); 99 p->op = SET; 100 101 fsd_log_debug(("set attr: %s = %s", name, value)); 102 103 if (head) 104 p->next = head; 105 else 106 p->next = NULL; 107 108 return p; 75 109 } 76 110 … … 247 281 } 248 282 249 ssize_t fsd_getline(char * line,ssize_t size, int fd) 250 { 251 char buf; 252 char * ptr = NULL; 253 ssize_t n = 0, rc; 254 ptr = line; 255 for(n = 1; n< size; n++) 256 { 257 if( (rc = read(fd,&buf,1 )) == 1) { 258 *ptr++ = buf; 259 if(buf == '\n') 260 { 261 break; 262 } 263 } 264 else if (rc == 0) { 265 if (n == 1) 266 return 0; 267 else 268 break; 269 } 270 else 271 return -1; 272 } 273 274 return n; 275 } 276 277 ssize_t fsd_getline_buffered(char * line,char * buf, ssize_t size, int fd, int * idx, int * end_idx, int * line_idx) 278 { 279 int i = -1; 280 int rc = -1; 281 282 memset(line,0,size); 283 284 start: 285 /* idx - start of data to parse (in buffer) 286 end_idx - end of data read from log (in buffer) 287 line_idx - place to write data in output line */ 288 if(*idx < *end_idx) 289 { 290 /* take line from buffer */ 291 for(i = *idx; i<= *end_idx;i++) 292 { 293 if(buf[i] == '\n') 294 { 295 int tmp = i - *idx; 296 strncpy(line + *line_idx,buf + *idx,tmp); 297 *idx = i + 1; 298 299 tmp+= *line_idx; 300 *line_idx = 0; 301 302 return tmp; 303 } 304 } 305 306 /* there was no '\n' so next part of log needs to be read. save lines beginning */ 307 if(*line_idx + i - *idx > size ) 308 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Line longer than %d unsupported",size); 309 310 strncpy(line + *line_idx,buf + *idx,i - *idx); 311 *line_idx += i - *idx; 312 *idx = 0; 313 *end_idx = 0; 314 goto start; 315 } 316 else 317 { 318 /* read log */ 319 if((rc = read(fd,buf,size)) > 0) 320 { 321 *end_idx = rc - 1; 322 *idx = 0; 323 goto start; 324 } 325 else if (rc == 0) 326 return 0; 327 else 328 return -1; 329 } 330 } 331 283 -
trunk/pbs_drmaa/util.h
r12 r29 31 31 32 32 void pbsdrmaa_free_attrl( struct attrl *list ); 33 void pbsdrmaa_dump_attrl( 34 const struct attrl *attribute_list, const char *prefix ); 33 void pbsdrmaa_dump_attrl( const struct attrl *attribute_list, const char *prefix ); 34 35 struct attrl *pbsdrmaa_add_attr( struct attrl *head, const char *name, const char *value); 35 36 36 37 /** … … 43 44 pbsdrmaa_write_tmpfile( const char *content, size_t len ); 44 45 45 ssize_t46 fsd_getline(char * line,ssize_t size, int fd);47 48 ssize_t49 fsd_getline_buffered(char * line,char * buf, ssize_t size, int fd, int * idx, int * end_idx, int * line_idx);50 46 51 47 #endif /* __PBS_DRMAA__UTIL_H */
Note: See TracChangeset
for help on using the changeset viewer.