Ignore:
Timestamp:
03/02/11 17:42:40 (9 years ago)
Author:
mmamonski
Message:

lost updates...

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/session.c

    r3 r7  
    4141 
    4242#include <pbs_drmaa/job.h> 
     43#include <pbs_drmaa/log_reader.h> 
    4344#include <pbs_drmaa/session.h> 
    4445#include <pbs_drmaa/submit.h> 
     
    218219                        struct stat statbuf; 
    219220                        char * volatile log_path; 
    220                         time_t t; 
    221  
     221                        struct tm tm; 
     222                         
    222223                        pbsself->pbs_home = pbs_home->val.string; 
    223224                        fsd_log_debug(("pbs_home: %s",pbsself->pbs_home)); 
     
    226227                        pbsself->wait_thread_log = true; 
    227228         
    228                         time(&t);        
    229                         localtime_r(&t,&pbsself->log_file_initial_time); 
     229                        time(&pbsself->log_file_initial_time);   
     230                        localtime_r(&pbsself->log_file_initial_time,&tm); 
    230231 
    231232                        if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 
    232233                                pbsself->pbs_home,        
    233                                 pbsself->log_file_initial_time.tm_year + 1900, 
    234                                 pbsself->log_file_initial_time.tm_mon + 1, 
    235                                 pbsself->log_file_initial_time.tm_mday)) == NULL) { 
     234                                tm.tm_year + 1900, 
     235                                tm.tm_mon + 1, 
     236                                        tm.tm_mday)) == NULL) { 
    236237                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 
    237238                        } 
     
    516517} 
    517518 
    518  
    519 enum field 
    520   { 
    521   FLD_DATE = 0, 
    522   FLD_EVENT = 1, 
    523   FLD_OBJ = 2, 
    524   FLD_TYPE = 3, 
    525   FLD_ID = 4, 
    526   FLD_MSG = 5 
    527   }; 
    528  
    529 enum field_msg 
    530   { 
    531   FLD_MSG_EXIT_STATUS = 0, 
    532   FLD_MSG_CPUT = 1, 
    533   FLD_MSG_MEM = 2, 
    534   FLD_MSG_VMEM = 3, 
    535   FLD_MSG_WALLTIME = 4 
    536   }; 
    537  
    538 #define FLD_MSG_STATUS "0010" 
    539 #define FLD_MSG_STATE "0008" 
    540 #define FLD_MSG_LOG "0002" 
    541  
    542 ssize_t fsd_getline(char * line,ssize_t size, int fd) 
    543 { 
    544         char buf; 
    545         char * ptr = NULL; 
    546         ssize_t n = 0, rc; 
    547         ptr = line; 
    548         for(n = 1; n< size; n++) 
    549         {                
    550                 if( (rc = read(fd,&buf,1 )) == 1) { 
    551                         *ptr++ = buf; 
    552                         if(buf == '\n') 
    553                         { 
    554                                 break; 
    555                         } 
    556                 } 
    557                 else if (rc == 0) { 
    558                         if (n == 1) 
    559                                 return 0; 
    560                         else 
    561                                 break; 
    562                 }                
    563                 else 
    564                         return -1;  
    565         } 
    566  
    567         return n; 
    568 }  
    569  
    570519void * 
    571520pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ) 
    572521{ 
    573         pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*) self; 
    574         fsd_job_t *volatile job = NULL; 
    575         pbsdrmaa_job_t *volatile pbsjob = NULL; 
    576         char job_id[256] = ""; 
    577         char event[256] = ""; 
    578         time_t t; 
    579         struct tm tm; 
    580  
    581         tm = pbsself->log_file_initial_time; 
    582  
     522        pbsdrmaa_log_reader_t *log_reader = NULL; 
     523         
    583524        fsd_log_enter(( "" )); 
    584         fsd_mutex_lock( &self->mutex ); 
     525         
    585526        TRY 
    586          {       
    587                 char * volatile log_path = NULL; 
    588                 char buffer[4096] = ""; 
    589                 bool volatile date_changed = true; 
    590                 int  volatile fd = -1; 
    591                 bool first_open = true; 
    592  
    593                 fsd_log_debug(("WT - reading log files")); 
    594  
    595                 while( self->wait_thread_run_flag ) 
    596                 TRY 
    597                  {                       
    598                         if(date_changed) 
    599                         { 
    600                                 int num_tries = 0; 
    601                                  
    602                                 time(&t);        
    603                                 localtime_r(&t,&tm); 
    604                          
    605                                 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 
    606                                 /* generate new date, close file and open new */ 
    607                                 if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 
    608                                         pbsself->pbs_home,        
    609                                         tm.tm_year + 1900, 
    610                                         tm.tm_mon + 1, 
    611                                         tm.tm_mday)) == NULL) { 
    612                                         fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 
    613                                 } 
    614  
    615                                 if(fd != -1) 
    616                                         close(fd); 
    617  
    618                                 fsd_log_debug(("Log file: %s",log_path)); 
    619                                  
    620                 retry: 
    621                                 if((fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES ) 
    622                                 { 
    623                                         fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread.")); 
    624                                         fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly")); 
    625                                         /*pbsself->super.enable_wait_thread = false;*/ /* run not wait_thread */ 
    626                                         pbsself->wait_thread_log = false; 
    627                                         pbsself->super.wait_thread = pbsself->super_wait_thread; 
    628                                         pbsself->super.wait_thread(self); 
    629                                 } else if ( fd == -1 ) { 
    630                                         fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 
    631                                         num_tries++; 
    632                                         sleep(5); 
    633                                         goto retry; 
    634                                 } 
    635  
    636                                 fsd_free(log_path); 
    637  
    638                                 fsd_log_debug(("Log file opened")); 
    639  
    640                                 if(first_open) { 
    641                                         fsd_log_debug(("Log file lseek")); 
    642                                         if(lseek(fd,pbsself->log_file_initial_size,SEEK_SET) == (off_t) -1) { 
    643                                                 char errbuf[256] = "InternalError"; 
    644                                                 (void)strerror_r(errno, errbuf, 256); 
    645                                                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf); 
    646                                         } 
    647                                         first_open = false; 
    648                                 } 
    649  
    650                                 date_changed = false; 
    651                         }                                
    652                          
    653                         while ((fsd_getline(buffer,sizeof(buffer),fd)) > 0)                      
    654                         { 
    655                                 const char *volatile ptr = buffer; 
    656                                 char field[256] = ""; 
    657                                 int volatile field_n = 0; 
    658                                 int n; 
    659  
    660                                 bool volatile job_id_match = false; 
    661                                 bool volatile event_match = false; 
    662                                 bool volatile log_event = false; 
    663                                 bool volatile log_match = false; 
    664                                 char *  temp_date = NULL; 
    665                                  
    666  
    667                                 struct batch_status status; 
    668                                 status.next = NULL; 
    669  
    670                                 if( strlcpy(job_id,"",sizeof(job_id)) > sizeof(job_id) ) { 
    671                                         fsd_log_error(("WT - strlcpy error")); 
    672                                 } 
    673                                 if( strlcpy(event,"",sizeof(event)) > sizeof(event) ) { 
    674                                         fsd_log_error(("WT - strlcpy error")); 
    675                                 } 
    676                                 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */ 
    677                                 { 
    678                                         if(field_n == FLD_DATE) 
    679                                         { 
    680                                                 temp_date = fsd_strdup(field); 
    681                                         } 
    682                                         else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 ||  
    683                                                                     strcmp(field,FLD_MSG_STATE) == 0 )) 
    684                                         { 
    685                                                 /* event described by log line*/ 
    686                                                 if(strlcpy(event, field,sizeof(event)) > sizeof(event)) { 
    687                                                         fsd_log_error(("WT - strlcpy error")); 
    688                                                 } 
    689                                                 event_match = true;                                                                      
    690                                         } 
    691                                         else if(event_match && field_n == FLD_ID) 
    692                                         {        
    693                                                 TRY 
    694                                                 {        
    695                                                         job = self->get_job( self, field ); 
    696                                                         pbsjob = (pbsdrmaa_job_t*) job; 
    697  
    698                                                         if( job ) 
    699                                                         { 
    700                                                                 if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) { 
    701                                                                         fsd_log_error(("WT - strlcpy error")); 
    702                                                                 } 
    703                                                                 fsd_log_debug(("WT - job_id: %s",job_id)); 
    704                                                                 status.name = fsd_strdup(job_id); 
    705                                                                 job_id_match = true; /* job_id is in drmaa */ 
    706                                                         } 
    707                                                         else  
    708                                                         { 
    709                                                                 fsd_log_debug(("WT - Unknown job: %s", field)); 
    710                                                         } 
    711                                                 } 
    712                                                 END_TRY  
    713                                         } 
    714                                         else if(job_id_match && field_n == FLD_MSG) 
    715                                         {                                                
    716                                                 /* parse msg - depends on FLD_EVENT*/ 
    717                                                 struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem, 
    718                                                         struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name;        
    719                                                  
    720                                                 bool state_running = false; 
    721  
    722                                                 struct_status.name = NULL; 
    723                                                 struct_status.value = NULL; 
    724                                                 struct_status.next = NULL; 
    725                                                 struct_status.resource = NULL; 
    726  
    727                                                 struct_state.name = NULL; 
    728                                                 struct_state.value = NULL; 
    729                                                 struct_state.next = NULL; 
    730                                                 struct_state.resource = NULL; 
    731  
    732                                                 struct_resource_cput.name = NULL; 
    733                                                 struct_resource_cput.value = NULL; 
    734                                                 struct_resource_cput.next = NULL; 
    735                                                 struct_resource_cput.resource = NULL; 
    736  
    737                                                 struct_resource_mem.name = NULL; 
    738                                                 struct_resource_mem.value = NULL; 
    739                                                 struct_resource_mem.next = NULL; 
    740                                                 struct_resource_mem.resource = NULL; 
    741  
    742                                                 struct_resource_vmem.name = NULL; 
    743                                                 struct_resource_vmem.value = NULL; 
    744                                                 struct_resource_vmem.next = NULL; 
    745                                                 struct_resource_vmem.resource = NULL; 
    746  
    747                                                 struct_resource_walltime.name = NULL; 
    748                                                 struct_resource_walltime.value = NULL; 
    749                                                 struct_resource_walltime.next = NULL; 
    750                                                 struct_resource_walltime.resource = NULL; 
    751  
    752                                                 struct_start_time.name = NULL; 
    753                                                 struct_start_time.value = NULL; 
    754                                                 struct_start_time.next = NULL; 
    755                                                 struct_start_time.resource = NULL; 
    756  
    757                                                 struct_mtime.name = NULL; 
    758                                                 struct_mtime.value = NULL; 
    759                                                 struct_mtime.next = NULL; 
    760                                                 struct_mtime.resource = NULL; 
    761  
    762                                                 struct_queue.name = NULL; 
    763                                                 struct_queue.value = NULL; 
    764                                                 struct_queue.next = NULL; 
    765                                                 struct_queue.resource = NULL; 
    766  
    767                                                 struct_account_name.name = NULL; 
    768                                                 struct_account_name.value = NULL; 
    769                                                 struct_account_name.next = NULL; 
    770                                                 struct_account_name.resource = NULL; 
    771  
    772                                                                  
    773                                                 if (strcmp(event,FLD_MSG_STATE) == 0)  
    774                                                 { 
    775                                                         /* job run, modified, queued etc */ 
    776                                                         int n = 0; 
    777                                                         status.attribs = &struct_state; 
    778                                                         struct_state.next = NULL; 
    779                                                         struct_state.name = "job_state"; 
    780                                                         if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/ 
    781                                                         { 
    782                                                                 n = 4;                                                           
    783                                                         }                
    784                                                         if(field[4] == 'M') { 
    785                                                                 struct tm temp_time_tm; 
    786                                                                 memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 
    787                                                                 temp_time_tm.tm_isdst = -1; 
    788  
    789                                                                 if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)  
    790                                                                  { 
    791                                                                         fsd_log_error(("failed to parse mtime: %s", temp_date)); 
    792                                                                  } 
    793                                                                 else 
    794                                                                  { 
    795                                                                         time_t temp_time = mktime(&temp_time_tm); 
    796                                                                         status.attribs = &struct_mtime;  
    797                                                                         struct_mtime.name = "mtime"; 
    798                                                                         struct_mtime.next = NULL; 
    799                                                                         struct_mtime.value = fsd_asprintf("%lu",temp_time); 
    800                                                                  } 
    801                                                         }                
    802                                                         /* != Job deleted and Job to be deleted*/ 
    803                                                         #ifdef PBS_PROFESSIONAL 
    804                                                         else if (field[4] != 't' && field[10] != 'd') { 
    805                                                         #else            
    806                                                         else if(field[4] != 'd') { 
    807                                                         #endif  
    808  
    809                                                                 if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */ 
    810                                                                         fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 
    811                                                                 } 
    812                                                                 if(struct_state.value[0] == 'R'){ 
    813                                                                         state_running = true; 
    814                                                                 } 
    815                                                         } 
    816                                                         else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/ 
    817                                                                 struct_status.name = "exit_status"; 
    818                                                                 struct_status.value = fsd_strdup("-1"); 
    819                                                                 struct_status.next = NULL; 
    820                                                                 struct_state.next = &struct_status; 
    821                                                                 struct_state.value = fsd_strdup("C");                                                            
    822                                                         } 
    823                                                 }                                                     
    824                                                 else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/ 
    825                                                 { 
    826                                                         /* exit status and rusage */ 
    827                                                         const char *ptr2 = field; 
    828                                                         char  msg[ 256 ] = ""; 
    829                                                         int n2; 
    830                                                         int msg_field_n = 0; 
    831                                                          
    832                                                         struct_resource_cput.name = "resources_used"; 
    833                                                         struct_resource_mem.name = "resources_used"; 
    834                                                         struct_resource_vmem.name = "resources_used"; 
    835                                                         struct_resource_walltime.name = "resources_used"; 
    836                                                         struct_status.name = "exit_status"; 
    837                                                         struct_state.name = "job_state"; 
    838                                  
    839                                                         status.attribs = &struct_resource_cput; 
    840                                                         struct_resource_cput.next = &struct_resource_mem; 
    841                                                         struct_resource_mem.next = &struct_resource_vmem; 
    842                                                         struct_resource_vmem.next = &struct_resource_walltime; 
    843                                                         struct_resource_walltime.next =  &struct_status; 
    844                                                         struct_status.next = &struct_state; 
    845                                                         struct_state.next = NULL; 
    846  
    847                                                         while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 
    848                                                          {                                               
    849                                                                 switch(msg_field_n)  
    850                                                                 { 
    851                                                                         case FLD_MSG_EXIT_STATUS: 
    852                                                                                 struct_status.value = fsd_strdup(strchr(msg,'=')+1); 
    853                                                                                 break; 
    854  
    855                                                                         case FLD_MSG_CPUT: 
    856                                                                                 struct_resource_cput.resource = "cput"; 
    857                                                                                 struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1); 
    858                                                                                 break; 
    859  
    860                                                                         case FLD_MSG_MEM: 
    861                                                                                 struct_resource_mem.resource = "mem"; 
    862                                                                                 struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1); 
    863                                                                                 break; 
    864  
    865                                                                         case FLD_MSG_VMEM: 
    866                                                                                 struct_resource_vmem.resource = "vmem"; 
    867                                                                                 struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1); 
    868                                                                                 break;  
    869  
    870                                                                         case FLD_MSG_WALLTIME: 
    871                                                                                 struct_resource_walltime.resource = "walltime"; 
    872                                                                                 struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1); 
    873                                                                                 break;  
    874                                                                 } 
    875                                                                
    876                                                                 ptr2 += n2;  
    877                                                                 msg_field_n++; 
    878                                                                 if ( *ptr2 != ' ' ) 
    879                                                                  { 
    880                                                                          break;  
    881                                                                  } 
    882                                                                 ++ptr2;                                          
    883                                                          } 
    884                                                         struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */ 
    885                                                 }                                                
    886  
    887                                                 if ( state_running ) 
    888                                                  { 
    889                                                         fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); 
    890                                                         job->update_status( job ); 
    891                                                  } 
    892                                                 else 
    893                                                  { 
    894                                                         fsd_log_debug(("WT - updating job: %s", job->job_id )); 
    895                                                         pbsjob->update( job, &status ); 
    896                                                  } 
    897  
    898                                  
    899                                                 fsd_cond_broadcast( &job->status_cond); 
    900                                                 fsd_cond_broadcast( &self->wait_condition ); 
    901  
    902                                                 if ( job ) 
    903                                                         job->release( job ); 
     527        {        
     528                log_reader = pbsdrmaa_log_reader_new( self, NULL); 
     529                log_reader->read_log( log_reader ); 
     530        } 
     531        FINALLY 
     532        { 
     533                pbsdrmaa_log_reader_destroy( log_reader ); 
     534        } 
     535        END_TRY 
    904536         
    905                                                 fsd_free(struct_resource_cput.value); 
    906                                                 fsd_free(struct_resource_mem.value); 
    907                                                 fsd_free(struct_resource_vmem.value); 
    908                                                 fsd_free(struct_resource_walltime.value); 
    909                                                 fsd_free(struct_status.value); 
    910                                                 fsd_free(struct_state.value); 
    911                                                 fsd_free(struct_start_time.value); 
    912                                                 fsd_free(struct_mtime.value); 
    913                                                 fsd_free(struct_queue.value); 
    914                                                 fsd_free(struct_account_name.value); 
    915  
    916                                                 if ( status.name!=NULL )  
    917                                                         fsd_free(status.name); 
    918                                         } 
    919                                         else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0) 
    920                                         { 
    921                                                 log_event = true;                                        
    922                                         } 
    923                                         else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 ) 
    924                                         { 
    925                                                 log_match = true; 
    926                                                 log_event = false; 
    927                                         } 
    928                                         else if( log_match && field_n == FLD_MSG &&  
    929                                                 field[0] == 'L' &&  
    930                                                 field[1] == 'o' &&  
    931                                                 field[2] == 'g' &&  
    932                                                 field[3] == ' ' &&  
    933                                                 field[4] == 'c' &&  
    934                                                 field[5] == 'l' &&  
    935                                                 field[6] == 'o' &&  
    936                                                 field[7] == 's' &&  
    937                                                 field[8] == 'e' &&  
    938                                                 field[9] == 'd' )  /* last field in the file - strange bahaviour*/ 
    939                                         { 
    940                                                 fsd_log_debug(("WT - Date changed. Closing log file")); 
    941                                                 date_changed = true; 
    942                                                 log_match = false; 
    943                                         } 
    944                                          
    945                                         ptr += n;  
    946                                         if ( *ptr != ';' ) 
    947                                         { 
    948                                                 break; /* end of line */ 
    949                                         } 
    950                                         field_n++; 
    951                                         ++ptr; 
    952                                 }                
    953  
    954                                 if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) { 
    955                                         fsd_log_error(("WT - strlcpy error")); 
    956                                 } 
    957  
    958                                 fsd_free(temp_date);                     
    959                         } 
    960  
    961                         fsd_mutex_unlock( &self->mutex );        
    962                         usleep(1000000); 
    963                         fsd_mutex_lock( &self->mutex ); 
    964                  } 
    965                 EXCEPT_DEFAULT 
    966                  { 
    967                         const fsd_exc_t *e = fsd_exc_get(); 
    968  
    969                         fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) )); 
    970                         fsd_exc_reraise(); 
    971                  } 
    972                 END_TRY 
    973  
    974                 if(fd != -1) 
    975                         close(fd); 
    976                 fsd_log_debug(("Log file closed")); 
    977          } 
    978         FINALLY 
    979          { 
    980                 fsd_log_debug(("WT - Terminated."));     
    981                 fsd_mutex_unlock( &self->mutex ); 
    982          } 
    983         END_TRY 
    984  
    985537        fsd_log_return(( " =NULL" )); 
    986538        return NULL; 
Note: See TracChangeset for help on using the changeset viewer.