source: trunk/pbs_drmaa/log_reader.c @ 28

Revision 28, 31.2 KB checked in by mmamonski, 10 years ago (diff)

hanging else

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51static bool
52pbsdrmaa_read_log();
53
54static void
55pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self);
56
57static ssize_t
58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
59
60static void
61pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self );
62
63static ssize_t
64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
65
66static void
67pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self );
68
69static ssize_t
70pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
71
72static bool
73pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self );
74
75int
76fsd_job_id_cmp(const char *s1, const char *s2);
77
78int
79pbsdrmaa_date_compare(const void *a, const void *b) ;
80
81/*
82 * Snippets from log files
83 *
84 * PBS Pro
85 *
8610/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
8710/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
8810/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
8910/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
9010/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
91
92 *
93 * Torque
94 *
9510/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
9610/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
9710/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
98
99 */
100pbsdrmaa_log_reader_t *
101pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
102{
103        pbsdrmaa_log_reader_t *volatile self = NULL;
104
105        fsd_log_enter((""));
106        TRY
107        {
108                fsd_malloc(self, pbsdrmaa_log_reader_t );
109               
110                self->session = session;
111               
112                /* ~templete method pattern */
113                if(job != NULL) /* job on missing */
114                {
115                        self->job = job;
116                        self->name = "Job_on_missing";
117                        self->select_file = pbsdrmaa_select_file_job_on_missing;
118                        self->read_line = pbsdrmaa_read_line_job_on_missing;
119                }
120                else /* wait thread */
121                {
122                        self->job = NULL;
123                        self->name = "WT";
124                        self->select_file = pbsdrmaa_select_file_wait_thread;
125                        self->read_line = pbsdrmaa_read_line_wait_thread;
126                }               
127                self->read_log = pbsdrmaa_read_log;     
128               
129                self->log_files = NULL;
130                self->log_files_number = 0;
131               
132                self->run_flag = true;
133                self->fd = -1;
134                self->date_changed = true;
135                self->first_open = true;
136               
137                self->log_file_initial_size = 0;
138                self->log_file_read_size = 0;
139        }
140        EXCEPT_DEFAULT
141        {
142                if( self != NULL)
143                        fsd_free(self);
144                       
145                fsd_exc_reraise();
146        }
147        END_TRY
148        fsd_log_return((""));
149        return self;
150}
151
152pbsdrmaa_log_reader_t *
153pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
154{
155        pbsdrmaa_log_reader_t *volatile self = NULL;
156
157        fsd_log_enter((""));
158        TRY
159        {
160                fsd_malloc(self, pbsdrmaa_log_reader_t );
161               
162                self->session = session;
163               
164                self->job = job;
165                self->name = "Accounting";
166                self->select_file = pbsdrmaa_select_file_accounting;
167                self->read_line = pbsdrmaa_read_line_accounting;
168                               
169                self->read_log = pbsdrmaa_read_log_accounting; 
170               
171                self->log_files = NULL;
172                self->log_files_number = 0;
173               
174                self->run_flag = true;
175                self->fd = -1;
176                self->date_changed = true;
177                self->first_open = true;
178               
179                self->log_file_initial_size = 0;
180                self->log_file_read_size = 0;
181        }
182        EXCEPT_DEFAULT
183        {
184                if( self != NULL)
185                        fsd_free(self);
186                       
187                fsd_exc_reraise();
188        }
189        END_TRY
190        fsd_log_return((""));
191        return self;
192}
193
194void
195pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
196{
197        fsd_log_enter((""));
198        TRY
199        {
200                if(self != NULL)
201                {
202                        int i = -1;
203                        for(i = 0; i < self->log_files_number ; i++)
204                                fsd_free(self->log_files[i]);
205                        fsd_free(self->log_files);
206                        fsd_free(self);
207                }                       
208        }
209        EXCEPT_DEFAULT
210        {
211                fsd_exc_reraise();
212        }
213        END_TRY
214       
215        fsd_log_return((""));
216}
217
218enum field
219{
220        FLD_DATE = 0,
221        FLD_EVENT = 1,
222        FLD_OBJ = 2,
223        FLD_TYPE = 3,
224        FLD_ID = 4,
225        FLD_MSG = 5
226};
227
228enum field_msg
229{
230        FLD_MSG_EXIT_STATUS = 0,
231        FLD_MSG_CPUT = 1,
232        FLD_MSG_MEM = 2,
233        FLD_MSG_VMEM = 3,
234        FLD_MSG_WALLTIME = 4
235};
236
237enum field_msg_accounting
238{
239        FLD_MSG_ACC_USER = 0,
240        FLD_MSG_ACC_GROUP = 1,
241        FLD_MSG_ACC_JOBNAME = 2,
242        FLD_MSG_ACC_QUEUE = 3,
243        FLD_MSG_ACC_CTIME = 4,
244        FLD_MSG_ACC_QTIME = 5,
245        FLD_MSG_ACC_ETIME = 6,
246        FLD_MSG_ACC_START = 7,
247        FLD_MSG_ACC_OWNER = 8,
248        FLD_MSG_ACC_EXEC_HOST = 9,
249        FLD_MSG_ACC_RES_NEEDNODES = 10,
250        FLD_MSG_ACC_RES_NODECT = 11,
251        FLD_MSG_ACC_RES_NODES = 12,
252        FLD_MSG_ACC_RES_WALLTIME = 13
253};
254
255#define FLD_MSG_STATUS "0010"
256#define FLD_MSG_STATE "0008"
257#define FLD_MSG_LOG "0002"
258
259bool
260pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
261{
262        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;
263        fsd_job_t *volatile temp_job = NULL;
264               
265        fsd_log_enter((""));
266       
267        if(self->job == NULL)
268                fsd_mutex_lock( &self->session->mutex );
269
270        TRY
271        {               
272                while( self->run_flag )
273                TRY
274                {
275                        char line[4096] = "";
276                        char buffer[4096] = "";
277                        int idx = 0, end_idx = 0, line_idx = 0;
278                       
279                        self->select_file(self);
280
281                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)
282                        {
283                                const char *volatile ptr = line;
284                                char field[256] = "";
285                                char job_id[256] = "";
286                                char event[256] = "";
287                                int volatile field_n = 0;
288                                int n;
289                               
290                                bool volatile job_id_match = false;
291                                bool volatile event_match = false;
292                                bool volatile log_event = false;
293                                bool volatile log_match = false;
294                                bool volatile older_job_found = false;
295                                bool volatile job_found = false;
296                                char * temp_date = NULL;
297                               
298                                struct batch_status status;
299                                status.next = NULL;
300
301                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
302                                {
303                                        if(field_n == FLD_DATE)
304                                        {
305                                                temp_date = fsd_strdup(field);
306                                        }
307                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || strcmp(field,FLD_MSG_STATE) == 0 ))
308                                        {
309                                                /* event described by log line*/
310                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event))
311                                                {
312                                                        fsd_log_error(("%s - strlcpy error",self->name));
313                                                }
314                                                event_match = true;
315                                        }
316                                        else if(event_match && field_n == FLD_ID)
317                                        {       
318                                                TRY
319                                                {       
320                                                        if(self->job == NULL) /* wait_thread */
321                                                        {
322                                                                temp_job = self->session->get_job( self->session, field );
323                                                                pbsjob = (pbsdrmaa_job_t*) temp_job;
324
325                                                                if( temp_job )
326                                                                {
327                                                                        if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
328                                                                                fsd_log_error(("%s - strlcpy error",self->name));
329                                                                        }
330                                                                        fsd_log_debug(("%s - job_id: %s",self->name,job_id));
331                                                                        status.name = fsd_strdup(job_id);
332                                                                        job_id_match = true; /* job_id is in drmaa */   
333                                                                }
334                                                                else
335                                                                {
336                                                                        fsd_log_debug(("%s - Unknown job: %s", self->name,field));
337                                                                }
338                                                        }
339                                                        else /* job_on_missing */
340                                                        {
341                                                                int diff = -1;
342                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
343                                                                if( diff == 0)
344                                                                {
345                                                                        /* read this file to the place we started and exit*/
346                                                                        fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id));
347                                                                        job_found = true;
348                                                                        older_job_found = false;
349                                                                        self->run_flag = false;
350                                                                        job_id_match = true;
351                                                                        status.name = fsd_strdup(self->job->job_id);
352                                                                }
353                                                                else if ( !job_found && diff >= 1)
354                                                                {
355                                                                        /* older job, find its beginning */
356                                                                        fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field));
357                                                                        older_job_found = true;
358                                                                        job_id_match = true;
359                                                                        status.name = fsd_strdup(self->job->job_id);
360                                                                }
361                                                                else  if( !job_found )
362                                                                {
363                                                                        fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field));
364                                                                }                                                               
365                                                        }
366                                                }
367                                                END_TRY
368                                        }
369                                        else if(job_id_match && field_n == FLD_MSG)
370                                        {                                               
371                                                /* parse msg - depends on FLD_EVENT */
372                                                struct attrl struct_resource_cput,
373                                                        struct_resource_mem,
374                                                        struct_resource_vmem,
375                                                        struct_resource_walltime,
376                                                        struct_status,
377                                                        struct_state,
378                                                        struct_start_time,
379                                                        struct_mtime,
380                                                        struct_queue,
381                                                        struct_account_name,
382                                                        struct_exec_vnode;
383                                                struct attrl *last_attr = NULL;
384                                               
385                                                bool state_running = false;
386
387                                                memset(&struct_status,0,sizeof(struct attrl));
388                                                memset(&struct_state,0,sizeof(struct attrl));
389                                                memset(&struct_resource_cput,0,sizeof(struct attrl));
390                                                memset(&struct_resource_mem,0,sizeof(struct attrl));
391                                                memset(&struct_resource_vmem,0,sizeof(struct attrl));
392                                                memset(&struct_resource_walltime,0,sizeof(struct attrl));
393                                                memset(&struct_start_time,0,sizeof(struct attrl));
394                                                memset(&struct_mtime,0,sizeof(struct attrl));
395                                                memset(&struct_queue,0,sizeof(struct attrl));
396                                                memset(&struct_account_name,0,sizeof(struct attrl));
397                                                memset(&struct_exec_vnode,0,sizeof(struct attrl));
398                                                               
399                                                if (strcmp(event,FLD_MSG_STATE) == 0)
400                                                {
401                                                        /* job run, modified, queued etc */
402                                                        int n = 0;
403                                                        status.attribs = &struct_state;
404                                                        struct_state.next = NULL;
405                                                        struct_state.name = "job_state";
406                                                        last_attr = &struct_state;
407
408                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
409                                                         {
410                                                                n = 4;
411                                                                if(older_job_found) /* job_on_missing - older job beginning - read this file and end */
412                                                                 {
413                                                                        self->run_flag = false;
414                                                                        fsd_log_debug(("Job_on_missing found older job beginning"));
415                                                                        fsd_free(status.name);
416                                                                        break;
417                                                                 }
418
419                                                                 { /* modified */
420                                                                        struct tm temp_time_tm;
421                                                                        memset(&temp_time_tm, 0, sizeof(temp_time_tm));
422                                                                        temp_time_tm.tm_isdst = -1;
423
424                                                                        if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
425                                                                         {
426                                                                                fsd_log_error(("failed to parse mtime: %s (line = %s)", temp_date, line));
427                                                                         }
428                                                                        else
429                                                                         {
430                                                                                time_t temp_time = mktime(&temp_time_tm);
431                                                                                last_attr->next = &struct_mtime;
432                                                                                last_attr = &struct_mtime;
433                                                                                struct_mtime.name = "mtime";
434                                                                                struct_mtime.next = NULL;
435                                                                                struct_mtime.value = fsd_asprintf("%lu",temp_time);
436                                                                         }
437                                                                 }
438                                                         }
439
440                                                        /* != Job deleted and Job to be deleted*/
441#ifdef PBS_PROFESSIONAL
442                                                        if      (field[4] != 't' && field[10] != 'd')
443                                                         {
444#else
445                                                        if (field[4] != 'd')
446                                                         {
447#endif
448                                                                struct_state.value = fsd_asprintf("%c",field[n]);
449                                                                if(struct_state.value[0] == 'R')
450                                                                 {
451                                                                        state_running = true;
452#ifdef PBS_PROFESSIONAL
453                                                                        {
454                                                                                char *p_vnode = NULL;
455                                                                                if ((p_vnode = strstr(field, "exec_vnode")))
456                                                                                 {
457                                                                                        last_attr->next = &struct_exec_vnode;
458                                                                                        last_attr =  &struct_exec_vnode;
459                                                                                        struct_exec_vnode.name = "exec_vnode";
460                                                                                        struct_exec_vnode.next = NULL;
461                                                                                        struct_exec_vnode.value = fsd_strdup(p_vnode + 11);
462                                                                                 }
463                                                                        }
464#endif
465                                                                 }
466                                                         }
467                                                        else
468                                                         { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
469                                                                struct_status.name = "exit_status";
470                                                                struct_status.value = fsd_strdup("-1");
471                                                                struct_status.next = NULL;
472                                                                struct_state.next = &struct_status;
473                                                                struct_state.value = fsd_strdup("C");
474                                                         }
475                                                }
476                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
477                                                {
478                                                        /* exit status and rusage */
479                                                        const char *ptr2 = field;
480                                                        char  msg[ 256 ] = "";
481                                                        int n2;
482                                                        int msg_field_n = 0;
483                                                       
484                                                        struct_resource_cput.name = "resources_used";
485                                                        struct_resource_mem.name = "resources_used";
486                                                        struct_resource_vmem.name = "resources_used";
487                                                        struct_resource_walltime.name = "resources_used";
488                                                        struct_status.name = "exit_status";
489                                                        struct_state.name = "job_state";
490                               
491                                                        status.attribs = &struct_resource_cput;
492                                                        struct_resource_cput.next = &struct_resource_mem;
493                                                        struct_resource_mem.next = &struct_resource_vmem;
494                                                        struct_resource_vmem.next = &struct_resource_walltime;
495                                                        struct_resource_walltime.next =  &struct_status;
496                                                        struct_status.next = &struct_state;
497                                                        struct_state.next = NULL;
498
499                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
500                                                         {                                             
501                                                                switch(msg_field_n)
502                                                                {
503                                                                        case FLD_MSG_EXIT_STATUS:
504                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
505                                                                                break;
506
507                                                                        case FLD_MSG_CPUT:
508                                                                                struct_resource_cput.resource = "cput";
509                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
510                                                                                break;
511
512                                                                        case FLD_MSG_MEM:
513                                                                                struct_resource_mem.resource = "mem";
514                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
515                                                                                break;
516
517                                                                        case FLD_MSG_VMEM:
518                                                                                struct_resource_vmem.resource = "vmem";
519                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
520                                                                                break;
521
522                                                                        case FLD_MSG_WALLTIME:
523                                                                                struct_resource_walltime.resource = "walltime";
524                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
525                                                                                break;
526                                                                }
527
528                                                                ptr2 += n2;
529                                                                msg_field_n++;
530                                                                if ( *ptr2 != ' ' )
531                                                                         break;
532                                                                ++ptr2;
533                                                        }
534                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
535                                                        fsd_log_info(("WT - job %s found as finished on %u", temp_job->job_id, (unsigned int)time(NULL)));
536                                                }                                               
537                                                 
538                                                if(self->job == NULL) /* wait_thread */
539                                                {
540                                                        if ( state_running )
541                                                        {
542                                                                fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id ));
543                                                                TRY
544                                                                {
545                                                                        temp_job->update_status( temp_job );
546                                                                }
547                                                                EXCEPT_DEFAULT
548                                                                {
549                                                                        /*TODO: distinguish between invalid job and internal errors */
550                                                                        fsd_log_debug(("Job finished just after entering running state: %s", temp_job->job_id));
551                                                                }
552                                                                END_TRY
553                                                        }
554                                                        else
555                                                        {
556                                                                fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id ));
557                                                                pbsjob->update( temp_job, &status );
558                                                        }
559                                                }
560                                                else if( job_found ) /* job_on_missing */
561                                                {
562                                                        fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id ));
563                                                        pbsjob->update( self->job, &status );
564                                                }
565                                               
566                                                if(self->job == NULL)
567                                                {
568                                                        fsd_cond_broadcast( &temp_job->status_cond);
569                                                        fsd_cond_broadcast( &self->session->wait_condition );
570                                                }
571                                                if ( temp_job )
572                                                        temp_job->release( temp_job );
573       
574                                                fsd_free(struct_resource_cput.value);
575                                                fsd_free(struct_resource_mem.value);
576                                                fsd_free(struct_resource_vmem.value);
577                                                fsd_free(struct_resource_walltime.value);
578                                                fsd_free(struct_status.value);
579                                                fsd_free(struct_state.value);
580                                                fsd_free(struct_start_time.value);
581                                                fsd_free(struct_mtime.value);
582                                                fsd_free(struct_queue.value);
583                                                fsd_free(struct_account_name.value);
584
585                                                if ( status.name!=NULL )
586                                                        fsd_free(status.name);
587                                        }
588                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
589                                        {
590                                                log_event = true;
591                                        }
592                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
593                                        {
594                                                log_match = true;
595                                                log_event = false;
596                                        }
597                                        else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0)
598                                        {
599                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name));
600                                                self->date_changed = true;
601                                                log_match = false;
602                                        }
603                                       
604                                        ptr += n;
605                                        if ( *ptr != ';' )
606                                        {
607                                                break; /* end of line */
608                                        }
609                                        field_n++;
610                                        ++ptr;
611                                }               
612
613                                fsd_free(temp_date);
614                        } /* end of while getline loop */
615                       
616                        if(self->job == NULL)
617                        {
618                                struct timeval timeout_tv;
619                                fd_set log_fds;
620       
621                                fsd_mutex_unlock( &self->session->mutex );
622                               
623                                FD_ZERO(&log_fds);
624                                FD_SET(self->fd, &log_fds);
625
626                                timeout_tv.tv_sec = 1;
627                                timeout_tv.tv_usec = 0;
628
629                                /* ignore return value - the next get line call will handle IO errors */
630                                (void)select(1, &log_fds, NULL, NULL, &timeout_tv);
631
632                                fsd_mutex_lock( &self->session->mutex );       
633
634                                self->run_flag = self->session->wait_thread_run_flag;
635                        }
636                }               
637                EXCEPT_DEFAULT
638                {
639                        const fsd_exc_t *e = fsd_exc_get();
640                        /* Its better to exit and communicate error rather then let the application to hang */
641                        fsd_log_fatal(( "Exception in wait thread %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
642                        exit(1);
643                }
644                END_TRY
645
646                if(self->fd != -1)
647                        close(self->fd);
648                fsd_log_debug(("%s - Log file closed",self->name));     
649        }
650        FINALLY
651        {
652                fsd_log_debug(("%s - Terminated.",self->name));
653                if(self->job == NULL)
654                        fsd_mutex_unlock( &self->session->mutex ); /**/
655        }
656        END_TRY
657       
658        fsd_log_return((""));
659        return true;
660}
661
662void
663pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
664{
665        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
666       
667        if(self->date_changed)
668        {
669                char * log_path = NULL;
670                int num_tries = 0;
671                struct tm tm;
672               
673                fsd_log_enter((""));
674               
675                if(!self->first_open)
676                        time(&self->t);
677                else
678                        self->t = pbssession->log_file_initial_time;
679                       
680                localtime_r(&self->t,&tm);
681                               
682                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
683                /* generate new date, close file and open new */
684                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
685                                        pbssession->pbs_home,   
686                                        tm.tm_year + 1900,
687                                        tm.tm_mon + 1,
688                                        tm.tm_mday)) == NULL) {
689                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
690                }
691
692                if(self->fd != -1)
693                        close(self->fd);
694
695                fsd_log_debug(("Log file: %s",log_path));
696                               
697        retry:
698                if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
699                {
700                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
701                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
702                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
703                        pbssession->wait_thread_log = false;
704                        pbssession->super.wait_thread = pbssession->super_wait_thread;
705                        pbssession->super.wait_thread(self->session);
706                } else if ( self->fd == -1 ) {
707                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
708                        num_tries++;
709                        sleep(5);
710                        goto retry;
711                }
712
713                fsd_free(log_path);
714
715                fsd_log_debug(("Log file opened"));
716
717                if(self->first_open) {
718                        fsd_log_debug(("Log file lseek"));
719                        if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) {
720                                char errbuf[256] = "InternalError";
721                                (void)strerror_r(errno, errbuf, 256);
722                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
723                        }
724                        self->first_open = false;
725                }
726
727                self->date_changed = false;
728               
729                fsd_log_return((""));
730        }       
731}
732
733ssize_t
734pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
735{
736        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
737}
738
739/* reverse date compare*/
740int
741pbsdrmaa_date_compare(const void *a, const void *b)
742{
743        const char *ia = *(const char **) a;
744        const char *ib = *(const char **) b;
745        return strcmp(ib, ia);
746}
747
748void
749pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self )
750{
751        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;   
752       
753        char * log_path = NULL;
754        int num_tries = 0;
755        static int file_number = 0;
756        fsd_log_enter((""));
757               
758        if(self->first_open)
759        {                       
760                DIR *dp = NULL;         
761                char * path = NULL;
762                struct dirent *ep = NULL;
763               
764                if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL)
765                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
766               
767                self->log_files_number = 0;     
768                dp = opendir (path);
769
770                fsd_calloc(self->log_files,2,char*);
771       
772                if (dp != NULL)
773                {
774                        while ((ep = readdir (dp)))
775                        {
776                                self->log_files_number++;
777                                if(self->log_files_number > 2)
778                                        fsd_realloc(self->log_files,self->log_files_number,char *);
779                               
780                                self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name);
781                        }
782                        (void) closedir (dp);
783                }
784                else
785                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory");
786
787                qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare);
788               
789                if(self->log_files_number <= 2)
790                {
791                        self->run_flag = false;
792                        fsd_log_error(("Job_on_missing - No log files available"));
793                }
794               
795                self->first_open = false;
796                fsd_free(path);
797        }       
798        else /* check previous day*/
799        {
800                if(++file_number > self->log_files_number - 2)
801                        fsd_log_error(("Job_on_missing - All available log files checked"));
802                else
803                        fsd_log_debug(("Job_on_missing checking previous day"));
804               
805                self->run_flag = false;
806                pbsdrmaa_job_on_missing_standard( self->job );                         
807        }
808       
809        #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
810        if((log_path = fsd_asprintf("%s/server_logs/%s",
811                                pbssession->pbs_home,   
812                                self->log_files[file_number])) == NULL) {
813                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
814        }
815
816        if(self->fd != -1)
817                close(self->fd);
818
819        fsd_log_debug(("Log file: %s",log_path));
820                               
821retry:
822        if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
823        {
824                fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing"));
825                fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly"));
826                self->run_flag = false;
827                pbsdrmaa_job_on_missing_standard( self->job );                 
828        } else if ( self->fd == -1 ) {
829                fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
830                num_tries++;
831                sleep(5);
832                goto retry;
833        }
834        else
835        {
836                struct stat statbuf;
837                if(stat(log_path,&statbuf) == -1) {
838                                char errbuf[256] = "InternalError";
839                                (void)strerror_r(errno, errbuf, 256);
840                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
841                }
842                self->log_file_read_size = 0;
843                self->log_file_initial_size = statbuf.st_size;
844                fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size));
845        }
846
847        fsd_free(log_path);
848
849        fsd_log_debug(("Log file opened"));
850       
851        fsd_log_return((""));
852}
853
854ssize_t
855pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
856{
857        int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx);
858       
859        if(n >= 0)
860                self->log_file_read_size += n;
861               
862        if(self->log_file_read_size >= self->log_file_initial_size)
863                return -1;
864
865        return n;
866}
867
868void
869pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self )
870{
871        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
872               
873        char * log_path = NULL;
874
875        struct tm tm;
876               
877        fsd_log_enter((""));
878               
879        time(&self->t);
880                       
881        localtime_r(&self->t,&tm);
882                               
883        #define DRMAA_ACCOUNTING_MAX_TRIES (12)
884        /* generate new date, close file and open new */
885        if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d",
886                                pbssession->pbs_home,   
887                                tm.tm_year + 1900,
888                                tm.tm_mon + 1,
889                                tm.tm_mday)) == NULL) {
890                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible");
891        }
892
893        if(self->fd != -1)
894                close(self->fd);
895
896        fsd_log_debug(("Accounting Log file: %s",log_path));
897
898        if((self->fd = open(log_path,O_RDONLY) ) == -1 )
899        {
900                fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home."));
901        }
902
903        fsd_free(log_path);
904
905        fsd_log_debug(("Accounting Log file opened"));
906
907        fsd_log_return((""));   
908}
909
910ssize_t
911pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
912{
913        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
914}
915
916enum field_acc
917{
918        FLD_ACC_DATE = 0,
919        FLD_ACC_EVENT = 1,
920        FLD_ACC_ID = 2,
921        FLD_ACC_MSG = 3
922};
923
924bool
925pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self )
926{
927        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
928        bool res = false;
929       
930        fsd_job_t *volatile temp_job = NULL;
931               
932        fsd_log_enter((""));
933        fsd_log_debug(("Accounting Log file opened"));
934        if(self->job == NULL)
935                fsd_mutex_lock( &self->session->mutex );
936
937        TRY
938        {               
939                TRY
940                {
941                        char line[4096] = "";
942                        char buffer[4096] = "";
943                        int idx = 0, end_idx = 0, line_idx = 0;
944                       
945                        self->select_file(self);
946                       
947                        if(self->fd != -1)                                     
948                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)
949                        {
950                                const char *volatile ptr = line;
951                                char field[256] = "";
952                                int volatile field_n = 0;
953                                int n;
954                               
955                                bool volatile job_id_match = false;     
956                       
957                                bool volatile job_found = false;
958                                char *  temp_date = NULL;
959                               
960                                struct batch_status status;
961                               
962                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
963                                {
964                                        status.next = NULL;
965                                        status.attribs = NULL;
966                               
967                                        if(field_n == FLD_ACC_DATE)
968                                        {
969                                                temp_date = fsd_strdup(field);
970                                        }
971                                        else if(field_n == FLD_ACC_EVENT)
972                                        {
973                                                       
974                                        }
975                                        else if(field_n == FLD_ACC_ID)
976                                        {                                                       
977                                                TRY
978                                                {                                                               
979                                                                int diff = -1;
980                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
981                                                                if( diff == 0)
982                                                                {
983                                                                        /* read this file to the place we started and exit*/
984                                                                        fsd_log_debug(("Accounting found job: %s",self->job->job_id));
985                                                                        job_found = true;
986                                                                        job_id_match = true;
987                                                                        status.name = fsd_strdup(self->job->job_id);
988                                                                }       
989                                                }
990                                                END_TRY
991                                        }
992                                        else if(job_id_match && field_n == FLD_ACC_MSG)
993                                        {                                       
994                                                struct attrl * struct_attrl = calloc(10,sizeof(struct attrl));
995                                                int i;
996
997                                                if(field[0] == 'q')
998                                                {
999                                                        status.attribs = &struct_attrl[0];
1000                                                        struct_attrl[0].name =  ATTR_queue;
1001                                                        struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1);
1002                                                        struct_attrl[0].next = NULL;
1003                                                }
1004                                                else if(field[0] == 'u')
1005                                                {
1006                                                        /* rusage */
1007                                                        const char *ptr2 = field;
1008                                                        char  msg[ 256 ] = "";
1009                                                        int n2 = 0;
1010                                                        int msg_field_n = 0;
1011                               
1012                                                        status.attribs = &struct_attrl[0];
1013
1014                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
1015                                                         {                                             
1016                                                                switch(msg_field_n)
1017                                                                {
1018                                                                        case FLD_MSG_ACC_USER:
1019                                                                                struct_attrl[msg_field_n].name = ATTR_euser;
1020                                                                                break;
1021
1022                                                                        case FLD_MSG_ACC_GROUP:
1023                                                                                struct_attrl[msg_field_n].name = ATTR_egroup;
1024                                                                                break;
1025
1026                                                                        case FLD_MSG_ACC_JOBNAME:
1027                                                                                struct_attrl[msg_field_n].name = ATTR_name;
1028                                                                                break;
1029
1030                                                                        case FLD_MSG_ACC_QUEUE:
1031                                                                                struct_attrl[msg_field_n].name = ATTR_queue;
1032                                                                                break;
1033
1034                                                                        case FLD_MSG_ACC_CTIME:
1035                                                                                struct_attrl[msg_field_n].name = ATTR_ctime;
1036                                                                                break;
1037                                                                               
1038                                                                        case FLD_MSG_ACC_QTIME:
1039                                                                                struct_attrl[msg_field_n].name = ATTR_qtime;
1040                                                                                break;
1041                                                                               
1042                                                                        case FLD_MSG_ACC_ETIME:
1043                                                                                struct_attrl[msg_field_n].name = ATTR_etime;
1044                                                                                break;
1045#ifndef PBS_PROFESSIONAL               
1046                                                                        case FLD_MSG_ACC_START:
1047                                                                                struct_attrl[msg_field_n].name = ATTR_start_time;
1048#else
1049                                                                        case FLD_MSG_ACC_START:
1050                                                                                struct_attrl[msg_field_n].name = ATTR_stime;
1051#endif
1052                                                                               
1053                                                                        case FLD_MSG_ACC_OWNER:
1054                                                                                struct_attrl[msg_field_n].name = ATTR_owner;
1055                                                                                break;
1056                                                                               
1057                                                                        case FLD_MSG_ACC_EXEC_HOST:
1058                                                                                struct_attrl[msg_field_n].name = ATTR_exechost;
1059                                                                                break;                                                                         
1060                                                                }
1061                                                               
1062                                                                struct_attrl[msg_field_n].value  = fsd_strdup(strchr(msg,'=')+1);
1063                                                                if(msg_field_n!=9)
1064                                                                {
1065                                                                        struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1];
1066                                                                }
1067                                                                else
1068                                                                {
1069                                                                        struct_attrl[msg_field_n].next = NULL;
1070                                                                        break;
1071                                                                }
1072
1073                                                                ptr2 += n2;
1074                                                                msg_field_n++;
1075                                                                if ( *ptr2 != ' ' )
1076                                                                        break;
1077
1078                                                                ++ptr2;
1079                                                        }
1080                                                }                                               
1081
1082                                                if( job_found && status.attribs != NULL)
1083                                                {
1084                                                        fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id ));
1085                                                        pbsjob->update( self->job, &status );
1086                                                        res = true;
1087                                                }
1088                                               
1089                                                if(self->job == NULL)
1090                                                {
1091                                                        fsd_cond_broadcast( &temp_job->status_cond);
1092                                                        fsd_cond_broadcast( &self->session->wait_condition );
1093                                                }
1094                                                if ( temp_job )
1095                                                        temp_job->release( temp_job );
1096       
1097                                                for(i = 0; i < 10; i++)
1098                                                {
1099                                                        fsd_free(struct_attrl[i].value);
1100                                                }
1101                                                fsd_free(struct_attrl);
1102                                                fsd_free(status.name);
1103                                        }
1104                                       
1105                                       
1106                                        ptr += n;
1107                                        if ( *ptr != ';' )
1108                                        {
1109                                                break; /* end of line */
1110                                        }
1111                                        field_n++;
1112                                        ++ptr;
1113                                }               
1114
1115                                fsd_free(temp_date);                   
1116                        } /* end of while getline loop */       
1117                       
1118                }               
1119                EXCEPT_DEFAULT
1120                 {
1121                        const fsd_exc_t *e = fsd_exc_get();
1122                        /* Its better to exit and communicate error rather then let the application to hang */
1123                        fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
1124                        exit(1);
1125                 }
1126                END_TRY
1127
1128                if(self->fd != -1)
1129                        close(self->fd);
1130                fsd_log_debug(("%s - Accounting log file closed",self->name)); 
1131        }
1132        FINALLY
1133        {
1134                fsd_log_debug(("%s - Terminated.",self->name));
1135                if(self->job == NULL)
1136                        fsd_mutex_unlock( &self->session->mutex ); /**/
1137        }
1138        END_TRY
1139       
1140        fsd_log_return((""));
1141        return res;
1142}
1143
1144int
1145fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */
1146{
1147        int job1;
1148        int job2;
1149        char *rest = NULL;
1150        char *token = NULL;
1151        char *ptr = fsd_strdup(s1);
1152        token = strtok_r(ptr, ".", &rest);
1153        job1 = atoi(token);
1154       
1155        fsd_free(token);
1156       
1157        ptr = fsd_strdup(s2);
1158        token = strtok_r(ptr,".",&rest);
1159        job2 = atoi(token);
1160       
1161        fsd_free(token);
1162        return job1 - job2;
1163}
1164
Note: See TracBrowser for help on using the repository browser.