- Timestamp:
- 09/17/12 23:25:29 (13 years ago)
- Location:
- trunk/pbs_drmaa
- Files:
-
- 1 edited
- 2 copied
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/Makefile.am
r12 r76 31 31 submit.c submit.h \ 32 32 util.c util.h \ 33 log_reader.c log_reader.h 33 log_reader.c log_reader.h \ 34 pbs_conn.c pbs_conn.h 34 35 BUILT_SOURCES = pbs_attrib.c 35 36 EXTRA_DIST = pbs_attrib.c -
trunk/pbs_drmaa/pbs_conn.c
r60 r76 22 22 #endif 23 23 24 #include <stdlib.h>25 #include <ctype.h>26 #include <string.h>27 #include <unistd.h>28 #include <sys/stat.h>29 #include <sys/types.h>30 #include <dirent.h>31 #include <fcntl.h>32 33 #include <pbs_ifl.h>34 24 #include <pbs_error.h> 35 25 … … 41 31 #include <drmaa_utils/datetime.h> 42 32 43 #include <pbs_drmaa/job.h> 44 #include <pbs_drmaa/log_reader.h> 45 #include <pbs_drmaa/session.h> 46 #include <pbs_drmaa/submit.h> 33 #include <pbs_drmaa/pbs_conn.h> 47 34 #include <pbs_drmaa/util.h> 48 #include <pbs_drmaa/pbs_attrib.h>49 35 50 36 #include <errno.h> 51 52 enum pbsdrmaa_field_id 53 { 54 PBSDRMAA_FLD_ID_DATE = 0, 55 PBSDRMAA_FLD_ID_EVENT = 1, 56 PBSDRMAA_FLD_ID_SRC = 2, 57 PBSDRMAA_FLD_ID_OBJ_TYPE = 3, 58 PBSDRMAA_FLD_ID_OBJ_ID = 4, 59 PBSDRMAA_FLD_ID_MSG = 5 60 }; 61 62 63 #define PBSDRMAA_FLD_MSG_0008 "0008" 64 #define PBSDRMAA_FLD_MSG_0010 "0010" 65 66 enum pbsdrmaa_event_type 67 { 68 pbsdrmaa_event_0008 = 8, 69 pbsdrmaa_event_0010 = 10 70 }; 71 72 static void pbsdrmaa_read_log(); 73 74 static void pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self); 75 76 static void pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self); 77 78 static void pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self); 79 80 static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size); 81 82 static char *pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id); 83 84 /* 85 * Snippets from log files 86 * 87 * PBS Pro 88 * 89 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal 90 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl 91 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb) 92 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl 93 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01 94 95 * 96 * Torque 97 * 98 10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short 99 10/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl 100 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 101 102 deleting job: 103 I . PBS Pro 104 a) in Q state 105 10/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Queued at request of mmamonski@grass1.man.poznan.pl, owner = mmamonski@grass1.man.poznan.pl, job name = STDIN, queue = workq 106 10/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Modified at request of Scheduler@grass1.man.poznan.pl 107 10/16/2011 09:49:37;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 108 10/16/2011 09:49:37;0100;Server@grass1;Job;2178.grass1.man.poznan.pl;dequeuing from workq, state 5 109 110 111 b) in R state 112 10/16/2011 09:45:12;0080;Server@grass1;Job;2177.grass1.man.poznan.pl;delete job request received 113 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job sent signal TermJob on delete 114 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 115 10/16/2011 09:45:12;0010;Server@grass1;Job;2177.grass1.man.poznan.pl;Exit_status=271 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=2772kb resources_used.ncpus=1 resources_used.vmem=199288kb resources_used.walltime=00:00:26 116 10/16/2011 09:45:12;0100;Server@grass1;Job;2177.grass1.man.poznan.pl;dequeuing from workq, state 5 117 118 II. Torque 119 a) in Q state 120 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 121 10/15/2011 21:19:25;0100;PBS_Server;Job;113045.grass1.man.poznan.pl;dequeuing from batch, state EXITING 122 123 b) in R state 124 10/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 125 10/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job sent signal SIGTERM on delete 126 10/15/2011 21:19:47;0010;PBS_Server;Job;113046.grass1.man.poznan.pl;Exit_status=271 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:10 127 128 Log closed: 129 10/16/2011 00:00:17;0002;PBS_Server;Svr;Log;Log closed 130 131 */ 132 pbsdrmaa_log_reader_t * 133 pbsdrmaa_log_reader_new( fsd_drmaa_session_t *session ) 134 { 135 pbsdrmaa_log_reader_t *volatile self = NULL; 37 #include <signal.h> 38 #include <unistd.h> 39 40 41 static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ); 42 43 static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self, char *job_id, struct attrl *attrib ); 44 45 static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status ); 46 47 static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal ); 48 49 static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id ); 50 51 static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id ); 52 53 static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self, char *job_id ); 54 55 static void pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect); 56 57 pbsdrmaa_pbs_conn_t * 58 pbsdrmaa_pbs_conn_new( pbsdrmaa_session_t *session, char *server ) 59 { 60 pbsdrmaa_pbs_conn_t *volatile self = NULL; 136 61 137 62 fsd_log_enter(("")); 138 63 139 64 TRY 140 {141 fsd_malloc(self, pbsdrmaa_ log_reader_t );65 { 66 fsd_malloc(self, pbsdrmaa_pbs_conn_t ); 142 67 143 68 self->session = session; 144 145 self->select_file = pbsdrmaa_select_file;146 self->read_log = pbsdrmaa_read_log;147 self->close = pbsdrmaa_close_log;148 self->reopen = pbsdrmaa_reopen_log;149 69 150 self->run_flag = true; 151 self->fhandle = NULL; 152 self->date_changed = true; 153 self->first_open = true; 154 self->log_path = NULL; 155 self->current_offset = 0; 156 157 } 70 self->submit = pbsdrmaa_pbs_submit; 71 self->statjob = pbsdrmaa_pbs_statjob; 72 self->statjob_free = pbsdrmaa_pbs_statjob_free; 73 self->sigjob = pbsdrmaa_pbs_sigjob; 74 self->deljob = pbsdrmaa_pbs_deljob; 75 self->rlsjob = pbsdrmaa_pbs_rlsjob; 76 self->holdjob = pbsdrmaa_pbs_holdjob; 77 78 self->server = fsd_strdup(server); 79 80 self->connection_fd = -1; 81 self->last_usage = time(NULL); 82 83 /*ignore SIGPIPE - otheriwse pbs_disconnect cause the program to exit */ 84 signal(SIGPIPE, SIG_IGN); 85 86 pbsdrmaa_pbs_reconnect_internal(self, false); 87 } 158 88 EXCEPT_DEFAULT 159 {89 { 160 90 if( self != NULL) 91 { 92 fsd_free(self->server); 161 93 fsd_free(self); 94 95 if (self->connection_fd != -1) 96 pbs_disconnect(self->connection_fd); 97 } 162 98 163 99 fsd_exc_reraise(); 164 }100 } 165 101 END_TRY 166 102 … … 172 108 173 109 void 174 pbsdrmaa_ log_reader_destroy ( pbsdrmaa_log_reader_t * self )110 pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ) 175 111 { 176 112 fsd_log_enter(("")); … … 179 115 if(self != NULL) 180 116 { 117 fsd_free(self->server); 181 118 fsd_free(self); 119 120 if (self->connection_fd != -1) 121 pbs_disconnect(self->connection_fd); 182 122 } 183 123 } … … 191 131 } 192 132 193 194 void 195 pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self ) 196 { 197 fsd_log_enter(("")); 198 199 fsd_mutex_lock( &self->session->mutex ); 200 201 TRY 202 { 203 while( self->run_flag ) 204 { 205 TRY 206 { 207 char *line = NULL; 208 209 self->select_file(self); 210 211 while ((line = fsd_readline(self->fhandle)) != NULL) 212 { 213 int field_id = PBSDRMAA_FLD_ID_DATE; 214 char *tok_ctx = NULL; 215 char *field_token = NULL; 216 char *event_timestamp = NULL; 217 int event_type = -1; 218 fsd_job_t *job = NULL; 219 220 /* at first detect if this not the end of log file */ 221 if (strstr(line, "Log;Log closed")) /*TODO try to be more effective and safe */ 222 { 223 fsd_log_debug(("WT - Date changed. Closing log file")); 224 self->date_changed = true; 225 goto cleanup; 226 } 227 228 for (field_token = strtok_r(line, ";", &tok_ctx); field_token; field_token = strtok_r(NULL, ";", &tok_ctx), field_id++) 229 { 230 if ( field_id == PBSDRMAA_FLD_ID_DATE) 231 { 232 event_timestamp = field_token; 233 #ifdef PBS_PBS_PROFESSIONAL 234 /*additional check */ 235 TRY 236 { 237 (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 238 } 239 EXCEPT_DEFAULT 240 { 241 fsd_log_error(("Failed to parse timestamp: %s. Log corrupted?", event_timestamp)); 242 } 243 END_TRY 244 #endif 245 } 246 else if ( field_id == PBSDRMAA_FLD_ID_EVENT) 247 { 248 if (strncmp(field_token, PBSDRMAA_FLD_MSG_0008, 4) == 0) 249 event_type = pbsdrmaa_event_0008; 250 else if (strncmp(field_token, PBSDRMAA_FLD_MSG_0010, 4) == 0) 251 event_type = pbsdrmaa_event_0010; 252 else 253 { 254 goto cleanup; /*we are interested only in the above log messages */ 255 } 256 } 257 else if ( field_id == PBSDRMAA_FLD_ID_SRC) 258 { 259 /* not used ignore */ 260 } 261 else if (field_id == PBSDRMAA_FLD_ID_OBJ_TYPE) 262 { 263 if (strncmp(field_token, "Job", 3) != 0) 264 { 265 goto cleanup; /* we are interested only in job events */ 266 } 267 } 268 else if (field_id == PBSDRMAA_FLD_ID_OBJ_ID) 269 { 270 const char *event_jobid = field_token; 271 272 if (!isdigit(event_jobid[0])) 273 { 274 fsd_log_debug(("WT - Invalid job: %s", event_jobid)); 275 goto cleanup; 276 } 277 278 job = self->session->get_job( self->session, event_jobid ); 279 280 if( job ) 281 { 282 fsd_log_debug(("WT - Found job event: %s", event_jobid)); 283 } 284 else 285 { 286 fsd_log_debug(("WT - Unknown job: %s", event_jobid)); /* Not a DRMAA job */ 287 goto cleanup; 288 } 289 } 290 else if (field_id == PBSDRMAA_FLD_ID_MSG) 291 { 292 char *msg = field_token; 293 struct batch_status status; 294 struct attrl *attribs = NULL; 295 bool in_running_state = false; 296 297 if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Queued", 10) == 0) 298 { 299 /* Queued 300 * PBS Pro: 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal 301 * Torque: 10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short 302 */ 303 char *p_queue = NULL; 304 305 fsd_log_info(("WT - Detected queuing of job %s", job->job_id)); 306 307 if ((p_queue = strstr(msg,"queue =")) == NULL) 308 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"No queue attribute found in log line = %s", line); 309 310 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "Q"); 311 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_QUEUE, p_queue + 7); 312 } 313 else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Run", 7) == 0) 314 { 315 /* 316 * Running 317 * Torque: 10/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl 318 * PBS Pro: 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb) 319 */ 320 char timestamp_unix[64]; 321 322 fsd_log_info(("WT - Detected start of job %s", job->job_id)); 323 324 (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 325 326 in_running_state = true; 327 328 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "R"); 329 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_START_TIME, timestamp_unix); 330 #ifdef PBS_PROFESSIONAL 331 { 332 char *p_vnode = NULL; 333 if ((p_vnode = strstr(msg, "exec_vnode"))) 334 { 335 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_VNODE, p_vnode + 11); 336 } 337 } 338 #endif 339 } 340 #ifndef PBS_PROFESSIONAL 341 else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job deleted", 11) == 0) 342 #else 343 else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job to be deleted", 17) == 0) 344 #endif 345 { 346 /* Deleted 347 * PBS Pro: 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 348 * Torque: 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 349 */ 350 char timestamp_unix[64]; 351 352 fsd_log_info(("WT - Detected deletion of job %s", job->job_id)); 353 354 (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 355 356 if (job->state < DRMAA_PS_RUNNING) 357 { 358 fsd_log_info(("WT - Job %s killed before entering running state (%d).", job->job_id, job->state)); 359 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 360 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 361 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, "-2"); 362 } 363 else 364 { 365 fsd_log_info(("WT - Job %s killed after entering running state (%d). Waiting for Completed event...", job->job_id, job->state)); 366 goto cleanup; /* job was started, ignore, wait for Exit_status message */ 367 } 368 } 369 else if (event_type == pbsdrmaa_event_0010 && (strncmp(msg, "Exit_status=", 12) == 0)) 370 { 371 /* Completed: 372 * PBS Pro: 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01 373 * Torque: 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 374 */ 375 char timestamp_unix[64]; 376 time_t timestamp_time_t = pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 377 char *tok_ctx2 = NULL; 378 char *token = NULL; 379 380 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 381 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 382 383 /* tokenize !!! */ 384 for (token = strtok_r(msg, " ", &tok_ctx2); token; token = strtok_r(NULL, " ", &tok_ctx2)) 385 { 386 if (strncmp(token, "Exit_status=", 12) == 0) 387 { 388 token[11] = '\0'; 389 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, token + 12); 390 fsd_log_info(("WT - Completion of job %s (Exit_status=%s) detected after %d seconds", job->job_id, token+12, (int)(time(NULL) - timestamp_time_t) )); 391 } 392 else if (strncmp(token, "resources_used.cput=", 20) == 0) 393 { 394 token[19] = '\0'; 395 attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 396 } 397 else if (strncmp(token, "resources_used.mem=", 19) == 0) 398 { 399 token[18] = '\0'; 400 attribs = pbsdrmaa_add_attr(attribs, token, token + 19); 401 } 402 else if (strncmp(token, "resources_used.vmem=", 20) == 0) 403 { 404 token[19] = '\0'; 405 attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 406 } 407 else if (strncmp(token, "resources_used.walltime=", 24) == 0) 408 { 409 token[23] = '\0'; 410 attribs = pbsdrmaa_add_attr(attribs, token, token + 24); 411 } 412 } 413 414 if (!job->execution_hosts) 415 { 416 char *exec_host = NULL; 417 fsd_log_info(("WT - No execution host information for job %s. Reading accounting logs...", job->job_id)); 418 exec_host = pbsdrmaa_get_exec_host_from_accountig(self, job->job_id); 419 if (exec_host) 420 { 421 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_HOST, exec_host); 422 fsd_free(exec_host); 423 } 424 } 425 } 426 else 427 { 428 fsd_log_debug(("Ignoring msg(type=%d) = %s", event_type, msg)); 429 goto cleanup; /* ignore other job events*/ 430 } 431 432 fsd_log_debug(("WT - updating job: %s", job->job_id )); 433 status.name = job->job_id; 434 status.attribs = attribs; 435 436 ((pbsdrmaa_job_t *)job)->update( job, &status ); 437 438 if ( in_running_state ) 439 { 440 fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); 441 TRY 442 { 443 job->update_status( job ); 444 } 445 EXCEPT_DEFAULT 446 { 447 /*TODO: distinguish between invalid job and internal errors */ 448 fsd_log_debug(("Job finished just after entering running state: %s", job->job_id)); 449 } 450 END_TRY 451 } 452 453 454 pbsdrmaa_free_attrl(attribs); /* TODO free on exception */ 455 456 fsd_cond_broadcast( &job->status_cond); 457 fsd_cond_broadcast( &self->session->wait_condition ); 458 459 } 460 else 461 { 462 fsd_assert(0); /*not reached */ 463 } 464 } 465 cleanup: 466 fsd_free(line); /* TODO what about exceptions */ 467 if ( job ) 468 job->release( job ); 469 470 471 472 } /* end of while getline loop */ 473 474 475 476 fsd_mutex_unlock( &self->session->mutex ); 477 478 /* close */ 479 self->close(self); 480 481 sleep(((pbsdrmaa_session_t *)self->session)->wait_thread_sleep_time); 482 483 /* and reopen log file */ 484 self->reopen(self); 485 486 fsd_mutex_lock( &self->session->mutex ); 487 488 self->run_flag = self->session->wait_thread_run_flag; 489 } 490 EXCEPT_DEFAULT 491 { 492 const fsd_exc_t *e = fsd_exc_get(); 493 /* Its better to exit and communicate error rather then let the application to hang */ 494 fsd_log_fatal(( "Exception in wait thread: <%d:%s>. Exiting !!!", e->code(e), e->message(e) )); 495 exit(1); 496 } 497 END_TRY 498 } 499 500 if(self->fhandle) 501 fclose(self->fhandle); 502 503 fsd_log_debug(("WT - Log file closed")); 504 } 505 FINALLY 506 { 507 fsd_log_debug(("WT - Terminated.")); 508 fsd_mutex_unlock( &self->session->mutex ); /**/ 509 } 510 END_TRY 511 512 fsd_log_return(("")); 513 } 514 515 void 516 pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self ) 517 { 518 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 519 520 if (self->date_changed) 521 { 522 int num_tries = 0; 523 struct tm tm; 524 char *old_log_path = NULL; 525 526 fsd_log_enter(("")); 527 528 if(!self->first_open) 529 time(&self->t); 530 else 531 self->t = pbssession->log_file_initial_time; 532 533 localtime_r(&self->t,&tm); 534 535 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 536 /* generate new date, close file and open new */ 537 old_log_path = self->log_path; 538 539 self->log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 540 541 if(self->fhandle) 542 fclose(self->fhandle); 543 544 fsd_log_info(("Opening log file: %s",self->log_path)); 545 546 retry: 547 if ((self->fhandle = fopen(self->log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open)) 548 { 549 fsd_log_error(("Can't open log file: %s. Verify pbs_home. Running standard wait_thread.", self->log_path)); 550 fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status")); 551 /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */ 552 pbssession->wait_thread_log = false; 553 pbssession->super.wait_thread = pbssession->super_wait_thread; 554 pbssession->super.wait_thread(self->session); 555 } 556 else if ( self->fhandle == NULL ) 557 { /* Torque seems not to create a new file immediately after the old one is closed */ 558 fsd_log_warning(("Can't open log file: %s. Retries count: %d", self->log_path, num_tries)); 559 num_tries++; 560 sleep(2 * num_tries); 561 goto retry; 562 } 563 564 fsd_log_debug(("Log file opened")); 565 566 if(self->first_open) 567 { 568 fsd_log_debug(("Log file lseek")); 569 570 if(fseek(self->fhandle, pbssession->log_file_initial_size, SEEK_SET) == (off_t) -1) 571 { 572 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 573 } 574 self->first_open = false; 575 } 576 else if (old_log_path && strcmp(old_log_path, self->log_path) == 0) 577 { 578 fsd_log_info(("PBS restarted. Seeking log file %u", (unsigned int)self->current_offset)); 579 if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1) 580 { 581 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 582 } 583 } 584 585 self->date_changed = false; 586 587 fsd_free(old_log_path); 588 589 fsd_log_return(("")); 590 } 591 } 592 593 time_t 594 pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size) 595 { 596 struct tm temp_time_tm; 597 memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 598 temp_time_tm.tm_isdst = -1; 599 600 if (strptime(timestamp, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) 601 { 602 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - failed to parse log timestamp: %s", timestamp); 603 } 604 else 605 { 606 time_t temp_time = mktime(&temp_time_tm); 607 snprintf(unixtime_str, size, "%lu", temp_time); 608 return temp_time; 609 } 610 } 611 612 char * 613 pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id) 614 { 615 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) log_reader->session; 616 struct tm tm; 617 time_t tm_t; 618 char *line = NULL; 619 char *exec_host = NULL; 620 char *log_path = NULL; 621 FILE *fhandle = NULL; 622 623 fsd_log_enter(("(job_id=%s)", job_id)); 624 625 tm_t = time(NULL); 626 localtime_r(&tm_t, &tm); 627 628 log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 629 630 fsd_log_info(("Opening accounting log file: %s", log_path)); 631 632 if ((fhandle = fopen(log_path, "r")) == NULL) 633 { 634 fsd_log_error(("Failed to open accounting log file: %s", log_path)); 635 fsd_free(log_path); 636 return NULL; 637 } 638 639 fsd_free(log_path); 640 /* 641 10/27/2011 14:09:32;E;114249.grass1.man.poznan.pl;user=drmaa group=drmaa jobname=none queue=shortq ctime=1319717371 qtime=1319717371 etime=1319717371 start=1319717372 owner=drmaa@grass1.man.poznan.pl exec_host=grass4.man.poznan.pl/0 Resource_List.neednodes=1 Resource_List.nodect=1 Resource_List.nodes=1 Resource_List.walltime=02:00:00 session=28561 end=1319717372 Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:00 642 */ 643 while ((line = fsd_readline(fhandle)) != NULL) 644 { 645 646 if (line[20] == 'E' && strncmp(line + 22, job_id, strlen(job_id)) == 0 ) 647 { 648 char *p = NULL; 649 650 fsd_log_debug(("Matched accounting log record = %s", line)); 651 652 if (!(exec_host = strstr(line, "exec_host"))) 653 { 654 fsd_log_error(("Invalid accounting record: %s", exec_host)); 655 break; 656 } 657 658 exec_host += 10; 659 660 p = exec_host; 661 while (*p != ' ' && *p != '\0') 662 p++; 663 *p = '\0'; 664 665 break; 666 } 667 668 fsd_free(line); 669 } 670 671 if (exec_host) 672 { 673 fsd_log_info(("Job %s was executing on hosts %s.", job_id, exec_host)); 674 exec_host = fsd_strdup(exec_host); 675 } 133 char* 134 pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ) 135 { 136 137 138 } 139 140 struct batch_status* 141 pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self, char *job_id, struct attrl *attrib ) 142 { 143 144 } 145 146 void 147 pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status ) 148 { 149 150 151 } 152 153 void 154 pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal ) 155 { 156 157 158 } 159 160 void 161 pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id ) 162 { 163 164 } 165 166 void 167 pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id ) 168 { 169 170 171 } 172 173 void 174 pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self, char *job_id ) 175 { 176 177 } 178 179 void 180 pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool force_reconnect) 181 { 182 int tries_left = self->session->max_retries_count; 183 int sleep_time = 1; 184 185 186 fsd_log_enter(("(%d)", self->connection_fd)); 187 188 if ( self->connection_fd != -1 ) 189 { 190 if (!force_reconnect) 191 { 192 fsd_log_return(("(%d)", self->connection_fd)); 193 return; 194 } 676 195 else 677 196 { 678 fsd_log_error(("Could not find executions hosts for %s.", job_id)); 197 pbs_disconnect(self->connection_fd); 198 self->connection_fd = -1; 679 199 } 680 681 if (line) 682 fsd_free(line); 683 684 fclose(fhandle); 685 686 return exec_host; 687 } 688 689 void 690 pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self ) 691 { 692 693 self->current_offset = ftello(self->fhandle); 694 695 fsd_log_debug(("Closing log file (offset=%d)", self->current_offset)); 696 697 fclose(self->fhandle); 698 699 self->fhandle = NULL; 700 } 701 702 void 703 pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self ) 704 { 705 fsd_log_debug(("Reopening log file: %s (offset=%d)", self->log_path, self->current_offset)); 706 707 if ((self->fhandle = fopen(self->log_path,"r")) == NULL) 708 { 709 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to reopen log file"); 710 } 711 712 if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1) 713 { 714 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 715 } 716 } 717 200 } 201 202 retry_connect: /* Life... */ 203 self->connection_fd = pbs_connect( self->server ); 204 fsd_log_info(( "pbs_connect(%s) =%d", self->server, self->connection_fd )); 205 if( self->connection_fd < 0 && tries_left-- ) 206 { 207 sleep(sleep_time); 208 sleep_time *=2; 209 goto retry_connect; 210 } 211 212 if( self->connection_fd < 0 ) 213 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 214 215 fsd_log_return(("(%d)", self->connection_fd)); 216 } 217 -
trunk/pbs_drmaa/pbs_conn.h
r48 r76 1 1 /* $Id$ */ 2 2 /* 3 * FedStage DRMAA forPBS Pro4 * Copyright (C) 20 06-2009 FedStage Systems3 * PSNC DRMAA for Torque/PBS Pro 4 * Copyright (C) 2012 Poznan Supercomputing and Networking Center 5 5 * 6 6 * This program is free software: you can redistribute it and/or modify … … 18 18 */ 19 19 20 #ifndef __PBS_DRMAA__ LOG_READER_H21 #define __PBS_DRMAA__ LOG_READER_H20 #ifndef __PBS_DRMAA__PBS_CONN_H 21 #define __PBS_DRMAA__PBS_CONN_H 22 22 23 23 #ifdef HAVE_CONFIG_H … … 30 30 #include <drmaa_utils/session.h> 31 31 32 typedef struct pbsdrmaa_log_reader_s pbsdrmaa_log_reader_t; 32 #include <session.h> 33 33 34 pbsdrmaa_log_reader_t * 35 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session); 34 #include <pbs_ifl.h> 35 36 typedef struct pbsdrmaa_pbs_conn_s pbsdrmaa_pbs_conn_t; 37 38 pbsdrmaa_pbs_conn_t * pbsdrmaa_pbs_conn_new ( pbsdrmaa_session_t * session, char *server); 36 39 37 40 void 38 pbsdrmaa_ log_reader_destroy ( pbsdrmaa_log_reader_t * self );41 pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ); 39 42 40 struct pbsdrmaa_log_reader_s { 41 fsd_drmaa_session_t *volatile session ; 43 struct pbsdrmaa_pbs_conn_s { 44 pbsdrmaa_session_t *volatile session; 45 46 char* (*submit) ( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ); 47 48 struct batch_status* (*statjob) ( pbsdrmaa_pbs_conn_t *self, char *job_id, struct attrl *attrib ); 49 50 void (*statjob_free) ( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status ); 51 52 void (*sigjob) ( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal ); 53 54 void (*deljob) ( pbsdrmaa_pbs_conn_t *self, char *job_id ); 55 56 void (*rlsjob) ( pbsdrmaa_pbs_conn_t *self, char *job_id ); 57 58 void (*holdjob) ( pbsdrmaa_pbs_conn_t *self, char *job_id ); 42 59 43 void (*read_log) ( pbsdrmaa_log_reader_t * self ); 60 /* contact string */ 61 char *server; 62 /* connection descriptor */ 63 int connection_fd; 44 64 45 void (*select_file) ( pbsdrmaa_log_reader_t * self ); 46 47 void (*close) ( pbsdrmaa_log_reader_t * self ); 48 49 void (*reopen) ( pbsdrmaa_log_reader_t * self ); 50 51 52 /* determines if function should run */ 53 bool run_flag; 54 55 /* date of current file */ 56 time_t t; 57 58 /* log file handle */ 59 FILE *fhandle; 60 61 /* for wait_thread - day changed */ 62 bool volatile date_changed; 63 64 /* for wait_thread - log file first open */ 65 bool volatile first_open; 66 67 char *volatile log_path; 68 69 off_t volatile current_offset; 65 /* timestamp of last usage */ 66 time_t last_usage; 70 67 }; 71 68 72 #endif /* __PBS_DRMAA__ LOG_READER_H */69 #endif /* __PBS_DRMAA__PBS_CONN_H */
Note: See TracChangeset
for help on using the changeset viewer.