Changeset 8 for trunk/pbs_drmaa


Ignore:
Timestamp:
03/02/11 22:08:19 (13 years ago)
Author:
mmatloka
Message:

log reader improvements

Location:
trunk/pbs_drmaa
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/job.h

    r1 r8  
    4242}; 
    4343 
     44void 
     45pbsdrmaa_job_on_missing_standard( fsd_job_t *self ); 
     46 
    4447#endif /* __PBS_DRMAA__JOB_H */ 
    4548 
  • trunk/pbs_drmaa/log_reader.c

    r7 r8  
    5353 
    5454static void 
    55 pbsdrmaa_chose_file_wait_thread ( pbsdrmaa_log_reader_t * self); 
     55pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self); 
    5656 
    5757static ssize_t 
    58 pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size ); 
     58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
    5959 
    6060static void 
    61 pbsdrmaa_chose_file_job_on_missing ( pbsdrmaa_log_reader_t * self ); 
     61pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self ); 
    6262 
    6363static ssize_t 
    64 pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size ); 
     64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
    6565 
    6666int  
     
    8787                        self->job = job; 
    8888                        self->name = "Job_on_missing"; 
    89                         self->chose_file = pbsdrmaa_chose_file_job_on_missing; 
     89                        self->select_file = pbsdrmaa_select_file_job_on_missing; 
    9090                        self->read_line = pbsdrmaa_read_line_job_on_missing; 
    9191                } 
     
    9494                        self->job = NULL; 
    9595                        self->name = "WT"; 
    96                         self->chose_file = pbsdrmaa_chose_file_wait_thread; 
     96                        self->select_file = pbsdrmaa_select_file_wait_thread; 
    9797                        self->read_line = pbsdrmaa_read_line_wait_thread; 
    9898                }                
     
    179179        if(self->job == NULL) 
    180180                fsd_mutex_lock( &self->session->mutex ); 
    181         /*else 
    182                 fsd_mutex_lock( &self->job->mutex );*/ 
     181 
    183182        TRY 
    184183        {                
     
    186185                TRY 
    187186                { 
    188                         char buffer[4096] = "";          
     187                        char line[4096] = ""; 
     188                        char buffer[4096] = ""; 
     189                        int idx = 0, end_idx = 0, line_idx = 0; 
    189190                         
    190                         self->chose_file(self); 
    191  
    192                         while ((self->read_line(self, buffer, sizeof(buffer))) > 0)                      
     191                        self->select_file(self); 
     192 
     193                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                          
    193194                        { 
    194                                 const char *volatile ptr = buffer; 
     195                                const char *volatile ptr = line; 
    195196                                char field[256] = ""; 
    196197                                char job_id[256] = ""; 
     
    198199                                int volatile field_n = 0; 
    199200                                int n; 
    200  
     201                                 
    201202                                bool volatile job_id_match = false; 
    202203                                bool volatile event_match = false; 
     
    286287                                                bool state_running = false; 
    287288 
    288                                                 struct_status.name = NULL; 
    289                                                 struct_status.value = NULL; 
    290                                                 struct_status.next = NULL; 
    291                                                 struct_status.resource = NULL; 
    292  
    293                                                 struct_state.name = NULL; 
    294                                                 struct_state.value = NULL; 
    295                                                 struct_state.next = NULL; 
    296                                                 struct_state.resource = NULL; 
    297  
    298                                                 struct_resource_cput.name = NULL; 
    299                                                 struct_resource_cput.value = NULL; 
    300                                                 struct_resource_cput.next = NULL; 
    301                                                 struct_resource_cput.resource = NULL; 
    302  
    303                                                 struct_resource_mem.name = NULL; 
    304                                                 struct_resource_mem.value = NULL; 
    305                                                 struct_resource_mem.next = NULL; 
    306                                                 struct_resource_mem.resource = NULL; 
    307  
    308                                                 struct_resource_vmem.name = NULL; 
    309                                                 struct_resource_vmem.value = NULL; 
    310                                                 struct_resource_vmem.next = NULL; 
    311                                                 struct_resource_vmem.resource = NULL; 
    312  
    313                                                 struct_resource_walltime.name = NULL; 
    314                                                 struct_resource_walltime.value = NULL; 
    315                                                 struct_resource_walltime.next = NULL; 
    316                                                 struct_resource_walltime.resource = NULL; 
    317  
    318                                                 struct_start_time.name = NULL; 
    319                                                 struct_start_time.value = NULL; 
    320                                                 struct_start_time.next = NULL; 
    321                                                 struct_start_time.resource = NULL; 
    322  
    323                                                 struct_mtime.name = NULL; 
    324                                                 struct_mtime.value = NULL; 
    325                                                 struct_mtime.next = NULL; 
    326                                                 struct_mtime.resource = NULL; 
    327  
    328                                                 struct_queue.name = NULL; 
    329                                                 struct_queue.value = NULL; 
    330                                                 struct_queue.next = NULL; 
    331                                                 struct_queue.resource = NULL; 
    332  
    333                                                 struct_account_name.name = NULL; 
    334                                                 struct_account_name.value = NULL; 
    335                                                 struct_account_name.next = NULL; 
    336                                                 struct_account_name.resource = NULL; 
     289                                                memset(&struct_status,0,sizeof(struct attrl)); /**/ 
     290                                                memset(&struct_state,0,sizeof(struct attrl)); 
     291                                                memset(&struct_resource_cput,0,sizeof(struct attrl)); 
     292                                                memset(&struct_resource_mem,0,sizeof(struct attrl)); 
     293                                                memset(&struct_resource_vmem,0,sizeof(struct attrl)); 
     294                                                memset(&struct_resource_walltime,0,sizeof(struct attrl)); 
     295                                                memset(&struct_start_time,0,sizeof(struct attrl)); 
     296                                                memset(&struct_mtime,0,sizeof(struct attrl)); 
     297                                                memset(&struct_queue,0,sizeof(struct attrl)); 
     298                                                memset(&struct_account_name,0,sizeof(struct attrl)); 
    337299                                                                 
    338300                                                if (strcmp(event,FLD_MSG_STATE) == 0)  
     
    507469                                                log_event = false; 
    508470                                        } 
    509                                         else if( self->job == NULL && log_match && field_n == FLD_MSG &&  
    510                                                 field[0] == 'L' &&  
    511                                                 field[1] == 'o' &&  
    512                                                 field[2] == 'g' &&  
    513                                                 field[3] == ' ' &&  
    514                                                 field[4] == 'c' &&  
    515                                                 field[5] == 'l' &&  
    516                                                 field[6] == 'o' &&  
    517                                                 field[7] == 's' &&  
    518                                                 field[8] == 'e' &&  
    519                                                 field[9] == 'd' )  /* last field in the file - strange bahaviour*/ 
     471                                        else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0)  
    520472                                        { 
    521473                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name)); 
     
    532484                                        ++ptr; 
    533485                                }                
    534  
    535                                 if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) { 
    536                                         fsd_log_error(("%s - strlcpy error",self->name)); 
    537                                 } 
    538486 
    539487                                fsd_free(temp_date);                     
     
    577525 
    578526void 
    579 pbsdrmaa_chose_file_wait_thread ( pbsdrmaa_log_reader_t * self ) 
     527pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self ) 
    580528{ 
    581529        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 
     
    648596 
    649597ssize_t 
    650 pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size ) 
    651 { 
    652         return fsd_getline(buffer,size,self->fd); 
     598pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 
     599{ 
     600        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx); 
    653601} 
    654602 
     
    663611 
    664612void 
    665 pbsdrmaa_chose_file_job_on_missing( pbsdrmaa_log_reader_t * self ) 
     613pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self ) 
    666614{ 
    667615        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;    
     
    769717 
    770718ssize_t 
    771 pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size ) 
    772 { 
    773         int n = fsd_getline(buffer,size,self->fd); 
     719pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 
     720{ 
     721        int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx); 
    774722         
    775723        if(n >= 0) 
     
    789737        char *rest = NULL; 
    790738        char *token = NULL; 
    791         char *ptr = strdup(s1); 
     739        char *ptr = fsd_strdup(s1); 
    792740        token = strtok_r(ptr, ".", &rest); 
    793741        job1 = atoi(token); 
     
    795743        fsd_free(token); 
    796744         
    797         ptr = strdup(s2); 
     745        ptr = fsd_strdup(s2); 
    798746        token = strtok_r(ptr,".",&rest); 
    799747        job2 = atoi(token); 
  • trunk/pbs_drmaa/log_reader.h

    r7 r8  
    4444         
    4545        void (* 
    46         chose_file) ( pbsdrmaa_log_reader_t * self ); 
     46        select_file) ( pbsdrmaa_log_reader_t * self ); 
    4747         
     48        /* line - read line, buffer - keeps read but not returned lines, idx, end_idx and line_idx values needed to be kept outside the function */ 
    4849        ssize_t (* 
    49         read_line) ( pbsdrmaa_log_reader_t * self , char * buffer , ssize_t size ); 
     50        read_line) ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
    5051         
    5152        /* specifies if function should run */ 
  • trunk/pbs_drmaa/util.c

    r7 r8  
    286286}  
    287287 
     288ssize_t fsd_getline_buffered(char * line,char * buf, ssize_t size, int fd, int * idx, int * end_idx, int * line_idx) 
     289{ 
     290        int i = -1; 
     291        int rc = -1; 
     292 
     293        memset(line,0,size); 
     294         
     295start: 
     296        /* idx - start of data to parse (in buffer) 
     297           end_idx - end of data read from log (in buffer) 
     298           line_idx - place to write data in output line */ 
     299        if(*idx < *end_idx) 
     300        { 
     301                /* take line from buffer */ 
     302                for(i = *idx; i<= *end_idx;i++) 
     303                {                
     304                        if(buf[i] == '\n') 
     305                        { 
     306                                int tmp = i - *idx; 
     307                                strncpy(line + *line_idx,buf + *idx,tmp);                                
     308                                *idx = i + 1; 
     309                         
     310                                tmp+= *line_idx; 
     311                                *line_idx = 0; 
     312                                 
     313                                return tmp; 
     314                        } 
     315                } 
     316                 
     317                /* there was no '\n' so next part of log needs to be read. save lines beginning */ 
     318                if(*line_idx + i - *idx > size ) 
     319                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Line longer than %d unsupported",size); 
     320                 
     321                strncpy(line + *line_idx,buf + *idx,i - *idx); 
     322                *line_idx += i - *idx; 
     323                *idx = 0; 
     324                *end_idx = 0; 
     325                goto start; 
     326        } 
     327        else 
     328        {                
     329                /* read log */ 
     330                if((rc = read(fd,buf,size)) > 0) 
     331                {                
     332                        *end_idx = rc - 1; 
     333                        *idx = 0; 
     334                        goto start; 
     335                } 
     336                else if (rc == 0)  
     337                        return 0; 
     338                else 
     339                        return -1; 
     340        } 
     341}  
     342 
  • trunk/pbs_drmaa/util.h

    r7 r8  
    4646fsd_getline(char * line,ssize_t size, int fd); 
    4747 
     48ssize_t  
     49fsd_getline_buffered(char * line,char * buf, ssize_t size, int fd, int * idx, int * end_idx, int * line_idx); 
     50 
    4851#endif /* __PBS_DRMAA__UTIL_H */ 
    4952 
Note: See TracChangeset for help on using the changeset viewer.