source: trunk/pbs_drmaa/session.c @ 3

Revision 3, 26.0 KB checked in by mmamonski, 9 years ago (diff)

try to be resistant to torque restarts - avoid double disconnect == double free

Line 
1/* $Id: session.c 385 2011-01-04 18:24:05Z mamonski $ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <fcntl.h>
31
32#include <pbs_ifl.h>
33#include <pbs_error.h>
34
35#include <drmaa_utils/datetime.h>
36#include <drmaa_utils/drmaa.h>
37#include <drmaa_utils/iter.h>
38#include <drmaa_utils/conf.h>
39#include <drmaa_utils/session.h>
40#include <drmaa_utils/datetime.h>
41
42#include <pbs_drmaa/job.h>
43#include <pbs_drmaa/session.h>
44#include <pbs_drmaa/submit.h>
45#include <pbs_drmaa/util.h>
46
47#include <errno.h>
48
49#ifndef lint
50static char rcsid[]
51#       ifdef __GNUC__
52                __attribute__ ((unused))
53#       endif
54        = "$Id: session.c 385 2011-01-04 18:24:05Z mamonski $";
55#endif
56
57static void
58pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
59
60static void
61pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
62
63static fsd_job_t *
64pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
65
66static bool
67pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self );
68
69static void
70pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
71
72static void
73*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
74
75static char *
76pbsdrmaa_session_run_impl(
77                fsd_drmaa_session_t *self,
78                const fsd_template_t *jt,
79                int bulk_idx
80                );
81
82static struct attrl *
83pbsdrmaa_create_status_attrl(void);
84
85
86fsd_drmaa_session_t *
87pbsdrmaa_session_new( const char *contact )
88{
89        pbsdrmaa_session_t *volatile self = NULL;
90
91        if( contact == NULL )
92                contact = "";
93        TRY
94         {
95                self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
96                fsd_realloc( self, 1, pbsdrmaa_session_t );
97                self->super_wait_thread = NULL;
98
99                self->log_file_initial_size = 0;
100                self->pbs_conn = -1;
101                self->pbs_home = NULL;
102
103                self->wait_thread_log = false;
104                self->status_attrl = NULL;
105               
106                self->super_destroy = self->super.destroy;
107                self->super.destroy = pbsdrmaa_session_destroy;
108                self->super.new_job = pbsdrmaa_session_new_job;
109                self->super.update_all_jobs_status
110                                = pbsdrmaa_session_update_all_jobs_status;
111                self->super.run_impl = pbsdrmaa_session_run_impl;
112
113                self->super_apply_configuration = self->super.apply_configuration;
114                self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
115
116                self->do_drm_keeps_completed_jobs =
117                        pbsdrmaa_session_do_drm_keeps_completed_jobs;
118
119                self->status_attrl = pbsdrmaa_create_status_attrl();
120
121                self->pbs_conn = pbs_connect( self->super.contact );
122                fsd_log_debug(( "pbs_connect(%s) =%d", self->super.contact,
123                                        self->pbs_conn ));
124                if( self->pbs_conn < 0 )
125                        pbsdrmaa_exc_raise_pbs( "pbs_connect" );
126
127                self->super.load_configuration( &self->super, "pbs_drmaa" );
128
129                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
130                if( self->do_drm_keeps_completed_jobs( self ) )
131                        self->super.missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS;
132         }
133        EXCEPT_DEFAULT
134         {
135                if( self )
136                  {
137                        self->super.destroy( &self->super );
138                        self = NULL;
139                  }
140         }
141        END_TRY
142        return (fsd_drmaa_session_t*)self;
143}
144
145
146void
147pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
148{
149        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
150        self->stop_wait_thread( self );
151        if( pbsself->pbs_conn >= 0 )
152                pbs_disconnect( pbsself->pbs_conn );
153        fsd_free( pbsself->status_attrl );
154        pbsself->super_destroy( self );
155}
156
157
158static char *
159pbsdrmaa_session_run_impl(
160                fsd_drmaa_session_t *self,
161                const fsd_template_t *jt,
162                int bulk_idx
163                )
164{
165        char *volatile job_id = NULL;
166        fsd_job_t *volatile job = NULL;
167        pbsdrmaa_submit_t *volatile submit = NULL;
168
169        fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
170        TRY
171         {
172                submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
173                submit->eval( submit );
174                job_id = submit->submit( submit );
175                job = self->new_job( self, job_id );
176                job->submit_time = time(NULL);
177                job->flags |= FSD_JOB_CURRENT_SESSION;
178                self->jobs->add( self->jobs, job );
179                job->release( job );  job = NULL;
180         }
181        EXCEPT_DEFAULT
182         {
183                fsd_free( job_id );
184                fsd_exc_reraise();
185         }
186        FINALLY
187         {
188                if( submit )
189                        submit->destroy( submit );
190                if( job )
191                        job->release( job );
192         }
193        END_TRY
194        fsd_log_return(( " =%s", job_id ));
195        return job_id;
196}
197
198
199static fsd_job_t *
200pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
201{
202        fsd_job_t *job;
203        job = pbsdrmaa_job_new( fsd_strdup(job_id) );
204        job->session = self;
205        return job;
206}
207
208void
209pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
210{
211        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
212        fsd_conf_option_t *pbs_home;
213        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
214        if( pbs_home )
215         {
216                if( pbs_home->type == FSD_CONF_STRING )
217                 {
218                        struct stat statbuf;
219                        char * volatile log_path;
220                        time_t t;
221
222                        pbsself->pbs_home = pbs_home->val.string;
223                        fsd_log_debug(("pbs_home: %s",pbsself->pbs_home));
224                        pbsself->super_wait_thread = pbsself->super.wait_thread;
225                        pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;             
226                        pbsself->wait_thread_log = true;
227       
228                        time(&t);       
229                        localtime_r(&t,&pbsself->log_file_initial_time);
230
231                        if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
232                                pbsself->pbs_home,       
233                                pbsself->log_file_initial_time.tm_year + 1900,
234                                pbsself->log_file_initial_time.tm_mon + 1,
235                                pbsself->log_file_initial_time.tm_mday)) == NULL) {
236                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
237                        }
238
239                        if(stat(log_path,&statbuf) == -1) {
240                                char errbuf[256] = "InternalError";
241                                (void)strerror_r(errno, errbuf, 256);
242                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
243                        }
244       
245                        fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
246                        pbsself->log_file_initial_size = statbuf.st_size;
247                        fsd_free(log_path);
248                 }
249                else
250                {
251                        pbsself->super.enable_wait_thread = false;
252                        pbsself->wait_thread_log = false;
253                        fsd_log_debug(("pbs_home not configured. Running standard wait_thread (pooling)."));
254                }
255         }
256
257
258
259        pbsself->super_apply_configuration(self); /* call method from the superclass */
260}
261
262
263void
264pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
265{
266        volatile bool conn_lock = false;
267        volatile bool jobs_lock = false;
268        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
269        fsd_job_set_t *jobs = self->jobs;
270        struct batch_status *volatile status = NULL;
271
272        fsd_log_enter((""));
273
274        TRY
275         {
276                conn_lock = fsd_mutex_lock( &self->drm_connection_mutex );
277retry:
278
279#ifdef PBS_PROFESSIONAL
280                status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL );
281#else
282                status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL );
283#endif
284                fsd_log_debug(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p",
285                                 pbsself->pbs_conn, (void*)status ));
286                if( status == NULL  &&  pbs_errno != 0 )
287                 {
288                        if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED)
289                         {
290                                if ( pbsself->pbs_conn >= 0)
291                                        pbs_disconnect( pbsself->pbs_conn );
292                                sleep(1);
293                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
294                                if( pbsself->pbs_conn < 0 )
295                                        pbsdrmaa_exc_raise_pbs( "pbs_connect" );
296                                else
297                                        goto retry;
298                         }
299                        else
300                         {
301                                pbsdrmaa_exc_raise_pbs( "pbs_statjob" );
302                         }
303                 }
304                conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
305
306                 {
307                        size_t i;
308                        fsd_job_t *job;
309                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
310                        for( i = 0;  i < jobs->tab_size;  i++ )
311                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
312                                 {
313                                        fsd_mutex_lock( &job->mutex );
314                                        job->flags |= FSD_JOB_MISSING;
315                                        fsd_mutex_unlock( &job->mutex );
316                                 }
317                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
318                 }
319
320                 {
321                        struct batch_status *volatile i;
322                        for( i = status;  i != NULL;  i = i->next )
323                         {
324                                fsd_job_t *job = NULL;
325                                fsd_log_debug(( "job_id=%s", i->name ));
326                                job = self->get_job( self, i->name );
327                                if( job != NULL )
328                                 {
329                                        job->flags &= ~FSD_JOB_MISSING;
330                                        TRY
331                                         {
332                                                ((pbsdrmaa_job_t*)job)->update( job, i );
333                                         }
334                                        FINALLY
335                                         {
336                                                job->release( job );
337                                         }
338                                        END_TRY
339                                 }
340                         }
341                 }
342
343                 {
344                        size_t volatile i;
345                        fsd_job_t *volatile job;
346                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
347                        for( i = 0;  i < jobs->tab_size;  i++ )
348                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
349                                 {
350                                        fsd_mutex_lock( &job->mutex );
351                                        TRY
352                                         {
353                                                if( job->flags & FSD_JOB_MISSING )
354                                                        job->on_missing( job );
355                                         }
356                                        FINALLY{ fsd_mutex_unlock( &job->mutex ); }
357                                        END_TRY
358                                 }
359                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
360                 }
361         }
362        FINALLY
363         {
364                if( status != NULL )
365                        pbs_statfree( status );
366                if( conn_lock )
367                        conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
368                if( jobs_lock )
369                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
370         }
371        END_TRY
372
373        fsd_log_return((""));
374}
375
376
377
378struct attrl *
379pbsdrmaa_create_status_attrl(void)
380{
381        struct attrl *result = NULL;
382        struct attrl *i;
383        const int max_attribs = 16;
384        int n_attribs;
385        int j = 0;
386
387        fsd_log_enter((""));
388        fsd_calloc( result, max_attribs, struct attrl );
389        result[j++].name="job_state";
390        result[j++].name="exit_status";
391        result[j++].name="resources_used";
392        result[j++].name="ctime";
393        result[j++].name="mtime";
394        result[j++].name="qtime";
395        result[j++].name="etime";
396
397        result[j++].name="queue";
398        result[j++].name="Account_Name";
399        result[j++].name="exec_host";
400        result[j++].name="start_time";
401        result[j++].name="mtime";
402#if 0
403        result[j].name="resources_used";  result[j].resource="walltime";  j++;
404        result[j].name="resources_used";  result[j].resource="cput";  j++;
405        result[j].name="resources_used";  result[j].resource="mem";  j++;
406        result[j].name="resources_used";  result[j].resource="vmem";  j++;
407        result[j].name="Resource_List";  result[j].resource="walltime";  j++;
408        result[j].name="Resource_List";  result[j].resource="cput";  j++;
409        result[j].name="Resource_List";  result[j].resource="mem";  j++;
410        result[j].name="Resource_List";  result[j].resource="vmem";  j++;
411#endif
412        n_attribs = j;
413        for( i = result;  true;  i++ )
414                if( i+1 < result + n_attribs )
415                        i->next = i+1;
416                else
417                 {
418                        i->next = NULL;
419                        break;
420                 }
421
422#ifdef DEBUGGING
423        fsd_log_return((":"));
424        pbsdrmaa_dump_attrl( result, NULL );
425#endif
426        return result;
427}
428
429
430bool
431pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self )
432{
433
434#ifndef PBS_PROFESSIONAL
435        struct attrl default_queue_query;
436        struct attrl keep_completed_query;
437        struct batch_status *default_queue_result = NULL;
438        struct batch_status *keep_completed_result = NULL;
439        const char *default_queue = NULL;
440        const char *keep_completed = NULL;
441        volatile bool result = false;
442        volatile bool conn_lock = false;
443
444        TRY
445         {
446                default_queue_query.next = NULL;
447                default_queue_query.name = "default_queue";
448                default_queue_query.resource = NULL;
449                default_queue_query.value = NULL;
450                keep_completed_query.next = NULL;
451                keep_completed_query.name = "keep_completed";
452                keep_completed_query.resource = NULL;
453                keep_completed_query.value = NULL;
454
455                conn_lock = fsd_mutex_lock( &self->super.drm_connection_mutex );
456
457                default_queue_result =
458                                pbs_statserver( self->pbs_conn, &default_queue_query, NULL );
459                if( default_queue_result == NULL )
460                        pbsdrmaa_exc_raise_pbs( "pbs_statserver" );
461                if( default_queue_result->attribs
462                                &&  !strcmp( default_queue_result->attribs->name,
463                                        "default_queue" ) )
464                        default_queue = default_queue_result->attribs->value;
465
466                fsd_log_debug(( "default_queue: %s", default_queue ));
467
468                if( default_queue )
469                 {
470                        keep_completed_result = pbs_statque( self->pbs_conn,
471                                        (char*)default_queue, &keep_completed_query, NULL );
472                        if( keep_completed_result == NULL )
473                                pbsdrmaa_exc_raise_pbs( "pbs_statque" );
474                        if( keep_completed_result->attribs
475                                        &&  !strcmp( keep_completed_result->attribs->name,
476                                                "keep_completed" ) )
477                                keep_completed = keep_completed_result->attribs->value;
478                 }
479
480                fsd_log_debug(( "keep_completed: %s", keep_completed ));
481         }
482        EXCEPT_DEFAULT
483         {
484                const fsd_exc_t *e = fsd_exc_get();
485                fsd_log_warning(( "PBS server seems not to keep completed jobs\n"
486                                "detail: %s", e->message(e) ));
487                result = false;
488         }
489        ELSE
490         {
491                result = false;
492                if( default_queue == NULL )
493                        fsd_log_warning(( "no default queue set on PBS server" ));
494                else if( keep_completed == NULL && self->pbs_home == NULL )
495                        fsd_log_warning(( "PBS server is not configured to keep completed jobs\n"
496                                                "in Torque: set keep_completed parameter of default queue\n"
497                                                "  $ qmgr -c 'set queue batch keep_completed = 60'\n"
498                                                " or configure DRMAA to utilize log files"
499                                                ));
500                else
501                        result = true;
502         }
503        FINALLY
504         {
505                if( default_queue_result )
506                        pbs_statfree( default_queue_result );
507                if( keep_completed_result )
508                        pbs_statfree( keep_completed_result );
509                if( conn_lock )
510                        conn_lock = fsd_mutex_unlock( &self->super.drm_connection_mutex );
511
512         }
513        END_TRY
514#endif
515        return false;
516}
517
518
519enum field
520  {
521  FLD_DATE = 0,
522  FLD_EVENT = 1,
523  FLD_OBJ = 2,
524  FLD_TYPE = 3,
525  FLD_ID = 4,
526  FLD_MSG = 5
527  };
528
529enum field_msg
530  {
531  FLD_MSG_EXIT_STATUS = 0,
532  FLD_MSG_CPUT = 1,
533  FLD_MSG_MEM = 2,
534  FLD_MSG_VMEM = 3,
535  FLD_MSG_WALLTIME = 4
536  };
537
538#define FLD_MSG_STATUS "0010"
539#define FLD_MSG_STATE "0008"
540#define FLD_MSG_LOG "0002"
541
542ssize_t fsd_getline(char * line,ssize_t size, int fd)
543{
544        char buf;
545        char * ptr = NULL;
546        ssize_t n = 0, rc;
547        ptr = line;
548        for(n = 1; n< size; n++)
549        {               
550                if( (rc = read(fd,&buf,1 )) == 1) {
551                        *ptr++ = buf;
552                        if(buf == '\n')
553                        {
554                                break;
555                        }
556                }
557                else if (rc == 0) {
558                        if (n == 1)
559                                return 0;
560                        else
561                                break;
562                }               
563                else
564                        return -1;
565        }
566
567        return n;
568}
569
570void *
571pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
572{
573        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*) self;
574        fsd_job_t *volatile job = NULL;
575        pbsdrmaa_job_t *volatile pbsjob = NULL;
576        char job_id[256] = "";
577        char event[256] = "";
578        time_t t;
579        struct tm tm;
580
581        tm = pbsself->log_file_initial_time;
582
583        fsd_log_enter(( "" ));
584        fsd_mutex_lock( &self->mutex );
585        TRY
586         {     
587                char * volatile log_path = NULL;
588                char buffer[4096] = "";
589                bool volatile date_changed = true;
590                int  volatile fd = -1;
591                bool first_open = true;
592
593                fsd_log_debug(("WT - reading log files"));
594
595                while( self->wait_thread_run_flag )
596                TRY
597                 {                     
598                        if(date_changed)
599                        {
600                                int num_tries = 0;
601                               
602                                time(&t);       
603                                localtime_r(&t,&tm);
604                       
605                                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
606                                /* generate new date, close file and open new */
607                                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
608                                        pbsself->pbs_home,       
609                                        tm.tm_year + 1900,
610                                        tm.tm_mon + 1,
611                                        tm.tm_mday)) == NULL) {
612                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
613                                }
614
615                                if(fd != -1)
616                                        close(fd);
617
618                                fsd_log_debug(("Log file: %s",log_path));
619                               
620                retry:
621                                if((fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
622                                {
623                                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
624                                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
625                                        /*pbsself->super.enable_wait_thread = false;*/ /* run not wait_thread */
626                                        pbsself->wait_thread_log = false;
627                                        pbsself->super.wait_thread = pbsself->super_wait_thread;
628                                        pbsself->super.wait_thread(self);
629                                } else if ( fd == -1 ) {
630                                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
631                                        num_tries++;
632                                        sleep(5);
633                                        goto retry;
634                                }
635
636                                fsd_free(log_path);
637
638                                fsd_log_debug(("Log file opened"));
639
640                                if(first_open) {
641                                        fsd_log_debug(("Log file lseek"));
642                                        if(lseek(fd,pbsself->log_file_initial_size,SEEK_SET) == (off_t) -1) {
643                                                char errbuf[256] = "InternalError";
644                                                (void)strerror_r(errno, errbuf, 256);
645                                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
646                                        }
647                                        first_open = false;
648                                }
649
650                                date_changed = false;
651                        }                               
652                       
653                        while ((fsd_getline(buffer,sizeof(buffer),fd)) > 0)                     
654                        {
655                                const char *volatile ptr = buffer;
656                                char field[256] = "";
657                                int volatile field_n = 0;
658                                int n;
659
660                                bool volatile job_id_match = false;
661                                bool volatile event_match = false;
662                                bool volatile log_event = false;
663                                bool volatile log_match = false;
664                                char *  temp_date = NULL;
665                               
666
667                                struct batch_status status;
668                                status.next = NULL;
669
670                                if( strlcpy(job_id,"",sizeof(job_id)) > sizeof(job_id) ) {
671                                        fsd_log_error(("WT - strlcpy error"));
672                                }
673                                if( strlcpy(event,"",sizeof(event)) > sizeof(event) ) {
674                                        fsd_log_error(("WT - strlcpy error"));
675                                }
676                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */
677                                {
678                                        if(field_n == FLD_DATE)
679                                        {
680                                                temp_date = fsd_strdup(field);
681                                        }
682                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 ||
683                                                                    strcmp(field,FLD_MSG_STATE) == 0 ))
684                                        {
685                                                /* event described by log line*/
686                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event)) {
687                                                        fsd_log_error(("WT - strlcpy error"));
688                                                }
689                                                event_match = true;                                                                     
690                                        }
691                                        else if(event_match && field_n == FLD_ID)
692                                        {       
693                                                TRY
694                                                {       
695                                                        job = self->get_job( self, field );
696                                                        pbsjob = (pbsdrmaa_job_t*) job;
697
698                                                        if( job )
699                                                        {
700                                                                if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
701                                                                        fsd_log_error(("WT - strlcpy error"));
702                                                                }
703                                                                fsd_log_debug(("WT - job_id: %s",job_id));
704                                                                status.name = fsd_strdup(job_id);
705                                                                job_id_match = true; /* job_id is in drmaa */
706                                                        }
707                                                        else
708                                                        {
709                                                                fsd_log_debug(("WT - Unknown job: %s", field));
710                                                        }
711                                                }
712                                                END_TRY
713                                        }
714                                        else if(job_id_match && field_n == FLD_MSG)
715                                        {                                               
716                                                /* parse msg - depends on FLD_EVENT*/
717                                                struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem,
718                                                        struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name;       
719                                               
720                                                bool state_running = false;
721
722                                                struct_status.name = NULL;
723                                                struct_status.value = NULL;
724                                                struct_status.next = NULL;
725                                                struct_status.resource = NULL;
726
727                                                struct_state.name = NULL;
728                                                struct_state.value = NULL;
729                                                struct_state.next = NULL;
730                                                struct_state.resource = NULL;
731
732                                                struct_resource_cput.name = NULL;
733                                                struct_resource_cput.value = NULL;
734                                                struct_resource_cput.next = NULL;
735                                                struct_resource_cput.resource = NULL;
736
737                                                struct_resource_mem.name = NULL;
738                                                struct_resource_mem.value = NULL;
739                                                struct_resource_mem.next = NULL;
740                                                struct_resource_mem.resource = NULL;
741
742                                                struct_resource_vmem.name = NULL;
743                                                struct_resource_vmem.value = NULL;
744                                                struct_resource_vmem.next = NULL;
745                                                struct_resource_vmem.resource = NULL;
746
747                                                struct_resource_walltime.name = NULL;
748                                                struct_resource_walltime.value = NULL;
749                                                struct_resource_walltime.next = NULL;
750                                                struct_resource_walltime.resource = NULL;
751
752                                                struct_start_time.name = NULL;
753                                                struct_start_time.value = NULL;
754                                                struct_start_time.next = NULL;
755                                                struct_start_time.resource = NULL;
756
757                                                struct_mtime.name = NULL;
758                                                struct_mtime.value = NULL;
759                                                struct_mtime.next = NULL;
760                                                struct_mtime.resource = NULL;
761
762                                                struct_queue.name = NULL;
763                                                struct_queue.value = NULL;
764                                                struct_queue.next = NULL;
765                                                struct_queue.resource = NULL;
766
767                                                struct_account_name.name = NULL;
768                                                struct_account_name.value = NULL;
769                                                struct_account_name.next = NULL;
770                                                struct_account_name.resource = NULL;
771
772                                                               
773                                                if (strcmp(event,FLD_MSG_STATE) == 0)
774                                                {
775                                                        /* job run, modified, queued etc */
776                                                        int n = 0;
777                                                        status.attribs = &struct_state;
778                                                        struct_state.next = NULL;
779                                                        struct_state.name = "job_state";
780                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
781                                                        {
782                                                                n = 4;                                                         
783                                                        }               
784                                                        if(field[4] == 'M') {
785                                                                struct tm temp_time_tm;
786                                                                memset(&temp_time_tm, 0, sizeof(temp_time_tm));
787                                                                temp_time_tm.tm_isdst = -1;
788
789                                                                if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
790                                                                 {
791                                                                        fsd_log_error(("failed to parse mtime: %s", temp_date));
792                                                                 }
793                                                                else
794                                                                 {
795                                                                        time_t temp_time = mktime(&temp_time_tm);
796                                                                        status.attribs = &struct_mtime;
797                                                                        struct_mtime.name = "mtime";
798                                                                        struct_mtime.next = NULL;
799                                                                        struct_mtime.value = fsd_asprintf("%lu",temp_time);
800                                                                 }
801                                                        }               
802                                                        /* != Job deleted and Job to be deleted*/
803                                                        #ifdef PBS_PROFESSIONAL
804                                                        else if (field[4] != 't' && field[10] != 'd') {
805                                                        #else           
806                                                        else if(field[4] != 'd') {
807                                                        #endif
808
809                                                                if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */
810                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
811                                                                }
812                                                                if(struct_state.value[0] == 'R'){
813                                                                        state_running = true;
814                                                                }
815                                                        }
816                                                        else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
817                                                                struct_status.name = "exit_status";
818                                                                struct_status.value = fsd_strdup("-1");
819                                                                struct_status.next = NULL;
820                                                                struct_state.next = &struct_status;
821                                                                struct_state.value = fsd_strdup("C");                                                           
822                                                        }
823                                                }                                                   
824                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
825                                                {
826                                                        /* exit status and rusage */
827                                                        const char *ptr2 = field;
828                                                        char  msg[ 256 ] = "";
829                                                        int n2;
830                                                        int msg_field_n = 0;
831                                                       
832                                                        struct_resource_cput.name = "resources_used";
833                                                        struct_resource_mem.name = "resources_used";
834                                                        struct_resource_vmem.name = "resources_used";
835                                                        struct_resource_walltime.name = "resources_used";
836                                                        struct_status.name = "exit_status";
837                                                        struct_state.name = "job_state";
838                               
839                                                        status.attribs = &struct_resource_cput;
840                                                        struct_resource_cput.next = &struct_resource_mem;
841                                                        struct_resource_mem.next = &struct_resource_vmem;
842                                                        struct_resource_vmem.next = &struct_resource_walltime;
843                                                        struct_resource_walltime.next =  &struct_status;
844                                                        struct_status.next = &struct_state;
845                                                        struct_state.next = NULL;
846
847                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
848                                                         {                                             
849                                                                switch(msg_field_n)
850                                                                {
851                                                                        case FLD_MSG_EXIT_STATUS:
852                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
853                                                                                break;
854
855                                                                        case FLD_MSG_CPUT:
856                                                                                struct_resource_cput.resource = "cput";
857                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
858                                                                                break;
859
860                                                                        case FLD_MSG_MEM:
861                                                                                struct_resource_mem.resource = "mem";
862                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
863                                                                                break;
864
865                                                                        case FLD_MSG_VMEM:
866                                                                                struct_resource_vmem.resource = "vmem";
867                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
868                                                                                break;
869
870                                                                        case FLD_MSG_WALLTIME:
871                                                                                struct_resource_walltime.resource = "walltime";
872                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
873                                                                                break;
874                                                                }
875                                                             
876                                                                ptr2 += n2;
877                                                                msg_field_n++;
878                                                                if ( *ptr2 != ' ' )
879                                                                 {
880                                                                         break;
881                                                                 }
882                                                                ++ptr2;                                         
883                                                         }
884                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
885                                                }                                               
886
887                                                if ( state_running )
888                                                 {
889                                                        fsd_log_debug(("WT - forcing update of job: %s", job->job_id ));
890                                                        job->update_status( job );
891                                                 }
892                                                else
893                                                 {
894                                                        fsd_log_debug(("WT - updating job: %s", job->job_id ));
895                                                        pbsjob->update( job, &status );
896                                                 }
897
898                               
899                                                fsd_cond_broadcast( &job->status_cond);
900                                                fsd_cond_broadcast( &self->wait_condition );
901
902                                                if ( job )
903                                                        job->release( job );
904       
905                                                fsd_free(struct_resource_cput.value);
906                                                fsd_free(struct_resource_mem.value);
907                                                fsd_free(struct_resource_vmem.value);
908                                                fsd_free(struct_resource_walltime.value);
909                                                fsd_free(struct_status.value);
910                                                fsd_free(struct_state.value);
911                                                fsd_free(struct_start_time.value);
912                                                fsd_free(struct_mtime.value);
913                                                fsd_free(struct_queue.value);
914                                                fsd_free(struct_account_name.value);
915
916                                                if ( status.name!=NULL )
917                                                        fsd_free(status.name);
918                                        }
919                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
920                                        {
921                                                log_event = true;                                       
922                                        }
923                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
924                                        {
925                                                log_match = true;
926                                                log_event = false;
927                                        }
928                                        else if( log_match && field_n == FLD_MSG &&
929                                                field[0] == 'L' &&
930                                                field[1] == 'o' &&
931                                                field[2] == 'g' &&
932                                                field[3] == ' ' &&
933                                                field[4] == 'c' &&
934                                                field[5] == 'l' &&
935                                                field[6] == 'o' &&
936                                                field[7] == 's' &&
937                                                field[8] == 'e' &&
938                                                field[9] == 'd' )  /* last field in the file - strange bahaviour*/
939                                        {
940                                                fsd_log_debug(("WT - Date changed. Closing log file"));
941                                                date_changed = true;
942                                                log_match = false;
943                                        }
944                                       
945                                        ptr += n;
946                                        if ( *ptr != ';' )
947                                        {
948                                                break; /* end of line */
949                                        }
950                                        field_n++;
951                                        ++ptr;
952                                }               
953
954                                if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) {
955                                        fsd_log_error(("WT - strlcpy error"));
956                                }
957
958                                fsd_free(temp_date);                   
959                        }
960
961                        fsd_mutex_unlock( &self->mutex );       
962                        usleep(1000000);
963                        fsd_mutex_lock( &self->mutex );
964                 }
965                EXCEPT_DEFAULT
966                 {
967                        const fsd_exc_t *e = fsd_exc_get();
968
969                        fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));
970                        fsd_exc_reraise();
971                 }
972                END_TRY
973
974                if(fd != -1)
975                        close(fd);
976                fsd_log_debug(("Log file closed"));
977         }
978        FINALLY
979         {
980                fsd_log_debug(("WT - Terminated."));   
981                fsd_mutex_unlock( &self->mutex );
982         }
983        END_TRY
984
985        fsd_log_return(( " =NULL" ));
986        return NULL;
987}
Note: See TracBrowser for help on using the repository browser.