Changeset 48


Ignore:
Timestamp:
12/07/11 18:00:32 (12 years ago)
Author:
mmamonski
Message:

node_properties and custom_resources support. Now one can request resources via env variable

Location:
trunk/pbs_drmaa
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/log_reader.c

    r45 r48  
    7272static void pbsdrmaa_read_log(); 
    7373 
    74 static void pbsdrmaa_select_file_wait_thread( pbsdrmaa_log_reader_t * self); 
    75  
    76 char *pbsdrmaa_read_line_wait_thread( pbsdrmaa_log_reader_t * self); 
     74static void pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self); 
     75 
     76static void pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self); 
     77 
     78static void pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self); 
    7779 
    7880static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size); 
     
    141143                self->session = session; 
    142144 
    143                 self->select_file = pbsdrmaa_select_file_wait_thread; 
     145                self->select_file = pbsdrmaa_select_file; 
    144146                self->read_log = pbsdrmaa_read_log;      
     147                self->close = pbsdrmaa_close_log; 
     148                self->reopen = pbsdrmaa_reopen_log; 
    145149                 
    146150                self->run_flag = true; 
     
    148152                self->date_changed = true; 
    149153                self->first_open = true; 
     154                self->log_path = NULL; 
     155                self->current_offset = 0; 
    150156                 
    151157        } 
     
    458464                                fsd_mutex_unlock( &self->session->mutex ); 
    459465 
     466                                /* close */ 
     467                                self->close(self); 
     468 
    460469                                sleep(((pbsdrmaa_session_t *)self->session)->wait_thread_sleep_time); 
     470 
     471                                /* and reopen log file */ 
     472                                self->reopen(self); 
    461473 
    462474                                fsd_mutex_lock( &self->session->mutex ); 
     
    490502 
    491503void 
    492 pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self ) 
     504pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self ) 
    493505{ 
    494506        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 
     
    496508        if (self->date_changed) 
    497509         { 
    498                 char * log_path = NULL; 
    499510                int num_tries = 0; 
    500511                struct tm tm;  
     
    511522                #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 
    512523                /* generate new date, close file and open new */ 
    513                 log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 
     524                if (self->log_path) 
     525                        fsd_free(self->log_path); 
     526 
     527                self->log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 
    514528 
    515529                if(self->fhandle) 
    516530                        fclose(self->fhandle); 
    517531 
    518                 fsd_log_info(("Opening log file: %s",log_path)); 
     532                fsd_log_info(("Opening log file: %s",self->log_path)); 
    519533                                 
    520534        retry: 
    521                 if ((self->fhandle = fopen(log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open)) 
     535                if ((self->fhandle = fopen(self->log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open)) 
    522536                 { 
    523                         fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread.")); 
     537                        fsd_log_error(("Can't open log file: %s. Verify pbs_home. Running standard wait_thread.", self->log_path)); 
    524538                        fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status")); 
    525539                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */ 
     
    530544                else if ( self->fhandle == NULL ) 
    531545                 { /* Torque seems not to create a new file immediately after the old one is closed */ 
    532                         fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 
     546                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", self->log_path, num_tries)); 
    533547                        num_tries++; 
    534548                        sleep(2 * num_tries); 
    535549                        goto retry; 
    536550                 } 
    537  
    538                 fsd_free(log_path); 
    539551 
    540552                fsd_log_debug(("Log file opened")); 
     
    653665} 
    654666 
    655  
     667void 
     668pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self ) 
     669{ 
     670 
     671        self->current_offset = ftello(self->fhandle); 
     672 
     673        fclose(self->fhandle); 
     674 
     675        self->fhandle = NULL; 
     676} 
     677 
     678void 
     679pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self ) 
     680{ 
     681        if ((self->fhandle = fopen(self->log_path,"r")) == NULL) 
     682         { 
     683                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to reopen log file"); 
     684         } 
     685 
     686        if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1) 
     687         { 
     688                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 
     689         } 
     690} 
     691 
  • trunk/pbs_drmaa/log_reader.h

    r29 r48  
    4545        void (*select_file) ( pbsdrmaa_log_reader_t * self ); 
    4646         
     47        void (*close) ( pbsdrmaa_log_reader_t * self ); 
     48 
     49        void (*reopen) ( pbsdrmaa_log_reader_t * self ); 
     50 
     51 
    4752        /* determines if function should run */ 
    4853        bool run_flag; 
     
    5964        /* for wait_thread - log file first open */ 
    6065        bool volatile first_open;        
     66 
     67        char *volatile log_path; 
     68 
     69        off_t volatile current_offset; 
    6170}; 
    6271 
  • trunk/pbs_drmaa/pbs_attrib.gperf

    r47 r48  
    6868Resource_List.host, t(PBSDRMAA_ATTR_HOST) 
    6969Resource_List.nodes, t(PBSDRMAA_ATTR_NODES) 
    70 Resource_List.software, t(PBSDRMAA_ATTR_SOFTWARE) 
    7170Resource_List.procs, t(PBSDRMAA_ATTR_PROCS) 
    7271Resource_List.ncpus, t(PBSDRMAA_ATTR_NCPUS) 
     
    103102submit_args, t(PBSDRMAA_ATTR_SUBMIT_ARGS) 
    104103mtime, t(PBSDRMAA_ATTR_MTIME) 
    105 # reservation attributes: 
    106 # reserve_start,       t(PBSDRMAA_ATTR_RESERVATION_START) 
    107 # reserve_end,         t(PBSDRMAA_ATTR_RESERVATION_END) 
    108 # reserve_duration,    t(PBSDRMAA_ATTR_RESERVATION_DURATION) 
    109 # reserve_state,       t(PBSDRMAA_ATTR_RESERVATION_STATE) 
    110 # reserve_substate,    t(PBSDRMAA_ATTR_RESERVATION_SUBSTATE) 
     104pbsdrmaa.node_properties, t(PBSDRMAA_ATTR_NODE_PROPERTIES) 
     105pbsdrmaa.custom_resources, t(PBSDRMAA_ATTR_CUSTOM_RESOURCES) 
    111106%% 
    112107#undef t 
     
    187182        { "Resource_List.host", PBSDRMAA_ATTR_HOST, false }, 
    188183        { "Resource_List.nodes", PBSDRMAA_ATTR_NODES, false }, 
    189         { "Resource_List.software", PBSDRMAA_ATTR_SOFTWARE, false }, 
    190184        { "Resource_List.procs", PBSDRMAA_ATTR_PROCS, false }, 
    191185        { "Resource_List.ncpus", PBSDRMAA_ATTR_NCPUS, false }, 
     
    221215        { "submit_args", PBSDRMAA_ATTR_SUBMIT_ARGS, false }, 
    222216        { "mtime", PBSDRMAA_ATTR_MTIME, false }, 
     217        { "pbsdrmaa.node_properties", PBSDRMAA_ATTR_NODE_PROPERTIES, false }, 
     218        { "pbsdrmaa.custom_resources", PBSDRMAA_ATTR_CUSTOM_RESOURCES, false }, 
     219         
    223220}; 
    224221 
  • trunk/pbs_drmaa/pbs_attrib.h

    r47 r48  
    5353#define PBSDRMAA_PROCS                  "Resource_List.procs" 
    5454#define PBSDRMAA_NCPUS                  "Resource_List.ncpus" 
    55 #define PBSDRMAA_SOFTWARE               "Resource_List.software" 
    5655#define PBSDRMAA_MAIL_POINTS            "Mail_Points" 
    5756#define PBSDRMAA_OUTPUT_PATH            "Output_Path" 
     
    8584#define PBSDRMAA_SUBMIT_ARGS            "submit_args" 
    8685#define PBSDRMAA_MTIME                  "mtime" 
    87  
     86#define PBSDRMAA_NODE_PROPERTIES        "pbsdrmaa.node_properties" 
     87#define PBSDRMAA_CUSTOM_RESOURCES       "pbsdrmaa.custom_resources" 
    8888 
    8989typedef enum { 
     
    142142        PBSDRMAA_ATTR_SUBMIT_ARGS, 
    143143        PBSDRMAA_ATTR_MTIME, 
     144        PBSDRMAA_ATTR_NODE_PROPERTIES, 
     145        PBSDRMAA_ATTR_CUSTOM_RESOURCES, 
    144146 
    145147        PBSDRMAA_N_PBS_ATTRIBUTES 
  • trunk/pbs_drmaa/submit.c

    r45 r48  
    4949#endif 
    5050 
    51 static void 
    52 pbsdrmaa_submit_destroy( pbsdrmaa_submit_t *self ); 
    53  
    54 static char * 
    55 pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self ); 
    56  
    57 static void 
    58 pbsdrmaa_submit_eval( pbsdrmaa_submit_t *self ); 
    59  
    60  
    61 static void 
    62 pbsdrmaa_submit_set( pbsdrmaa_submit_t *self, const char *pbs_attr, 
    63                 char *value, unsigned placeholders ); 
     51static void pbsdrmaa_submit_destroy( pbsdrmaa_submit_t *self ); 
     52 
     53static char *pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self ); 
     54 
     55static void pbsdrmaa_submit_eval( pbsdrmaa_submit_t *self ); 
     56 
     57static void pbsdrmaa_submit_set( pbsdrmaa_submit_t *self, const char *pbs_attr, char *value, unsigned placeholders ); 
    6458 
    6559static void pbsdrmaa_submit_apply_defaults( pbsdrmaa_submit_t *self ); 
     
    10195                self->apply_job_environment = pbsdrmaa_submit_apply_job_environment; 
    10296                self->apply_email_notification = pbsdrmaa_submit_apply_email_notification; 
    103                 self->apply_native_specification = 
    104                         pbsdrmaa_submit_apply_native_specification; 
     97                self->apply_native_specification = pbsdrmaa_submit_apply_native_specification; 
    10598 
    10699                self->pbs_job_attributes = pbsdrmaa_pbs_template_new(); 
     
    143136        TRY 
    144137         { 
    145                 const fsd_template_t *pbs_tmpl = self->pbs_job_attributes; 
    146                 unsigned i; 
     138                fsd_template_t *pbs_tmpl = self->pbs_job_attributes; 
     139                int i; 
    147140                int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count; 
    148141                int sleep_time = 1; 
    149142 
    150                 for( i = 0;  i < PBSDRMAA_N_PBS_ATTRIBUTES;  i++ ) 
     143                for( i = PBSDRMAA_N_PBS_ATTRIBUTES - 1; i >= 0; i-- ) /* down loop -> start with custom resources */ 
    151144                 { 
    152145                        const char *name = pbs_tmpl->by_code( pbs_tmpl, i )->name; 
    153                         if( name  &&  name[0] != '!' && pbs_tmpl->get_attr( pbs_tmpl, name ) ) 
     146                        const char *value = pbs_tmpl->get_attr( pbs_tmpl, name ); 
     147 
     148                        if (!value) 
     149                                continue; 
     150 
     151                        if ( i == PBSDRMAA_ATTR_CUSTOM_RESOURCES) 
    154152                         { 
    155                                 const char *value; 
    156  
    157                                 value = pbs_tmpl->get_attr( pbs_tmpl, name ); 
     153                                char *value_copy = fsd_strdup(value); 
     154                                char *tok_comma_ctx = NULL; 
     155                                char *res_token = NULL; 
     156                                /* matlab:2,simulink:1 */ 
     157 
     158                                for (res_token = strtok_r(value_copy, ",", &tok_comma_ctx); res_token; res_token = strtok_r(NULL, ",", &tok_comma_ctx)) 
     159                                 { 
     160                                        char *value_p = strstr(res_token, ":"); 
     161 
     162                                        if (value_p) 
     163                                         { 
     164                                                char *name_p = NULL; 
     165                                                *value_p = '\0'; 
     166                                                value_p++; 
     167                                                name_p = fsd_asprintf("Resource_List.%s",res_token); 
     168                                                pbs_attr = pbsdrmaa_add_attr( pbs_attr, name_p, value_p ); 
     169                                                fsd_free(name_p); 
     170                                         } 
     171                                        else 
     172                                         { 
     173                                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE ); 
     174                                         } 
     175                                 } 
     176 
     177                                fsd_free(value_copy); 
     178                         } 
     179                        else if (i == PBSDRMAA_ATTR_NODE_PROPERTIES) 
     180                         { 
     181                                const char *nodes_value = pbs_tmpl->get_attr( pbs_tmpl, PBSDRMAA_NODES ); 
     182                                char *final_value = NULL; 
     183 
     184                                if (!nodes_value) 
     185                                 { 
     186                                        final_value = fsd_asprintf("%s:%s",nodes_value, value); 
     187                                 } 
     188                                else 
     189                                 { 
     190                                        final_value = fsd_asprintf("1:%s", value); 
     191                                 } 
     192 
     193                                pbs_tmpl->set_attr( pbs_tmpl, PBSDRMAA_NODES, final_value); 
     194                                fsd_free(final_value); 
     195                         } 
     196                        else 
     197                         { 
    158198                                pbs_attr = pbsdrmaa_add_attr( pbs_attr, name, value ); 
    159199                         } 
     
    448488} 
    449489 
     490 
     491 
    450492void 
    451493pbsdrmaa_submit_apply_job_environment( pbsdrmaa_submit_t *self ) 
     
    471513 
    472514        if (env_v) 
    473         { 
     515         { 
    474516                ii = 0; 
    475                 while (env_v[ii]) { 
     517                while (env_v[ii]) 
     518                 { 
    476519                        len += strlen(env_v[ii]) + 1; 
    477520                        ii++; 
    478                 } 
    479         } 
     521                 } 
     522         } 
    480523         
    481524        len+= strlen("PBS_O_WORKDIR=") + strlen(wd); 
     
    531574} 
    532575 
    533 static void parse_resources(fsd_template_t *pbs_attr,const char *resources) 
     576static const char *get_job_env(pbsdrmaa_submit_t *self, const char *env_name) 
     577{ 
     578        const fsd_template_t *jt = self->job_template; 
     579        const char *const *env_v = jt->get_v_attr( jt, DRMAA_V_ENV); 
     580        int ii = 0; 
     581 
     582        while (env_v[ii]) 
     583         { 
     584                char *eq_p = strstr(env_v[ii], "="); 
     585 
     586                if ((eq_p) && (strncmp(env_v[ii], env_name, eq_p - env_v[ii]) == 0)) 
     587                                return ++eq_p; 
     588 
     589                ii++; 
     590         } 
     591 
     592        return NULL; 
     593} 
     594 
     595static void parse_resources(pbsdrmaa_submit_t *self, fsd_template_t *pbs_attr,const char *resources) 
    534596{ 
    535597        char * volatile name = NULL; 
     
    550612                                name = fsd_asprintf("Resource_List.%s", arg); 
    551613                                value = ++psep; 
    552                                 pbs_attr->set_attr( pbs_attr, name , value ); 
     614                                if (value[0] == '$' && get_job_env(self, value + 1)) 
     615                                        pbs_attr->set_attr( pbs_attr, name , get_job_env(self, value + 1) ); /*get value from job env variable */ 
     616                                else 
     617                                        pbs_attr->set_attr( pbs_attr, name , value ); 
    553618                                fsd_free(name); 
    554619                                name = NULL; 
     
    600665                const char *native_specification ) 
    601666{ 
    602         fsd_log_enter(( "({native_specification=%s})", native_specification )); 
     667        fsd_log_enter(( "({native_specification=%s})", native_specification )); 
    603668 
    604669        if( native_specification == NULL ) 
     
    700765                                                        break; 
    701766                                                case 'l' : 
    702                                                         parse_resources(pbs_attr, arg); 
     767                                                        parse_resources( self, pbs_attr, arg); 
    703768                                                        break;                                                   
    704769                                                default : 
Note: See TracChangeset for help on using the changeset viewer.