source: trunk/pbs_drmaa/log_reader.c @ 26

Revision 26, 31.2 KB checked in by mmamonski, 8 years ago (diff)

try pbs

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51static bool
52pbsdrmaa_read_log();
53
54static void
55pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self);
56
57static ssize_t
58pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
59
60static void
61pbsdrmaa_select_file_job_on_missing ( pbsdrmaa_log_reader_t * self );
62
63static ssize_t
64pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
65
66static void
67pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self );
68
69static ssize_t
70pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx );
71
72static bool
73pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self );
74
75int
76fsd_job_id_cmp(const char *s1, const char *s2);
77
78int
79pbsdrmaa_date_compare(const void *a, const void *b) ;
80
81/*
82 * Snippets from log files
83 *
84 * PBS Pro
85 *
8610/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
8710/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
8810/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
8910/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
9010/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
91
92 *
93 * Torque
94 *
9510/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
9610/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
9710/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
98
99 */
100pbsdrmaa_log_reader_t *
101pbsdrmaa_log_reader_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
102{
103        pbsdrmaa_log_reader_t *volatile self = NULL;
104
105        fsd_log_enter((""));
106        TRY
107        {
108                fsd_malloc(self, pbsdrmaa_log_reader_t );
109               
110                self->session = session;
111               
112                /* ~templete method pattern */
113                if(job != NULL) /* job on missing */
114                {
115                        self->job = job;
116                        self->name = "Job_on_missing";
117                        self->select_file = pbsdrmaa_select_file_job_on_missing;
118                        self->read_line = pbsdrmaa_read_line_job_on_missing;
119                }
120                else /* wait thread */
121                {
122                        self->job = NULL;
123                        self->name = "WT";
124                        self->select_file = pbsdrmaa_select_file_wait_thread;
125                        self->read_line = pbsdrmaa_read_line_wait_thread;
126                }               
127                self->read_log = pbsdrmaa_read_log;     
128               
129                self->log_files = NULL;
130                self->log_files_number = 0;
131               
132                self->run_flag = true;
133                self->fd = -1;
134                self->date_changed = true;
135                self->first_open = true;
136               
137                self->log_file_initial_size = 0;
138                self->log_file_read_size = 0;
139        }
140        EXCEPT_DEFAULT
141        {
142                if( self != NULL)
143                        fsd_free(self);
144                       
145                fsd_exc_reraise();
146        }
147        END_TRY
148        fsd_log_return((""));
149        return self;
150}
151
152pbsdrmaa_log_reader_t *
153pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job )
154{
155        pbsdrmaa_log_reader_t *volatile self = NULL;
156
157        fsd_log_enter((""));
158        TRY
159        {
160                fsd_malloc(self, pbsdrmaa_log_reader_t );
161               
162                self->session = session;
163               
164                self->job = job;
165                self->name = "Accounting";
166                self->select_file = pbsdrmaa_select_file_accounting;
167                self->read_line = pbsdrmaa_read_line_accounting;
168                               
169                self->read_log = pbsdrmaa_read_log_accounting; 
170               
171                self->log_files = NULL;
172                self->log_files_number = 0;
173               
174                self->run_flag = true;
175                self->fd = -1;
176                self->date_changed = true;
177                self->first_open = true;
178               
179                self->log_file_initial_size = 0;
180                self->log_file_read_size = 0;
181        }
182        EXCEPT_DEFAULT
183        {
184                if( self != NULL)
185                        fsd_free(self);
186                       
187                fsd_exc_reraise();
188        }
189        END_TRY
190        fsd_log_return((""));
191        return self;
192}
193
194void
195pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
196{
197        fsd_log_enter((""));
198        TRY
199        {
200                if(self != NULL)
201                {
202                        int i = -1;
203                        for(i = 0; i < self->log_files_number ; i++)
204                                fsd_free(self->log_files[i]);
205                        fsd_free(self->log_files);
206                        fsd_free(self);
207                }                       
208        }
209        EXCEPT_DEFAULT
210        {
211                fsd_exc_reraise();
212        }
213        END_TRY
214       
215        fsd_log_return((""));
216}
217
218enum field
219{
220        FLD_DATE = 0,
221        FLD_EVENT = 1,
222        FLD_OBJ = 2,
223        FLD_TYPE = 3,
224        FLD_ID = 4,
225        FLD_MSG = 5
226};
227
228enum field_msg
229{
230        FLD_MSG_EXIT_STATUS = 0,
231        FLD_MSG_CPUT = 1,
232        FLD_MSG_MEM = 2,
233        FLD_MSG_VMEM = 3,
234        FLD_MSG_WALLTIME = 4
235};
236
237enum field_msg_accounting
238{
239        FLD_MSG_ACC_USER = 0,
240        FLD_MSG_ACC_GROUP = 1,
241        FLD_MSG_ACC_JOBNAME = 2,
242        FLD_MSG_ACC_QUEUE = 3,
243        FLD_MSG_ACC_CTIME = 4,
244        FLD_MSG_ACC_QTIME = 5,
245        FLD_MSG_ACC_ETIME = 6,
246        FLD_MSG_ACC_START = 7,
247        FLD_MSG_ACC_OWNER = 8,
248        FLD_MSG_ACC_EXEC_HOST = 9,
249        FLD_MSG_ACC_RES_NEEDNODES = 10,
250        FLD_MSG_ACC_RES_NODECT = 11,
251        FLD_MSG_ACC_RES_NODES = 12,
252        FLD_MSG_ACC_RES_WALLTIME = 13
253};
254
255#define FLD_MSG_STATUS "0010"
256#define FLD_MSG_STATE "0008"
257#define FLD_MSG_LOG "0002"
258
259bool
260pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
261{
262        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;
263        fsd_job_t *volatile temp_job = NULL;
264               
265        fsd_log_enter((""));
266       
267        if(self->job == NULL)
268                fsd_mutex_lock( &self->session->mutex );
269
270        TRY
271        {               
272                while( self->run_flag )
273                TRY
274                {
275                        char line[4096] = "";
276                        char buffer[4096] = "";
277                        int idx = 0, end_idx = 0, line_idx = 0;
278                       
279                        self->select_file(self);
280
281                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)
282                        {
283                                const char *volatile ptr = line;
284                                char field[256] = "";
285                                char job_id[256] = "";
286                                char event[256] = "";
287                                int volatile field_n = 0;
288                                int n;
289                               
290                                bool volatile job_id_match = false;
291                                bool volatile event_match = false;
292                                bool volatile log_event = false;
293                                bool volatile log_match = false;
294                                bool volatile older_job_found = false;
295                                bool volatile job_found = false;
296                                char * temp_date = NULL;
297                               
298                                struct batch_status status;
299                                status.next = NULL;
300
301                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
302                                {
303                                        if(field_n == FLD_DATE)
304                                        {
305                                                temp_date = fsd_strdup(field);
306                                        }
307                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || strcmp(field,FLD_MSG_STATE) == 0 ))
308                                        {
309                                                /* event described by log line*/
310                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event))
311                                                {
312                                                        fsd_log_error(("%s - strlcpy error",self->name));
313                                                }
314                                                event_match = true;
315                                        }
316                                        else if(event_match && field_n == FLD_ID)
317                                        {       
318                                                TRY
319                                                {       
320                                                        if(self->job == NULL) /* wait_thread */
321                                                        {
322                                                                temp_job = self->session->get_job( self->session, field );
323                                                                pbsjob = (pbsdrmaa_job_t*) temp_job;
324
325                                                                if( temp_job )
326                                                                {
327                                                                        if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
328                                                                                fsd_log_error(("%s - strlcpy error",self->name));
329                                                                        }
330                                                                        fsd_log_debug(("%s - job_id: %s",self->name,job_id));
331                                                                        status.name = fsd_strdup(job_id);
332                                                                        job_id_match = true; /* job_id is in drmaa */   
333                                                                }
334                                                                else
335                                                                {
336                                                                        fsd_log_debug(("%s - Unknown job: %s", self->name,field));
337                                                                }
338                                                        }
339                                                        else /* job_on_missing */
340                                                        {
341                                                                int diff = -1;
342                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
343                                                                if( diff == 0)
344                                                                {
345                                                                        /* read this file to the place we started and exit*/
346                                                                        fsd_log_debug(("Job_on_missing found job: %s",self->job->job_id));
347                                                                        job_found = true;
348                                                                        older_job_found = false;
349                                                                        self->run_flag = false;
350                                                                        job_id_match = true;
351                                                                        status.name = fsd_strdup(self->job->job_id);
352                                                                }
353                                                                else if ( !job_found && diff >= 1)
354                                                                {
355                                                                        /* older job, find its beginning */
356                                                                        fsd_log_debug(("Job_on_missing found older job than %s : %s",self->job->job_id,field));
357                                                                        older_job_found = true;
358                                                                        job_id_match = true;
359                                                                        status.name = fsd_strdup(self->job->job_id);
360                                                                }
361                                                                else  if( !job_found )
362                                                                {
363                                                                        fsd_log_debug(("Job_on_missing found newer job than %s : %s",self->job->job_id,field));
364                                                                }                                                               
365                                                        }
366                                                }
367                                                END_TRY
368                                        }
369                                        else if(job_id_match && field_n == FLD_MSG)
370                                        {                                               
371                                                /* parse msg - depends on FLD_EVENT */
372                                                struct attrl struct_resource_cput,
373                                                        struct_resource_mem,
374                                                        struct_resource_vmem,
375                                                        struct_resource_walltime,
376                                                        struct_status,
377                                                        struct_state,
378                                                        struct_start_time,
379                                                        struct_mtime,
380                                                        struct_queue,
381                                                        struct_account_name,
382                                                        struct_exec_vnode;
383                                                struct attrl *last_attr = NULL;
384                                               
385                                                bool state_running = false;
386
387                                                memset(&struct_status,0,sizeof(struct attrl));
388                                                memset(&struct_state,0,sizeof(struct attrl));
389                                                memset(&struct_resource_cput,0,sizeof(struct attrl));
390                                                memset(&struct_resource_mem,0,sizeof(struct attrl));
391                                                memset(&struct_resource_vmem,0,sizeof(struct attrl));
392                                                memset(&struct_resource_walltime,0,sizeof(struct attrl));
393                                                memset(&struct_start_time,0,sizeof(struct attrl));
394                                                memset(&struct_mtime,0,sizeof(struct attrl));
395                                                memset(&struct_queue,0,sizeof(struct attrl));
396                                                memset(&struct_account_name,0,sizeof(struct attrl));
397                                                memset(&struct_exec_vnode,0,sizeof(struct attrl));
398                                                               
399                                                if (strcmp(event,FLD_MSG_STATE) == 0)
400                                                {
401                                                        /* job run, modified, queued etc */
402                                                        int n = 0;
403                                                        status.attribs = &struct_state;
404                                                        struct_state.next = NULL;
405                                                        struct_state.name = "job_state";
406                                                        last_attr = &struct_state;
407
408                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
409                                                         {
410                                                                n = 4;
411                                                                if(older_job_found) /* job_on_missing - older job beginning - read this file and end */
412                                                                 {
413                                                                        self->run_flag = false;
414                                                                        fsd_log_debug(("Job_on_missing found older job beginning"));
415                                                                        fsd_free(status.name);
416                                                                        break;
417                                                                 }
418
419                                                                 { /* modified */
420                                                                        struct tm temp_time_tm;
421                                                                        memset(&temp_time_tm, 0, sizeof(temp_time_tm));
422                                                                        temp_time_tm.tm_isdst = -1;
423
424                                                                        if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
425                                                                         {
426                                                                                fsd_log_error(("failed to parse mtime: %s (line = %s)", temp_date, line));
427                                                                         }
428                                                                        else
429                                                                         {
430                                                                                time_t temp_time = mktime(&temp_time_tm);
431                                                                                last_attr->next = &struct_mtime;
432                                                                                last_attr = &struct_mtime;
433                                                                                struct_mtime.name = "mtime";
434                                                                                struct_mtime.next = NULL;
435                                                                                struct_mtime.value = fsd_asprintf("%lu",temp_time);
436                                                                         }
437                                                                 }
438                                                         }
439
440                                                        /* != Job deleted and Job to be deleted*/
441#ifdef PBS_PROFESSIONAL
442                                                        else if (field[4] != 't' && field[10] != 'd')
443                                                         {
444#else
445                                                        else if (field[4] != 'd')
446                                                         {
447#endif
448                                                                struct_state.value = fsd_asprintf("%c",field[n]);
449
450                                                                if(struct_state.value[0] == 'R')
451                                                                 {
452                                                                        state_running = true;
453#ifdef PBS_PROFESSIONAL
454                                                                        {
455                                                                                char *p_vnode = NULL;
456                                                                                if (p_vnode = strstr(field, "exec_vnode"))
457                                                                                 {
458                                                                                        last_attr->next = &struct_exec_vnode;
459                                                                                        last_attr =  &struct_exec_vnode;
460                                                                                        struct_exec_vnode.name = "exec_vnode";
461                                                                                        struct_exec_vnode.next = NULL;
462                                                                                        struct_exec_vnode.value = fsd_strdup(p + 11);
463                                                                                 }
464                                                                        }
465#endif
466                                                                 }
467                                                         }
468                                                        else
469                                                         { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
470                                                                struct_status.name = "exit_status";
471                                                                struct_status.value = fsd_strdup("-1");
472                                                                struct_status.next = NULL;
473                                                                struct_state.next = &struct_status;
474                                                                struct_state.value = fsd_strdup("C");
475                                                         }
476                                                }
477                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
478                                                {
479                                                        /* exit status and rusage */
480                                                        const char *ptr2 = field;
481                                                        char  msg[ 256 ] = "";
482                                                        int n2;
483                                                        int msg_field_n = 0;
484                                                       
485                                                        struct_resource_cput.name = "resources_used";
486                                                        struct_resource_mem.name = "resources_used";
487                                                        struct_resource_vmem.name = "resources_used";
488                                                        struct_resource_walltime.name = "resources_used";
489                                                        struct_status.name = "exit_status";
490                                                        struct_state.name = "job_state";
491                               
492                                                        status.attribs = &struct_resource_cput;
493                                                        struct_resource_cput.next = &struct_resource_mem;
494                                                        struct_resource_mem.next = &struct_resource_vmem;
495                                                        struct_resource_vmem.next = &struct_resource_walltime;
496                                                        struct_resource_walltime.next =  &struct_status;
497                                                        struct_status.next = &struct_state;
498                                                        struct_state.next = NULL;
499
500                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
501                                                         {                                             
502                                                                switch(msg_field_n)
503                                                                {
504                                                                        case FLD_MSG_EXIT_STATUS:
505                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
506                                                                                break;
507
508                                                                        case FLD_MSG_CPUT:
509                                                                                struct_resource_cput.resource = "cput";
510                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
511                                                                                break;
512
513                                                                        case FLD_MSG_MEM:
514                                                                                struct_resource_mem.resource = "mem";
515                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
516                                                                                break;
517
518                                                                        case FLD_MSG_VMEM:
519                                                                                struct_resource_vmem.resource = "vmem";
520                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
521                                                                                break;
522
523                                                                        case FLD_MSG_WALLTIME:
524                                                                                struct_resource_walltime.resource = "walltime";
525                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
526                                                                                break;
527                                                                }
528
529                                                                ptr2 += n2;
530                                                                msg_field_n++;
531                                                                if ( *ptr2 != ' ' )
532                                                                         break;
533                                                                ++ptr2;
534                                                        }
535                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
536                                                        fsd_log_info(("WT - job %s found as finished on %u", temp_job->job_id, (unsigned int)time(NULL)));
537                                                }                                               
538                                                 
539                                                if(self->job == NULL) /* wait_thread */
540                                                {
541                                                        if ( state_running )
542                                                        {
543                                                                fsd_log_debug(("WT - forcing update of job: %s", temp_job->job_id ));
544                                                                TRY
545                                                                {
546                                                                        temp_job->update_status( temp_job );
547                                                                }
548                                                                EXCEPT_DEFAULT
549                                                                {
550                                                                        /*TODO: distinguish between invalid job and internal errors */
551                                                                        fsd_log_debug(("Job finished just after entering running state: %s", temp_job->job_id));
552                                                                }
553                                                                END_TRY
554                                                        }
555                                                        else
556                                                        {
557                                                                fsd_log_debug(("%s - updating job: %s",self->name, temp_job->job_id ));
558                                                                pbsjob->update( temp_job, &status );
559                                                        }
560                                                }
561                                                else if( job_found ) /* job_on_missing */
562                                                {
563                                                        fsd_log_debug(("Job_on_missing - updating job: %s", self->job->job_id ));
564                                                        pbsjob->update( self->job, &status );
565                                                }
566                                               
567                                                if(self->job == NULL)
568                                                {
569                                                        fsd_cond_broadcast( &temp_job->status_cond);
570                                                        fsd_cond_broadcast( &self->session->wait_condition );
571                                                }
572                                                if ( temp_job )
573                                                        temp_job->release( temp_job );
574       
575                                                fsd_free(struct_resource_cput.value);
576                                                fsd_free(struct_resource_mem.value);
577                                                fsd_free(struct_resource_vmem.value);
578                                                fsd_free(struct_resource_walltime.value);
579                                                fsd_free(struct_status.value);
580                                                fsd_free(struct_state.value);
581                                                fsd_free(struct_start_time.value);
582                                                fsd_free(struct_mtime.value);
583                                                fsd_free(struct_queue.value);
584                                                fsd_free(struct_account_name.value);
585
586                                                if ( status.name!=NULL )
587                                                        fsd_free(status.name);
588                                        }
589                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
590                                        {
591                                                log_event = true;
592                                        }
593                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
594                                        {
595                                                log_match = true;
596                                                log_event = false;
597                                        }
598                                        else if( self->job == NULL && log_match && field_n == FLD_MSG && strncmp(field,"Log closed",10) == 0)
599                                        {
600                                                fsd_log_debug(("%s - Date changed. Closing log file",self->name));
601                                                self->date_changed = true;
602                                                log_match = false;
603                                        }
604                                       
605                                        ptr += n;
606                                        if ( *ptr != ';' )
607                                        {
608                                                break; /* end of line */
609                                        }
610                                        field_n++;
611                                        ++ptr;
612                                }               
613
614                                fsd_free(temp_date);
615                        } /* end of while getline loop */
616                       
617                        if(self->job == NULL)
618                        {
619                                struct timeval timeout_tv;
620                                fd_set log_fds;
621       
622                                fsd_mutex_unlock( &self->session->mutex );
623                               
624                                FD_ZERO(&log_fds);
625                                FD_SET(self->fd, &log_fds);
626
627                                timeout_tv.tv_sec = 1;
628                                timeout_tv.tv_usec = 0;
629
630                                /* ignore return value - the next get line call will handle IO errors */
631                                (void)select(1, &log_fds, NULL, NULL, &timeout_tv);
632
633                                fsd_mutex_lock( &self->session->mutex );       
634
635                                self->run_flag = self->session->wait_thread_run_flag;
636                        }
637                }               
638                EXCEPT_DEFAULT
639                {
640                        const fsd_exc_t *e = fsd_exc_get();
641                        /* Its better to exit and communicate error rather then let the application to hang */
642                        fsd_log_fatal(( "Exception in wait thread %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
643                        exit(1);
644                }
645                END_TRY
646
647                if(self->fd != -1)
648                        close(self->fd);
649                fsd_log_debug(("%s - Log file closed",self->name));     
650        }
651        FINALLY
652        {
653                fsd_log_debug(("%s - Terminated.",self->name));
654                if(self->job == NULL)
655                        fsd_mutex_unlock( &self->session->mutex ); /**/
656        }
657        END_TRY
658       
659        fsd_log_return((""));
660        return true;
661}
662
663void
664pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
665{
666        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
667       
668        if(self->date_changed)
669        {
670                char * log_path = NULL;
671                int num_tries = 0;
672                struct tm tm;
673               
674                fsd_log_enter((""));
675               
676                if(!self->first_open)
677                        time(&self->t);
678                else
679                        self->t = pbssession->log_file_initial_time;
680                       
681                localtime_r(&self->t,&tm);
682                               
683                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
684                /* generate new date, close file and open new */
685                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
686                                        pbssession->pbs_home,   
687                                        tm.tm_year + 1900,
688                                        tm.tm_mon + 1,
689                                        tm.tm_mday)) == NULL) {
690                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
691                }
692
693                if(self->fd != -1)
694                        close(self->fd);
695
696                fsd_log_debug(("Log file: %s",log_path));
697                               
698        retry:
699                if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
700                {
701                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
702                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
703                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
704                        pbssession->wait_thread_log = false;
705                        pbssession->super.wait_thread = pbssession->super_wait_thread;
706                        pbssession->super.wait_thread(self->session);
707                } else if ( self->fd == -1 ) {
708                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
709                        num_tries++;
710                        sleep(5);
711                        goto retry;
712                }
713
714                fsd_free(log_path);
715
716                fsd_log_debug(("Log file opened"));
717
718                if(self->first_open) {
719                        fsd_log_debug(("Log file lseek"));
720                        if(lseek(self->fd,pbssession->log_file_initial_size,SEEK_SET) == (off_t) -1) {
721                                char errbuf[256] = "InternalError";
722                                (void)strerror_r(errno, errbuf, 256);
723                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
724                        }
725                        self->first_open = false;
726                }
727
728                self->date_changed = false;
729               
730                fsd_log_return((""));
731        }       
732}
733
734ssize_t
735pbsdrmaa_read_line_wait_thread ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
736{
737        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
738}
739
740/* reverse date compare*/
741int
742pbsdrmaa_date_compare(const void *a, const void *b)
743{
744        const char *ia = *(const char **) a;
745        const char *ib = *(const char **) b;
746        return strcmp(ib, ia);
747}
748
749void
750pbsdrmaa_select_file_job_on_missing( pbsdrmaa_log_reader_t * self )
751{
752        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;   
753       
754        char * log_path = NULL;
755        int num_tries = 0;
756        static int file_number = 0;
757        fsd_log_enter((""));
758               
759        if(self->first_open)
760        {                       
761                DIR *dp = NULL;         
762                char * path = NULL;
763                struct dirent *ep = NULL;
764               
765                if((path = fsd_asprintf("%s/server_logs/",pbssession->pbs_home)) == NULL)
766                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
767               
768                self->log_files_number = 0;     
769                dp = opendir (path);
770
771                fsd_calloc(self->log_files,2,char*);
772       
773                if (dp != NULL)
774                {
775                        while ((ep = readdir (dp)))
776                        {
777                                self->log_files_number++;
778                                if(self->log_files_number > 2)
779                                        fsd_realloc(self->log_files,self->log_files_number,char *);
780                               
781                                self->log_files[self->log_files_number-1] = fsd_strdup(ep->d_name);
782                        }
783                        (void) closedir (dp);
784                }
785                else
786                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Couldn't open the directory");
787
788                qsort(self->log_files,self->log_files_number,sizeof(char *),pbsdrmaa_date_compare);
789               
790                if(self->log_files_number <= 2)
791                {
792                        self->run_flag = false;
793                        fsd_log_error(("Job_on_missing - No log files available"));
794                }
795               
796                self->first_open = false;
797                fsd_free(path);
798        }       
799        else /* check previous day*/
800        {
801                if(++file_number > self->log_files_number - 2)
802                        fsd_log_error(("Job_on_missing - All available log files checked"));
803                else
804                        fsd_log_debug(("Job_on_missing checking previous day"));
805               
806                self->run_flag = false;
807                pbsdrmaa_job_on_missing_standard( self->job );                         
808        }
809       
810        #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
811        if((log_path = fsd_asprintf("%s/server_logs/%s",
812                                pbssession->pbs_home,   
813                                self->log_files[file_number])) == NULL) {
814                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Job_on_missing - Memory allocation wasn't possible");
815        }
816
817        if(self->fd != -1)
818                close(self->fd);
819
820        fsd_log_debug(("Log file: %s",log_path));
821                               
822retry:
823        if((self->fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
824        {
825                fsd_log_error(("Can't open log file. Verify pbs_home. Running standard job_on_missing"));
826                fsd_log_error(("Remember that without keep_completed set standard job_on_missing won't run correctly"));
827                self->run_flag = false;
828                pbsdrmaa_job_on_missing_standard( self->job );                 
829        } else if ( self->fd == -1 ) {
830                fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
831                num_tries++;
832                sleep(5);
833                goto retry;
834        }
835        else
836        {
837                struct stat statbuf;
838                if(stat(log_path,&statbuf) == -1) {
839                                char errbuf[256] = "InternalError";
840                                (void)strerror_r(errno, errbuf, 256);
841                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
842                }
843                self->log_file_read_size = 0;
844                self->log_file_initial_size = statbuf.st_size;
845                fsd_log_debug(("Set log_file_initial_size %ld",self->log_file_initial_size));
846        }
847
848        fsd_free(log_path);
849
850        fsd_log_debug(("Log file opened"));
851       
852        fsd_log_return((""));
853}
854
855ssize_t
856pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
857{
858        int n = fsd_getline_buffered(line,buffer,size,self->fd, idx, end_idx, line_idx);
859       
860        if(n >= 0)
861                self->log_file_read_size += n;
862               
863        if(self->log_file_read_size >= self->log_file_initial_size)
864                return -1;
865
866        return n;
867}
868
869void
870pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self )
871{
872        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
873               
874        char * log_path = NULL;
875
876        struct tm tm;
877               
878        fsd_log_enter((""));
879               
880        time(&self->t);
881                       
882        localtime_r(&self->t,&tm);
883                               
884        #define DRMAA_ACCOUNTING_MAX_TRIES (12)
885        /* generate new date, close file and open new */
886        if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d",
887                                pbssession->pbs_home,   
888                                tm.tm_year + 1900,
889                                tm.tm_mon + 1,
890                                tm.tm_mday)) == NULL) {
891                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible");
892        }
893
894        if(self->fd != -1)
895                close(self->fd);
896
897        fsd_log_debug(("Accounting Log file: %s",log_path));
898
899        if((self->fd = open(log_path,O_RDONLY) ) == -1 )
900        {
901                fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home."));
902        }
903
904        fsd_free(log_path);
905
906        fsd_log_debug(("Accounting Log file opened"));
907
908        fsd_log_return((""));   
909}
910
911ssize_t
912pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx )
913{
914        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx);
915}
916
917enum field_acc
918{
919        FLD_ACC_DATE = 0,
920        FLD_ACC_EVENT = 1,
921        FLD_ACC_ID = 2,
922        FLD_ACC_MSG = 3
923};
924
925bool
926pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self )
927{
928        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;   
929        bool res = false;
930       
931        fsd_job_t *volatile temp_job = NULL;
932               
933        fsd_log_enter((""));
934        fsd_log_debug(("Accounting Log file opened"));
935        if(self->job == NULL)
936                fsd_mutex_lock( &self->session->mutex );
937
938        TRY
939        {               
940                TRY
941                {
942                        char line[4096] = "";
943                        char buffer[4096] = "";
944                        int idx = 0, end_idx = 0, line_idx = 0;
945                       
946                        self->select_file(self);
947                       
948                        if(self->fd != -1)                                     
949                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)
950                        {
951                                const char *volatile ptr = line;
952                                char field[256] = "";
953                                int volatile field_n = 0;
954                                int n;
955                               
956                                bool volatile job_id_match = false;     
957                       
958                                bool volatile job_found = false;
959                                char *  temp_date = NULL;
960                               
961                                struct batch_status status;
962                               
963                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */
964                                {
965                                        status.next = NULL;
966                                        status.attribs = NULL;
967                               
968                                        if(field_n == FLD_ACC_DATE)
969                                        {
970                                                temp_date = fsd_strdup(field);
971                                        }
972                                        else if(field_n == FLD_ACC_EVENT)
973                                        {
974                                                       
975                                        }
976                                        else if(field_n == FLD_ACC_ID)
977                                        {                                                       
978                                                TRY
979                                                {                                                               
980                                                                int diff = -1;
981                                                                diff = fsd_job_id_cmp(self->job->job_id,field);
982                                                                if( diff == 0)
983                                                                {
984                                                                        /* read this file to the place we started and exit*/
985                                                                        fsd_log_debug(("Accounting found job: %s",self->job->job_id));
986                                                                        job_found = true;
987                                                                        job_id_match = true;
988                                                                        status.name = fsd_strdup(self->job->job_id);
989                                                                }       
990                                                }
991                                                END_TRY
992                                        }
993                                        else if(job_id_match && field_n == FLD_ACC_MSG)
994                                        {                                       
995                                                struct attrl * struct_attrl = calloc(10,sizeof(struct attrl));
996                                                int i;
997
998                                                if(field[0] == 'q')
999                                                {
1000                                                        status.attribs = &struct_attrl[0];
1001                                                        struct_attrl[0].name =  ATTR_queue;
1002                                                        struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1);
1003                                                        struct_attrl[0].next = NULL;
1004                                                }
1005                                                else if(field[0] == 'u')
1006                                                {
1007                                                        /* rusage */
1008                                                        const char *ptr2 = field;
1009                                                        char  msg[ 256 ] = "";
1010                                                        int n2 = 0;
1011                                                        int msg_field_n = 0;
1012                               
1013                                                        status.attribs = &struct_attrl[0];
1014
1015                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
1016                                                         {                                             
1017                                                                switch(msg_field_n)
1018                                                                {
1019                                                                        case FLD_MSG_ACC_USER:
1020                                                                                struct_attrl[msg_field_n].name = ATTR_euser;
1021                                                                                break;
1022
1023                                                                        case FLD_MSG_ACC_GROUP:
1024                                                                                struct_attrl[msg_field_n].name = ATTR_egroup;
1025                                                                                break;
1026
1027                                                                        case FLD_MSG_ACC_JOBNAME:
1028                                                                                struct_attrl[msg_field_n].name = ATTR_name;
1029                                                                                break;
1030
1031                                                                        case FLD_MSG_ACC_QUEUE:
1032                                                                                struct_attrl[msg_field_n].name = ATTR_queue;
1033                                                                                break;
1034
1035                                                                        case FLD_MSG_ACC_CTIME:
1036                                                                                struct_attrl[msg_field_n].name = ATTR_ctime;
1037                                                                                break;
1038                                                                               
1039                                                                        case FLD_MSG_ACC_QTIME:
1040                                                                                struct_attrl[msg_field_n].name = ATTR_qtime;
1041                                                                                break;
1042                                                                               
1043                                                                        case FLD_MSG_ACC_ETIME:
1044                                                                                struct_attrl[msg_field_n].name = ATTR_etime;
1045                                                                                break;
1046#ifndef PBS_PROFESSIONAL               
1047                                                                        case FLD_MSG_ACC_START:
1048                                                                                struct_attrl[msg_field_n].name = ATTR_start_time;
1049#else
1050                                                                        case FLD_MSG_ACC_START:
1051                                                                                struct_attrl[msg_field_n].name = ATTR_stime;
1052#endif
1053                                                                               
1054                                                                        case FLD_MSG_ACC_OWNER:
1055                                                                                struct_attrl[msg_field_n].name = ATTR_owner;
1056                                                                                break;
1057                                                                               
1058                                                                        case FLD_MSG_ACC_EXEC_HOST:
1059                                                                                struct_attrl[msg_field_n].name = ATTR_exechost;
1060                                                                                break;                                                                         
1061                                                                }
1062                                                               
1063                                                                struct_attrl[msg_field_n].value  = fsd_strdup(strchr(msg,'=')+1);
1064                                                                if(msg_field_n!=9)
1065                                                                {
1066                                                                        struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1];
1067                                                                }
1068                                                                else
1069                                                                {
1070                                                                        struct_attrl[msg_field_n].next = NULL;
1071                                                                        break;
1072                                                                }
1073
1074                                                                ptr2 += n2;
1075                                                                msg_field_n++;
1076                                                                if ( *ptr2 != ' ' )
1077                                                                        break;
1078
1079                                                                ++ptr2;
1080                                                        }
1081                                                }                                               
1082
1083                                                if( job_found && status.attribs != NULL)
1084                                                {
1085                                                        fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id ));
1086                                                        pbsjob->update( self->job, &status );
1087                                                        res = true;
1088                                                }
1089                                               
1090                                                if(self->job == NULL)
1091                                                {
1092                                                        fsd_cond_broadcast( &temp_job->status_cond);
1093                                                        fsd_cond_broadcast( &self->session->wait_condition );
1094                                                }
1095                                                if ( temp_job )
1096                                                        temp_job->release( temp_job );
1097       
1098                                                for(i = 0; i < 10; i++)
1099                                                {
1100                                                        fsd_free(struct_attrl[i].value);
1101                                                }
1102                                                fsd_free(struct_attrl);
1103                                                fsd_free(status.name);
1104                                        }
1105                                       
1106                                       
1107                                        ptr += n;
1108                                        if ( *ptr != ';' )
1109                                        {
1110                                                break; /* end of line */
1111                                        }
1112                                        field_n++;
1113                                        ++ptr;
1114                                }               
1115
1116                                fsd_free(temp_date);                   
1117                        } /* end of while getline loop */       
1118                       
1119                }               
1120                EXCEPT_DEFAULT
1121                 {
1122                        const fsd_exc_t *e = fsd_exc_get();
1123                        /* Its better to exit and communicate error rather then let the application to hang */
1124                        fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) ));
1125                        exit(1);
1126                 }
1127                END_TRY
1128
1129                if(self->fd != -1)
1130                        close(self->fd);
1131                fsd_log_debug(("%s - Accounting log file closed",self->name)); 
1132        }
1133        FINALLY
1134        {
1135                fsd_log_debug(("%s - Terminated.",self->name));
1136                if(self->job == NULL)
1137                        fsd_mutex_unlock( &self->session->mutex ); /**/
1138        }
1139        END_TRY
1140       
1141        fsd_log_return((""));
1142        return res;
1143}
1144
1145int
1146fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */
1147{
1148        int job1;
1149        int job2;
1150        char *rest = NULL;
1151        char *token = NULL;
1152        char *ptr = fsd_strdup(s1);
1153        token = strtok_r(ptr, ".", &rest);
1154        job1 = atoi(token);
1155       
1156        fsd_free(token);
1157       
1158        ptr = fsd_strdup(s2);
1159        token = strtok_r(ptr,".",&rest);
1160        job2 = atoi(token);
1161       
1162        fsd_free(token);
1163        return job1 - job2;
1164}
1165
Note: See TracBrowser for help on using the repository browser.