source: trunk/pbs_drmaa/log_reader.c @ 7

Revision 7, 21.6 KB checked in by mmamonski, 13 years ago (diff)

lost updates...

RevLine 
[7]1/* $Id: log_reader.c 323 2010-09-21 21:31:29Z mmatloka $ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51static void
52pbsdrmaa_read_log();
53
54static void
55pbsdrmaa_chose_file_wait_thread ( pbsdrmaa_log_reader_t * self);
56
57static ssize_t
58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size );
59
60static void
61pbsdrmaa_chose_file_job_on_missing ( pbsdrmaa_log_reader_t * self );
62
63static ssize_t
64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size );
65
66int
67fsd_job_id_cmp(const char *s1, const char *s2);
68
69int
70pbsdrmaa_date_compare(const void *a, const void *b) ;
71
72pbsdrmaa_log_reader_t *
73pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
74{
75        pbsdrmaa_log_reader_t *volatile self = NULL;
76
77        fsd_log_enter((""));
78        TRY
79        {
80                fsd_malloc(self, pbsdrmaa_log_reader_t );
81               
82                self->session = session;
83               
84                /* ~templete method pattern */
85                if(job != NULL) /* job on missing */
86                {
87                        self->job = job;
88                        self->name = "Job_on_missing";
89                        self->chose_file = pbsdrmaa_chose_file_job_on_missing;
90                        self->read_line = pbsdrmaa_read_line_job_on_missing;
91                }
92                else /* wait thread */
93                {
94                        self->job = NULL;
95                        self->name = "WT";
96                        self->chose_file = pbsdrmaa_chose_file_wait_thread;
97                        self->read_line = pbsdrmaa_read_line_wait_thread;
98                }               
99                self->read_log = pbsdrmaa_read_log;     
100               
101                self->log_files = NULL;
102                self->log_files_number = 0;
103               
104                self->run_flag = true;
105                self->fd = -1;
106                self->date_changed = true;
107                self->first_open = true;
108               
109                self->log_file_initial_size = 0;
110                self->log_file_read_size = 0;
111        }
112        EXCEPT_DEFAULT
113        {
114                if( self != NULL)
115                        fsd_free(self);
116                       
117                fsd_exc_reraise();
118        }
119        END_TRY
120        fsd_log_return((""));
121        return self;
122}
123
124void
125pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
126{
127        fsd_log_enter((""));
128        TRY
129        {
130                if(self != NULL)
131                {
132                        int i = -1;
133                        for(i = 0; i < self->log_files_number ; i++)
134                                fsd_free(self->log_files[i]);
135                        fsd_free(self->log_files);
136                        fsd_free(self);
137                }                       
138        }
139        EXCEPT_DEFAULT
140        {
141                fsd_exc_reraise();
142        }
143        END_TRY
144       
145        fsd_log_return((""));
146}
147
148enum field
149{
150        FLD_DATE = 0,
151        FLD_EVENT = 1,
152        FLD_OBJ = 2,
153        FLD_TYPE = 3,
154        FLD_ID = 4,
155        FLD_MSG = 5
156};
157
158enum field_msg
159{
160        FLD_MSG_EXIT_STATUS = 0,
161        FLD_MSG_CPUT = 1,
162        FLD_MSG_MEM = 2,
163        FLD_MSG_VMEM = 3,
164        FLD_MSG_WALLTIME = 4
165};
166
167#define FLD_MSG_STATUS "0010"
168#define FLD_MSG_STATE "0008"
169#define FLD_MSG_LOG "0002"
170
171void
172pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
173{
174        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
175        fsd_job_t *volatile temp_job = NULL;
176               
177        fsd_log_enter((""));
178       
179        if(self->job == NULL)
180                fsd_mutex_lock( &self->session->mutex );
181        /*else
182                fsd_mutex_lock( &self->job->mutex );*/
183        TRY
184        {               
185                while( self->run_flag )
186                TRY
187                {
188                        char buffer[4096] = "";         
189                       
190                        self->chose_file(self);
191
192                        while ((self->read_line(self, buffer, sizeof(buffer))) > 0)                     
193                        {
194                                const char *volatile ptr = buffer;
195                                char field[256] = "";
196                                char job_id[256] = "";
197                                char event[256] = "";
198                                int volatile field_n = 0;
199                                int n;
200
201                                bool volatile job_id_match = false;
202                                bool volatile event_match = false;
203                                bool volatile log_event = false;
204                                bool volatile log_match = false;
205                                bool volatile older_job_found = false;
206                                bool volatile job_found = false;
207                                char *  temp_date = NULL;
208                               
209                                struct batch_status status;
210                                status.next = NULL;
211
212                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */
213                                {
214                                        if(field_n == FLD_DATE)
215                                        {
216                                                temp_date = fsd_strdup(field);
217                                        }
218                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 ||
219                                                                    strcmp(field,FLD_MSG_STATE) == 0 ))
220                                        {
221                                                /* event described by log line*/
222                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event)) {
223                                                        fsd_log_error(("%s - strlcpy error",self->name));
224                                                }
225                                                event_match = true;                                                                     
226                                        }
227                                        else if(event_match && field_n == FLD_ID)
228                                        {       
229                                                TRY
230                                                {       
231                                                        if(self->job == NULL) /* wait_thread */
232                                                        {
233                                                                temp_job = self->session->get_job( self->session, field );
234                                                                pbsjob = (pbsdrmaa_job_t*) temp_job;
235
236                                                                if( temp_job )
237                                                                {
238                                                                        if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
239                                                                                fsd_log_error(("%s - strlcpy error",self->name));
240                                                                        }
241                                                                        fsd_log_debug(("%s - job_id: %s",self->name,job_id));
242                                                                        status.name = fsd_strdup(job_id);
243                                                                        job_id_match = true; /* job_id is in drmaa */   
244                                                                }
245                                                                else
246                                                                {
247                                                                        fsd_log_debug(("%s - Unknown job: %s", self->name,field));
248                                                                }
249                                                        }
250                                                        else /* job_on_missing */
251                                                        {
252                                                                int diff = -1;
253                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
254                                                                if( diff == 0)
255                                                                {
256                                                                        /* read this file to the place we started and exit*/
257                                                                        fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id));
258                                                                        job_found = true;
259                                                                        older_job_found = false;
260                                                                        self->run_flag = false;
261                                                                        job_id_match = true;
262                                                                        status.name = fsd_strdup(self->job->job_id);                                                                   
263                                                                }
264                                                                else if ( !job_found && diff >= 1)
265                                                                {
266                                                                        /* older job, find its beginning */
267                                                                        fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field));
268                                                                        older_job_found = true;
269                                                                        job_id_match = true;
270                                                                        status.name = fsd_strdup(self->job->job_id);
271                                                                }
272                                                                else  if( !job_found )
273                                                                {
274                                                                        fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field));
275                                                                }                                                               
276                                                        }
277                                                }
278                                                END_TRY
279                                        }
280                                        else if(job_id_match && field_n == FLD_MSG)
281                                        {                                               
282                                                /* parse msg - depends on FLD_EVENT */
283                                                struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem,
284                                                        struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name;       
285                                               
286                                                bool state_running = false;
287
288                                                struct_status.name = NULL;
289                                                struct_status.value = NULL;
290                                                struct_status.next = NULL;
291                                                struct_status.resource = NULL;
292
293                                                struct_state.name = NULL;
294                                                struct_state.value = NULL;
295                                                struct_state.next = NULL;
296                                                struct_state.resource = NULL;
297
298                                                struct_resource_cput.name = NULL;
299                                                struct_resource_cput.value = NULL;
300                                                struct_resource_cput.next = NULL;
301                                                struct_resource_cput.resource = NULL;
302
303                                                struct_resource_mem.name = NULL;
304                                                struct_resource_mem.value = NULL;
305                                                struct_resource_mem.next = NULL;
306                                                struct_resource_mem.resource = NULL;
307
308                                                struct_resource_vmem.name = NULL;
309                                                struct_resource_vmem.value = NULL;
310                                                struct_resource_vmem.next = NULL;
311                                                struct_resource_vmem.resource = NULL;
312
313                                                struct_resource_walltime.name = NULL;
314                                                struct_resource_walltime.value = NULL;
315                                                struct_resource_walltime.next = NULL;
316                                                struct_resource_walltime.resource = NULL;
317
318                                                struct_start_time.name = NULL;
319                                                struct_start_time.value = NULL;
320                                                struct_start_time.next = NULL;
321                                                struct_start_time.resource = NULL;
322
323                                                struct_mtime.name = NULL;
324                                                struct_mtime.value = NULL;
325                                                struct_mtime.next = NULL;
326                                                struct_mtime.resource = NULL;
327
328                                                struct_queue.name = NULL;
329                                                struct_queue.value = NULL;
330                                                struct_queue.next = NULL;
331                                                struct_queue.resource = NULL;
332
333                                                struct_account_name.name = NULL;
334                                                struct_account_name.value = NULL;
335                                                struct_account_name.next = NULL;
336                                                struct_account_name.resource = NULL;
337                                                               
338                                                if (strcmp(event,FLD_MSG_STATE) == 0)
339                                                {
340                                                        /* job run, modified, queued etc */
341                                                        int n = 0;
342                                                        status.attribs = &struct_state;
343                                                        struct_state.next = NULL;
344                                                        struct_state.name = "job_state";
345                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
346                                                        {
347                                                                n = 4;
348                                                                if(older_job_found) /* job_on_missing - older job beginning - read this file and end */
349                                                                {
350                                                                        self->run_flag = false;
351                                                                        fsd_log_debug(("Job_on_missing found older job beginning"));
352                                                                        fsd_free(status.name);
353                                                                        break;
354                                                                }
355                                                        }               
356                                                        if(field[4] == 'M') { /* modified */
357                                                                struct tm temp_time_tm;
358                                                                memset(&temp_time_tm, 0, sizeof(temp_time_tm));
359                                                                temp_time_tm.tm_isdst = -1;
360
361                                                                if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
362                                                                 {
363                                                                        fsd_log_error(("failed to parse mtime: %s", temp_date));
364                                                                 }
365                                                                else
366                                                                 {
367                                                                        time_t temp_time = mktime(&temp_time_tm);
368                                                                        status.attribs = &struct_mtime;
369                                                                        struct_mtime.name = "mtime";
370                                                                        struct_mtime.next = NULL;
371                                                                        struct_mtime.value = fsd_asprintf("%lu",temp_time);
372                                                                 }
373                                                        }               
374                                                        /* != Job deleted and Job to be deleted*/
375                                                        #ifdef PBS_PROFESSIONAL
376                                                        else if (field[4] != 't' && field[10] != 'd') {
377                                                        #else           
378                                                        else if(field[4] != 'd') {
379                                                        #endif
380
381                                                                if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */
382                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"%s - Memory allocation wasn't possible",self->name);
383                                                                }
384                                                                if(struct_state.value[0] == 'R'){
385                                                                        state_running = true;
386                                                                }
387                                                        }
388                                                        else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
389                                                                struct_status.name = "exit_status";
390                                                                struct_status.value = fsd_strdup("-1");
391                                                                struct_status.next = NULL;
392                                                                struct_state.next = &struct_status;
393                                                                struct_state.value = fsd_strdup("C");                                                           
394                                                        }
395                                                }                                                   
396                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
397                                                {
398                                                        /* exit status and rusage */
399                                                        const char *ptr2 = field;
400                                                        char  msg[ 256 ] = "";
401                                                        int n2;
402                                                        int msg_field_n = 0;
403                                                       
404                                                        struct_resource_cput.name = "resources_used";
405                                                        struct_resource_mem.name = "resources_used";
406                                                        struct_resource_vmem.name = "resources_used";
407                                                        struct_resource_walltime.name = "resources_used";
408                                                        struct_status.name = "exit_status";
409                                                        struct_state.name = "job_state";
410                               
411                                                        status.attribs = &struct_resource_cput;
412                                                        struct_resource_cput.next = &struct_resource_mem;
413                                                        struct_resource_mem.next = &struct_resource_vmem;
414                                                        struct_resource_vmem.next = &struct_resource_walltime;
415                                                        struct_resource_walltime.next =  &struct_status;
416                                                        struct_status.next = &struct_state;
417                                                        struct_state.next = NULL;
418
419                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
420                                                         {                                             
421                                                                switch(msg_field_n)
422                                                                {
423                                                                        case FLD_MSG_EXIT_STATUS:
424                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
425                                                                                break;
426
427                                                                        case FLD_MSG_CPUT:
428                                                                                struct_resource_cput.resource = "cput";
429                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
430                                                                                break;
431
432                                                                        case FLD_MSG_MEM:
433                                                                                struct_resource_mem.resource = "mem";
434                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
435                                                                                break;
436
437                                                                        case FLD_MSG_VMEM:
438                                                                                struct_resource_vmem.resource = "vmem";
439                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
440                                                                                break;
441
442                                                                        case FLD_MSG_WALLTIME:
443                                                                                struct_resource_walltime.resource = "walltime";
444                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
445                                                                                break;
446                                                                }
447                                                             
448                                                                ptr2 += n2;
449                                                                msg_field_n++;
450                                                                if ( *ptr2 != ' ' )
451                                                                 {
452                                                                         break;
453                                                                 }
454                                                                ++ptr2;                                         
455                                                         }
456                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
457                                                }                                               
458                                                 
459                                                if(self->job == NULL) /* wait_thread */
460                                                {
461                                                        if ( state_running )
462                                                        {
463                                                                fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id ));
464                                                                temp_job->update_status( temp_job );
465                                                        }
466                                                        else
467                                                        {
468                                                                fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id ));                                                 
469                                                                pbsjob->update( temp_job, &status );
470                                                        }
471                                                 }
472                                                 else if( job_found ) /* job_on_missing */
473                                                 {
474                                                        fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id ));                                                       
475                                                        pbsjob->update( self->job, &status );
476                                                 }
477                                               
478                                                if(self->job == NULL)
479                                                {
480                                                        fsd_cond_broadcast( &temp_job->status_cond);
481                                                        fsd_cond_broadcast( &self->session->wait_condition );
482                                                }
483                                                if ( temp_job )
484                                                        temp_job->release( temp_job );
485       
486                                                fsd_free(struct_resource_cput.value);
487                                                fsd_free(struct_resource_mem.value);
488                                                fsd_free(struct_resource_vmem.value);
489                                                fsd_free(struct_resource_walltime.value);
490                                                fsd_free(struct_status.value);
491                                                fsd_free(struct_state.value);
492                                                fsd_free(struct_start_time.value);
493                                                fsd_free(struct_mtime.value);
494                                                fsd_free(struct_queue.value);
495                                                fsd_free(struct_account_name.value);
496
497                                                if ( status.name!=NULL )
498                                                        fsd_free(status.name);
499                                        }
500                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
501                                        {
502                                                log_event = true;                                       
503                                        }
504                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
505                                        {
506                                                log_match = true;
507                                                log_event = false;
508                                        }
509                                        else if( self->job == NULL && log_match && field_n == FLD_MSG &&
510                                                field[0] == 'L' &&
511                                                field[1] == 'o' &&
512                                                field[2] == 'g' &&
513                                                field[3] == ' ' &&
514                                                field[4] == 'c' &&
515                                                field[5] == 'l' &&
516                                                field[6] == 'o' &&
517                                                field[7] == 's' &&
518                                                field[8] == 'e' &&
519                                                field[9] == 'd' )  /* last field in the file - strange bahaviour*/
520                                        {
521                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name));
522                                                self->date_changed = true;
523                                                log_match = false;
524                                        }
525                                       
526                                        ptr += n;
527                                        if ( *ptr != ';' )
528                                        {
529                                                break; /* end of line */
530                                        }
531                                        field_n++;
532                                        ++ptr;
533                                }               
534
535                                if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) {
536                                        fsd_log_error(("%s - strlcpy error",self->name));
537                                }
538
539                                fsd_free(temp_date);                   
540                        } /* end of while getline loop */                       
541                       
542                        if(self->job == NULL)
543                        {
544                                fsd_mutex_unlock( &self->session->mutex );                     
545                                usleep(1000000);
546                                fsd_mutex_lock( &self->session->mutex );       
547                        }
548                       
549                        if(self->job == NULL)
550                        {
551                                self->run_flag = self->session->wait_thread_run_flag;
552                        }
553                }               
554                EXCEPT_DEFAULT
555                 {
556                        const fsd_exc_t *e = fsd_exc_get();
557
558                        fsd_log_error(( "%s: <%d:%s>", self->name, e->code(e), e->message(e) ));
559                        fsd_exc_reraise();
560                 }
561                END_TRY
562
563                if(self->fd != -1)
564                        close(self->fd);
565                fsd_log_debug(("%s - Log file closed",self->name));     
566        }
567        FINALLY
568        {
569                fsd_log_debug(("%s - Terminated.",self->name));
570                if(self->job == NULL)
571                        fsd_mutex_unlock( &self->session->mutex ); /**/
572        }
573        END_TRY
574       
575        fsd_log_return((""));
576}
577
578void
579pbsdrmaa_chose_file_wait_thread ( pbsdrmaa_log_reader_t * self )
580{
581        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
582       
583        if(self->date_changed)
584        {
585                char * log_path = NULL;
586                int num_tries = 0;
587                struct tm tm;
588               
589                fsd_log_enter((""));
590               
591                if(!self->first_open)
592                        time(&self->t);
593                else
594                        self->t = pbssession->log_file_initial_time;
595                       
596                localtime_r(&self->t,&tm);
597                               
598                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
599                /* generate new date, close file and open new */
600                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
601                                        pbssession->pbs_home,   
602                                        tm.tm_year + 1900,
603                                        tm.tm_mon + 1,
604                                        tm.tm_mday)) == NULL) {
605                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
606                }
607
608                if(self->fd != -1)
609                        close(self->fd);
610
611                fsd_log_debug(("Log file: %s",log_path));
612                               
613        retry:
614                if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
615                {
616                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
617                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
618                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
619                        pbssession->wait_thread_log = false;
620                        pbssession->super.wait_thread = pbssession->super_wait_thread;
621                        pbssession->super.wait_thread(self->session);
622                } else if ( self->fd == -1 ) {
623                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
624                        num_tries++;
625                        sleep(5);
626                        goto retry;
627                }
628
629                fsd_free(log_path);
630
631                fsd_log_debug(("Log file opened"));
632
633                if(self->first_open) {
634                        fsd_log_debug(("Log file lseek"));
635                        if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) {
636                                char errbuf[256] = "InternalError";
637                                (void)strerror_r(errno, errbuf, 256);
638                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
639                        }
640                        self->first_open = false;
641                }
642
643                self->date_changed = false;
644               
645                fsd_log_return((""));
646        }       
647}
648
649ssize_t
650pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size )
651{
652        return fsd_getline(buffer,size,self->fd);
653}
654
655/* reverse date compare*/
656int
657pbsdrmaa_date_compare(const void *a, const void *b)
658{
659   const char *ia = *(const char **) a;
660   const char *ib = *(const char **) b;
661   return strcmp(ib, ia);
662}
663
664void
665pbsdrmaa_chose_file_job_on_missing( pbsdrmaa_log_reader_t * self )
666{
667        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;   
668       
669        char * log_path = NULL;
670        int num_tries = 0;
671        static int file_number = 0;
672        fsd_log_enter((""));
673               
674        if(self->first_open)
675        {                       
676                DIR *dp = NULL;         
677                char * path = NULL;
678                struct dirent *ep = NULL;
679               
680                if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL)
681                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
682               
683                self->log_files_number = 0;     
684                dp = opendir (path);
685
686                fsd_calloc(self->log_files,2,char*);
687       
688                if (dp != NULL)
689                {
690                        while ((ep = readdir (dp)))
691                        {
692                                self->log_files_number++;
693                                if(self->log_files_number > 2)
694                                        fsd_realloc(self->log_files,self->log_files_number,char *);
695                               
696                                self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name);
697                        }
698                        (void) closedir (dp);
699                }
700                else
701                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory");
702
703                qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare);
704               
705                if(self->log_files_number <= 2)
706                {
707                        self->run_flag = false;
708                        fsd_log_error(("Job_on_missing - No log files available"));
709                }
710               
711                self->first_open = false;
712                fsd_free(path);
713        }       
714        else /* check previous day*/
715        {
716                if(++file_number > self->log_files_number - 2)
717                        fsd_log_error(("Job_on_missing - All available log files checked"));
718                else
719                        fsd_log_debug(("Job_on_missing checking previous day"));
720               
721                self->run_flag = false;
722                pbsdrmaa_job_on_missing_standard( self->job );                         
723        }
724       
725        #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
726        if((log_path = fsd_asprintf("%s/server_logs/%s",
727                                pbssession->pbs_home,   
728                                self->log_files[file_number])) == NULL) {
729                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
730        }
731
732        if(self->fd != -1)
733                close(self->fd);
734
735        fsd_log_debug(("Log file: %s",log_path));
736                               
737retry:
738        if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
739        {
740                fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing"));
741                fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly"));
742                self->run_flag = false;
743                pbsdrmaa_job_on_missing_standard( self->job );                 
744        } else if ( self->fd == -1 ) {
745                fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
746                num_tries++;
747                sleep(5);
748                goto retry;
749        }
750        else
751        {
752                struct stat statbuf;
753                if(stat(log_path,&statbuf) == -1) {
754                                char errbuf[256] = "InternalError";
755                                (void)strerror_r(errno, errbuf, 256);
756                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
757                }
758                self->log_file_read_size = 0;
759                self->log_file_initial_size = statbuf.st_size;
760                fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size));
761        }
762
763        fsd_free(log_path);
764
765        fsd_log_debug(("Log file opened"));
766       
767        fsd_log_return((""));
768}
769
770ssize_t
771pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * buffer, ssize_t size )
772{
773        int n = fsd_getline(buffer,size,self->fd);
774       
775        if(n >= 0)
776                self->log_file_read_size += n;
777               
778        if(self->log_file_read_size >= self->log_file_initial_size)
779                return -1;
780
781        return n;
782}
783
784int
785fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */
786{
787        int job1;
788        int job2;
789        char *rest = NULL;
790        char *token = NULL;
791        char *ptr = strdup(s1);
792        token = strtok_r(ptr, ".", &rest);
793        job1 = atoi(token);
794       
795        fsd_free(token);
796       
797        ptr = strdup(s2);
798        token = strtok_r(ptr,".",&rest);
799        job2 = atoi(token);
800       
801        fsd_free(token);
802        return job1 - job2;
803}
804
Note: See TracBrowser for help on using the repository browser.