Changeset 29 for trunk


Ignore:
Timestamp:
10/17/11 01:49:55 (12 years ago)
Author:
mmamonski
Message:

log reder reStructured

Location:
trunk/pbs_drmaa
Files:
1 deleted
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/job.c

    r26 r29  
    5959pbsdrmaa_job_on_missing_standard( fsd_job_t *self ); 
    6060 
    61 void 
    62 pbsdrmaa_job_on_missing_log_based( fsd_job_t *self ); 
    63  
    6461static void 
    6562pbsdrmaa_job_update( fsd_job_t *self, struct batch_status* ); 
    66  
    67 bool 
    68 pbsdrmaa_job_update_status_accounting( fsd_job_t *self ); 
    6963 
    7064 
     
    254248                else if( self->state < DRMAA_PS_DONE ) 
    255249                 { 
    256 #ifndef PBS_PROFESSIONAL 
    257                         /*best effort call*/ 
    258                         if (pbsdrmaa_job_update_status_accounting(self) == false) 
    259                                 self->on_missing( self ); 
    260 #else 
    261250                        self->on_missing( self ); 
    262 #endif 
    263251                 } 
    264252         } 
     
    305293                                break; 
    306294                        case PBSDRMAA_ATTR_EXIT_STATUS: 
    307                                 exit_status = atoi( i->value ); 
     295                                exit_status = fsd_atoi( i->value ); 
    308296                                break; 
    309297                        case PBSDRMAA_ATTR_RESOURCES_USED: 
     
    453441                pbsdrmaa_job_on_missing_standard( self );        
    454442        else 
    455                 pbsdrmaa_job_on_missing_log_based( self );       
     443                pbsdrmaa_job_on_missing_standard( self ); /* TODO: try to provide implementation that uses accounting/server log files */ 
    456444} 
    457445 
     
    506494} 
    507495 
    508 void 
    509 pbsdrmaa_job_on_missing_log_based( fsd_job_t *self ) 
    510 { 
    511         fsd_drmaa_session_t *session = self->session; 
    512         pbsdrmaa_log_reader_t *log_reader = NULL; 
    513          
    514         fsd_log_enter(( "({job_id=%s})", self->job_id )); 
    515         fsd_log_info(( "Job %s missing from DRM queue", self->job_id )); 
    516          
    517         TRY 
    518         {        
    519                 log_reader = pbsdrmaa_log_reader_new( session, self); 
    520                 log_reader->read_log( log_reader );  
    521         } 
    522         FINALLY 
    523         { 
    524                 pbsdrmaa_log_reader_destroy( log_reader ); 
    525         } 
    526         END_TRY 
    527  
    528         fsd_log_return(( "; job_ps=%s, exit_status=%d", 
    529                                 drmaa_job_ps_to_str(self->state), self->exit_status ));  
    530 } 
    531  
    532 bool 
    533 pbsdrmaa_job_update_status_accounting( fsd_job_t *self ) 
    534 { 
    535         fsd_drmaa_session_t *session = self->session; 
    536         pbsdrmaa_log_reader_t *log_reader = NULL; 
    537         bool res = false; 
    538          
    539         fsd_log_enter(( "({job_id=%s})", self->job_id )); 
    540         fsd_log_info(( "Reading job %s info from accounting file", self->job_id )); 
    541          
    542         TRY 
    543         {        
    544                 log_reader = pbsdrmaa_log_reader_accounting_new( session, self); 
    545                 bool res = log_reader->read_log( log_reader );  
    546         } 
    547         FINALLY 
    548         { 
    549                 pbsdrmaa_log_reader_destroy( log_reader ); 
    550         } 
    551         END_TRY 
    552  
    553         fsd_log_return(("")); 
    554         return res; 
    555 } 
  • trunk/pbs_drmaa/log_reader.c

    r28 r29  
    4646#include <pbs_drmaa/submit.h> 
    4747#include <pbs_drmaa/util.h> 
     48#include <pbs_drmaa/pbs_attrib.h> 
    4849 
    4950#include <errno.h> 
    5051 
    51 static bool 
    52 pbsdrmaa_read_log(); 
    53  
    54 static void 
    55 pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self); 
    56  
    57 static ssize_t 
    58 pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
    59  
    60 static void 
    61 pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self ); 
    62  
    63 static ssize_t 
    64 pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
    65  
    66 static void 
    67 pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ); 
    68  
    69 static ssize_t 
    70 pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
    71  
    72 static bool  
    73 pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self ); 
    74  
    75 int  
    76 fsd_job_id_cmp(const char *s1, const char *s2); 
    77  
    78 int  
    79 pbsdrmaa_date_compare(const void *a, const void *b) ; 
     52enum pbsdrmaa_field_id 
     53{ 
     54        PBSDRMAA_FLD_ID_DATE = 0, 
     55        PBSDRMAA_FLD_ID_EVENT = 1, 
     56        PBSDRMAA_FLD_ID_SRC = 2, 
     57        PBSDRMAA_FLD_ID_OBJ_TYPE = 3, 
     58        PBSDRMAA_FLD_ID_OBJ_ID = 4, 
     59        PBSDRMAA_FLD_ID_MSG = 5 
     60}; 
     61 
     62 
     63#define PBSDRMAA_FLD_MSG_0008 "0008" 
     64#define PBSDRMAA_FLD_MSG_0010 "0010" 
     65 
     66enum pbsdrmaa_event_type 
     67{ 
     68        pbsdrmaa_event_0008 = 8, 
     69        pbsdrmaa_event_0010 = 10 
     70}; 
     71 
     72static void pbsdrmaa_read_log(); 
     73 
     74static void pbsdrmaa_select_file_wait_thread( pbsdrmaa_log_reader_t * self); 
     75 
     76char *pbsdrmaa_read_line_wait_thread( pbsdrmaa_log_reader_t * self); 
     77 
     78static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size); 
     79 
     80static char *pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id); 
    8081 
    8182/* 
     
    979810/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 
    9899 
     100deleting job: 
     101I . PBS Pro 
     102a) in Q state 
     10310/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Queued at request of mmamonski@grass1.man.poznan.pl, owner = mmamonski@grass1.man.poznan.pl, job name = STDIN, queue = workq 
     10410/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Modified at request of Scheduler@grass1.man.poznan.pl 
     10510/16/2011 09:49:37;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 
     10610/16/2011 09:49:37;0100;Server@grass1;Job;2178.grass1.man.poznan.pl;dequeuing from workq, state 5 
     107 
     108 
     109b) in R state 
     11010/16/2011 09:45:12;0080;Server@grass1;Job;2177.grass1.man.poznan.pl;delete job request received 
     11110/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job sent signal TermJob on delete 
     11210/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 
     11310/16/2011 09:45:12;0010;Server@grass1;Job;2177.grass1.man.poznan.pl;Exit_status=271 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=2772kb resources_used.ncpus=1 resources_used.vmem=199288kb resources_used.walltime=00:00:26 
     11410/16/2011 09:45:12;0100;Server@grass1;Job;2177.grass1.man.poznan.pl;dequeuing from workq, state 5 
     115 
     116II. Torque 
     117a) in Q state 
     11810/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 
     11910/15/2011 21:19:25;0100;PBS_Server;Job;113045.grass1.man.poznan.pl;dequeuing from batch, state EXITING 
     120 
     121b) in R state 
     12210/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 
     12310/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job sent signal SIGTERM on delete 
     12410/15/2011 21:19:47;0010;PBS_Server;Job;113046.grass1.man.poznan.pl;Exit_status=271 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:10 
     125 
     126Log closed: 
     12710/16/2011 00:00:17;0002;PBS_Server;Svr;Log;Log closed 
     128 
    99129 */ 
    100130pbsdrmaa_log_reader_t *  
    101 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job ) 
     131pbsdrmaa_log_reader_new( fsd_drmaa_session_t *session ) 
    102132{ 
    103133        pbsdrmaa_log_reader_t *volatile self = NULL; 
    104134 
    105135        fsd_log_enter(("")); 
     136 
    106137        TRY 
    107138        { 
     
    109140                 
    110141                self->session = session; 
    111                  
    112                 /* ~templete method pattern */ 
    113                 if(job != NULL) /* job on missing */ 
    114                 { 
    115                         self->job = job; 
    116                         self->name = "Job_on_missing"; 
    117                         self->select_file = pbsdrmaa_select_file_job_on_missing; 
    118                         self->read_line = pbsdrmaa_read_line_job_on_missing; 
    119                 } 
    120                 else /* wait thread */ 
    121                 { 
    122                         self->job = NULL; 
    123                         self->name = "WT"; 
    124                         self->select_file = pbsdrmaa_select_file_wait_thread; 
    125                         self->read_line = pbsdrmaa_read_line_wait_thread; 
    126                 }                
     142 
     143                self->select_file = pbsdrmaa_select_file_wait_thread; 
    127144                self->read_log = pbsdrmaa_read_log;      
    128145                 
    129                 self->log_files = NULL; 
    130                 self->log_files_number = 0; 
    131                  
    132146                self->run_flag = true; 
    133                 self->fd = -1; 
     147                self->fhandle = NULL; 
    134148                self->date_changed = true; 
    135149                self->first_open = true; 
    136150                 
    137                 self->log_file_initial_size = 0; 
    138                 self->log_file_read_size = 0; 
    139151        } 
    140152        EXCEPT_DEFAULT 
     
    146158        } 
    147159        END_TRY 
     160 
    148161        fsd_log_return(("")); 
     162 
    149163        return self; 
    150164} 
    151165 
    152 pbsdrmaa_log_reader_t *  
    153 pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job ) 
    154 { 
    155         pbsdrmaa_log_reader_t *volatile self = NULL; 
    156  
    157         fsd_log_enter(("")); 
    158         TRY 
    159         { 
    160                 fsd_malloc(self, pbsdrmaa_log_reader_t ); 
    161                  
    162                 self->session = session; 
    163                  
    164                 self->job = job; 
    165                 self->name = "Accounting"; 
    166                 self->select_file = pbsdrmaa_select_file_accounting; 
    167                 self->read_line = pbsdrmaa_read_line_accounting; 
    168                                  
    169                 self->read_log = pbsdrmaa_read_log_accounting;   
    170                  
    171                 self->log_files = NULL; 
    172                 self->log_files_number = 0; 
    173                  
    174                 self->run_flag = true; 
    175                 self->fd = -1; 
    176                 self->date_changed = true; 
    177                 self->first_open = true; 
    178                  
    179                 self->log_file_initial_size = 0; 
    180                 self->log_file_read_size = 0; 
    181         } 
    182         EXCEPT_DEFAULT 
    183         { 
    184                 if( self != NULL) 
    185                         fsd_free(self); 
    186                          
    187                 fsd_exc_reraise(); 
    188         } 
    189         END_TRY 
    190         fsd_log_return(("")); 
    191         return self; 
    192 } 
    193166 
    194167void 
     
    200173                if(self != NULL) 
    201174                { 
    202                         int i = -1; 
    203                         for(i = 0; i < self->log_files_number ; i++) 
    204                                 fsd_free(self->log_files[i]); 
    205                         fsd_free(self->log_files); 
    206175                        fsd_free(self);  
    207                 }                        
     176                } 
    208177        } 
    209178        EXCEPT_DEFAULT 
     
    216185} 
    217186 
    218 enum field 
    219 {  
    220         FLD_DATE = 0, 
    221         FLD_EVENT = 1, 
    222         FLD_OBJ = 2, 
    223         FLD_TYPE = 3, 
    224         FLD_ID = 4, 
    225         FLD_MSG = 5 
    226 }; 
    227  
    228 enum field_msg 
    229 { 
    230         FLD_MSG_EXIT_STATUS = 0, 
    231         FLD_MSG_CPUT = 1, 
    232         FLD_MSG_MEM = 2, 
    233         FLD_MSG_VMEM = 3, 
    234         FLD_MSG_WALLTIME = 4 
    235 }; 
    236  
    237 enum field_msg_accounting 
    238 { 
    239         FLD_MSG_ACC_USER = 0, 
    240         FLD_MSG_ACC_GROUP = 1, 
    241         FLD_MSG_ACC_JOBNAME = 2, 
    242         FLD_MSG_ACC_QUEUE = 3, 
    243         FLD_MSG_ACC_CTIME = 4, 
    244         FLD_MSG_ACC_QTIME = 5, 
    245         FLD_MSG_ACC_ETIME = 6, 
    246         FLD_MSG_ACC_START = 7, 
    247         FLD_MSG_ACC_OWNER = 8, 
    248         FLD_MSG_ACC_EXEC_HOST = 9, 
    249         FLD_MSG_ACC_RES_NEEDNODES = 10, 
    250         FLD_MSG_ACC_RES_NODECT = 11, 
    251         FLD_MSG_ACC_RES_NODES = 12, 
    252         FLD_MSG_ACC_RES_WALLTIME = 13 
    253 }; 
    254  
    255 #define FLD_MSG_STATUS "0010" 
    256 #define FLD_MSG_STATE "0008" 
    257 #define FLD_MSG_LOG "0002" 
    258  
    259 bool  
     187 
     188void 
    260189pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self ) 
    261190{ 
    262         pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job; 
    263         fsd_job_t *volatile temp_job = NULL; 
    264                  
    265191        fsd_log_enter(("")); 
    266192         
    267         if(self->job == NULL) 
    268                 fsd_mutex_lock( &self->session->mutex ); 
     193        fsd_mutex_lock( &self->session->mutex ); 
    269194 
    270195        TRY 
    271         {                
     196         { 
    272197                while( self->run_flag ) 
    273                 TRY 
    274                 { 
    275                         char line[4096] = ""; 
    276                         char buffer[4096] = ""; 
    277                         int idx = 0, end_idx = 0, line_idx = 0; 
    278                          
    279                         self->select_file(self); 
    280  
    281                         while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0) 
     198                 { 
     199                        TRY 
    282200                        { 
    283                                 const char *volatile ptr = line; 
    284                                 char field[256] = ""; 
    285                                 char job_id[256] = ""; 
    286                                 char event[256] = ""; 
    287                                 int volatile field_n = 0; 
    288                                 int n; 
     201                                char *line = NULL; 
    289202                                 
    290                                 bool volatile job_id_match = false; 
    291                                 bool volatile event_match = false; 
    292                                 bool volatile log_event = false; 
    293                                 bool volatile log_match = false; 
    294                                 bool volatile older_job_found = false; 
    295                                 bool volatile job_found = false; 
    296                                 char * temp_date = NULL; 
    297                                  
    298                                 struct batch_status status; 
    299                                 status.next = NULL; 
    300  
    301                                 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */ 
    302                                 { 
    303                                         if(field_n == FLD_DATE) 
    304                                         { 
    305                                                 temp_date = fsd_strdup(field); 
    306                                         } 
    307                                         else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || strcmp(field,FLD_MSG_STATE) == 0 )) 
    308                                         { 
    309                                                 /* event described by log line*/ 
    310                                                 if(strlcpy(event, field,sizeof(event)) > sizeof(event)) 
    311                                                 { 
    312                                                         fsd_log_error(("%s - strlcpy error",self->name)); 
    313                                                 } 
    314                                                 event_match = true; 
    315                                         } 
    316                                         else if(event_match && field_n == FLD_ID) 
    317                                         {        
    318                                                 TRY 
    319                                                 {        
    320                                                         if(self->job == NULL) /* wait_thread */ 
    321                                                         { 
    322                                                                 temp_job = self->session->get_job( self->session, field ); 
    323                                                                 pbsjob = (pbsdrmaa_job_t*) temp_job; 
    324  
    325                                                                 if( temp_job ) 
    326                                                                 { 
    327                                                                         if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) { 
    328                                                                                 fsd_log_error(("%s - strlcpy error",self->name)); 
    329                                                                         } 
    330                                                                         fsd_log_debug(("%s - job_id: %s",self->name,job_id)); 
    331                                                                         status.name = fsd_strdup(job_id); 
    332                                                                         job_id_match = true; /* job_id is in drmaa */    
    333                                                                 } 
    334                                                                 else  
    335                                                                 { 
    336                                                                         fsd_log_debug(("%s - Unknown job: %s", self->name,field)); 
    337                                                                 } 
    338                                                         } 
    339                                                         else /* job_on_missing */ 
    340                                                         { 
    341                                                                 int diff = -1; 
    342                                                                 diff = fsd_job_id_cmp(self->job->job_id,field); 
    343                                                                 if( diff == 0) 
    344                                                                 { 
    345                                                                         /* read this file to the place we started and exit*/ 
    346                                                                         fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id)); 
    347                                                                         job_found = true; 
    348                                                                         older_job_found = false; 
    349                                                                         self->run_flag = false; 
    350                                                                         job_id_match = true;  
    351                                                                         status.name = fsd_strdup(self->job->job_id); 
    352                                                                 } 
    353                                                                 else if ( !job_found && diff >= 1) 
    354                                                                 { 
    355                                                                         /* older job, find its beginning */ 
    356                                                                         fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field)); 
    357                                                                         older_job_found = true; 
    358                                                                         job_id_match = true;  
    359                                                                         status.name = fsd_strdup(self->job->job_id); 
    360                                                                 } 
    361                                                                 else  if( !job_found ) 
    362                                                                 { 
    363                                                                         fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field)); 
    364                                                                 }                                                                
    365                                                         } 
    366                                                 } 
    367                                                 END_TRY  
    368                                         } 
    369                                         else if(job_id_match && field_n == FLD_MSG) 
    370                                         {                                                
    371                                                 /* parse msg - depends on FLD_EVENT */ 
    372                                                 struct attrl struct_resource_cput, 
    373                                                         struct_resource_mem, 
    374                                                         struct_resource_vmem, 
    375                                                         struct_resource_walltime, 
    376                                                         struct_status, 
    377                                                         struct_state, 
    378                                                         struct_start_time, 
    379                                                         struct_mtime, 
    380                                                         struct_queue, 
    381                                                         struct_account_name, 
    382                                                         struct_exec_vnode; 
    383                                                 struct attrl *last_attr = NULL; 
    384                                                  
    385                                                 bool state_running = false; 
    386  
    387                                                 memset(&struct_status,0,sizeof(struct attrl)); 
    388                                                 memset(&struct_state,0,sizeof(struct attrl)); 
    389                                                 memset(&struct_resource_cput,0,sizeof(struct attrl)); 
    390                                                 memset(&struct_resource_mem,0,sizeof(struct attrl)); 
    391                                                 memset(&struct_resource_vmem,0,sizeof(struct attrl)); 
    392                                                 memset(&struct_resource_walltime,0,sizeof(struct attrl)); 
    393                                                 memset(&struct_start_time,0,sizeof(struct attrl)); 
    394                                                 memset(&struct_mtime,0,sizeof(struct attrl)); 
    395                                                 memset(&struct_queue,0,sizeof(struct attrl)); 
    396                                                 memset(&struct_account_name,0,sizeof(struct attrl)); 
    397                                                 memset(&struct_exec_vnode,0,sizeof(struct attrl)); 
    398                                                                  
    399                                                 if (strcmp(event,FLD_MSG_STATE) == 0)  
    400                                                 { 
    401                                                         /* job run, modified, queued etc */ 
    402                                                         int n = 0; 
    403                                                         status.attribs = &struct_state; 
    404                                                         struct_state.next = NULL; 
    405                                                         struct_state.name = "job_state"; 
    406                                                         last_attr = &struct_state; 
    407  
    408                                                         if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/ 
    409                                                          { 
    410                                                                 n = 4; 
    411                                                                 if(older_job_found) /* job_on_missing - older job beginning - read this file and end */ 
     203                                self->select_file(self); 
     204 
     205                                while ((line = fsd_readline(self->fhandle)) != NULL) 
     206                                 { 
     207                                        int field_id = PBSDRMAA_FLD_ID_DATE; 
     208                                        char *tok_ctx = NULL; 
     209                                        char *field_token = NULL; 
     210                                        char *event_timestamp = NULL; 
     211                                        int event_type = -1; 
     212                                        fsd_job_t *job = NULL; 
     213 
     214                                        /* at first detect if this not the end of log file */ 
     215                                        if (strstr(line, "Log;Log closed")) /*TODO try to be more effective and safe */ 
     216                                         { 
     217                                                fsd_log_debug(("WT - Date changed. Closing log file")); 
     218                                                self->date_changed = true; 
     219                                         } 
     220 
     221                                        for (field_token = strtok_r(line, ";", &tok_ctx); field_token; field_token = strtok_r(NULL, ";", &tok_ctx), field_id++) 
     222                                         { 
     223                                                if ( field_id == PBSDRMAA_FLD_ID_DATE) 
     224                                                 { 
     225                                                        event_timestamp = field_token; 
     226                                                 } 
     227                                                else if ( field_id == PBSDRMAA_FLD_ID_EVENT) 
     228                                                 { 
     229                                                        if (strncmp(field_token, PBSDRMAA_FLD_MSG_0008, 4) == 0) 
     230                                                                event_type = pbsdrmaa_event_0008; 
     231                                                        else if (strncmp(field_token, PBSDRMAA_FLD_MSG_0010, 4) == 0) 
     232                                                                event_type = pbsdrmaa_event_0010; 
     233                                                        else 
     234                                                                break; /*we are interested only in the above log messages */ 
     235                                                 } 
     236                                                else if ( field_id == PBSDRMAA_FLD_ID_SRC) 
     237                                                 { 
     238                                                        /* not used ignore */ 
     239                                                 } 
     240                                                else if (field_id  == PBSDRMAA_FLD_ID_OBJ_TYPE) 
     241                                                 { 
     242                                                        if (strncmp(field_token, "Job", 3) != 0) 
     243                                                                break; /* we are interested only in job events */ 
     244                                                 } 
     245                                                else if (field_id == PBSDRMAA_FLD_ID_OBJ_ID) 
     246                                                 { 
     247                                                        const char *event_jobid = field_token; 
     248 
     249                                                        TRY 
     250                                                         { 
     251                                                                job = self->session->get_job( self->session, event_jobid ); 
     252 
     253                                                                if( job ) 
    412254                                                                 { 
    413                                                                         self->run_flag = false; 
    414                                                                         fsd_log_debug(("Job_on_missing found older job beginning")); 
    415                                                                         fsd_free(status.name); 
     255                                                                        fsd_log_debug(("WT - Found job event: %s", event_jobid)); 
     256                                                                 } 
     257                                                                else 
     258                                                                 { 
     259                                                                        fsd_log_debug(("WT - Unknown job: %s", event_jobid)); /* Not a DRMAA job */ 
    416260                                                                        break; 
    417261                                                                 } 
    418  
    419                                                                  { /* modified */ 
    420                                                                         struct tm temp_time_tm; 
    421                                                                         memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 
    422                                                                         temp_time_tm.tm_isdst = -1; 
    423  
    424                                                                         if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) 
    425                                                                          { 
    426                                                                                 fsd_log_error(("failed to parse mtime: %s (line = %s)", temp_date, line)); 
    427                                                                          } 
    428                                                                         else 
    429                                                                          { 
    430                                                                                 time_t temp_time = mktime(&temp_time_tm); 
    431                                                                                 last_attr->next = &struct_mtime; 
    432                                                                                 last_attr = &struct_mtime; 
    433                                                                                 struct_mtime.name = "mtime"; 
    434                                                                                 struct_mtime.next = NULL; 
    435                                                                                 struct_mtime.value = fsd_asprintf("%lu",temp_time); 
    436                                                                          } 
    437                                                                  } 
    438262                                                         } 
     263                                                        END_TRY 
     264                                                 } 
     265                                                else if (field_id == PBSDRMAA_FLD_ID_MSG) 
     266                                                 { 
     267                                                        char *msg = field_token; 
     268                                                        struct batch_status status; 
     269                                                        struct attrl *attribs = NULL; 
     270                                                        bool in_running_state = false; 
     271 
     272                                                        if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Queued", 10) == 0) 
     273                                                         { 
     274                                                                /* Queued 
     275                                                                 * PBS Pro: 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal 
     276                                                                 * Torque:  10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short 
     277                                                                 */ 
     278                                                                char *p_queue = NULL; 
     279 
     280                                                                if ((p_queue = strstr(msg,"queue =")) == NULL) 
     281                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"No queue attribute found in log line = %s", line); 
    439282 
    440283                                                        /* != Job deleted and Job to be deleted*/ 
     
    447290#endif 
    448291                                                                struct_state.value = fsd_asprintf("%c",field[n]); 
    449                                                                 if(struct_state.value[0] == 'R') 
    450                                                                  { 
    451                                                                         state_running = true; 
     292                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 
     293 
     294                                                                in_running_state = true; 
     295 
     296                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "R"); 
     297                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_START_TIME, timestamp_unix); 
    452298#ifdef PBS_PROFESSIONAL 
    453299                                                                        { 
     
    463309                                                                        } 
    464310#endif 
     311                                                         } 
     312#ifndef PBS_PBS_PROFESSIONAL 
     313                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job deleted", 11)) 
     314#else 
     315                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job to be deleted", 17)) 
     316#endif 
     317                                                         { 
     318                                                        /* Deleted 
     319                                                         * PBS Pro: 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 
     320                                                         * Torque: 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 
     321                                                         */ 
     322                                                                char timestamp_unix[64]; 
     323 
     324                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 
     325 
     326                                                                if (job->state < DRMAA_PS_RUNNING) 
     327                                                                 { 
     328                                                                        fsd_log_info(("Job %s killed before entering running state (%d).", job->job_id, job->state)); 
     329                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 
     330                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 
     331                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, "-2"); 
     332                                                                 } 
     333                                                                else 
     334                                                                 { 
     335                                                                        break; /* job was started, ignore, wait for Exit_status message */ 
     336                                                                 } 
     337                                                         } 
     338                                                        else if (event_type == pbsdrmaa_event_0010 && strncmp(msg, "Exit_status=", 12)) 
     339                                                         { 
     340                                                        /* Completed: 
     341                                                         * PBS Pro: 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01 
     342                                                         * Torque: 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 
     343                                                         */ 
     344                                                                char timestamp_unix[64]; 
     345                                                                time_t timestamp_time_t = pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 
     346                                                                char *tok_ctx2 = NULL; 
     347                                                                char *token = NULL; 
     348 
     349                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 
     350                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 
     351 
     352                                                                /* tokenize !!! */ 
     353                                                                for (token = strtok_r(msg, " ", &tok_ctx2); token; token = strtok_r(NULL, " ", &tok_ctx2)) 
     354                                                                 { 
     355                                                                        if (strncmp(token, "Exit_status=", 12) == 0) 
     356                                                                         { 
     357                                                                                token[12] = '\0'; 
     358                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 12); 
     359                                                                                fsd_log_info(("WT - Completion of job %s (Exit_status=%s) detected after %d seconds", job->job_id, token+12, (int)(time(NULL) - timestamp_time_t) )); 
     360                                                                         } 
     361                                                                        else if (strncmp(token, "resources_used.cput=", 20) == 0) 
     362                                                                         { 
     363                                                                                token[20] = '\0'; 
     364                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 
     365                                                                         } 
     366                                                                        else if (strncmp(token, "resources_used.mem=", 19) == 0) 
     367                                                                         { 
     368                                                                                token[19] = '\0'; 
     369                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 19); 
     370                                                                         } 
     371                                                                        else if (strncmp(token, "resources_used.vmem=", 20) == 0) 
     372                                                                         { 
     373                                                                                token[20] = '\0'; 
     374                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 
     375                                                                         } 
     376                                                                        else if (strncmp(token, "resources_used.walltime=", 24) == 0) 
     377                                                                         { 
     378                                                                                token[24] = '\0'; 
     379                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 24); 
     380                                                                         } 
     381                                                                 } 
     382 
     383                                                                if (!job->execution_hosts) 
     384                                                                 { 
     385                                                                        char *exec_host = NULL; 
     386                                                                        fsd_log_info(("WT - No execution host information for job %s. Reading accounting logs...", job->job_id)); 
     387                                                                        exec_host = pbsdrmaa_get_exec_host_from_accountig(self, job->job_id); 
     388                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_HOST, exec_host); 
     389                                                                        fsd_free(exec_host); 
    465390                                                                 } 
    466391                                                         } 
    467392                                                        else 
    468                                                          { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/ 
    469                                                                 struct_status.name = "exit_status"; 
    470                                                                 struct_status.value = fsd_strdup("-1"); 
    471                                                                 struct_status.next = NULL; 
    472                                                                 struct_state.next = &struct_status; 
    473                                                                 struct_state.value = fsd_strdup("C"); 
    474                                                          } 
    475                                                 } 
    476                                                 else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/ 
    477                                                 { 
    478                                                         /* exit status and rusage */ 
    479                                                         const char *ptr2 = field; 
    480                                                         char  msg[ 256 ] = ""; 
    481                                                         int n2; 
    482                                                         int msg_field_n = 0; 
    483                                                          
    484                                                         struct_resource_cput.name = "resources_used"; 
    485                                                         struct_resource_mem.name = "resources_used"; 
    486                                                         struct_resource_vmem.name = "resources_used"; 
    487                                                         struct_resource_walltime.name = "resources_used"; 
    488                                                         struct_status.name = "exit_status"; 
    489                                                         struct_state.name = "job_state"; 
    490                                  
    491                                                         status.attribs = &struct_resource_cput; 
    492                                                         struct_resource_cput.next = &struct_resource_mem; 
    493                                                         struct_resource_mem.next = &struct_resource_vmem; 
    494                                                         struct_resource_vmem.next = &struct_resource_walltime; 
    495                                                         struct_resource_walltime.next =  &struct_status; 
    496                                                         struct_status.next = &struct_state; 
    497                                                         struct_state.next = NULL; 
    498  
    499                                                         while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 
    500                                                          {                                               
    501                                                                 switch(msg_field_n)  
    502                                                                 { 
    503                                                                         case FLD_MSG_EXIT_STATUS: 
    504                                                                                 struct_status.value = fsd_strdup(strchr(msg,'=')+1); 
    505                                                                                 break; 
    506  
    507                                                                         case FLD_MSG_CPUT: 
    508                                                                                 struct_resource_cput.resource = "cput"; 
    509                                                                                 struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1); 
    510                                                                                 break; 
    511  
    512                                                                         case FLD_MSG_MEM: 
    513                                                                                 struct_resource_mem.resource = "mem"; 
    514                                                                                 struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1); 
    515                                                                                 break; 
    516  
    517                                                                         case FLD_MSG_VMEM: 
    518                                                                                 struct_resource_vmem.resource = "vmem"; 
    519                                                                                 struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1); 
    520                                                                                 break;  
    521  
    522                                                                         case FLD_MSG_WALLTIME: 
    523                                                                                 struct_resource_walltime.resource = "walltime"; 
    524                                                                                 struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1); 
    525                                                                                 break;  
    526                                                                 } 
    527  
    528                                                                 ptr2 += n2;  
    529                                                                 msg_field_n++; 
    530                                                                 if ( *ptr2 != ' ' ) 
    531                                                                          break;  
    532                                                                 ++ptr2; 
     393                                                        { 
     394                                                                break; /* ignore other job events*/ 
    533395                                                        } 
    534                                                         struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */ 
    535                                                         fsd_log_info(("WT - job %s found as finished on %u", temp_job->job_id, (unsigned int)time(NULL))); 
    536                                                 }                                                
    537                                                   
    538                                                 if(self->job == NULL) /* wait_thread */ 
    539                                                 { 
    540                                                         if ( state_running ) 
    541                                                         { 
    542                                                                 fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id )); 
     396 
     397                                                        if ( in_running_state ) 
     398                                                         { 
     399                                                                fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); 
    543400                                                                TRY 
    544401                                                                { 
    545                                                                         temp_job->update_status( temp_job ); 
     402                                                                        job->update_status( job ); 
    546403                                                                } 
    547404                                                                EXCEPT_DEFAULT 
    548405                                                                { 
    549406                                                                        /*TODO: distinguish between invalid job and internal errors */ 
    550                                                                         fsd_log_debug(("Job finished just after entering running state: %s", temp_job->job_id)); 
     407                                                                        fsd_log_debug(("Job finished just after entering running state: %s", job->job_id)); 
    551408                                                                } 
    552409                                                                END_TRY 
    553                                                         } 
     410                                                         } 
    554411                                                        else 
    555                                                         { 
    556                                                                 fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id )); 
    557                                                                 pbsjob->update( temp_job, &status ); 
    558                                                         } 
    559                                                 } 
    560                                                 else if( job_found ) /* job_on_missing */ 
    561                                                 { 
    562                                                         fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id )); 
    563                                                         pbsjob->update( self->job, &status ); 
    564                                                 } 
    565                                                  
    566                                                 if(self->job == NULL) 
    567                                                 { 
    568                                                         fsd_cond_broadcast( &temp_job->status_cond); 
     412                                                         { 
     413                                                                fsd_log_debug(("WT - updating job: %s", job->job_id )); 
     414                                                                status.name = job->job_id; 
     415                                                                status.attribs = attribs; 
     416 
     417                                                                ((pbsdrmaa_job_t *)job)->update( job, &status ); 
     418 
     419                                                                pbsdrmaa_free_attrl(attribs); /* TODO free on exception */ 
     420                                                         } 
     421 
     422                                                        fsd_cond_broadcast( &job->status_cond); 
    569423                                                        fsd_cond_broadcast( &self->session->wait_condition ); 
    570                                                 } 
    571                                                 if ( temp_job ) 
    572                                                         temp_job->release( temp_job ); 
    573          
    574                                                 fsd_free(struct_resource_cput.value); 
    575                                                 fsd_free(struct_resource_mem.value); 
    576                                                 fsd_free(struct_resource_vmem.value); 
    577                                                 fsd_free(struct_resource_walltime.value); 
    578                                                 fsd_free(struct_status.value); 
    579                                                 fsd_free(struct_state.value); 
    580                                                 fsd_free(struct_start_time.value); 
    581                                                 fsd_free(struct_mtime.value); 
    582                                                 fsd_free(struct_queue.value); 
    583                                                 fsd_free(struct_account_name.value); 
    584  
    585                                                 if ( status.name!=NULL )  
    586                                                         fsd_free(status.name); 
    587                                         } 
    588                                         else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0) 
    589                                         { 
    590                                                 log_event = true; 
    591                                         } 
    592                                         else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 ) 
    593                                         { 
    594                                                 log_match = true; 
    595                                                 log_event = false; 
    596                                         } 
    597                                         else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0) 
    598                                         { 
    599                                                 fsd_log_debug(("%s - Date changed. Closing log file",self->name)); 
    600                                                 self->date_changed = true; 
    601                                                 log_match = false; 
    602                                         } 
     424 
     425                                                        if ( job ) 
     426                                                                job->release( job ); 
     427 
     428                                                        fsd_free(line); /* TODO free on exception */ 
     429                                                 } 
     430                                                else 
     431                                                 { 
     432                                                        fsd_assert(0); /*not reached */ 
     433                                                 } 
     434                                         } 
     435 
     436                                 } /* end of while getline loop */ 
     437 
     438                                 { /* poll on log file */ 
     439                                        struct timeval timeout_tv; 
     440                                        fd_set log_fds; 
     441 
     442                                        fsd_mutex_unlock( &self->session->mutex ); 
    603443                                         
    604                                         ptr += n;  
    605                                         if ( *ptr != ';' ) 
    606                                         { 
    607                                                 break; /* end of line */ 
    608                                         } 
    609                                         field_n++; 
    610                                         ++ptr; 
    611                                 }                
    612  
    613                                 fsd_free(temp_date); 
    614                         } /* end of while getline loop */ 
    615                          
    616                         if(self->job == NULL) 
     444                                        FD_ZERO(&log_fds); 
     445                                        FD_SET(fileno(self->fhandle), &log_fds); 
     446 
     447                                        timeout_tv.tv_sec = 1; 
     448                                        timeout_tv.tv_usec = 0; 
     449 
     450                                        /* ignore return value - the next get line call will handle IO errors */ 
     451                                        (void)select(1, &log_fds, NULL, NULL, &timeout_tv); 
     452 
     453                                        fsd_mutex_lock( &self->session->mutex ); 
     454 
     455                                        self->run_flag = self->session->wait_thread_run_flag; 
     456                                 } 
     457                        } 
     458                        EXCEPT_DEFAULT 
    617459                        { 
    618                                 struct timeval timeout_tv; 
    619                                 fd_set log_fds; 
    620          
    621                                 fsd_mutex_unlock( &self->session->mutex ); 
    622                                  
    623                                 FD_ZERO(&log_fds); 
    624                                 FD_SET(self->fd, &log_fds); 
    625  
    626                                 timeout_tv.tv_sec = 1; 
    627                                 timeout_tv.tv_usec = 0; 
    628  
    629                                 /* ignore return value - the next get line call will handle IO errors */ 
    630                                 (void)select(1, &log_fds, NULL, NULL, &timeout_tv); 
    631  
    632                                 fsd_mutex_lock( &self->session->mutex );         
    633  
    634                                 self->run_flag = self->session->wait_thread_run_flag; 
     460                                const fsd_exc_t *e = fsd_exc_get(); 
     461                                /* Its better to exit and communicate error rather then let the application to hang */ 
     462                                fsd_log_fatal(( "Exception in wait thread: <%d:%s>. Exiting !!!", e->code(e), e->message(e) )); 
     463                                exit(1); 
    635464                        } 
    636                 }                
    637                 EXCEPT_DEFAULT 
    638                 { 
    639                         const fsd_exc_t *e = fsd_exc_get(); 
    640                         /* Its better to exit and communicate error rather then let the application to hang */ 
    641                         fsd_log_fatal(( "Exception in wait thread %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) )); 
    642                         exit(1); 
    643                 } 
    644                 END_TRY 
    645  
    646                 if(self->fd != -1) 
    647                         close(self->fd); 
    648                 fsd_log_debug(("%s - Log file closed",self->name));      
     465                        END_TRY 
     466                 } 
     467 
     468                if(self->fhandle) 
     469                        fclose(self->fhandle); 
     470 
     471                fsd_log_debug(("WT - Log file closed")); 
    649472        } 
    650473        FINALLY 
    651474        { 
    652                 fsd_log_debug(("%s - Terminated.",self->name));  
    653                 if(self->job == NULL) 
    654                         fsd_mutex_unlock( &self->session->mutex ); /**/ 
     475                fsd_log_debug(("WT - Terminated.")); 
     476                fsd_mutex_unlock( &self->session->mutex ); /**/ 
    655477        } 
    656478        END_TRY 
    657479         
    658480        fsd_log_return(("")); 
    659         return true; 
    660481} 
    661482 
     
    665486        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 
    666487         
    667         if(self->date_changed) 
    668         { 
     488        if (self->date_changed) 
     489         { 
    669490                char * log_path = NULL; 
    670491                int num_tries = 0; 
     
    682503                #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 
    683504                /* generate new date, close file and open new */ 
    684                 if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 
    685                                         pbssession->pbs_home,     
    686                                         tm.tm_year + 1900, 
    687                                         tm.tm_mon + 1, 
    688                                         tm.tm_mday)) == NULL) { 
    689                         fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 
    690                 } 
    691  
    692                 if(self->fd != -1) 
    693                         close(self->fd); 
    694  
    695                 fsd_log_debug(("Log file: %s",log_path)); 
     505                log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 
     506 
     507                if(self->fhandle) 
     508                        fclose(self->fhandle); 
     509 
     510                fsd_log_info(("Opening log file: %s",log_path)); 
    696511                                 
    697512        retry: 
    698                 if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES ) 
    699                 { 
     513                if ((self->fhandle = fopen(log_path,"")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open)) 
     514                 { 
    700515                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread.")); 
    701                         fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly")); 
     516                        fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status")); 
    702517                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */ 
    703518                        pbssession->wait_thread_log = false; 
    704519                        pbssession->super.wait_thread = pbssession->super_wait_thread; 
    705520                        pbssession->super.wait_thread(self->session); 
    706                 } else if ( self->fd == -1 ) { 
     521                 } 
     522                else if ( self->fhandle == NULL ) 
     523                 { /* Torque seems not to create a new file immediately after the old one is closed */ 
    707524                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 
    708525                        num_tries++; 
    709                         sleep(5); 
     526                        sleep(2 * num_tries); 
    710527                        goto retry; 
    711                 } 
     528                 } 
    712529 
    713530                fsd_free(log_path); 
     
    715532                fsd_log_debug(("Log file opened")); 
    716533 
    717                 if(self->first_open) { 
     534                if(self->first_open) 
     535                 { 
    718536                        fsd_log_debug(("Log file lseek")); 
    719                         if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) { 
    720                                 char errbuf[256] = "InternalError"; 
    721                                 (void)strerror_r(errno, errbuf, 256); 
    722                                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf); 
    723                         } 
     537 
     538                        if(fseek(self->fhandle, pbssession->log_file_initial_size, SEEK_SET) == (off_t) -1) 
     539                         { 
     540                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 
     541                         } 
    724542                        self->first_open = false; 
    725                 } 
     543                 } 
    726544 
    727545                self->date_changed = false; 
     
    731549} 
    732550 
    733 ssize_t 
    734 pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 
    735 { 
    736         return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx); 
     551time_t 
     552pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size) 
     553{ 
     554        struct tm temp_time_tm; 
     555        memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 
     556        temp_time_tm.tm_isdst = -1; 
     557 
     558        if (strptime(timestamp, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) 
     559         { 
     560                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - failed to parse log timestamp: %s", timestamp); 
     561         } 
     562        else 
     563         { 
     564                time_t temp_time = mktime(&temp_time_tm); 
     565                snprintf(unixtime_str, size, "%lu", temp_time); 
     566                return temp_time; 
     567         } 
    737568} 
    738569 
    739 /* reverse date compare*/ 
    740 int  
    741 pbsdrmaa_date_compare(const void *a, const void *b)  
    742 { 
    743         const char *ia = *(const char **) a; 
    744         const char *ib = *(const char **) b; 
    745         return strcmp(ib, ia); 
     570char * 
     571pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id) 
     572{ 
     573        /* TODO: implement */ 
     574        return NULL; 
    746575} 
    747576 
    748 void 
    749 pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self ) 
    750 { 
    751         pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;    
    752          
    753         char * log_path = NULL; 
    754         int num_tries = 0; 
    755         static int file_number = 0; 
    756         fsd_log_enter(("")); 
    757                  
    758         if(self->first_open)  
    759         {                        
    760                 DIR *dp = NULL;          
    761                 char * path = NULL; 
    762                 struct dirent *ep = NULL; 
    763                  
    764                 if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL) 
    765                         fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible"); 
    766                  
    767                 self->log_files_number = 0;      
    768                 dp = opendir (path); 
    769  
    770                 fsd_calloc(self->log_files,2,char*); 
    771          
    772                 if (dp != NULL) 
    773                 { 
    774                         while ((ep = readdir (dp))) 
    775                         { 
    776                                 self->log_files_number++; 
    777                                 if(self->log_files_number > 2) 
    778                                         fsd_realloc(self->log_files,self->log_files_number,char *); 
    779                                  
    780                                 self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name); 
    781                         } 
    782                         (void) closedir (dp); 
    783                 } 
    784                 else 
    785                         fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory"); 
    786  
    787                 qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare); 
    788                  
    789                 if(self->log_files_number <= 2) 
    790                 { 
    791                         self->run_flag = false; 
    792                         fsd_log_error(("Job_on_missing - No log files available")); 
    793                 } 
    794                  
    795                 self->first_open = false; 
    796                 fsd_free(path); 
    797         }        
    798         else /* check previous day*/ 
    799         { 
    800                 if(++file_number > self->log_files_number - 2) 
    801                         fsd_log_error(("Job_on_missing - All available log files checked")); 
    802                 else 
    803                         fsd_log_debug(("Job_on_missing checking previous day")); 
    804                  
    805                 self->run_flag = false; 
    806                 pbsdrmaa_job_on_missing_standard( self->job );                           
    807         } 
    808          
    809         #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 
    810         if((log_path = fsd_asprintf("%s/server_logs/%s", 
    811                                 pbssession->pbs_home,     
    812                                 self->log_files[file_number])) == NULL) { 
    813                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible"); 
    814         } 
    815  
    816         if(self->fd != -1) 
    817                 close(self->fd); 
    818  
    819         fsd_log_debug(("Log file: %s",log_path)); 
    820                                  
    821 retry: 
    822         if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES ) 
    823         { 
    824                 fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing")); 
    825                 fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly")); 
    826                 self->run_flag = false; 
    827                 pbsdrmaa_job_on_missing_standard( self->job );                   
    828         } else if ( self->fd == -1 ) { 
    829                 fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 
    830                 num_tries++; 
    831                 sleep(5); 
    832                 goto retry; 
    833         } 
    834         else 
    835         { 
    836                 struct stat statbuf; 
    837                 if(stat(log_path,&statbuf) == -1) { 
    838                                 char errbuf[256] = "InternalError"; 
    839                                 (void)strerror_r(errno, errbuf, 256); 
    840                                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf); 
    841                 } 
    842                 self->log_file_read_size = 0; 
    843                 self->log_file_initial_size = statbuf.st_size; 
    844                 fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size)); 
    845         } 
    846  
    847         fsd_free(log_path); 
    848  
    849         fsd_log_debug(("Log file opened")); 
    850          
    851         fsd_log_return(("")); 
    852 } 
    853  
    854 ssize_t 
    855 pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 
    856 { 
    857         int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx); 
    858          
    859         if(n >= 0) 
    860                 self->log_file_read_size += n; 
    861                  
    862         if(self->log_file_read_size >= self->log_file_initial_size) 
    863                 return -1;  
    864  
    865         return n;  
    866 } 
    867  
    868 void 
    869 pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ) 
    870 { 
    871         pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 
    872                  
    873         char * log_path = NULL; 
    874  
    875         struct tm tm;  
    876                  
    877         fsd_log_enter(("")); 
    878                  
    879         time(&self->t);  
    880                          
    881         localtime_r(&self->t,&tm); 
    882                                  
    883         #define DRMAA_ACCOUNTING_MAX_TRIES (12) 
    884         /* generate new date, close file and open new */ 
    885         if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", 
    886                                 pbssession->pbs_home,     
    887                                 tm.tm_year + 1900, 
    888                                 tm.tm_mon + 1, 
    889                                 tm.tm_mday)) == NULL) { 
    890                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible"); 
    891         } 
    892  
    893         if(self->fd != -1) 
    894                 close(self->fd); 
    895  
    896         fsd_log_debug(("Accounting Log file: %s",log_path)); 
    897  
    898         if((self->fd = open(log_path,O_RDONLY) ) == -1 ) 
    899         { 
    900                 fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home.")); 
    901         }  
    902  
    903         fsd_free(log_path); 
    904  
    905         fsd_log_debug(("Accounting Log file opened")); 
    906  
    907         fsd_log_return((""));    
    908 } 
    909  
    910 ssize_t 
    911 pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 
    912 { 
    913         return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx); 
    914 } 
    915  
    916 enum field_acc 
    917 {  
    918         FLD_ACC_DATE = 0, 
    919         FLD_ACC_EVENT = 1, 
    920         FLD_ACC_ID = 2, 
    921         FLD_ACC_MSG = 3 
    922 }; 
    923  
    924 bool  
    925 pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self ) 
    926 { 
    927         pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;    
    928         bool res = false; 
    929          
    930         fsd_job_t *volatile temp_job = NULL; 
    931                  
    932         fsd_log_enter(("")); 
    933         fsd_log_debug(("Accounting Log file opened")); 
    934         if(self->job == NULL) 
    935                 fsd_mutex_lock( &self->session->mutex ); 
    936  
    937         TRY 
    938         {                
    939                 TRY 
    940                 { 
    941                         char line[4096] = ""; 
    942                         char buffer[4096] = ""; 
    943                         int idx = 0, end_idx = 0, line_idx = 0; 
    944                          
    945                         self->select_file(self); 
    946                          
    947                         if(self->fd != -1)                                       
    948                         while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0) 
    949                         { 
    950                                 const char *volatile ptr = line; 
    951                                 char field[256] = ""; 
    952                                 int volatile field_n = 0; 
    953                                 int n; 
    954                                  
    955                                 bool volatile job_id_match = false;      
    956                          
    957                                 bool volatile job_found = false; 
    958                                 char *  temp_date = NULL; 
    959                                  
    960                                 struct batch_status status; 
    961                                  
    962                                 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */ 
    963                                 { 
    964                                         status.next = NULL; 
    965                                         status.attribs = NULL; 
    966                                  
    967                                         if(field_n == FLD_ACC_DATE) 
    968                                         { 
    969                                                 temp_date = fsd_strdup(field); 
    970                                         } 
    971                                         else if(field_n == FLD_ACC_EVENT) 
    972                                         { 
    973                                                          
    974                                         } 
    975                                         else if(field_n == FLD_ACC_ID) 
    976                                         {                                                        
    977                                                 TRY 
    978                                                 {                                                                
    979                                                                 int diff = -1; 
    980                                                                 diff = fsd_job_id_cmp(self->job->job_id,field); 
    981                                                                 if( diff == 0) 
    982                                                                 { 
    983                                                                         /* read this file to the place we started and exit*/ 
    984                                                                         fsd_log_debug(("Accounting found job: %s",self->job->job_id)); 
    985                                                                         job_found = true; 
    986                                                                         job_id_match = true;  
    987                                                                         status.name = fsd_strdup(self->job->job_id); 
    988                                                                 }        
    989                                                 } 
    990                                                 END_TRY  
    991                                         } 
    992                                         else if(job_id_match && field_n == FLD_ACC_MSG) 
    993                                         {                                        
    994                                                 struct attrl * struct_attrl = calloc(10,sizeof(struct attrl)); 
    995                                                 int i; 
    996  
    997                                                 if(field[0] == 'q') 
    998                                                 { 
    999                                                         status.attribs = &struct_attrl[0]; 
    1000                                                         struct_attrl[0].name =  ATTR_queue; 
    1001                                                         struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1); 
    1002                                                         struct_attrl[0].next = NULL; 
    1003                                                 } 
    1004                                                 else if(field[0] == 'u') 
    1005                                                 { 
    1006                                                         /* rusage */ 
    1007                                                         const char *ptr2 = field; 
    1008                                                         char  msg[ 256 ] = ""; 
    1009                                                         int n2 = 0; 
    1010                                                         int msg_field_n = 0; 
    1011                                  
    1012                                                         status.attribs = &struct_attrl[0]; 
    1013  
    1014                                                         while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 
    1015                                                          {                                               
    1016                                                                 switch(msg_field_n)  
    1017                                                                 { 
    1018                                                                         case FLD_MSG_ACC_USER: 
    1019                                                                                 struct_attrl[msg_field_n].name = ATTR_euser; 
    1020                                                                                 break; 
    1021  
    1022                                                                         case FLD_MSG_ACC_GROUP: 
    1023                                                                                 struct_attrl[msg_field_n].name = ATTR_egroup; 
    1024                                                                                 break; 
    1025  
    1026                                                                         case FLD_MSG_ACC_JOBNAME: 
    1027                                                                                 struct_attrl[msg_field_n].name = ATTR_name; 
    1028                                                                                 break; 
    1029  
    1030                                                                         case FLD_MSG_ACC_QUEUE: 
    1031                                                                                 struct_attrl[msg_field_n].name = ATTR_queue; 
    1032                                                                                 break;  
    1033  
    1034                                                                         case FLD_MSG_ACC_CTIME: 
    1035                                                                                 struct_attrl[msg_field_n].name = ATTR_ctime; 
    1036                                                                                 break;  
    1037                                                                                  
    1038                                                                         case FLD_MSG_ACC_QTIME: 
    1039                                                                                 struct_attrl[msg_field_n].name = ATTR_qtime; 
    1040                                                                                 break;  
    1041                                                                                  
    1042                                                                         case FLD_MSG_ACC_ETIME: 
    1043                                                                                 struct_attrl[msg_field_n].name = ATTR_etime; 
    1044                                                                                 break;  
    1045 #ifndef PBS_PROFESSIONAL                 
    1046                                                                         case FLD_MSG_ACC_START: 
    1047                                                                                 struct_attrl[msg_field_n].name = ATTR_start_time; 
    1048 #else 
    1049                                                                         case FLD_MSG_ACC_START: 
    1050                                                                                 struct_attrl[msg_field_n].name = ATTR_stime; 
    1051 #endif 
    1052                                                                                  
    1053                                                                         case FLD_MSG_ACC_OWNER: 
    1054                                                                                 struct_attrl[msg_field_n].name = ATTR_owner; 
    1055                                                                                 break;  
    1056                                                                                  
    1057                                                                         case FLD_MSG_ACC_EXEC_HOST: 
    1058                                                                                 struct_attrl[msg_field_n].name = ATTR_exechost; 
    1059                                                                                 break;                                                                           
    1060                                                                 } 
    1061                                                                  
    1062                                                                 struct_attrl[msg_field_n].value  = fsd_strdup(strchr(msg,'=')+1); 
    1063                                                                 if(msg_field_n!=9) 
    1064                                                                 { 
    1065                                                                         struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1]; 
    1066                                                                 } 
    1067                                                                 else 
    1068                                                                 { 
    1069                                                                         struct_attrl[msg_field_n].next = NULL; 
    1070                                                                         break; 
    1071                                                                 } 
    1072  
    1073                                                                 ptr2 += n2;  
    1074                                                                 msg_field_n++; 
    1075                                                                 if ( *ptr2 != ' ' ) 
    1076                                                                         break; 
    1077  
    1078                                                                 ++ptr2; 
    1079                                                         } 
    1080                                                 }                                                
    1081  
    1082                                                 if( job_found && status.attribs != NULL) 
    1083                                                 { 
    1084                                                         fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id )); 
    1085                                                         pbsjob->update( self->job, &status ); 
    1086                                                         res = true; 
    1087                                                 } 
    1088                                                  
    1089                                                 if(self->job == NULL) 
    1090                                                 { 
    1091                                                         fsd_cond_broadcast( &temp_job->status_cond); 
    1092                                                         fsd_cond_broadcast( &self->session->wait_condition ); 
    1093                                                 } 
    1094                                                 if ( temp_job ) 
    1095                                                         temp_job->release( temp_job ); 
    1096          
    1097                                                 for(i = 0; i < 10; i++) 
    1098                                                 { 
    1099                                                         fsd_free(struct_attrl[i].value); 
    1100                                                 } 
    1101                                                 fsd_free(struct_attrl); 
    1102                                                 fsd_free(status.name); 
    1103                                         } 
    1104                                          
    1105                                          
    1106                                         ptr += n;  
    1107                                         if ( *ptr != ';' ) 
    1108                                         { 
    1109                                                 break; /* end of line */ 
    1110                                         } 
    1111                                         field_n++; 
    1112                                         ++ptr; 
    1113                                 }                
    1114  
    1115                                 fsd_free(temp_date);                     
    1116                         } /* end of while getline loop */        
    1117                          
    1118                 }                
    1119                 EXCEPT_DEFAULT 
    1120                  { 
    1121                         const fsd_exc_t *e = fsd_exc_get(); 
    1122                         /* Its better to exit and communicate error rather then let the application to hang */ 
    1123                         fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) )); 
    1124                         exit(1); 
    1125                  } 
    1126                 END_TRY 
    1127  
    1128                 if(self->fd != -1) 
    1129                         close(self->fd); 
    1130                 fsd_log_debug(("%s - Accounting log file closed",self->name));   
    1131         } 
    1132         FINALLY 
    1133         { 
    1134                 fsd_log_debug(("%s - Terminated.",self->name));  
    1135                 if(self->job == NULL) 
    1136                         fsd_mutex_unlock( &self->session->mutex ); /**/ 
    1137         } 
    1138         END_TRY 
    1139          
    1140         fsd_log_return(("")); 
    1141         return res; 
    1142 } 
    1143  
    1144 int  
    1145 fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */ 
    1146 { 
    1147         int job1; 
    1148         int job2; 
    1149         char *rest = NULL; 
    1150         char *token = NULL; 
    1151         char *ptr = fsd_strdup(s1); 
    1152         token = strtok_r(ptr, ".", &rest); 
    1153         job1 = atoi(token); 
    1154          
    1155         fsd_free(token); 
    1156          
    1157         ptr = fsd_strdup(s2); 
    1158         token = strtok_r(ptr,".",&rest); 
    1159         job2 = atoi(token); 
    1160          
    1161         fsd_free(token); 
    1162         return job1 - job2; 
    1163 } 
    1164  
     577 
  • trunk/pbs_drmaa/log_reader.h

    r21 r29  
    2525#endif 
    2626 
     27#include <stdio.h> 
     28 
    2729#include <drmaa_utils/job.h> 
    2830#include <drmaa_utils/session.h> 
     
    3133 
    3234pbsdrmaa_log_reader_t *  
    33 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 
    34  
    35 pbsdrmaa_log_reader_t *  
    36 pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 
     35pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session); 
    3736 
    3837void 
     
    4140struct pbsdrmaa_log_reader_s { 
    4241        fsd_drmaa_session_t *volatile session ; 
    43         fsd_job_t *volatile job; 
    4442         
    45         bool (* 
    46         read_log) ( pbsdrmaa_log_reader_t * self ); 
     43        void (*read_log) ( pbsdrmaa_log_reader_t * self ); 
    4744         
    48         void (* 
    49         select_file) ( pbsdrmaa_log_reader_t * self ); 
     45        void (*select_file) ( pbsdrmaa_log_reader_t * self ); 
    5046         
    51         /* line - read line, buffer - keeps read but not returned lines, idx, end_idx and line_idx values needed to be kept outside the function */ 
    52         ssize_t (* 
    53         read_line) ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
    54          
    55         /* specifies if function should run */ 
     47        /* determines if function should run */ 
    5648        bool run_flag; 
    5749         
     
    5951        time_t t;        
    6052         
    61         /* for job_on_missing  - available log files */ 
    62         char ** log_files; 
    63          
    64         /* for job_on_missing - number of log files */ 
    65         int log_files_number; 
    66          
    67         /* log file descriptor */ 
    68         int volatile fd; 
    69          
    70         /* for job_on_missing - log file size when function was ran */ 
    71         off_t log_file_initial_size; 
    72          
    73         /* for job_on_missing - read lines size */ 
    74         off_t log_file_read_size; 
     53        /* log file handle */ 
     54        FILE *fhandle; 
    7555         
    7656        /* for wait_thread - day changed */ 
     
    7959        /* for wait_thread - log file first open */ 
    8060        bool volatile first_open;        
    81          
    82         char * name; 
    8361}; 
    8462 
  • trunk/pbs_drmaa/pbs_attrib.h

    r26 r29  
    8181#define PBSDRMAA_EXTENSION              "extension" 
    8282#define PBSDRMAA_SUBMIT_ARGS            "submit_args" 
     83#define PBSDRMAA_MTIME                  "mtime" 
    8384 
    8485 
  • trunk/pbs_drmaa/session.c

    r12 r29  
    520520        TRY 
    521521        {        
    522                 log_reader = pbsdrmaa_log_reader_new( self, NULL); 
     522                log_reader = pbsdrmaa_log_reader_new( self ); 
    523523                log_reader->read_log( log_reader ); 
    524524        } 
  • trunk/pbs_drmaa/session.h

    r12 r29  
    3535        fsd_drmaa_session_t super; 
    3636 
    37         bool (* 
    38         do_drm_keeps_completed_jobs)( pbsdrmaa_session_t *self ); 
     37        bool (*do_drm_keeps_completed_jobs)( pbsdrmaa_session_t *self ); 
    3938 
    40         void (* 
    41         super_destroy)( fsd_drmaa_session_t *self ); 
     39        void (*super_destroy)( fsd_drmaa_session_t *self ); 
    4240 
    43         void (* 
    44         super_apply_configuration)(fsd_drmaa_session_t *self); 
     41        void (*super_apply_configuration)(fsd_drmaa_session_t *self); 
    4542 
    4643        /* 
    4744         * Pointer to standard wait_thread drmaa_utils function 
    4845         */ 
    49         void* (* 
    50         super_wait_thread)( fsd_drmaa_session_t *self ); 
     46        void* (*super_wait_thread)( fsd_drmaa_session_t *self ); 
    5147 
    5248        /* 
     
    7268 
    7369        /* 
    74          * Log file initial size - used by wait_thread which reads log files 
     70         * Log file initial size - used by wait_thread which reads log files TODO: check if it can be safely moved to log_reader 
    7571         */ 
    7672        off_t log_file_initial_size; 
    7773 
    7874        /* 
    79          * Time we checked log file initial size - used by wait_thread which reads log files 
     75         * Time we checked log file initial size - used by wait_thread which reads log files TODO: check if it can be safely moved to log_reader 
    8076         */ 
    8177        time_t log_file_initial_time; 
  • trunk/pbs_drmaa/submit.c

    r24 r29  
    151151                        if( name  &&  name[0] != '!' && pbs_tmpl->get_attr( pbs_tmpl, name ) ) 
    152152                         { 
    153                                 struct attrl *p; 
    154                                 const char *resource; 
    155153                                const char *value; 
     154 
    156155                                value = pbs_tmpl->get_attr( pbs_tmpl, name ); 
    157                                 fsd_malloc( p, struct attrl ); 
    158                                 memset( p, 0, sizeof(struct attrl) ); 
    159                                 p->next = pbs_attr; 
    160                                 pbs_attr = p; 
    161                                 resource = strchr( name, '.' ); 
    162                                 if( resource ) 
    163                                  { 
    164                                         p->name = fsd_strndup( name, resource-name ); 
    165                                         p->resource = fsd_strdup( resource+1 ); 
    166                                  } 
    167                                 else 
    168                                         p->name = fsd_strdup( name ); 
    169                                 fsd_log_debug(("set attr: %s = %s", name, value)); 
    170                                 p->value = fsd_strdup( value ); 
    171                                 p->op = SET; 
     156                                pbs_attr = pbsdrmaa_add_attr( pbs_attr, name, value ); 
    172157                         } 
    173158                 } 
  • trunk/pbs_drmaa/util.c

    r16 r29  
    7373                fsd_free( p ); 
    7474         } 
     75} 
     76 
     77struct attrl * 
     78pbsdrmaa_add_attr( struct attrl *head, const char *name, const char *value) 
     79{ 
     80        struct attrl *p = NULL; 
     81        char *resource = NULL; 
     82 
     83        fsd_malloc( p, struct attrl ); 
     84        memset( p, 0, sizeof(struct attrl) ); 
     85 
     86        resource = strchr( name, '.' ); 
     87 
     88        if( resource ) 
     89         { 
     90                p->name = fsd_strndup( name, resource - name ); 
     91                p->resource = fsd_strdup( resource+1 ); 
     92         } 
     93        else 
     94         { 
     95                p->name = fsd_strdup( name ); 
     96         } 
     97 
     98        p->value = fsd_strdup(value); 
     99        p->op = SET; 
     100 
     101        fsd_log_debug(("set attr: %s = %s", name, value)); 
     102 
     103        if (head) 
     104                p->next = head; 
     105        else 
     106                p->next = NULL; 
     107 
     108        return p; 
    75109} 
    76110 
     
    247281} 
    248282 
    249 ssize_t fsd_getline(char * line,ssize_t size, int fd) 
    250 { 
    251         char buf; 
    252         char * ptr = NULL; 
    253         ssize_t n = 0, rc; 
    254         ptr = line; 
    255         for(n = 1; n< size; n++) 
    256         {                
    257                 if( (rc = read(fd,&buf,1 )) == 1) { 
    258                         *ptr++ = buf; 
    259                         if(buf == '\n') 
    260                         { 
    261                                 break; 
    262                         } 
    263                 } 
    264                 else if (rc == 0) { 
    265                         if (n == 1) 
    266                                 return 0; 
    267                         else 
    268                                 break; 
    269                 }                
    270                 else 
    271                         return -1;  
    272         } 
    273  
    274         return n; 
    275 }  
    276  
    277 ssize_t fsd_getline_buffered(char * line,char * buf, ssize_t size, int fd, int * idx, int * end_idx, int * line_idx) 
    278 { 
    279         int i = -1; 
    280         int rc = -1; 
    281  
    282         memset(line,0,size); 
    283          
    284 start: 
    285         /* idx - start of data to parse (in buffer) 
    286            end_idx - end of data read from log (in buffer) 
    287            line_idx - place to write data in output line */ 
    288         if(*idx < *end_idx) 
    289         { 
    290                 /* take line from buffer */ 
    291                 for(i = *idx; i<= *end_idx;i++) 
    292                 {                
    293                         if(buf[i] == '\n') 
    294                         { 
    295                                 int tmp = i - *idx; 
    296                                 strncpy(line + *line_idx,buf + *idx,tmp);                                
    297                                 *idx = i + 1; 
    298                          
    299                                 tmp+= *line_idx; 
    300                                 *line_idx = 0; 
    301                                  
    302                                 return tmp; 
    303                         } 
    304                 } 
    305                  
    306                 /* there was no '\n' so next part of log needs to be read. save lines beginning */ 
    307                 if(*line_idx + i - *idx > size ) 
    308                         fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Line longer than %d unsupported",size); 
    309                  
    310                 strncpy(line + *line_idx,buf + *idx,i - *idx); 
    311                 *line_idx += i - *idx; 
    312                 *idx = 0; 
    313                 *end_idx = 0; 
    314                 goto start; 
    315         } 
    316         else 
    317         {                
    318                 /* read log */ 
    319                 if((rc = read(fd,buf,size)) > 0) 
    320                 {                
    321                         *end_idx = rc - 1; 
    322                         *idx = 0; 
    323                         goto start; 
    324                 } 
    325                 else if (rc == 0)  
    326                         return 0; 
    327                 else 
    328                         return -1; 
    329         } 
    330 }  
    331  
     283 
  • trunk/pbs_drmaa/util.h

    r12 r29  
    3131 
    3232void pbsdrmaa_free_attrl( struct attrl *list ); 
    33 void pbsdrmaa_dump_attrl( 
    34                 const struct attrl *attribute_list, const char *prefix ); 
     33void pbsdrmaa_dump_attrl( const struct attrl *attribute_list, const char *prefix ); 
     34 
     35struct attrl *pbsdrmaa_add_attr( struct attrl *head, const char *name, const char *value); 
    3536 
    3637/** 
     
    4344pbsdrmaa_write_tmpfile( const char *content, size_t len ); 
    4445 
    45 ssize_t 
    46 fsd_getline(char * line,ssize_t size, int fd); 
    47  
    48 ssize_t  
    49 fsd_getline_buffered(char * line,char * buf, ssize_t size, int fd, int * idx, int * end_idx, int * line_idx); 
    5046 
    5147#endif /* __PBS_DRMAA__UTIL_H */ 
Note: See TracChangeset for help on using the changeset viewer.