source: trunk/pbs_drmaa/log_reader.c @ 8

Revision 8, 20.6 KB checked in by mmatloka, 13 years ago (diff)

log reader improvements

Line 
1/* $Id: log_reader.c 323 2010-09-21 21:31:29Z mmatloka $ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51static void
52pbsdrmaa_read_log();
53
54static void
55pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self);
56
57static ssize_t
58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
59
60static void
61pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self );
62
63static ssize_t
64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
65
66int
67fsd_job_id_cmp(const char *s1, const char *s2);
68
69int
70pbsdrmaa_date_compare(const void *a, const void *b) ;
71
72pbsdrmaa_log_reader_t *
73pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
74{
75        pbsdrmaa_log_reader_t *volatile self = NULL;
76
77        fsd_log_enter((""));
78        TRY
79        {
80                fsd_malloc(self, pbsdrmaa_log_reader_t );
81               
82                self->session = session;
83               
84                /* ~templete method pattern */
85                if(job != NULL) /* job on missing */
86                {
87                        self->job = job;
88                        self->name = "Job_on_missing";
89                        self->select_file = pbsdrmaa_select_file_job_on_missing;
90                        self->read_line = pbsdrmaa_read_line_job_on_missing;
91                }
92                else /* wait thread */
93                {
94                        self->job = NULL;
95                        self->name = "WT";
96                        self->select_file = pbsdrmaa_select_file_wait_thread;
97                        self->read_line = pbsdrmaa_read_line_wait_thread;
98                }               
99                self->read_log = pbsdrmaa_read_log;     
100               
101                self->log_files = NULL;
102                self->log_files_number = 0;
103               
104                self->run_flag = true;
105                self->fd = -1;
106                self->date_changed = true;
107                self->first_open = true;
108               
109                self->log_file_initial_size = 0;
110                self->log_file_read_size = 0;
111        }
112        EXCEPT_DEFAULT
113        {
114                if( self != NULL)
115                        fsd_free(self);
116                       
117                fsd_exc_reraise();
118        }
119        END_TRY
120        fsd_log_return((""));
121        return self;
122}
123
124void
125pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
126{
127        fsd_log_enter((""));
128        TRY
129        {
130                if(self != NULL)
131                {
132                        int i = -1;
133                        for(i = 0; i < self->log_files_number ; i++)
134                                fsd_free(self->log_files[i]);
135                        fsd_free(self->log_files);
136                        fsd_free(self);
137                }                       
138        }
139        EXCEPT_DEFAULT
140        {
141                fsd_exc_reraise();
142        }
143        END_TRY
144       
145        fsd_log_return((""));
146}
147
148enum field
149{
150        FLD_DATE = 0,
151        FLD_EVENT = 1,
152        FLD_OBJ = 2,
153        FLD_TYPE = 3,
154        FLD_ID = 4,
155        FLD_MSG = 5
156};
157
158enum field_msg
159{
160        FLD_MSG_EXIT_STATUS = 0,
161        FLD_MSG_CPUT = 1,
162        FLD_MSG_MEM = 2,
163        FLD_MSG_VMEM = 3,
164        FLD_MSG_WALLTIME = 4
165};
166
167#define FLD_MSG_STATUS "0010"
168#define FLD_MSG_STATE "0008"
169#define FLD_MSG_LOG "0002"
170
171void
172pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
173{
174        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
175        fsd_job_t *volatile temp_job = NULL;
176               
177        fsd_log_enter((""));
178       
179        if(self->job == NULL)
180                fsd_mutex_lock( &self->session->mutex );
181
182        TRY
183        {               
184                while( self->run_flag )
185                TRY
186                {
187                        char line[4096] = "";
188                        char buffer[4096] = "";
189                        int idx = 0, end_idx = 0, line_idx = 0;
190                       
191                        self->select_file(self);
192
193                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                         
194                        {
195                                const char *volatile ptr = line;
196                                char field[256] = "";
197                                char job_id[256] = "";
198                                char event[256] = "";
199                                int volatile field_n = 0;
200                                int n;
201                               
202                                bool volatile job_id_match = false;
203                                bool volatile event_match = false;
204                                bool volatile log_event = false;
205                                bool volatile log_match = false;
206                                bool volatile older_job_found = false;
207                                bool volatile job_found = false;
208                                char *  temp_date = NULL;
209                               
210                                struct batch_status status;
211                                status.next = NULL;
212
213                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */
214                                {
215                                        if(field_n == FLD_DATE)
216                                        {
217                                                temp_date = fsd_strdup(field);
218                                        }
219                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 ||
220                                                                    strcmp(field,FLD_MSG_STATE) == 0 ))
221                                        {
222                                                /* event described by log line*/
223                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event)) {
224                                                        fsd_log_error(("%s - strlcpy error",self->name));
225                                                }
226                                                event_match = true;                                                                     
227                                        }
228                                        else if(event_match && field_n == FLD_ID)
229                                        {       
230                                                TRY
231                                                {       
232                                                        if(self->job == NULL) /* wait_thread */
233                                                        {
234                                                                temp_job = self->session->get_job( self->session, field );
235                                                                pbsjob = (pbsdrmaa_job_t*) temp_job;
236
237                                                                if( temp_job )
238                                                                {
239                                                                        if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
240                                                                                fsd_log_error(("%s - strlcpy error",self->name));
241                                                                        }
242                                                                        fsd_log_debug(("%s - job_id: %s",self->name,job_id));
243                                                                        status.name = fsd_strdup(job_id);
244                                                                        job_id_match = true; /* job_id is in drmaa */   
245                                                                }
246                                                                else
247                                                                {
248                                                                        fsd_log_debug(("%s - Unknown job: %s", self->name,field));
249                                                                }
250                                                        }
251                                                        else /* job_on_missing */
252                                                        {
253                                                                int diff = -1;
254                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
255                                                                if( diff == 0)
256                                                                {
257                                                                        /* read this file to the place we started and exit*/
258                                                                        fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id));
259                                                                        job_found = true;
260                                                                        older_job_found = false;
261                                                                        self->run_flag = false;
262                                                                        job_id_match = true;
263                                                                        status.name = fsd_strdup(self->job->job_id);                                                                   
264                                                                }
265                                                                else if ( !job_found && diff >= 1)
266                                                                {
267                                                                        /* older job, find its beginning */
268                                                                        fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field));
269                                                                        older_job_found = true;
270                                                                        job_id_match = true;
271                                                                        status.name = fsd_strdup(self->job->job_id);
272                                                                }
273                                                                else  if( !job_found )
274                                                                {
275                                                                        fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field));
276                                                                }                                                               
277                                                        }
278                                                }
279                                                END_TRY
280                                        }
281                                        else if(job_id_match && field_n == FLD_MSG)
282                                        {                                               
283                                                /* parse msg - depends on FLD_EVENT */
284                                                struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem,
285                                                        struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name;       
286                                               
287                                                bool state_running = false;
288
289                                                memset(&struct_status,0,sizeof(struct attrl)); /**/
290                                                memset(&struct_state,0,sizeof(struct attrl));
291                                                memset(&struct_resource_cput,0,sizeof(struct attrl));
292                                                memset(&struct_resource_mem,0,sizeof(struct attrl));
293                                                memset(&struct_resource_vmem,0,sizeof(struct attrl));
294                                                memset(&struct_resource_walltime,0,sizeof(struct attrl));
295                                                memset(&struct_start_time,0,sizeof(struct attrl));
296                                                memset(&struct_mtime,0,sizeof(struct attrl));
297                                                memset(&struct_queue,0,sizeof(struct attrl));
298                                                memset(&struct_account_name,0,sizeof(struct attrl));
299                                                               
300                                                if (strcmp(event,FLD_MSG_STATE) == 0)
301                                                {
302                                                        /* job run, modified, queued etc */
303                                                        int n = 0;
304                                                        status.attribs = &struct_state;
305                                                        struct_state.next = NULL;
306                                                        struct_state.name = "job_state";
307                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
308                                                        {
309                                                                n = 4;
310                                                                if(older_job_found) /* job_on_missing - older job beginning - read this file and end */
311                                                                {
312                                                                        self->run_flag = false;
313                                                                        fsd_log_debug(("Job_on_missing found older job beginning"));
314                                                                        fsd_free(status.name);
315                                                                        break;
316                                                                }
317                                                        }               
318                                                        if(field[4] == 'M') { /* modified */
319                                                                struct tm temp_time_tm;
320                                                                memset(&temp_time_tm, 0, sizeof(temp_time_tm));
321                                                                temp_time_tm.tm_isdst = -1;
322
323                                                                if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
324                                                                 {
325                                                                        fsd_log_error(("failed to parse mtime: %s", temp_date));
326                                                                 }
327                                                                else
328                                                                 {
329                                                                        time_t temp_time = mktime(&temp_time_tm);
330                                                                        status.attribs = &struct_mtime;
331                                                                        struct_mtime.name = "mtime";
332                                                                        struct_mtime.next = NULL;
333                                                                        struct_mtime.value = fsd_asprintf("%lu",temp_time);
334                                                                 }
335                                                        }               
336                                                        /* != Job deleted and Job to be deleted*/
337                                                        #ifdef PBS_PROFESSIONAL
338                                                        else if (field[4] != 't' && field[10] != 'd') {
339                                                        #else           
340                                                        else if(field[4] != 'd') {
341                                                        #endif
342
343                                                                if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */
344                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"%s - Memory allocation wasn't possible",self->name);
345                                                                }
346                                                                if(struct_state.value[0] == 'R'){
347                                                                        state_running = true;
348                                                                }
349                                                        }
350                                                        else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
351                                                                struct_status.name = "exit_status";
352                                                                struct_status.value = fsd_strdup("-1");
353                                                                struct_status.next = NULL;
354                                                                struct_state.next = &struct_status;
355                                                                struct_state.value = fsd_strdup("C");                                                           
356                                                        }
357                                                }                                                   
358                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
359                                                {
360                                                        /* exit status and rusage */
361                                                        const char *ptr2 = field;
362                                                        char  msg[ 256 ] = "";
363                                                        int n2;
364                                                        int msg_field_n = 0;
365                                                       
366                                                        struct_resource_cput.name = "resources_used";
367                                                        struct_resource_mem.name = "resources_used";
368                                                        struct_resource_vmem.name = "resources_used";
369                                                        struct_resource_walltime.name = "resources_used";
370                                                        struct_status.name = "exit_status";
371                                                        struct_state.name = "job_state";
372                               
373                                                        status.attribs = &struct_resource_cput;
374                                                        struct_resource_cput.next = &struct_resource_mem;
375                                                        struct_resource_mem.next = &struct_resource_vmem;
376                                                        struct_resource_vmem.next = &struct_resource_walltime;
377                                                        struct_resource_walltime.next =  &struct_status;
378                                                        struct_status.next = &struct_state;
379                                                        struct_state.next = NULL;
380
381                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
382                                                         {                                             
383                                                                switch(msg_field_n)
384                                                                {
385                                                                        case FLD_MSG_EXIT_STATUS:
386                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
387                                                                                break;
388
389                                                                        case FLD_MSG_CPUT:
390                                                                                struct_resource_cput.resource = "cput";
391                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
392                                                                                break;
393
394                                                                        case FLD_MSG_MEM:
395                                                                                struct_resource_mem.resource = "mem";
396                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
397                                                                                break;
398
399                                                                        case FLD_MSG_VMEM:
400                                                                                struct_resource_vmem.resource = "vmem";
401                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
402                                                                                break;
403
404                                                                        case FLD_MSG_WALLTIME:
405                                                                                struct_resource_walltime.resource = "walltime";
406                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
407                                                                                break;
408                                                                }
409                                                             
410                                                                ptr2 += n2;
411                                                                msg_field_n++;
412                                                                if ( *ptr2 != ' ' )
413                                                                 {
414                                                                         break;
415                                                                 }
416                                                                ++ptr2;                                         
417                                                         }
418                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
419                                                }                                               
420                                                 
421                                                if(self->job == NULL) /* wait_thread */
422                                                {
423                                                        if ( state_running )
424                                                        {
425                                                                fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id ));
426                                                                temp_job->update_status( temp_job );
427                                                        }
428                                                        else
429                                                        {
430                                                                fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id ));                                                 
431                                                                pbsjob->update( temp_job, &status );
432                                                        }
433                                                 }
434                                                 else if( job_found ) /* job_on_missing */
435                                                 {
436                                                        fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id ));                                                       
437                                                        pbsjob->update( self->job, &status );
438                                                 }
439                                               
440                                                if(self->job == NULL)
441                                                {
442                                                        fsd_cond_broadcast( &temp_job->status_cond);
443                                                        fsd_cond_broadcast( &self->session->wait_condition );
444                                                }
445                                                if ( temp_job )
446                                                        temp_job->release( temp_job );
447       
448                                                fsd_free(struct_resource_cput.value);
449                                                fsd_free(struct_resource_mem.value);
450                                                fsd_free(struct_resource_vmem.value);
451                                                fsd_free(struct_resource_walltime.value);
452                                                fsd_free(struct_status.value);
453                                                fsd_free(struct_state.value);
454                                                fsd_free(struct_start_time.value);
455                                                fsd_free(struct_mtime.value);
456                                                fsd_free(struct_queue.value);
457                                                fsd_free(struct_account_name.value);
458
459                                                if ( status.name!=NULL )
460                                                        fsd_free(status.name);
461                                        }
462                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
463                                        {
464                                                log_event = true;                                       
465                                        }
466                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
467                                        {
468                                                log_match = true;
469                                                log_event = false;
470                                        }
471                                        else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0)
472                                        {
473                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name));
474                                                self->date_changed = true;
475                                                log_match = false;
476                                        }
477                                       
478                                        ptr += n;
479                                        if ( *ptr != ';' )
480                                        {
481                                                break; /* end of line */
482                                        }
483                                        field_n++;
484                                        ++ptr;
485                                }               
486
487                                fsd_free(temp_date);                   
488                        } /* end of while getline loop */                       
489                       
490                        if(self->job == NULL)
491                        {
492                                fsd_mutex_unlock( &self->session->mutex );                     
493                                usleep(1000000);
494                                fsd_mutex_lock( &self->session->mutex );       
495                        }
496                       
497                        if(self->job == NULL)
498                        {
499                                self->run_flag = self->session->wait_thread_run_flag;
500                        }
501                }               
502                EXCEPT_DEFAULT
503                 {
504                        const fsd_exc_t *e = fsd_exc_get();
505
506                        fsd_log_error(( "%s: <%d:%s>", self->name, e->code(e), e->message(e) ));
507                        fsd_exc_reraise();
508                 }
509                END_TRY
510
511                if(self->fd != -1)
512                        close(self->fd);
513                fsd_log_debug(("%s - Log file closed",self->name));     
514        }
515        FINALLY
516        {
517                fsd_log_debug(("%s - Terminated.",self->name));
518                if(self->job == NULL)
519                        fsd_mutex_unlock( &self->session->mutex ); /**/
520        }
521        END_TRY
522       
523        fsd_log_return((""));
524}
525
526void
527pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
528{
529        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
530       
531        if(self->date_changed)
532        {
533                char * log_path = NULL;
534                int num_tries = 0;
535                struct tm tm;
536               
537                fsd_log_enter((""));
538               
539                if(!self->first_open)
540                        time(&self->t);
541                else
542                        self->t = pbssession->log_file_initial_time;
543                       
544                localtime_r(&self->t,&tm);
545                               
546                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
547                /* generate new date, close file and open new */
548                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
549                                        pbssession->pbs_home,   
550                                        tm.tm_year + 1900,
551                                        tm.tm_mon + 1,
552                                        tm.tm_mday)) == NULL) {
553                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
554                }
555
556                if(self->fd != -1)
557                        close(self->fd);
558
559                fsd_log_debug(("Log file: %s",log_path));
560                               
561        retry:
562                if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
563                {
564                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
565                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
566                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
567                        pbssession->wait_thread_log = false;
568                        pbssession->super.wait_thread = pbssession->super_wait_thread;
569                        pbssession->super.wait_thread(self->session);
570                } else if ( self->fd == -1 ) {
571                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
572                        num_tries++;
573                        sleep(5);
574                        goto retry;
575                }
576
577                fsd_free(log_path);
578
579                fsd_log_debug(("Log file opened"));
580
581                if(self->first_open) {
582                        fsd_log_debug(("Log file lseek"));
583                        if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) {
584                                char errbuf[256] = "InternalError";
585                                (void)strerror_r(errno, errbuf, 256);
586                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
587                        }
588                        self->first_open = false;
589                }
590
591                self->date_changed = false;
592               
593                fsd_log_return((""));
594        }       
595}
596
597ssize_t
598pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
599{
600        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
601}
602
603/* reverse date compare*/
604int
605pbsdrmaa_date_compare(const void *a, const void *b)
606{
607   const char *ia = *(const char **) a;
608   const char *ib = *(const char **) b;
609   return strcmp(ib, ia);
610}
611
612void
613pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self )
614{
615        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;   
616       
617        char * log_path = NULL;
618        int num_tries = 0;
619        static int file_number = 0;
620        fsd_log_enter((""));
621               
622        if(self->first_open)
623        {                       
624                DIR *dp = NULL;         
625                char * path = NULL;
626                struct dirent *ep = NULL;
627               
628                if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL)
629                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
630               
631                self->log_files_number = 0;     
632                dp = opendir (path);
633
634                fsd_calloc(self->log_files,2,char*);
635       
636                if (dp != NULL)
637                {
638                        while ((ep = readdir (dp)))
639                        {
640                                self->log_files_number++;
641                                if(self->log_files_number > 2)
642                                        fsd_realloc(self->log_files,self->log_files_number,char *);
643                               
644                                self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name);
645                        }
646                        (void) closedir (dp);
647                }
648                else
649                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory");
650
651                qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare);
652               
653                if(self->log_files_number <= 2)
654                {
655                        self->run_flag = false;
656                        fsd_log_error(("Job_on_missing - No log files available"));
657                }
658               
659                self->first_open = false;
660                fsd_free(path);
661        }       
662        else /* check previous day*/
663        {
664                if(++file_number > self->log_files_number - 2)
665                        fsd_log_error(("Job_on_missing - All available log files checked"));
666                else
667                        fsd_log_debug(("Job_on_missing checking previous day"));
668               
669                self->run_flag = false;
670                pbsdrmaa_job_on_missing_standard( self->job );                         
671        }
672       
673        #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
674        if((log_path = fsd_asprintf("%s/server_logs/%s",
675                                pbssession->pbs_home,   
676                                self->log_files[file_number])) == NULL) {
677                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
678        }
679
680        if(self->fd != -1)
681                close(self->fd);
682
683        fsd_log_debug(("Log file: %s",log_path));
684                               
685retry:
686        if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
687        {
688                fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing"));
689                fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly"));
690                self->run_flag = false;
691                pbsdrmaa_job_on_missing_standard( self->job );                 
692        } else if ( self->fd == -1 ) {
693                fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
694                num_tries++;
695                sleep(5);
696                goto retry;
697        }
698        else
699        {
700                struct stat statbuf;
701                if(stat(log_path,&statbuf) == -1) {
702                                char errbuf[256] = "InternalError";
703                                (void)strerror_r(errno, errbuf, 256);
704                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
705                }
706                self->log_file_read_size = 0;
707                self->log_file_initial_size = statbuf.st_size;
708                fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size));
709        }
710
711        fsd_free(log_path);
712
713        fsd_log_debug(("Log file opened"));
714       
715        fsd_log_return((""));
716}
717
718ssize_t
719pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
720{
721        int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx);
722       
723        if(n >= 0)
724                self->log_file_read_size += n;
725               
726        if(self->log_file_read_size >= self->log_file_initial_size)
727                return -1;
728
729        return n;
730}
731
732int
733fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */
734{
735        int job1;
736        int job2;
737        char *rest = NULL;
738        char *token = NULL;
739        char *ptr = fsd_strdup(s1);
740        token = strtok_r(ptr, ".", &rest);
741        job1 = atoi(token);
742       
743        fsd_free(token);
744       
745        ptr = fsd_strdup(s2);
746        token = strtok_r(ptr,".",&rest);
747        job2 = atoi(token);
748       
749        fsd_free(token);
750        return job1 - job2;
751}
752
Note: See TracBrowser for help on using the repository browser.