source: trunk/pbs_drmaa/log_reader.c @ 25

Revision 25, 30.8 KB checked in by mmamonski, 8 years ago (diff)

pbs log reader - clean a little

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51static bool
52pbsdrmaa_read_log();
53
54static void
55pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self);
56
57static ssize_t
58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
59
60static void
61pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self );
62
63static ssize_t
64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
65
66static void
67pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self );
68
69static ssize_t
70pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
71
72static bool
73pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self );
74
75int
76fsd_job_id_cmp(const char *s1, const char *s2);
77
78int
79pbsdrmaa_date_compare(const void *a, const void *b) ;
80
81/*
82 * Snippets from log files
83 *
84 * PBS Pro
85 *
8610/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
8710/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
8810/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
8910/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
9010/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
91
92 *
93 * Torque
94 *
9510/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
9610/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
9710/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
98
99 */
100pbsdrmaa_log_reader_t *
101pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
102{
103        pbsdrmaa_log_reader_t *volatile self = NULL;
104
105        fsd_log_enter((""));
106        TRY
107        {
108                fsd_malloc(self, pbsdrmaa_log_reader_t );
109               
110                self->session = session;
111               
112                /* ~templete method pattern */
113                if(job != NULL) /* job on missing */
114                {
115                        self->job = job;
116                        self->name = "Job_on_missing";
117                        self->select_file = pbsdrmaa_select_file_job_on_missing;
118                        self->read_line = pbsdrmaa_read_line_job_on_missing;
119                }
120                else /* wait thread */
121                {
122                        self->job = NULL;
123                        self->name = "WT";
124                        self->select_file = pbsdrmaa_select_file_wait_thread;
125                        self->read_line = pbsdrmaa_read_line_wait_thread;
126                }               
127                self->read_log = pbsdrmaa_read_log;     
128               
129                self->log_files = NULL;
130                self->log_files_number = 0;
131               
132                self->run_flag = true;
133                self->fd = -1;
134                self->date_changed = true;
135                self->first_open = true;
136               
137                self->log_file_initial_size = 0;
138                self->log_file_read_size = 0;
139        }
140        EXCEPT_DEFAULT
141        {
142                if( self != NULL)
143                        fsd_free(self);
144                       
145                fsd_exc_reraise();
146        }
147        END_TRY
148        fsd_log_return((""));
149        return self;
150}
151
152pbsdrmaa_log_reader_t *
153pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
154{
155        pbsdrmaa_log_reader_t *volatile self = NULL;
156
157        fsd_log_enter((""));
158        TRY
159        {
160                fsd_malloc(self, pbsdrmaa_log_reader_t );
161               
162                self->session = session;
163               
164                self->job = job;
165                self->name = "Accounting";
166                self->select_file = pbsdrmaa_select_file_accounting;
167                self->read_line = pbsdrmaa_read_line_accounting;
168                               
169                self->read_log = pbsdrmaa_read_log_accounting; 
170               
171                self->log_files = NULL;
172                self->log_files_number = 0;
173               
174                self->run_flag = true;
175                self->fd = -1;
176                self->date_changed = true;
177                self->first_open = true;
178               
179                self->log_file_initial_size = 0;
180                self->log_file_read_size = 0;
181        }
182        EXCEPT_DEFAULT
183        {
184                if( self != NULL)
185                        fsd_free(self);
186                       
187                fsd_exc_reraise();
188        }
189        END_TRY
190        fsd_log_return((""));
191        return self;
192}
193
194void
195pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
196{
197        fsd_log_enter((""));
198        TRY
199        {
200                if(self != NULL)
201                {
202                        int i = -1;
203                        for(i = 0; i < self->log_files_number ; i++)
204                                fsd_free(self->log_files[i]);
205                        fsd_free(self->log_files);
206                        fsd_free(self);
207                }                       
208        }
209        EXCEPT_DEFAULT
210        {
211                fsd_exc_reraise();
212        }
213        END_TRY
214       
215        fsd_log_return((""));
216}
217
218enum field
219{
220        FLD_DATE = 0,
221        FLD_EVENT = 1,
222        FLD_OBJ = 2,
223        FLD_TYPE = 3,
224        FLD_ID = 4,
225        FLD_MSG = 5
226};
227
228enum field_msg
229{
230        FLD_MSG_EXIT_STATUS = 0,
231        FLD_MSG_CPUT = 1,
232        FLD_MSG_MEM = 2,
233        FLD_MSG_VMEM = 3,
234        FLD_MSG_WALLTIME = 4
235};
236
237enum field_msg_accounting
238{
239        FLD_MSG_ACC_USER = 0,
240        FLD_MSG_ACC_GROUP = 1,
241        FLD_MSG_ACC_JOBNAME = 2,
242        FLD_MSG_ACC_QUEUE = 3,
243        FLD_MSG_ACC_CTIME = 4,
244        FLD_MSG_ACC_QTIME = 5,
245        FLD_MSG_ACC_ETIME = 6,
246        FLD_MSG_ACC_START = 7,
247        FLD_MSG_ACC_OWNER = 8,
248        FLD_MSG_ACC_EXEC_HOST = 9,
249        FLD_MSG_ACC_RES_NEEDNODES = 10,
250        FLD_MSG_ACC_RES_NODECT = 11,
251        FLD_MSG_ACC_RES_NODES = 12,
252        FLD_MSG_ACC_RES_WALLTIME = 13
253};
254
255#define FLD_MSG_STATUS "0010"
256#define FLD_MSG_STATE "0008"
257#define FLD_MSG_LOG "0002"
258
259bool
260pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
261{
262        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
263        fsd_job_t *volatile temp_job = NULL;
264               
265        fsd_log_enter((""));
266       
267        if(self->job == NULL)
268                fsd_mutex_lock( &self->session->mutex );
269
270        TRY
271        {               
272                while( self->run_flag )
273                TRY
274                {
275                        char line[4096] = "";
276                        char buffer[4096] = "";
277                        int idx = 0, end_idx = 0, line_idx = 0;
278                       
279                        self->select_file(self);
280
281                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                         
282                        {
283                                const char *volatile ptr = line;
284                                char field[256] = "";
285                                char job_id[256] = "";
286                                char event[256] = "";
287                                int volatile field_n = 0;
288                                int n;
289                               
290                                bool volatile job_id_match = false;
291                                bool volatile event_match = false;
292                                bool volatile log_event = false;
293                                bool volatile log_match = false;
294                                bool volatile older_job_found = false;
295                                bool volatile job_found = false;
296                                char * temp_date = NULL;
297                               
298                                struct batch_status status;
299                                status.next = NULL;
300
301                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
302                                {
303                                        if(field_n == FLD_DATE)
304                                        {
305                                                temp_date = fsd_strdup(field);
306                                        }
307                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || strcmp(field,FLD_MSG_STATE) == 0 ))
308                                        {
309                                                /* event described by log line*/
310                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event))
311                                                {
312                                                        fsd_log_error(("%s - strlcpy error",self->name));
313                                                }
314                                                event_match = true;
315                                        }
316                                        else if(event_match && field_n == FLD_ID)
317                                        {       
318                                                TRY
319                                                {       
320                                                        if(self->job == NULL) /* wait_thread */
321                                                        {
322                                                                temp_job = self->session->get_job( self->session, field );
323                                                                pbsjob = (pbsdrmaa_job_t*) temp_job;
324
325                                                                if( temp_job )
326                                                                {
327                                                                        if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
328                                                                                fsd_log_error(("%s - strlcpy error",self->name));
329                                                                        }
330                                                                        fsd_log_debug(("%s - job_id: %s",self->name,job_id));
331                                                                        status.name = fsd_strdup(job_id);
332                                                                        job_id_match = true; /* job_id is in drmaa */   
333                                                                }
334                                                                else
335                                                                {
336                                                                        fsd_log_debug(("%s - Unknown job: %s", self->name,field));
337                                                                }
338                                                        }
339                                                        else /* job_on_missing */
340                                                        {
341                                                                int diff = -1;
342                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
343                                                                if( diff == 0)
344                                                                {
345                                                                        /* read this file to the place we started and exit*/
346                                                                        fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id));
347                                                                        job_found = true;
348                                                                        older_job_found = false;
349                                                                        self->run_flag = false;
350                                                                        job_id_match = true;
351                                                                        status.name = fsd_strdup(self->job->job_id);                                                                   
352                                                                }
353                                                                else if ( !job_found && diff >= 1)
354                                                                {
355                                                                        /* older job, find its beginning */
356                                                                        fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field));
357                                                                        older_job_found = true;
358                                                                        job_id_match = true;
359                                                                        status.name = fsd_strdup(self->job->job_id);
360                                                                }
361                                                                else  if( !job_found )
362                                                                {
363                                                                        fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field));
364                                                                }                                                               
365                                                        }
366                                                }
367                                                END_TRY
368                                        }
369                                        else if(job_id_match && field_n == FLD_MSG)
370                                        {                                               
371                                                /* parse msg - depends on FLD_EVENT */
372                                                struct attrl struct_resource_cput,
373                                                        struct_resource_mem,
374                                                        struct_resource_vmem,
375                                                        struct_resource_walltime,
376                                                        struct_status,
377                                                        struct_state,
378                                                        struct_start_time,
379                                                        struct_mtime,
380                                                        struct_queue,
381                                                        struct_account_name;
382                                               
383                                                bool state_running = false;
384
385                                                memset(&struct_status,0,sizeof(struct attrl));
386                                                memset(&struct_state,0,sizeof(struct attrl));
387                                                memset(&struct_resource_cput,0,sizeof(struct attrl));
388                                                memset(&struct_resource_mem,0,sizeof(struct attrl));
389                                                memset(&struct_resource_vmem,0,sizeof(struct attrl));
390                                                memset(&struct_resource_walltime,0,sizeof(struct attrl));
391                                                memset(&struct_start_time,0,sizeof(struct attrl));
392                                                memset(&struct_mtime,0,sizeof(struct attrl));
393                                                memset(&struct_queue,0,sizeof(struct attrl));
394                                                memset(&struct_account_name,0,sizeof(struct attrl));
395                                                               
396                                                if (strcmp(event,FLD_MSG_STATE) == 0)
397                                                {
398                                                        /* job run, modified, queued etc */
399                                                        int n = 0;
400                                                        status.attribs = &struct_state;
401                                                        struct_state.next = NULL;
402                                                        struct_state.name = "job_state";
403                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
404                                                         {
405                                                                n = 4;
406                                                                if(older_job_found) /* job_on_missing - older job beginning - read this file and end */
407                                                                 {
408                                                                        self->run_flag = false;
409                                                                        fsd_log_debug(("Job_on_missing found older job beginning"));
410                                                                        fsd_free(status.name);
411                                                                        break;
412                                                                 }
413
414                                                                 { /* modified */
415                                                                        struct tm temp_time_tm;
416                                                                        memset(&temp_time_tm, 0, sizeof(temp_time_tm));
417                                                                        temp_time_tm.tm_isdst = -1;
418
419                                                                        if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
420                                                                         {
421                                                                                fsd_log_error(("failed to parse mtime: %s (line = %s)", temp_date, line));
422                                                                         }
423                                                                        else
424                                                                         {
425                                                                                time_t temp_time = mktime(&temp_time_tm);
426                                                                                status.attribs = &struct_mtime;
427                                                                                struct_mtime.name = "mtime";
428                                                                                struct_mtime.next = NULL;
429                                                                                struct_mtime.value = fsd_asprintf("%lu",temp_time);
430                                                                         }
431                                                                 }
432                                                         }
433
434                                                        /* != Job deleted and Job to be deleted*/
435#ifdef PBS_PROFESSIONAL
436                                                        else if (field[4] != 't' && field[10] != 'd') {
437#else
438                                                        else if(field[4] != 'd') {
439#endif
440
441                                                                if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */
442                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"%s - Memory allocation wasn't possible",self->name);
443                                                                }
444                                                                if(struct_state.value[0] == 'R'){
445                                                                        state_running = true;
446                                                                }
447                                                        }
448                                                        else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
449                                                                struct_status.name = "exit_status";
450                                                                struct_status.value = fsd_strdup("-1");
451                                                                struct_status.next = NULL;
452                                                                struct_state.next = &struct_status;
453                                                                struct_state.value = fsd_strdup("C");
454                                                        }
455                                                }
456                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
457                                                {
458                                                        /* exit status and rusage */
459                                                        const char *ptr2 = field;
460                                                        char  msg[ 256 ] = "";
461                                                        int n2;
462                                                        int msg_field_n = 0;
463                                                       
464                                                        struct_resource_cput.name = "resources_used";
465                                                        struct_resource_mem.name = "resources_used";
466                                                        struct_resource_vmem.name = "resources_used";
467                                                        struct_resource_walltime.name = "resources_used";
468                                                        struct_status.name = "exit_status";
469                                                        struct_state.name = "job_state";
470                               
471                                                        status.attribs = &struct_resource_cput;
472                                                        struct_resource_cput.next = &struct_resource_mem;
473                                                        struct_resource_mem.next = &struct_resource_vmem;
474                                                        struct_resource_vmem.next = &struct_resource_walltime;
475                                                        struct_resource_walltime.next =  &struct_status;
476                                                        struct_status.next = &struct_state;
477                                                        struct_state.next = NULL;
478
479                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
480                                                         {                                             
481                                                                switch(msg_field_n)
482                                                                {
483                                                                        case FLD_MSG_EXIT_STATUS:
484                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
485                                                                                break;
486
487                                                                        case FLD_MSG_CPUT:
488                                                                                struct_resource_cput.resource = "cput";
489                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
490                                                                                break;
491
492                                                                        case FLD_MSG_MEM:
493                                                                                struct_resource_mem.resource = "mem";
494                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
495                                                                                break;
496
497                                                                        case FLD_MSG_VMEM:
498                                                                                struct_resource_vmem.resource = "vmem";
499                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
500                                                                                break;
501
502                                                                        case FLD_MSG_WALLTIME:
503                                                                                struct_resource_walltime.resource = "walltime";
504                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
505                                                                                break;
506                                                                }
507
508                                                                ptr2 += n2;
509                                                                msg_field_n++;
510                                                                if ( *ptr2 != ' ' )
511                                                                         break;
512                                                                ++ptr2;
513                                                        }
514                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
515                                                        fsd_log_info(("WT - job %s found as finished on %u", temp_job->job_id, (unsigned int)time(NULL)));
516                                                }                                               
517                                                 
518                                                if(self->job == NULL) /* wait_thread */
519                                                {
520                                                        if ( state_running )
521                                                        {
522                                                                fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id ));
523                                                                TRY
524                                                                {
525                                                                        temp_job->update_status( temp_job );
526                                                                }
527                                                                EXCEPT_DEFAULT
528                                                                {
529                                                                        /*TODO: distinguish between invalid job and internal errors */
530                                                                        fsd_log_debug(("Job finished just after entering running state: %s", temp_job->job_id));
531                                                                }
532                                                                END_TRY
533                                                        }
534                                                        else
535                                                        {
536                                                                fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id ));                                                 
537                                                                pbsjob->update( temp_job, &status );
538                                                        }
539                                                }
540                                                else if( job_found ) /* job_on_missing */
541                                                {
542                                                        fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id ));                                                       
543                                                        pbsjob->update( self->job, &status );
544                                                }
545                                               
546                                                if(self->job == NULL)
547                                                {
548                                                        fsd_cond_broadcast( &temp_job->status_cond);
549                                                        fsd_cond_broadcast( &self->session->wait_condition );
550                                                }
551                                                if ( temp_job )
552                                                        temp_job->release( temp_job );
553       
554                                                fsd_free(struct_resource_cput.value);
555                                                fsd_free(struct_resource_mem.value);
556                                                fsd_free(struct_resource_vmem.value);
557                                                fsd_free(struct_resource_walltime.value);
558                                                fsd_free(struct_status.value);
559                                                fsd_free(struct_state.value);
560                                                fsd_free(struct_start_time.value);
561                                                fsd_free(struct_mtime.value);
562                                                fsd_free(struct_queue.value);
563                                                fsd_free(struct_account_name.value);
564
565                                                if ( status.name!=NULL )
566                                                        fsd_free(status.name);
567                                        }
568                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
569                                        {
570                                                log_event = true;
571                                        }
572                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
573                                        {
574                                                log_match = true;
575                                                log_event = false;
576                                        }
577                                        else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0)
578                                        {
579                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name));
580                                                self->date_changed = true;
581                                                log_match = false;
582                                        }
583                                       
584                                        ptr += n;
585                                        if ( *ptr != ';' )
586                                        {
587                                                break; /* end of line */
588                                        }
589                                        field_n++;
590                                        ++ptr;
591                                }               
592
593                                fsd_free(temp_date);                   
594                        } /* end of while getline loop */                       
595                       
596                        if(self->job == NULL)
597                        {
598                                struct timeval timeout_tv;
599                                fd_set log_fds;
600       
601                                fsd_mutex_unlock( &self->session->mutex );                     
602                               
603                                FD_ZERO(&log_fds);
604                                FD_SET(self->fd, &log_fds);
605
606                                timeout_tv.tv_sec = 1;
607                                timeout_tv.tv_usec = 0;
608
609                                /* ignore return value - the next get line call will handle IO errors */
610                                (void)select(1, &log_fds, NULL, NULL, &timeout_tv);
611
612                                fsd_mutex_lock( &self->session->mutex );       
613
614                                self->run_flag = self->session->wait_thread_run_flag;
615                        }
616                }               
617                EXCEPT_DEFAULT
618                {
619                        const fsd_exc_t *e = fsd_exc_get();
620                        /* Its better to exit and communicate error rather then let the application to hang */
621                        fsd_log_fatal(( "Exception in wait thread %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
622                        exit(1);
623                }
624                END_TRY
625
626                if(self->fd != -1)
627                        close(self->fd);
628                fsd_log_debug(("%s - Log file closed",self->name));     
629        }
630        FINALLY
631        {
632                fsd_log_debug(("%s - Terminated.",self->name));
633                if(self->job == NULL)
634                        fsd_mutex_unlock( &self->session->mutex ); /**/
635        }
636        END_TRY
637       
638        fsd_log_return((""));
639        return true;
640}
641
642void
643pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
644{
645        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
646       
647        if(self->date_changed)
648        {
649                char * log_path = NULL;
650                int num_tries = 0;
651                struct tm tm;
652               
653                fsd_log_enter((""));
654               
655                if(!self->first_open)
656                        time(&self->t);
657                else
658                        self->t = pbssession->log_file_initial_time;
659                       
660                localtime_r(&self->t,&tm);
661                               
662                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
663                /* generate new date, close file and open new */
664                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
665                                        pbssession->pbs_home,   
666                                        tm.tm_year + 1900,
667                                        tm.tm_mon + 1,
668                                        tm.tm_mday)) == NULL) {
669                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
670                }
671
672                if(self->fd != -1)
673                        close(self->fd);
674
675                fsd_log_debug(("Log file: %s",log_path));
676                               
677        retry:
678                if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
679                {
680                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
681                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
682                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
683                        pbssession->wait_thread_log = false;
684                        pbssession->super.wait_thread = pbssession->super_wait_thread;
685                        pbssession->super.wait_thread(self->session);
686                } else if ( self->fd == -1 ) {
687                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
688                        num_tries++;
689                        sleep(5);
690                        goto retry;
691                }
692
693                fsd_free(log_path);
694
695                fsd_log_debug(("Log file opened"));
696
697                if(self->first_open) {
698                        fsd_log_debug(("Log file lseek"));
699                        if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) {
700                                char errbuf[256] = "InternalError";
701                                (void)strerror_r(errno, errbuf, 256);
702                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
703                        }
704                        self->first_open = false;
705                }
706
707                self->date_changed = false;
708               
709                fsd_log_return((""));
710        }       
711}
712
713ssize_t
714pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
715{
716        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
717}
718
719/* reverse date compare*/
720int
721pbsdrmaa_date_compare(const void *a, const void *b)
722{
723        const char *ia = *(const char **) a;
724        const char *ib = *(const char **) b;
725        return strcmp(ib, ia);
726}
727
728void
729pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self )
730{
731        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;   
732       
733        char * log_path = NULL;
734        int num_tries = 0;
735        static int file_number = 0;
736        fsd_log_enter((""));
737               
738        if(self->first_open)
739        {                       
740                DIR *dp = NULL;         
741                char * path = NULL;
742                struct dirent *ep = NULL;
743               
744                if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL)
745                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
746               
747                self->log_files_number = 0;     
748                dp = opendir (path);
749
750                fsd_calloc(self->log_files,2,char*);
751       
752                if (dp != NULL)
753                {
754                        while ((ep = readdir (dp)))
755                        {
756                                self->log_files_number++;
757                                if(self->log_files_number > 2)
758                                        fsd_realloc(self->log_files,self->log_files_number,char *);
759                               
760                                self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name);
761                        }
762                        (void) closedir (dp);
763                }
764                else
765                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory");
766
767                qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare);
768               
769                if(self->log_files_number <= 2)
770                {
771                        self->run_flag = false;
772                        fsd_log_error(("Job_on_missing - No log files available"));
773                }
774               
775                self->first_open = false;
776                fsd_free(path);
777        }       
778        else /* check previous day*/
779        {
780                if(++file_number > self->log_files_number - 2)
781                        fsd_log_error(("Job_on_missing - All available log files checked"));
782                else
783                        fsd_log_debug(("Job_on_missing checking previous day"));
784               
785                self->run_flag = false;
786                pbsdrmaa_job_on_missing_standard( self->job );                         
787        }
788       
789        #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
790        if((log_path = fsd_asprintf("%s/server_logs/%s",
791                                pbssession->pbs_home,   
792                                self->log_files[file_number])) == NULL) {
793                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
794        }
795
796        if(self->fd != -1)
797                close(self->fd);
798
799        fsd_log_debug(("Log file: %s",log_path));
800                               
801retry:
802        if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
803        {
804                fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing"));
805                fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly"));
806                self->run_flag = false;
807                pbsdrmaa_job_on_missing_standard( self->job );                 
808        } else if ( self->fd == -1 ) {
809                fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
810                num_tries++;
811                sleep(5);
812                goto retry;
813        }
814        else
815        {
816                struct stat statbuf;
817                if(stat(log_path,&statbuf) == -1) {
818                                char errbuf[256] = "InternalError";
819                                (void)strerror_r(errno, errbuf, 256);
820                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
821                }
822                self->log_file_read_size = 0;
823                self->log_file_initial_size = statbuf.st_size;
824                fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size));
825        }
826
827        fsd_free(log_path);
828
829        fsd_log_debug(("Log file opened"));
830       
831        fsd_log_return((""));
832}
833
834ssize_t
835pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
836{
837        int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx);
838       
839        if(n >= 0)
840                self->log_file_read_size += n;
841               
842        if(self->log_file_read_size >= self->log_file_initial_size)
843                return -1;
844
845        return n;
846}
847
848void
849pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self )
850{
851        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
852               
853        char * log_path = NULL;
854
855        struct tm tm;
856               
857        fsd_log_enter((""));
858               
859        time(&self->t);
860                       
861        localtime_r(&self->t,&tm);
862                               
863        #define DRMAA_ACCOUNTING_MAX_TRIES (12)
864        /* generate new date, close file and open new */
865        if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d",
866                                pbssession->pbs_home,   
867                                tm.tm_year + 1900,
868                                tm.tm_mon + 1,
869                                tm.tm_mday)) == NULL) {
870                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible");
871        }
872
873        if(self->fd != -1)
874                close(self->fd);
875
876        fsd_log_debug(("Accounting Log file: %s",log_path));                           
877
878        if((self->fd = open(log_path,O_RDONLY) ) == -1 )
879        {
880                fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home."));
881        }
882
883        fsd_free(log_path);
884
885        fsd_log_debug(("Accounting Log file opened"));
886
887        fsd_log_return((""));   
888}
889
890ssize_t
891pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
892{
893        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
894}
895
896enum field_acc
897{
898        FLD_ACC_DATE = 0,
899        FLD_ACC_EVENT = 1,
900        FLD_ACC_ID = 2,
901        FLD_ACC_MSG = 3
902};
903
904bool
905pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self )
906{
907        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
908        bool res = false;
909       
910        fsd_job_t *volatile temp_job = NULL;
911               
912        fsd_log_enter((""));
913        fsd_log_debug(("Accounting Log file opened"));
914        if(self->job == NULL)
915                fsd_mutex_lock( &self->session->mutex );
916
917        TRY
918        {               
919                TRY
920                {
921                        char line[4096] = "";
922                        char buffer[4096] = "";
923                        int idx = 0, end_idx = 0, line_idx = 0;
924                       
925                        self->select_file(self);
926                       
927                        if(self->fd != -1)                                     
928                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                         
929                        {
930                                const char *volatile ptr = line;
931                                char field[256] = "";
932                                int volatile field_n = 0;
933                                int n;
934                               
935                                bool volatile job_id_match = false;     
936                       
937                                bool volatile job_found = false;
938                                char *  temp_date = NULL;
939                               
940                                struct batch_status status;
941                               
942                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
943                                {
944                                        status.next = NULL;
945                                        status.attribs = NULL;
946                               
947                                        if(field_n == FLD_ACC_DATE)
948                                        {
949                                                temp_date = fsd_strdup(field);
950                                        }
951                                        else if(field_n == FLD_ACC_EVENT)
952                                        {
953                                                       
954                                        }
955                                        else if(field_n == FLD_ACC_ID)
956                                        {                                                       
957                                                TRY
958                                                {                                                               
959                                                                int diff = -1;
960                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
961                                                                if( diff == 0)
962                                                                {
963                                                                        /* read this file to the place we started and exit*/
964                                                                        fsd_log_debug(("Accounting found job: %s",self->job->job_id));
965                                                                        job_found = true;
966                                                                        job_id_match = true;
967                                                                        status.name = fsd_strdup(self->job->job_id);                                                                   
968                                                                }       
969                                                }
970                                                END_TRY
971                                        }
972                                        else if(job_id_match && field_n == FLD_ACC_MSG)
973                                        {                                       
974                                                struct attrl * struct_attrl = calloc(10,sizeof(struct attrl));
975                                                                                             
976                                                if(field[0] == 'q')
977                                                {
978                                                        status.attribs = &struct_attrl[0];
979                                                        struct_attrl[0].name =  ATTR_queue;
980                                                        struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1);
981                                                        struct_attrl[0].next = NULL;
982                                                }
983                                                else if(field[0] == 'u')
984                                                {
985                                                        /* rusage */
986                                                        const char *ptr2 = field;
987                                                        char  msg[ 256 ] = "";
988                                                        int n2 = 0;
989                                                        int msg_field_n = 0;   
990                               
991                                                        status.attribs = &struct_attrl[0];
992
993                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
994                                                         {                                             
995                                                                switch(msg_field_n)
996                                                                {
997                                                                        case FLD_MSG_ACC_USER:
998                                                                                struct_attrl[msg_field_n].name = ATTR_euser;                                                                   
999                                                                                break;
1000
1001                                                                        case FLD_MSG_ACC_GROUP:
1002                                                                                struct_attrl[msg_field_n].name = ATTR_egroup;
1003                                                                                break;
1004
1005                                                                        case FLD_MSG_ACC_JOBNAME:
1006                                                                                struct_attrl[msg_field_n].name = ATTR_name;
1007                                                                                break;
1008
1009                                                                        case FLD_MSG_ACC_QUEUE:
1010                                                                                struct_attrl[msg_field_n].name = ATTR_queue;
1011                                                                                break;
1012
1013                                                                        case FLD_MSG_ACC_CTIME:
1014                                                                                struct_attrl[msg_field_n].name = ATTR_ctime;
1015                                                                                break;
1016                                                                               
1017                                                                        case FLD_MSG_ACC_QTIME:
1018                                                                                struct_attrl[msg_field_n].name = ATTR_qtime;
1019                                                                                break;
1020                                                                               
1021                                                                        case FLD_MSG_ACC_ETIME:
1022                                                                                struct_attrl[msg_field_n].name = ATTR_etime;
1023                                                                                break;
1024#ifndef PBS_PROFESSIONAL               
1025                                                                        case FLD_MSG_ACC_START:
1026                                                                                struct_attrl[msg_field_n].name = ATTR_start_time;
1027#else
1028                                                                        case FLD_MSG_ACC_START:
1029                                                                                struct_attrl[msg_field_n].name = ATTR_stime;
1030#endif
1031                                                                               
1032                                                                        case FLD_MSG_ACC_OWNER:
1033                                                                                struct_attrl[msg_field_n].name = ATTR_owner;
1034                                                                                break;
1035                                                                               
1036                                                                        case FLD_MSG_ACC_EXEC_HOST:
1037                                                                                struct_attrl[msg_field_n].name = ATTR_exechost;
1038                                                                                break;                                                                         
1039                                                                }
1040                                                               
1041                                                                struct_attrl[msg_field_n].value  = fsd_strdup(strchr(msg,'=')+1);
1042                                                                if(msg_field_n!=9)
1043                                                                {
1044                                                                        struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1];
1045                                                                }
1046                                                                else
1047                                                                {
1048                                                                        struct_attrl[msg_field_n].next = NULL;
1049                                                                        break;
1050                                                                }
1051
1052                                                                ptr2 += n2;
1053                                                                msg_field_n++;
1054                                                                if ( *ptr2 != ' ' )
1055                                                                        break;
1056
1057                                                                ++ptr2;
1058                                                        }
1059                                                }                                               
1060
1061                                                if( job_found && status.attribs != NULL)
1062                                                {
1063                                                        fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id ));                                     
1064                                                        pbsjob->update( self->job, &status );
1065                                                        res = true;
1066                                                }
1067                                               
1068                                                if(self->job == NULL)
1069                                                {
1070                                                        fsd_cond_broadcast( &temp_job->status_cond);
1071                                                        fsd_cond_broadcast( &self->session->wait_condition );
1072                                                }
1073                                                if ( temp_job )
1074                                                        temp_job->release( temp_job );
1075       
1076                                                int i = 0;
1077                                                for(i = 0; i < 10; i++)
1078                                                {
1079                                                        fsd_free(struct_attrl[i].value);
1080                                                }
1081                                                fsd_free(struct_attrl);
1082                                                fsd_free(status.name);
1083                                        }
1084                                       
1085                                       
1086                                        ptr += n;
1087                                        if ( *ptr != ';' )
1088                                        {
1089                                                break; /* end of line */
1090                                        }
1091                                        field_n++;
1092                                        ++ptr;
1093                                }               
1094
1095                                fsd_free(temp_date);                   
1096                        } /* end of while getline loop */       
1097                       
1098                }               
1099                EXCEPT_DEFAULT
1100                 {
1101                        const fsd_exc_t *e = fsd_exc_get();
1102                        /* Its better to exit and communicate error rather then let the application to hang */
1103                        fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
1104                        exit(1);
1105                 }
1106                END_TRY
1107
1108                if(self->fd != -1)
1109                        close(self->fd);
1110                fsd_log_debug(("%s - Accounting log file closed",self->name)); 
1111        }
1112        FINALLY
1113        {
1114                fsd_log_debug(("%s - Terminated.",self->name));
1115                if(self->job == NULL)
1116                        fsd_mutex_unlock( &self->session->mutex ); /**/
1117        }
1118        END_TRY
1119       
1120        fsd_log_return((""));
1121        return res;
1122}
1123
1124int
1125fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */
1126{
1127        int job1;
1128        int job2;
1129        char *rest = NULL;
1130        char *token = NULL;
1131        char *ptr = fsd_strdup(s1);
1132        token = strtok_r(ptr, ".", &rest);
1133        job1 = atoi(token);
1134       
1135        fsd_free(token);
1136       
1137        ptr = fsd_strdup(s2);
1138        token = strtok_r(ptr,".",&rest);
1139        job2 = atoi(token);
1140       
1141        fsd_free(token);
1142        return job1 - job2;
1143}
1144
Note: See TracBrowser for help on using the repository browser.