Changeset 48 for trunk/pbs_drmaa
- Timestamp:
- 12/07/11 18:00:32 (13 years ago)
- Location:
- trunk/pbs_drmaa
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/log_reader.c
r45 r48 72 72 static void pbsdrmaa_read_log(); 73 73 74 static void pbsdrmaa_select_file_wait_thread( pbsdrmaa_log_reader_t * self); 75 76 char *pbsdrmaa_read_line_wait_thread( pbsdrmaa_log_reader_t * self); 74 static void pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self); 75 76 static void pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self); 77 78 static void pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self); 77 79 78 80 static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size); … … 141 143 self->session = session; 142 144 143 self->select_file = pbsdrmaa_select_file _wait_thread;145 self->select_file = pbsdrmaa_select_file; 144 146 self->read_log = pbsdrmaa_read_log; 147 self->close = pbsdrmaa_close_log; 148 self->reopen = pbsdrmaa_reopen_log; 145 149 146 150 self->run_flag = true; … … 148 152 self->date_changed = true; 149 153 self->first_open = true; 154 self->log_path = NULL; 155 self->current_offset = 0; 150 156 151 157 } … … 458 464 fsd_mutex_unlock( &self->session->mutex ); 459 465 466 /* close */ 467 self->close(self); 468 460 469 sleep(((pbsdrmaa_session_t *)self->session)->wait_thread_sleep_time); 470 471 /* and reopen log file */ 472 self->reopen(self); 461 473 462 474 fsd_mutex_lock( &self->session->mutex ); … … 490 502 491 503 void 492 pbsdrmaa_select_file _wait_thread( pbsdrmaa_log_reader_t * self )504 pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self ) 493 505 { 494 506 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; … … 496 508 if (self->date_changed) 497 509 { 498 char * log_path = NULL;499 510 int num_tries = 0; 500 511 struct tm tm; … … 511 522 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 512 523 /* generate new date, close file and open new */ 513 log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 524 if (self->log_path) 525 fsd_free(self->log_path); 526 527 self->log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 514 528 515 529 if(self->fhandle) 516 530 fclose(self->fhandle); 517 531 518 fsd_log_info(("Opening log file: %s", log_path));532 fsd_log_info(("Opening log file: %s",self->log_path)); 519 533 520 534 retry: 521 if ((self->fhandle = fopen( log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open))535 if ((self->fhandle = fopen(self->log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open)) 522 536 { 523 fsd_log_error(("Can't open log file . Verify pbs_home. Running standard wait_thread."));537 fsd_log_error(("Can't open log file: %s. Verify pbs_home. Running standard wait_thread.", self->log_path)); 524 538 fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status")); 525 539 /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */ … … 530 544 else if ( self->fhandle == NULL ) 531 545 { /* Torque seems not to create a new file immediately after the old one is closed */ 532 fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));546 fsd_log_warning(("Can't open log file: %s. Retries count: %d", self->log_path, num_tries)); 533 547 num_tries++; 534 548 sleep(2 * num_tries); 535 549 goto retry; 536 550 } 537 538 fsd_free(log_path);539 551 540 552 fsd_log_debug(("Log file opened")); … … 653 665 } 654 666 655 667 void 668 pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self ) 669 { 670 671 self->current_offset = ftello(self->fhandle); 672 673 fclose(self->fhandle); 674 675 self->fhandle = NULL; 676 } 677 678 void 679 pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self ) 680 { 681 if ((self->fhandle = fopen(self->log_path,"r")) == NULL) 682 { 683 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to reopen log file"); 684 } 685 686 if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1) 687 { 688 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 689 } 690 } 691 -
trunk/pbs_drmaa/log_reader.h
r29 r48 45 45 void (*select_file) ( pbsdrmaa_log_reader_t * self ); 46 46 47 void (*close) ( pbsdrmaa_log_reader_t * self ); 48 49 void (*reopen) ( pbsdrmaa_log_reader_t * self ); 50 51 47 52 /* determines if function should run */ 48 53 bool run_flag; … … 59 64 /* for wait_thread - log file first open */ 60 65 bool volatile first_open; 66 67 char *volatile log_path; 68 69 off_t volatile current_offset; 61 70 }; 62 71 -
trunk/pbs_drmaa/pbs_attrib.gperf
r47 r48 68 68 Resource_List.host, t(PBSDRMAA_ATTR_HOST) 69 69 Resource_List.nodes, t(PBSDRMAA_ATTR_NODES) 70 Resource_List.software, t(PBSDRMAA_ATTR_SOFTWARE)71 70 Resource_List.procs, t(PBSDRMAA_ATTR_PROCS) 72 71 Resource_List.ncpus, t(PBSDRMAA_ATTR_NCPUS) … … 103 102 submit_args, t(PBSDRMAA_ATTR_SUBMIT_ARGS) 104 103 mtime, t(PBSDRMAA_ATTR_MTIME) 105 # reservation attributes: 106 # reserve_start, t(PBSDRMAA_ATTR_RESERVATION_START) 107 # reserve_end, t(PBSDRMAA_ATTR_RESERVATION_END) 108 # reserve_duration, t(PBSDRMAA_ATTR_RESERVATION_DURATION) 109 # reserve_state, t(PBSDRMAA_ATTR_RESERVATION_STATE) 110 # reserve_substate, t(PBSDRMAA_ATTR_RESERVATION_SUBSTATE) 104 pbsdrmaa.node_properties, t(PBSDRMAA_ATTR_NODE_PROPERTIES) 105 pbsdrmaa.custom_resources, t(PBSDRMAA_ATTR_CUSTOM_RESOURCES) 111 106 %% 112 107 #undef t … … 187 182 { "Resource_List.host", PBSDRMAA_ATTR_HOST, false }, 188 183 { "Resource_List.nodes", PBSDRMAA_ATTR_NODES, false }, 189 { "Resource_List.software", PBSDRMAA_ATTR_SOFTWARE, false },190 184 { "Resource_List.procs", PBSDRMAA_ATTR_PROCS, false }, 191 185 { "Resource_List.ncpus", PBSDRMAA_ATTR_NCPUS, false }, … … 221 215 { "submit_args", PBSDRMAA_ATTR_SUBMIT_ARGS, false }, 222 216 { "mtime", PBSDRMAA_ATTR_MTIME, false }, 217 { "pbsdrmaa.node_properties", PBSDRMAA_ATTR_NODE_PROPERTIES, false }, 218 { "pbsdrmaa.custom_resources", PBSDRMAA_ATTR_CUSTOM_RESOURCES, false }, 219 223 220 }; 224 221 -
trunk/pbs_drmaa/pbs_attrib.h
r47 r48 53 53 #define PBSDRMAA_PROCS "Resource_List.procs" 54 54 #define PBSDRMAA_NCPUS "Resource_List.ncpus" 55 #define PBSDRMAA_SOFTWARE "Resource_List.software"56 55 #define PBSDRMAA_MAIL_POINTS "Mail_Points" 57 56 #define PBSDRMAA_OUTPUT_PATH "Output_Path" … … 85 84 #define PBSDRMAA_SUBMIT_ARGS "submit_args" 86 85 #define PBSDRMAA_MTIME "mtime" 87 86 #define PBSDRMAA_NODE_PROPERTIES "pbsdrmaa.node_properties" 87 #define PBSDRMAA_CUSTOM_RESOURCES "pbsdrmaa.custom_resources" 88 88 89 89 typedef enum { … … 142 142 PBSDRMAA_ATTR_SUBMIT_ARGS, 143 143 PBSDRMAA_ATTR_MTIME, 144 PBSDRMAA_ATTR_NODE_PROPERTIES, 145 PBSDRMAA_ATTR_CUSTOM_RESOURCES, 144 146 145 147 PBSDRMAA_N_PBS_ATTRIBUTES -
trunk/pbs_drmaa/submit.c
r45 r48 49 49 #endif 50 50 51 static void 52 pbsdrmaa_submit_destroy( pbsdrmaa_submit_t *self ); 53 54 static char * 55 pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self ); 56 57 static void 58 pbsdrmaa_submit_eval( pbsdrmaa_submit_t *self ); 59 60 61 static void 62 pbsdrmaa_submit_set( pbsdrmaa_submit_t *self, const char *pbs_attr, 63 char *value, unsigned placeholders ); 51 static void pbsdrmaa_submit_destroy( pbsdrmaa_submit_t *self ); 52 53 static char *pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self ); 54 55 static void pbsdrmaa_submit_eval( pbsdrmaa_submit_t *self ); 56 57 static void pbsdrmaa_submit_set( pbsdrmaa_submit_t *self, const char *pbs_attr, char *value, unsigned placeholders ); 64 58 65 59 static void pbsdrmaa_submit_apply_defaults( pbsdrmaa_submit_t *self ); … … 101 95 self->apply_job_environment = pbsdrmaa_submit_apply_job_environment; 102 96 self->apply_email_notification = pbsdrmaa_submit_apply_email_notification; 103 self->apply_native_specification = 104 pbsdrmaa_submit_apply_native_specification; 97 self->apply_native_specification = pbsdrmaa_submit_apply_native_specification; 105 98 106 99 self->pbs_job_attributes = pbsdrmaa_pbs_template_new(); … … 143 136 TRY 144 137 { 145 constfsd_template_t *pbs_tmpl = self->pbs_job_attributes;146 unsignedi;138 fsd_template_t *pbs_tmpl = self->pbs_job_attributes; 139 int i; 147 140 int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count; 148 141 int sleep_time = 1; 149 142 150 for( i = 0; i < PBSDRMAA_N_PBS_ATTRIBUTES; i++ )143 for( i = PBSDRMAA_N_PBS_ATTRIBUTES - 1; i >= 0; i-- ) /* down loop -> start with custom resources */ 151 144 { 152 145 const char *name = pbs_tmpl->by_code( pbs_tmpl, i )->name; 153 if( name && name[0] != '!' && pbs_tmpl->get_attr( pbs_tmpl, name ) ) 146 const char *value = pbs_tmpl->get_attr( pbs_tmpl, name ); 147 148 if (!value) 149 continue; 150 151 if ( i == PBSDRMAA_ATTR_CUSTOM_RESOURCES) 154 152 { 155 const char *value; 156 157 value = pbs_tmpl->get_attr( pbs_tmpl, name ); 153 char *value_copy = fsd_strdup(value); 154 char *tok_comma_ctx = NULL; 155 char *res_token = NULL; 156 /* matlab:2,simulink:1 */ 157 158 for (res_token = strtok_r(value_copy, ",", &tok_comma_ctx); res_token; res_token = strtok_r(NULL, ",", &tok_comma_ctx)) 159 { 160 char *value_p = strstr(res_token, ":"); 161 162 if (value_p) 163 { 164 char *name_p = NULL; 165 *value_p = '\0'; 166 value_p++; 167 name_p = fsd_asprintf("Resource_List.%s",res_token); 168 pbs_attr = pbsdrmaa_add_attr( pbs_attr, name_p, value_p ); 169 fsd_free(name_p); 170 } 171 else 172 { 173 fsd_exc_raise_code( FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE ); 174 } 175 } 176 177 fsd_free(value_copy); 178 } 179 else if (i == PBSDRMAA_ATTR_NODE_PROPERTIES) 180 { 181 const char *nodes_value = pbs_tmpl->get_attr( pbs_tmpl, PBSDRMAA_NODES ); 182 char *final_value = NULL; 183 184 if (!nodes_value) 185 { 186 final_value = fsd_asprintf("%s:%s",nodes_value, value); 187 } 188 else 189 { 190 final_value = fsd_asprintf("1:%s", value); 191 } 192 193 pbs_tmpl->set_attr( pbs_tmpl, PBSDRMAA_NODES, final_value); 194 fsd_free(final_value); 195 } 196 else 197 { 158 198 pbs_attr = pbsdrmaa_add_attr( pbs_attr, name, value ); 159 199 } … … 448 488 } 449 489 490 491 450 492 void 451 493 pbsdrmaa_submit_apply_job_environment( pbsdrmaa_submit_t *self ) … … 471 513 472 514 if (env_v) 473 {515 { 474 516 ii = 0; 475 while (env_v[ii]) { 517 while (env_v[ii]) 518 { 476 519 len += strlen(env_v[ii]) + 1; 477 520 ii++; 478 }479 }521 } 522 } 480 523 481 524 len+= strlen("PBS_O_WORKDIR=") + strlen(wd); … … 531 574 } 532 575 533 static void parse_resources(fsd_template_t *pbs_attr,const char *resources) 576 static const char *get_job_env(pbsdrmaa_submit_t *self, const char *env_name) 577 { 578 const fsd_template_t *jt = self->job_template; 579 const char *const *env_v = jt->get_v_attr( jt, DRMAA_V_ENV); 580 int ii = 0; 581 582 while (env_v[ii]) 583 { 584 char *eq_p = strstr(env_v[ii], "="); 585 586 if ((eq_p) && (strncmp(env_v[ii], env_name, eq_p - env_v[ii]) == 0)) 587 return ++eq_p; 588 589 ii++; 590 } 591 592 return NULL; 593 } 594 595 static void parse_resources(pbsdrmaa_submit_t *self, fsd_template_t *pbs_attr,const char *resources) 534 596 { 535 597 char * volatile name = NULL; … … 550 612 name = fsd_asprintf("Resource_List.%s", arg); 551 613 value = ++psep; 552 pbs_attr->set_attr( pbs_attr, name , value ); 614 if (value[0] == '$' && get_job_env(self, value + 1)) 615 pbs_attr->set_attr( pbs_attr, name , get_job_env(self, value + 1) ); /*get value from job env variable */ 616 else 617 pbs_attr->set_attr( pbs_attr, name , value ); 553 618 fsd_free(name); 554 619 name = NULL; … … 600 665 const char *native_specification ) 601 666 { 602 667 fsd_log_enter(( "({native_specification=%s})", native_specification )); 603 668 604 669 if( native_specification == NULL ) … … 700 765 break; 701 766 case 'l' : 702 parse_resources( pbs_attr, arg);767 parse_resources( self, pbs_attr, arg); 703 768 break; 704 769 default :
Note: See TracChangeset
for help on using the changeset viewer.