source: trunk/pbs_drmaa/log_reader.c @ 18

Revision 18, 21.0 KB checked in by mmamonski, 13 years ago (diff)

log reader race condition causing hanging of application

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51static void
52pbsdrmaa_read_log();
53
54static void
55pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self);
56
57static ssize_t
58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
59
60static void
61pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self );
62
63static ssize_t
64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
65
66int
67fsd_job_id_cmp(const char *s1, const char *s2);
68
69int
70pbsdrmaa_date_compare(const void *a, const void *b) ;
71
72pbsdrmaa_log_reader_t *
73pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
74{
75        pbsdrmaa_log_reader_t *volatile self = NULL;
76
77        fsd_log_enter((""));
78        TRY
79        {
80                fsd_malloc(self, pbsdrmaa_log_reader_t );
81               
82                self->session = session;
83               
84                /* ~templete method pattern */
85                if(job != NULL) /* job on missing */
86                {
87                        self->job = job;
88                        self->name = "Job_on_missing";
89                        self->select_file = pbsdrmaa_select_file_job_on_missing;
90                        self->read_line = pbsdrmaa_read_line_job_on_missing;
91                }
92                else /* wait thread */
93                {
94                        self->job = NULL;
95                        self->name = "WT";
96                        self->select_file = pbsdrmaa_select_file_wait_thread;
97                        self->read_line = pbsdrmaa_read_line_wait_thread;
98                }               
99                self->read_log = pbsdrmaa_read_log;     
100               
101                self->log_files = NULL;
102                self->log_files_number = 0;
103               
104                self->run_flag = true;
105                self->fd = -1;
106                self->date_changed = true;
107                self->first_open = true;
108               
109                self->log_file_initial_size = 0;
110                self->log_file_read_size = 0;
111        }
112        EXCEPT_DEFAULT
113        {
114                if( self != NULL)
115                        fsd_free(self);
116                       
117                fsd_exc_reraise();
118        }
119        END_TRY
120        fsd_log_return((""));
121        return self;
122}
123
124void
125pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
126{
127        fsd_log_enter((""));
128        TRY
129        {
130                if(self != NULL)
131                {
132                        int i = -1;
133                        for(i = 0; i < self->log_files_number ; i++)
134                                fsd_free(self->log_files[i]);
135                        fsd_free(self->log_files);
136                        fsd_free(self);
137                }                       
138        }
139        EXCEPT_DEFAULT
140        {
141                fsd_exc_reraise();
142        }
143        END_TRY
144       
145        fsd_log_return((""));
146}
147
148enum field
149{
150        FLD_DATE = 0,
151        FLD_EVENT = 1,
152        FLD_OBJ = 2,
153        FLD_TYPE = 3,
154        FLD_ID = 4,
155        FLD_MSG = 5
156};
157
158enum field_msg
159{
160        FLD_MSG_EXIT_STATUS = 0,
161        FLD_MSG_CPUT = 1,
162        FLD_MSG_MEM = 2,
163        FLD_MSG_VMEM = 3,
164        FLD_MSG_WALLTIME = 4
165};
166
167#define FLD_MSG_STATUS "0010"
168#define FLD_MSG_STATE "0008"
169#define FLD_MSG_LOG "0002"
170
171void
172pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
173{
174        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
175        fsd_job_t *volatile temp_job = NULL;
176               
177        fsd_log_enter((""));
178       
179        if(self->job == NULL)
180                fsd_mutex_lock( &self->session->mutex );
181
182        TRY
183        {               
184                while( self->run_flag )
185                TRY
186                {
187                        char line[4096] = "";
188                        char buffer[4096] = "";
189                        int idx = 0, end_idx = 0, line_idx = 0;
190                       
191                        self->select_file(self);
192
193                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                         
194                        {
195                                const char *volatile ptr = line;
196                                char field[256] = "";
197                                char job_id[256] = "";
198                                char event[256] = "";
199                                int volatile field_n = 0;
200                                int n;
201                               
202                                bool volatile job_id_match = false;
203                                bool volatile event_match = false;
204                                bool volatile log_event = false;
205                                bool volatile log_match = false;
206                                bool volatile older_job_found = false;
207                                bool volatile job_found = false;
208                                char *  temp_date = NULL;
209                               
210                                struct batch_status status;
211                                status.next = NULL;
212
213                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */
214                                {
215                                        if(field_n == FLD_DATE)
216                                        {
217                                                temp_date = fsd_strdup(field);
218                                        }
219                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 ||
220                                                                    strcmp(field,FLD_MSG_STATE) == 0 ))
221                                        {
222                                                /* event described by log line*/
223                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event)) {
224                                                        fsd_log_error(("%s - strlcpy error",self->name));
225                                                }
226                                                event_match = true;                                                                     
227                                        }
228                                        else if(event_match && field_n == FLD_ID)
229                                        {       
230                                                TRY
231                                                {       
232                                                        if(self->job == NULL) /* wait_thread */
233                                                        {
234                                                                temp_job = self->session->get_job( self->session, field );
235                                                                pbsjob = (pbsdrmaa_job_t*) temp_job;
236
237                                                                if( temp_job )
238                                                                {
239                                                                        if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
240                                                                                fsd_log_error(("%s - strlcpy error",self->name));
241                                                                        }
242                                                                        fsd_log_debug(("%s - job_id: %s",self->name,job_id));
243                                                                        status.name = fsd_strdup(job_id);
244                                                                        job_id_match = true; /* job_id is in drmaa */   
245                                                                }
246                                                                else
247                                                                {
248                                                                        fsd_log_debug(("%s - Unknown job: %s", self->name,field));
249                                                                }
250                                                        }
251                                                        else /* job_on_missing */
252                                                        {
253                                                                int diff = -1;
254                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
255                                                                if( diff == 0)
256                                                                {
257                                                                        /* read this file to the place we started and exit*/
258                                                                        fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id));
259                                                                        job_found = true;
260                                                                        older_job_found = false;
261                                                                        self->run_flag = false;
262                                                                        job_id_match = true;
263                                                                        status.name = fsd_strdup(self->job->job_id);                                                                   
264                                                                }
265                                                                else if ( !job_found && diff >= 1)
266                                                                {
267                                                                        /* older job, find its beginning */
268                                                                        fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field));
269                                                                        older_job_found = true;
270                                                                        job_id_match = true;
271                                                                        status.name = fsd_strdup(self->job->job_id);
272                                                                }
273                                                                else  if( !job_found )
274                                                                {
275                                                                        fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field));
276                                                                }                                                               
277                                                        }
278                                                }
279                                                END_TRY
280                                        }
281                                        else if(job_id_match && field_n == FLD_MSG)
282                                        {                                               
283                                                /* parse msg - depends on FLD_EVENT */
284                                                struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem,
285                                                        struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name;       
286                                               
287                                                bool state_running = false;
288
289                                                memset(&struct_status,0,sizeof(struct attrl)); /**/
290                                                memset(&struct_state,0,sizeof(struct attrl));
291                                                memset(&struct_resource_cput,0,sizeof(struct attrl));
292                                                memset(&struct_resource_mem,0,sizeof(struct attrl));
293                                                memset(&struct_resource_vmem,0,sizeof(struct attrl));
294                                                memset(&struct_resource_walltime,0,sizeof(struct attrl));
295                                                memset(&struct_start_time,0,sizeof(struct attrl));
296                                                memset(&struct_mtime,0,sizeof(struct attrl));
297                                                memset(&struct_queue,0,sizeof(struct attrl));
298                                                memset(&struct_account_name,0,sizeof(struct attrl));
299                                                               
300                                                if (strcmp(event,FLD_MSG_STATE) == 0)
301                                                {
302                                                        /* job run, modified, queued etc */
303                                                        int n = 0;
304                                                        status.attribs = &struct_state;
305                                                        struct_state.next = NULL;
306                                                        struct_state.name = "job_state";
307                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
308                                                        {
309                                                                n = 4;
310                                                                if(older_job_found) /* job_on_missing - older job beginning - read this file and end */
311                                                                {
312                                                                        self->run_flag = false;
313                                                                        fsd_log_debug(("Job_on_missing found older job beginning"));
314                                                                        fsd_free(status.name);
315                                                                        break;
316                                                                }
317                                                        }               
318                                                        if(field[4] == 'M') { /* modified */
319                                                                struct tm temp_time_tm;
320                                                                memset(&temp_time_tm, 0, sizeof(temp_time_tm));
321                                                                temp_time_tm.tm_isdst = -1;
322
323                                                                if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
324                                                                 {
325                                                                        fsd_log_error(("failed to parse mtime: %s", temp_date));
326                                                                 }
327                                                                else
328                                                                 {
329                                                                        time_t temp_time = mktime(&temp_time_tm);
330                                                                        status.attribs = &struct_mtime;
331                                                                        struct_mtime.name = "mtime";
332                                                                        struct_mtime.next = NULL;
333                                                                        struct_mtime.value = fsd_asprintf("%lu",temp_time);
334                                                                 }
335                                                        }               
336                                                        /* != Job deleted and Job to be deleted*/
337                                                        #ifdef PBS_PROFESSIONAL
338                                                        else if (field[4] != 't' && field[10] != 'd') {
339                                                        #else           
340                                                        else if(field[4] != 'd') {
341                                                        #endif
342
343                                                                if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */
344                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"%s - Memory allocation wasn't possible",self->name);
345                                                                }
346                                                                if(struct_state.value[0] == 'R'){
347                                                                        state_running = true;
348                                                                }
349                                                        }
350                                                        else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
351                                                                struct_status.name = "exit_status";
352                                                                struct_status.value = fsd_strdup("-1");
353                                                                struct_status.next = NULL;
354                                                                struct_state.next = &struct_status;
355                                                                struct_state.value = fsd_strdup("C");                                                           
356                                                        }
357                                                }                                                   
358                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
359                                                {
360                                                        /* exit status and rusage */
361                                                        const char *ptr2 = field;
362                                                        char  msg[ 256 ] = "";
363                                                        int n2;
364                                                        int msg_field_n = 0;
365                                                       
366                                                        struct_resource_cput.name = "resources_used";
367                                                        struct_resource_mem.name = "resources_used";
368                                                        struct_resource_vmem.name = "resources_used";
369                                                        struct_resource_walltime.name = "resources_used";
370                                                        struct_status.name = "exit_status";
371                                                        struct_state.name = "job_state";
372                               
373                                                        status.attribs = &struct_resource_cput;
374                                                        struct_resource_cput.next = &struct_resource_mem;
375                                                        struct_resource_mem.next = &struct_resource_vmem;
376                                                        struct_resource_vmem.next = &struct_resource_walltime;
377                                                        struct_resource_walltime.next =  &struct_status;
378                                                        struct_status.next = &struct_state;
379                                                        struct_state.next = NULL;
380
381                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
382                                                         {                                             
383                                                                switch(msg_field_n)
384                                                                {
385                                                                        case FLD_MSG_EXIT_STATUS:
386                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
387                                                                                break;
388
389                                                                        case FLD_MSG_CPUT:
390                                                                                struct_resource_cput.resource = "cput";
391                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
392                                                                                break;
393
394                                                                        case FLD_MSG_MEM:
395                                                                                struct_resource_mem.resource = "mem";
396                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
397                                                                                break;
398
399                                                                        case FLD_MSG_VMEM:
400                                                                                struct_resource_vmem.resource = "vmem";
401                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
402                                                                                break;
403
404                                                                        case FLD_MSG_WALLTIME:
405                                                                                struct_resource_walltime.resource = "walltime";
406                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
407                                                                                break;
408                                                                }
409                                                             
410                                                                ptr2 += n2;
411                                                                msg_field_n++;
412                                                                if ( *ptr2 != ' ' )
413                                                                 {
414                                                                         break;
415                                                                 }
416                                                                ++ptr2;                                         
417                                                         }
418                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
419                                                }                                               
420                                                 
421                                                if(self->job == NULL) /* wait_thread */
422                                                {
423                                                        if ( state_running )
424                                                        {
425                                                                fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id ));
426                                                                TRY
427                                                                {
428                                                                        temp_job->update_status( temp_job );
429                                                                }
430                                                                EXCEPT_DEFAULT
431                                                                {
432                                                                        /*TODO: distinguish between invalid job and internal errors */
433                                                                        fsd_log_debug(("Job finished just after entering running state: %s", temp_job->job_id));
434                                                                }
435                                                                END_TRY
436                                                        }
437                                                        else
438                                                        {
439                                                                fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id ));                                                 
440                                                                pbsjob->update( temp_job, &status );
441                                                        }
442                                                 }
443                                                 else if( job_found ) /* job_on_missing */
444                                                 {
445                                                        fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id ));                                                       
446                                                        pbsjob->update( self->job, &status );
447                                                 }
448                                               
449                                                if(self->job == NULL)
450                                                {
451                                                        fsd_cond_broadcast( &temp_job->status_cond);
452                                                        fsd_cond_broadcast( &self->session->wait_condition );
453                                                }
454                                                if ( temp_job )
455                                                        temp_job->release( temp_job );
456       
457                                                fsd_free(struct_resource_cput.value);
458                                                fsd_free(struct_resource_mem.value);
459                                                fsd_free(struct_resource_vmem.value);
460                                                fsd_free(struct_resource_walltime.value);
461                                                fsd_free(struct_status.value);
462                                                fsd_free(struct_state.value);
463                                                fsd_free(struct_start_time.value);
464                                                fsd_free(struct_mtime.value);
465                                                fsd_free(struct_queue.value);
466                                                fsd_free(struct_account_name.value);
467
468                                                if ( status.name!=NULL )
469                                                        fsd_free(status.name);
470                                        }
471                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
472                                        {
473                                                log_event = true;                                       
474                                        }
475                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
476                                        {
477                                                log_match = true;
478                                                log_event = false;
479                                        }
480                                        else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0)
481                                        {
482                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name));
483                                                self->date_changed = true;
484                                                log_match = false;
485                                        }
486                                       
487                                        ptr += n;
488                                        if ( *ptr != ';' )
489                                        {
490                                                break; /* end of line */
491                                        }
492                                        field_n++;
493                                        ++ptr;
494                                }               
495
496                                fsd_free(temp_date);                   
497                        } /* end of while getline loop */                       
498                       
499                        if(self->job == NULL)
500                        {
501                                fsd_mutex_unlock( &self->session->mutex );                     
502                                usleep(1000000);
503                                fsd_mutex_lock( &self->session->mutex );       
504                        }
505                       
506                        if(self->job == NULL)
507                        {
508                                self->run_flag = self->session->wait_thread_run_flag;
509                        }
510                }               
511                EXCEPT_DEFAULT
512                 {
513                        const fsd_exc_t *e = fsd_exc_get();
514                        /* Its better to exit and communicate error rather then let the application to hang */
515                        fsd_log_fatal(( "Exception in wait thread %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
516                        exit(1);
517                 }
518                END_TRY
519
520                if(self->fd != -1)
521                        close(self->fd);
522                fsd_log_debug(("%s - Log file closed",self->name));     
523        }
524        FINALLY
525        {
526                fsd_log_debug(("%s - Terminated.",self->name));
527                if(self->job == NULL)
528                        fsd_mutex_unlock( &self->session->mutex ); /**/
529        }
530        END_TRY
531       
532        fsd_log_return((""));
533}
534
535void
536pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
537{
538        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
539       
540        if(self->date_changed)
541        {
542                char * log_path = NULL;
543                int num_tries = 0;
544                struct tm tm;
545               
546                fsd_log_enter((""));
547               
548                if(!self->first_open)
549                        time(&self->t);
550                else
551                        self->t = pbssession->log_file_initial_time;
552                       
553                localtime_r(&self->t,&tm);
554                               
555                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
556                /* generate new date, close file and open new */
557                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
558                                        pbssession->pbs_home,   
559                                        tm.tm_year + 1900,
560                                        tm.tm_mon + 1,
561                                        tm.tm_mday)) == NULL) {
562                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
563                }
564
565                if(self->fd != -1)
566                        close(self->fd);
567
568                fsd_log_debug(("Log file: %s",log_path));
569                               
570        retry:
571                if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
572                {
573                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
574                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
575                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
576                        pbssession->wait_thread_log = false;
577                        pbssession->super.wait_thread = pbssession->super_wait_thread;
578                        pbssession->super.wait_thread(self->session);
579                } else if ( self->fd == -1 ) {
580                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
581                        num_tries++;
582                        sleep(5);
583                        goto retry;
584                }
585
586                fsd_free(log_path);
587
588                fsd_log_debug(("Log file opened"));
589
590                if(self->first_open) {
591                        fsd_log_debug(("Log file lseek"));
592                        if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) {
593                                char errbuf[256] = "InternalError";
594                                (void)strerror_r(errno, errbuf, 256);
595                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
596                        }
597                        self->first_open = false;
598                }
599
600                self->date_changed = false;
601               
602                fsd_log_return((""));
603        }       
604}
605
606ssize_t
607pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
608{
609        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
610}
611
612/* reverse date compare*/
613int
614pbsdrmaa_date_compare(const void *a, const void *b)
615{
616   const char *ia = *(const char **) a;
617   const char *ib = *(const char **) b;
618   return strcmp(ib, ia);
619}
620
621void
622pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self )
623{
624        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;   
625       
626        char * log_path = NULL;
627        int num_tries = 0;
628        static int file_number = 0;
629        fsd_log_enter((""));
630               
631        if(self->first_open)
632        {                       
633                DIR *dp = NULL;         
634                char * path = NULL;
635                struct dirent *ep = NULL;
636               
637                if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL)
638                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
639               
640                self->log_files_number = 0;     
641                dp = opendir (path);
642
643                fsd_calloc(self->log_files,2,char*);
644       
645                if (dp != NULL)
646                {
647                        while ((ep = readdir (dp)))
648                        {
649                                self->log_files_number++;
650                                if(self->log_files_number > 2)
651                                        fsd_realloc(self->log_files,self->log_files_number,char *);
652                               
653                                self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name);
654                        }
655                        (void) closedir (dp);
656                }
657                else
658                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory");
659
660                qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare);
661               
662                if(self->log_files_number <= 2)
663                {
664                        self->run_flag = false;
665                        fsd_log_error(("Job_on_missing - No log files available"));
666                }
667               
668                self->first_open = false;
669                fsd_free(path);
670        }       
671        else /* check previous day*/
672        {
673                if(++file_number > self->log_files_number - 2)
674                        fsd_log_error(("Job_on_missing - All available log files checked"));
675                else
676                        fsd_log_debug(("Job_on_missing checking previous day"));
677               
678                self->run_flag = false;
679                pbsdrmaa_job_on_missing_standard( self->job );                         
680        }
681       
682        #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
683        if((log_path = fsd_asprintf("%s/server_logs/%s",
684                                pbssession->pbs_home,   
685                                self->log_files[file_number])) == NULL) {
686                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
687        }
688
689        if(self->fd != -1)
690                close(self->fd);
691
692        fsd_log_debug(("Log file: %s",log_path));
693                               
694retry:
695        if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
696        {
697                fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing"));
698                fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly"));
699                self->run_flag = false;
700                pbsdrmaa_job_on_missing_standard( self->job );                 
701        } else if ( self->fd == -1 ) {
702                fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
703                num_tries++;
704                sleep(5);
705                goto retry;
706        }
707        else
708        {
709                struct stat statbuf;
710                if(stat(log_path,&statbuf) == -1) {
711                                char errbuf[256] = "InternalError";
712                                (void)strerror_r(errno, errbuf, 256);
713                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
714                }
715                self->log_file_read_size = 0;
716                self->log_file_initial_size = statbuf.st_size;
717                fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size));
718        }
719
720        fsd_free(log_path);
721
722        fsd_log_debug(("Log file opened"));
723       
724        fsd_log_return((""));
725}
726
727ssize_t
728pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
729{
730        int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx);
731       
732        if(n >= 0)
733                self->log_file_read_size += n;
734               
735        if(self->log_file_read_size >= self->log_file_initial_size)
736                return -1;
737
738        return n;
739}
740
741int
742fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */
743{
744        int job1;
745        int job2;
746        char *rest = NULL;
747        char *token = NULL;
748        char *ptr = fsd_strdup(s1);
749        token = strtok_r(ptr, ".", &rest);
750        job1 = atoi(token);
751       
752        fsd_free(token);
753       
754        ptr = fsd_strdup(s2);
755        token = strtok_r(ptr,".",&rest);
756        job2 = atoi(token);
757       
758        fsd_free(token);
759        return job1 - job2;
760}
761
Note: See TracBrowser for help on using the repository browser.