source: trunk/pbs_drmaa/log_reader.c @ 21

Revision 21, 29.5 KB checked in by mmatloka, 9 years ago (diff)

exec host from accounting file

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51static bool
52pbsdrmaa_read_log();
53
54static void
55pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self);
56
57static ssize_t
58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
59
60static void
61pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self );
62
63static ssize_t
64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
65
66static void
67pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self );
68
69static ssize_t
70pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
71
72static bool
73pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self );
74
75int
76fsd_job_id_cmp(const char *s1, const char *s2);
77
78int
79pbsdrmaa_date_compare(const void *a, const void *b) ;
80
81pbsdrmaa_log_reader_t *
82pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
83{
84        pbsdrmaa_log_reader_t *volatile self = NULL;
85
86        fsd_log_enter((""));
87        TRY
88        {
89                fsd_malloc(self, pbsdrmaa_log_reader_t );
90               
91                self->session = session;
92               
93                /* ~templete method pattern */
94                if(job != NULL) /* job on missing */
95                {
96                        self->job = job;
97                        self->name = "Job_on_missing";
98                        self->select_file = pbsdrmaa_select_file_job_on_missing;
99                        self->read_line = pbsdrmaa_read_line_job_on_missing;
100                }
101                else /* wait thread */
102                {
103                        self->job = NULL;
104                        self->name = "WT";
105                        self->select_file = pbsdrmaa_select_file_wait_thread;
106                        self->read_line = pbsdrmaa_read_line_wait_thread;
107                }               
108                self->read_log = pbsdrmaa_read_log;     
109               
110                self->log_files = NULL;
111                self->log_files_number = 0;
112               
113                self->run_flag = true;
114                self->fd = -1;
115                self->date_changed = true;
116                self->first_open = true;
117               
118                self->log_file_initial_size = 0;
119                self->log_file_read_size = 0;
120        }
121        EXCEPT_DEFAULT
122        {
123                if( self != NULL)
124                        fsd_free(self);
125                       
126                fsd_exc_reraise();
127        }
128        END_TRY
129        fsd_log_return((""));
130        return self;
131}
132
133pbsdrmaa_log_reader_t *
134pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
135{
136        pbsdrmaa_log_reader_t *volatile self = NULL;
137
138        fsd_log_enter((""));
139        TRY
140        {
141                fsd_malloc(self, pbsdrmaa_log_reader_t );
142               
143                self->session = session;
144               
145                self->job = job;
146                self->name = "Accounting";
147                self->select_file = pbsdrmaa_select_file_accounting;
148                self->read_line = pbsdrmaa_read_line_accounting;
149                               
150                self->read_log = pbsdrmaa_read_log_accounting; 
151               
152                self->log_files = NULL;
153                self->log_files_number = 0;
154               
155                self->run_flag = true;
156                self->fd = -1;
157                self->date_changed = true;
158                self->first_open = true;
159               
160                self->log_file_initial_size = 0;
161                self->log_file_read_size = 0;
162        }
163        EXCEPT_DEFAULT
164        {
165                if( self != NULL)
166                        fsd_free(self);
167                       
168                fsd_exc_reraise();
169        }
170        END_TRY
171        fsd_log_return((""));
172        return self;
173}
174
175void
176pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
177{
178        fsd_log_enter((""));
179        TRY
180        {
181                if(self != NULL)
182                {
183                        int i = -1;
184                        for(i = 0; i < self->log_files_number ; i++)
185                                fsd_free(self->log_files[i]);
186                        fsd_free(self->log_files);
187                        fsd_free(self);
188                }                       
189        }
190        EXCEPT_DEFAULT
191        {
192                fsd_exc_reraise();
193        }
194        END_TRY
195       
196        fsd_log_return((""));
197}
198
199enum field
200{
201        FLD_DATE = 0,
202        FLD_EVENT = 1,
203        FLD_OBJ = 2,
204        FLD_TYPE = 3,
205        FLD_ID = 4,
206        FLD_MSG = 5
207};
208
209enum field_msg
210{
211        FLD_MSG_EXIT_STATUS = 0,
212        FLD_MSG_CPUT = 1,
213        FLD_MSG_MEM = 2,
214        FLD_MSG_VMEM = 3,
215        FLD_MSG_WALLTIME = 4
216};
217
218enum field_msg_accounting
219{
220        FLD_MSG_ACC_USER = 0,
221        FLD_MSG_ACC_GROUP = 1,
222        FLD_MSG_ACC_JOBNAME = 2,
223        FLD_MSG_ACC_QUEUE = 3,
224        FLD_MSG_ACC_CTIME = 4,
225        FLD_MSG_ACC_QTIME = 5,
226        FLD_MSG_ACC_ETIME = 6,
227        FLD_MSG_ACC_START = 7,
228        FLD_MSG_ACC_OWNER = 8,
229        FLD_MSG_ACC_EXEC_HOST = 9,
230        FLD_MSG_ACC_RES_NEEDNODES = 10,
231        FLD_MSG_ACC_RES_NODECT = 11,
232        FLD_MSG_ACC_RES_NODES = 12,
233        FLD_MSG_ACC_RES_WALLTIME = 13
234};
235
236#define FLD_MSG_STATUS "0010"
237#define FLD_MSG_STATE "0008"
238#define FLD_MSG_LOG "0002"
239
240bool
241pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
242{
243        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
244        fsd_job_t *volatile temp_job = NULL;
245               
246        fsd_log_enter((""));
247       
248        if(self->job == NULL)
249                fsd_mutex_lock( &self->session->mutex );
250
251        TRY
252        {               
253                while( self->run_flag )
254                TRY
255                {
256                        char line[4096] = "";
257                        char buffer[4096] = "";
258                        int idx = 0, end_idx = 0, line_idx = 0;
259                       
260                        self->select_file(self);
261
262                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                         
263                        {
264                                const char *volatile ptr = line;
265                                char field[256] = "";
266                                char job_id[256] = "";
267                                char event[256] = "";
268                                int volatile field_n = 0;
269                                int n;
270                               
271                                bool volatile job_id_match = false;
272                                bool volatile event_match = false;
273                                bool volatile log_event = false;
274                                bool volatile log_match = false;
275                                bool volatile older_job_found = false;
276                                bool volatile job_found = false;
277                                char *  temp_date = NULL;
278                               
279                                struct batch_status status;
280                                status.next = NULL;
281
282                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
283                                {
284                                        if(field_n == FLD_DATE)
285                                        {
286                                                temp_date = fsd_strdup(field);
287                                        }
288                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 ||
289                                                                    strcmp(field,FLD_MSG_STATE) == 0 ))
290                                        {
291                                                /* event described by log line*/
292                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event)) {
293                                                        fsd_log_error(("%s - strlcpy error",self->name));
294                                                }
295                                                event_match = true;                                                                     
296                                        }
297                                        else if(event_match && field_n == FLD_ID)
298                                        {       
299                                                TRY
300                                                {       
301                                                        if(self->job == NULL) /* wait_thread */
302                                                        {
303                                                                temp_job = self->session->get_job( self->session, field );
304                                                                pbsjob = (pbsdrmaa_job_t*) temp_job;
305
306                                                                if( temp_job )
307                                                                {
308                                                                        if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
309                                                                                fsd_log_error(("%s - strlcpy error",self->name));
310                                                                        }
311                                                                        fsd_log_debug(("%s - job_id: %s",self->name,job_id));
312                                                                        status.name = fsd_strdup(job_id);
313                                                                        job_id_match = true; /* job_id is in drmaa */   
314                                                                }
315                                                                else
316                                                                {
317                                                                        fsd_log_debug(("%s - Unknown job: %s", self->name,field));
318                                                                }
319                                                        }
320                                                        else /* job_on_missing */
321                                                        {
322                                                                int diff = -1;
323                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
324                                                                if( diff == 0)
325                                                                {
326                                                                        /* read this file to the place we started and exit*/
327                                                                        fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id));
328                                                                        job_found = true;
329                                                                        older_job_found = false;
330                                                                        self->run_flag = false;
331                                                                        job_id_match = true;
332                                                                        status.name = fsd_strdup(self->job->job_id);                                                                   
333                                                                }
334                                                                else if ( !job_found && diff >= 1)
335                                                                {
336                                                                        /* older job, find its beginning */
337                                                                        fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field));
338                                                                        older_job_found = true;
339                                                                        job_id_match = true;
340                                                                        status.name = fsd_strdup(self->job->job_id);
341                                                                }
342                                                                else  if( !job_found )
343                                                                {
344                                                                        fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field));
345                                                                }                                                               
346                                                        }
347                                                }
348                                                END_TRY
349                                        }
350                                        else if(job_id_match && field_n == FLD_MSG)
351                                        {                                               
352                                                /* parse msg - depends on FLD_EVENT */
353                                                struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem,
354                                                        struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name;       
355                                               
356                                                bool state_running = false;
357
358                                                memset(&struct_status,0,sizeof(struct attrl)); /**/
359                                                memset(&struct_state,0,sizeof(struct attrl));
360                                                memset(&struct_resource_cput,0,sizeof(struct attrl));
361                                                memset(&struct_resource_mem,0,sizeof(struct attrl));
362                                                memset(&struct_resource_vmem,0,sizeof(struct attrl));
363                                                memset(&struct_resource_walltime,0,sizeof(struct attrl));
364                                                memset(&struct_start_time,0,sizeof(struct attrl));
365                                                memset(&struct_mtime,0,sizeof(struct attrl));
366                                                memset(&struct_queue,0,sizeof(struct attrl));
367                                                memset(&struct_account_name,0,sizeof(struct attrl));
368                                                               
369                                                if (strcmp(event,FLD_MSG_STATE) == 0)
370                                                {
371                                                        /* job run, modified, queued etc */
372                                                        int n = 0;
373                                                        status.attribs = &struct_state;
374                                                        struct_state.next = NULL;
375                                                        struct_state.name = "job_state";
376                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
377                                                        {
378                                                                n = 4;
379                                                                if(older_job_found) /* job_on_missing - older job beginning - read this file and end */
380                                                                {
381                                                                        self->run_flag = false;
382                                                                        fsd_log_debug(("Job_on_missing found older job beginning"));
383                                                                        fsd_free(status.name);
384                                                                        break;
385                                                                }
386                                                        }               
387                                                        if(field[4] == 'M') { /* modified */
388                                                                struct tm temp_time_tm;
389                                                                memset(&temp_time_tm, 0, sizeof(temp_time_tm));
390                                                                temp_time_tm.tm_isdst = -1;
391
392                                                                if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
393                                                                 {
394                                                                        fsd_log_error(("failed to parse mtime: %s", temp_date));
395                                                                 }
396                                                                else
397                                                                 {
398                                                                        time_t temp_time = mktime(&temp_time_tm);
399                                                                        status.attribs = &struct_mtime;
400                                                                        struct_mtime.name = "mtime";
401                                                                        struct_mtime.next = NULL;
402                                                                        struct_mtime.value = fsd_asprintf("%lu",temp_time);
403                                                                 }
404                                                        }               
405                                                        /* != Job deleted and Job to be deleted*/
406                                                        #ifdef PBS_PROFESSIONAL
407                                                        else if (field[4] != 't' && field[10] != 'd') {
408                                                        #else           
409                                                        else if(field[4] != 'd') {
410                                                        #endif
411
412                                                                if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */
413                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"%s - Memory allocation wasn't possible",self->name);
414                                                                }
415                                                                if(struct_state.value[0] == 'R'){
416                                                                        state_running = true;
417                                                                }
418                                                        }
419                                                        else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
420                                                                struct_status.name = "exit_status";
421                                                                struct_status.value = fsd_strdup("-1");
422                                                                struct_status.next = NULL;
423                                                                struct_state.next = &struct_status;
424                                                                struct_state.value = fsd_strdup("C");                                                           
425                                                        }
426                                                }                                                   
427                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
428                                                {
429                                                        /* exit status and rusage */
430                                                        const char *ptr2 = field;
431                                                        char  msg[ 256 ] = "";
432                                                        int n2;
433                                                        int msg_field_n = 0;
434                                                       
435                                                        struct_resource_cput.name = "resources_used";
436                                                        struct_resource_mem.name = "resources_used";
437                                                        struct_resource_vmem.name = "resources_used";
438                                                        struct_resource_walltime.name = "resources_used";
439                                                        struct_status.name = "exit_status";
440                                                        struct_state.name = "job_state";
441                               
442                                                        status.attribs = &struct_resource_cput;
443                                                        struct_resource_cput.next = &struct_resource_mem;
444                                                        struct_resource_mem.next = &struct_resource_vmem;
445                                                        struct_resource_vmem.next = &struct_resource_walltime;
446                                                        struct_resource_walltime.next =  &struct_status;
447                                                        struct_status.next = &struct_state;
448                                                        struct_state.next = NULL;
449
450                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
451                                                         {                                             
452                                                                switch(msg_field_n)
453                                                                {
454                                                                        case FLD_MSG_EXIT_STATUS:
455                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
456                                                                                break;
457
458                                                                        case FLD_MSG_CPUT:
459                                                                                struct_resource_cput.resource = "cput";
460                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
461                                                                                break;
462
463                                                                        case FLD_MSG_MEM:
464                                                                                struct_resource_mem.resource = "mem";
465                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
466                                                                                break;
467
468                                                                        case FLD_MSG_VMEM:
469                                                                                struct_resource_vmem.resource = "vmem";
470                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
471                                                                                break;
472
473                                                                        case FLD_MSG_WALLTIME:
474                                                                                struct_resource_walltime.resource = "walltime";
475                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
476                                                                                break;
477                                                                }
478                                                             
479                                                                ptr2 += n2;
480                                                                msg_field_n++;
481                                                                if ( *ptr2 != ' ' )
482                                                                 {
483                                                                         break;
484                                                                 }
485                                                                ++ptr2;                                         
486                                                         }
487                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
488                                                        fsd_log_info(("WT - job %s found as finished on %u", temp_job->job_id, (unsigned int)time(NULL)));
489                                                }                                               
490                                                 
491                                                if(self->job == NULL) /* wait_thread */
492                                                {
493                                                        if ( state_running )
494                                                        {
495                                                                fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id ));
496                                                                TRY
497                                                                {
498                                                                        temp_job->update_status( temp_job );
499                                                                }
500                                                                EXCEPT_DEFAULT
501                                                                {
502                                                                        /*TODO: distinguish between invalid job and internal errors */
503                                                                        fsd_log_debug(("Job finished just after entering running state: %s", temp_job->job_id));
504                                                                }
505                                                                END_TRY
506                                                        }
507                                                        else
508                                                        {
509                                                                fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id ));                                                 
510                                                                pbsjob->update( temp_job, &status );
511                                                        }
512                                                 }
513                                                 else if( job_found ) /* job_on_missing */
514                                                 {
515                                                        fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id ));                                                       
516                                                        pbsjob->update( self->job, &status );
517                                                 }
518                                               
519                                                if(self->job == NULL)
520                                                {
521                                                        fsd_cond_broadcast( &temp_job->status_cond);
522                                                        fsd_cond_broadcast( &self->session->wait_condition );
523                                                }
524                                                if ( temp_job )
525                                                        temp_job->release( temp_job );
526       
527                                                fsd_free(struct_resource_cput.value);
528                                                fsd_free(struct_resource_mem.value);
529                                                fsd_free(struct_resource_vmem.value);
530                                                fsd_free(struct_resource_walltime.value);
531                                                fsd_free(struct_status.value);
532                                                fsd_free(struct_state.value);
533                                                fsd_free(struct_start_time.value);
534                                                fsd_free(struct_mtime.value);
535                                                fsd_free(struct_queue.value);
536                                                fsd_free(struct_account_name.value);
537
538                                                if ( status.name!=NULL )
539                                                        fsd_free(status.name);
540                                        }
541                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
542                                        {
543                                                log_event = true;                                       
544                                        }
545                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
546                                        {
547                                                log_match = true;
548                                                log_event = false;
549                                        }
550                                        else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0)
551                                        {
552                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name));
553                                                self->date_changed = true;
554                                                log_match = false;
555                                        }
556                                       
557                                        ptr += n;
558                                        if ( *ptr != ';' )
559                                        {
560                                                break; /* end of line */
561                                        }
562                                        field_n++;
563                                        ++ptr;
564                                }               
565
566                                fsd_free(temp_date);                   
567                        } /* end of while getline loop */                       
568                       
569                        if(self->job == NULL)
570                        {
571                                struct timeval timeout_tv;
572                                fd_set log_fds;
573       
574                                fsd_mutex_unlock( &self->session->mutex );                     
575                               
576                                FD_ZERO(&log_fds);
577                                FD_SET(self->fd, &log_fds);
578
579                                timeout_tv.tv_sec = 1;
580                                timeout_tv.tv_usec = 0;
581
582                                /* ignore return value - the next get line call will handle IO errors */
583                                (void)select(1, &log_fds, NULL, NULL, &timeout_tv);
584
585                                fsd_mutex_lock( &self->session->mutex );       
586
587                                self->run_flag = self->session->wait_thread_run_flag;
588                        }
589                }               
590                EXCEPT_DEFAULT
591                 {
592                        const fsd_exc_t *e = fsd_exc_get();
593                        /* Its better to exit and communicate error rather then let the application to hang */
594                        fsd_log_fatal(( "Exception in wait thread %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
595                        exit(1);
596                 }
597                END_TRY
598
599                if(self->fd != -1)
600                        close(self->fd);
601                fsd_log_debug(("%s - Log file closed",self->name));     
602        }
603        FINALLY
604        {
605                fsd_log_debug(("%s - Terminated.",self->name));
606                if(self->job == NULL)
607                        fsd_mutex_unlock( &self->session->mutex ); /**/
608        }
609        END_TRY
610       
611        fsd_log_return((""));
612        return true;
613}
614
615void
616pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
617{
618        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
619       
620        if(self->date_changed)
621        {
622                char * log_path = NULL;
623                int num_tries = 0;
624                struct tm tm;
625               
626                fsd_log_enter((""));
627               
628                if(!self->first_open)
629                        time(&self->t);
630                else
631                        self->t = pbssession->log_file_initial_time;
632                       
633                localtime_r(&self->t,&tm);
634                               
635                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
636                /* generate new date, close file and open new */
637                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
638                                        pbssession->pbs_home,   
639                                        tm.tm_year + 1900,
640                                        tm.tm_mon + 1,
641                                        tm.tm_mday)) == NULL) {
642                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
643                }
644
645                if(self->fd != -1)
646                        close(self->fd);
647
648                fsd_log_debug(("Log file: %s",log_path));
649                               
650        retry:
651                if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
652                {
653                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
654                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
655                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
656                        pbssession->wait_thread_log = false;
657                        pbssession->super.wait_thread = pbssession->super_wait_thread;
658                        pbssession->super.wait_thread(self->session);
659                } else if ( self->fd == -1 ) {
660                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
661                        num_tries++;
662                        sleep(5);
663                        goto retry;
664                }
665
666                fsd_free(log_path);
667
668                fsd_log_debug(("Log file opened"));
669
670                if(self->first_open) {
671                        fsd_log_debug(("Log file lseek"));
672                        if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) {
673                                char errbuf[256] = "InternalError";
674                                (void)strerror_r(errno, errbuf, 256);
675                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
676                        }
677                        self->first_open = false;
678                }
679
680                self->date_changed = false;
681               
682                fsd_log_return((""));
683        }       
684}
685
686ssize_t
687pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
688{
689        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
690}
691
692/* reverse date compare*/
693int
694pbsdrmaa_date_compare(const void *a, const void *b)
695{
696   const char *ia = *(const char **) a;
697   const char *ib = *(const char **) b;
698   return strcmp(ib, ia);
699}
700
701void
702pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self )
703{
704        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;   
705       
706        char * log_path = NULL;
707        int num_tries = 0;
708        static int file_number = 0;
709        fsd_log_enter((""));
710               
711        if(self->first_open)
712        {                       
713                DIR *dp = NULL;         
714                char * path = NULL;
715                struct dirent *ep = NULL;
716               
717                if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL)
718                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
719               
720                self->log_files_number = 0;     
721                dp = opendir (path);
722
723                fsd_calloc(self->log_files,2,char*);
724       
725                if (dp != NULL)
726                {
727                        while ((ep = readdir (dp)))
728                        {
729                                self->log_files_number++;
730                                if(self->log_files_number > 2)
731                                        fsd_realloc(self->log_files,self->log_files_number,char *);
732                               
733                                self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name);
734                        }
735                        (void) closedir (dp);
736                }
737                else
738                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory");
739
740                qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare);
741               
742                if(self->log_files_number <= 2)
743                {
744                        self->run_flag = false;
745                        fsd_log_error(("Job_on_missing - No log files available"));
746                }
747               
748                self->first_open = false;
749                fsd_free(path);
750        }       
751        else /* check previous day*/
752        {
753                if(++file_number > self->log_files_number - 2)
754                        fsd_log_error(("Job_on_missing - All available log files checked"));
755                else
756                        fsd_log_debug(("Job_on_missing checking previous day"));
757               
758                self->run_flag = false;
759                pbsdrmaa_job_on_missing_standard( self->job );                         
760        }
761       
762        #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
763        if((log_path = fsd_asprintf("%s/server_logs/%s",
764                                pbssession->pbs_home,   
765                                self->log_files[file_number])) == NULL) {
766                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
767        }
768
769        if(self->fd != -1)
770                close(self->fd);
771
772        fsd_log_debug(("Log file: %s",log_path));
773                               
774retry:
775        if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
776        {
777                fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing"));
778                fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly"));
779                self->run_flag = false;
780                pbsdrmaa_job_on_missing_standard( self->job );                 
781        } else if ( self->fd == -1 ) {
782                fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
783                num_tries++;
784                sleep(5);
785                goto retry;
786        }
787        else
788        {
789                struct stat statbuf;
790                if(stat(log_path,&statbuf) == -1) {
791                                char errbuf[256] = "InternalError";
792                                (void)strerror_r(errno, errbuf, 256);
793                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
794                }
795                self->log_file_read_size = 0;
796                self->log_file_initial_size = statbuf.st_size;
797                fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size));
798        }
799
800        fsd_free(log_path);
801
802        fsd_log_debug(("Log file opened"));
803       
804        fsd_log_return((""));
805}
806
807ssize_t
808pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
809{
810        int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx);
811       
812        if(n >= 0)
813                self->log_file_read_size += n;
814               
815        if(self->log_file_read_size >= self->log_file_initial_size)
816                return -1;
817
818        return n;
819}
820
821void
822pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self )
823{
824        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
825               
826        char * log_path = NULL;
827
828        struct tm tm;
829               
830        fsd_log_enter((""));
831               
832        time(&self->t);
833                       
834        localtime_r(&self->t,&tm);
835                               
836        #define DRMAA_ACCOUNTING_MAX_TRIES (12)
837        /* generate new date, close file and open new */
838        if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d",
839                                pbssession->pbs_home,   
840                                tm.tm_year + 1900,
841                                tm.tm_mon + 1,
842                                tm.tm_mday)) == NULL) {
843                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible");
844        }
845
846        if(self->fd != -1)
847                close(self->fd);
848
849        fsd_log_debug(("Accounting Log file: %s",log_path));                           
850
851        if((self->fd = open(log_path,O_RDONLY) ) == -1 )
852        {
853                fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home."));
854        }
855
856        fsd_free(log_path);
857
858        fsd_log_debug(("Accounting Log file opened"));
859
860        fsd_log_return((""));   
861}
862
863ssize_t
864pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
865{
866        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
867}
868
869enum field_acc
870{
871        FLD_ACC_DATE = 0,
872        FLD_ACC_EVENT = 1,
873        FLD_ACC_ID = 2,
874        FLD_ACC_MSG = 3
875};
876
877bool
878pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self )
879{
880        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
881        bool res = false;
882       
883        fsd_job_t *volatile temp_job = NULL;
884               
885        fsd_log_enter((""));
886        fsd_log_debug(("Accounting Log file opened"));
887        if(self->job == NULL)
888                fsd_mutex_lock( &self->session->mutex );
889
890        TRY
891        {               
892                TRY
893                {
894                        char line[4096] = "";
895                        char buffer[4096] = "";
896                        int idx = 0, end_idx = 0, line_idx = 0;
897                       
898                        self->select_file(self);
899                       
900                        if(self->fd != -1)                                     
901                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                         
902                        {
903                                const char *volatile ptr = line;
904                                char field[256] = "";
905                                int volatile field_n = 0;
906                                int n;
907                               
908                                bool volatile job_id_match = false;     
909                       
910                                bool volatile job_found = false;
911                                char *  temp_date = NULL;
912                               
913                                struct batch_status status;
914                               
915                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
916                                {
917                                        status.next = NULL;
918                                        status.attribs = NULL;
919                               
920                                        if(field_n == FLD_ACC_DATE)
921                                        {
922                                                temp_date = fsd_strdup(field);
923                                        }
924                                        else if(field_n == FLD_ACC_EVENT)
925                                        {
926                                                       
927                                        }
928                                        else if(field_n == FLD_ACC_ID)
929                                        {                                                       
930                                                TRY
931                                                {                                                               
932                                                                int diff = -1;
933                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
934                                                                if( diff == 0)
935                                                                {
936                                                                        /* read this file to the place we started and exit*/
937                                                                        fsd_log_debug(("Accounting found job: %s",self->job->job_id));
938                                                                        job_found = true;
939                                                                        job_id_match = true;
940                                                                        status.name = fsd_strdup(self->job->job_id);                                                                   
941                                                                }       
942                                                }
943                                                END_TRY
944                                        }
945                                        else if(job_id_match && field_n == FLD_ACC_MSG)
946                                        {                                       
947                                                struct attrl * struct_attrl = calloc(10,sizeof(struct attrl));
948                                                                                             
949                                                if(field[0] == 'q')
950                                                {
951                                                        status.attribs = &struct_attrl[0];
952                                                        struct_attrl[0].name =  ATTR_queue;
953                                                        struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1);
954                                                        struct_attrl[0].next = NULL;
955                                                }
956                                                else if(field[0] == 'u')
957                                                {
958                                                        /* rusage */
959                                                        const char *ptr2 = field;
960                                                        char  msg[ 256 ] = "";
961                                                        int n2 = 0;
962                                                        int msg_field_n = 0;   
963                               
964                                                        status.attribs = &struct_attrl[0];
965
966                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
967                                                         {                                             
968                                                                switch(msg_field_n)
969                                                                {
970                                                                        case FLD_MSG_ACC_USER:
971                                                                                struct_attrl[msg_field_n].name = ATTR_euser;                                                                   
972                                                                                break;
973
974                                                                        case FLD_MSG_ACC_GROUP:
975                                                                                struct_attrl[msg_field_n].name = ATTR_egroup;
976                                                                                break;
977
978                                                                        case FLD_MSG_ACC_JOBNAME:
979                                                                                struct_attrl[msg_field_n].name = ATTR_name;
980                                                                                break;
981
982                                                                        case FLD_MSG_ACC_QUEUE:
983                                                                                struct_attrl[msg_field_n].name = ATTR_queue;
984                                                                                break;
985
986                                                                        case FLD_MSG_ACC_CTIME:
987                                                                                struct_attrl[msg_field_n].name = ATTR_ctime;
988                                                                                break;
989                                                                               
990                                                                        case FLD_MSG_ACC_QTIME:
991                                                                                struct_attrl[msg_field_n].name = ATTR_qtime;
992                                                                                break;
993                                                                               
994                                                                        case FLD_MSG_ACC_ETIME:
995                                                                                struct_attrl[msg_field_n].name = ATTR_etime;
996                                                                                break;
997                                                                               
998                                                                        case FLD_MSG_ACC_START:
999                                                                                struct_attrl[msg_field_n].name = ATTR_start_time;
1000                                                                                break;
1001                                                                               
1002                                                                        case FLD_MSG_ACC_OWNER:
1003                                                                                struct_attrl[msg_field_n].name = ATTR_owner;
1004                                                                                break;
1005                                                                               
1006                                                                        case FLD_MSG_ACC_EXEC_HOST:
1007                                                                                struct_attrl[msg_field_n].name = ATTR_exechost;
1008                                                                                break;                                                                         
1009                                                                }
1010                                                               
1011                                                                struct_attrl[msg_field_n].value  = fsd_strdup(strchr(msg,'=')+1);
1012                                                                if(msg_field_n!=9)
1013                                                                {
1014                                                                        struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1];
1015                                                                }
1016                                                                else
1017                                                                {
1018                                                                        struct_attrl[msg_field_n].next = NULL;
1019                                                                        break;
1020                                                                }
1021                                                             
1022                                                                ptr2 += n2;
1023                                                                msg_field_n++;
1024                                                                if ( *ptr2 != ' ' )
1025                                                                 {
1026                                                                         break;
1027                                                                 }
1028                                                                ++ptr2;                                         
1029                                                         }
1030                                                }                                               
1031                                                 
1032                                            if( job_found && status.attribs != NULL)
1033                                                 {
1034                                                        fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id ));                                     
1035                                                        pbsjob->update( self->job, &status );
1036                                                        res = true;
1037                                                 }
1038                                               
1039                                                if(self->job == NULL)
1040                                                {
1041                                                        fsd_cond_broadcast( &temp_job->status_cond);
1042                                                        fsd_cond_broadcast( &self->session->wait_condition );
1043                                                }
1044                                                if ( temp_job )
1045                                                        temp_job->release( temp_job );
1046       
1047                                                int i = 0;
1048                                                for(i = 0; i < 10; i++)
1049                                                {
1050                                                        fsd_free(struct_attrl[i].value);
1051                                                }
1052                                                fsd_free(struct_attrl);
1053                                                fsd_free(status.name);
1054                                        }
1055                                       
1056                                       
1057                                        ptr += n;
1058                                        if ( *ptr != ';' )
1059                                        {
1060                                                break; /* end of line */
1061                                        }
1062                                        field_n++;
1063                                        ++ptr;
1064                                }               
1065
1066                                fsd_free(temp_date);                   
1067                        } /* end of while getline loop */       
1068                       
1069                }               
1070                EXCEPT_DEFAULT
1071                 {
1072                        const fsd_exc_t *e = fsd_exc_get();
1073                        /* Its better to exit and communicate error rather then let the application to hang */
1074                        fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
1075                        exit(1);
1076                 }
1077                END_TRY
1078
1079                if(self->fd != -1)
1080                        close(self->fd);
1081                fsd_log_debug(("%s - Accounting log file closed",self->name)); 
1082        }
1083        FINALLY
1084        {
1085                fsd_log_debug(("%s - Terminated.",self->name));
1086                if(self->job == NULL)
1087                        fsd_mutex_unlock( &self->session->mutex ); /**/
1088        }
1089        END_TRY
1090       
1091        fsd_log_return((""));
1092        return res;
1093}
1094
1095int
1096fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */
1097{
1098        int job1;
1099        int job2;
1100        char *rest = NULL;
1101        char *token = NULL;
1102        char *ptr = fsd_strdup(s1);
1103        token = strtok_r(ptr, ".", &rest);
1104        job1 = atoi(token);
1105       
1106        fsd_free(token);
1107       
1108        ptr = fsd_strdup(s2);
1109        token = strtok_r(ptr,".",&rest);
1110        job2 = atoi(token);
1111       
1112        fsd_free(token);
1113        return job1 - job2;
1114}
1115
Note: See TracBrowser for help on using the repository browser.