source: trunk/pbs_drmaa/session.c @ 1

Revision 1, 25.9 KB checked in by mmamonski, 14 years ago (diff)

Torque/PBS DRMAA initial commit

Line 
1/* $Id: session.c 385 2011-01-04 18:24:05Z mamonski $ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <fcntl.h>
31
32#include <pbs_ifl.h>
33#include <pbs_error.h>
34
35#include <drmaa_utils/datetime.h>
36#include <drmaa_utils/drmaa.h>
37#include <drmaa_utils/iter.h>
38#include <drmaa_utils/conf.h>
39#include <drmaa_utils/session.h>
40#include <drmaa_utils/datetime.h>
41
42#include <pbs_drmaa/job.h>
43#include <pbs_drmaa/session.h>
44#include <pbs_drmaa/submit.h>
45#include <pbs_drmaa/util.h>
46
47#include <errno.h>
48
49#ifndef lint
50static char rcsid[]
51#       ifdef __GNUC__
52                __attribute__ ((unused))
53#       endif
54        = "$Id: session.c 385 2011-01-04 18:24:05Z mamonski $";
55#endif
56
57static void
58pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
59
60static void
61pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
62
63static fsd_job_t *
64pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
65
66static bool
67pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self );
68
69static void
70pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
71
72static void
73*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
74
75static char *
76pbsdrmaa_session_run_impl(
77                fsd_drmaa_session_t *self,
78                const fsd_template_t *jt,
79                int bulk_idx
80                );
81
82static struct attrl *
83pbsdrmaa_create_status_attrl(void);
84
85
86fsd_drmaa_session_t *
87pbsdrmaa_session_new( const char *contact )
88{
89        pbsdrmaa_session_t *volatile self = NULL;
90
91        if( contact == NULL )
92                contact = "";
93        TRY
94         {
95                self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
96                fsd_realloc( self, 1, pbsdrmaa_session_t );
97                self->super_wait_thread = NULL;
98
99                self->log_file_initial_size = 0;
100                self->pbs_conn = -1;
101                self->pbs_home = NULL;
102
103                self->wait_thread_log = false;
104                self->status_attrl = NULL;
105               
106                self->super_destroy = self->super.destroy;
107                self->super.destroy = pbsdrmaa_session_destroy;
108                self->super.new_job = pbsdrmaa_session_new_job;
109                self->super.update_all_jobs_status
110                                = pbsdrmaa_session_update_all_jobs_status;
111                self->super.run_impl = pbsdrmaa_session_run_impl;
112
113                self->super_apply_configuration = self->super.apply_configuration;
114                self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
115
116                self->do_drm_keeps_completed_jobs =
117                        pbsdrmaa_session_do_drm_keeps_completed_jobs;
118
119                self->status_attrl = pbsdrmaa_create_status_attrl();
120
121                self->pbs_conn = pbs_connect( self->super.contact );
122                fsd_log_debug(( "pbs_connect(%s) =%d", self->super.contact,
123                                        self->pbs_conn ));
124                if( self->pbs_conn < 0 )
125                        pbsdrmaa_exc_raise_pbs( "pbs_connect" );
126
127                self->super.load_configuration( &self->super, "pbs_drmaa" );
128
129                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
130                if( self->do_drm_keeps_completed_jobs( self ) )
131                        self->super.missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS;
132         }
133        EXCEPT_DEFAULT
134         {
135                if( self )
136                  {
137                        self->super.destroy( &self->super );
138                        self = NULL;
139                  }
140         }
141        END_TRY
142        return (fsd_drmaa_session_t*)self;
143}
144
145
146void
147pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
148{
149        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
150        self->stop_wait_thread( self );
151        if( pbsself->pbs_conn >= 0 )
152                pbs_disconnect( pbsself->pbs_conn );
153        fsd_free( pbsself->status_attrl );
154        pbsself->super_destroy( self );
155}
156
157
158static char *
159pbsdrmaa_session_run_impl(
160                fsd_drmaa_session_t *self,
161                const fsd_template_t *jt,
162                int bulk_idx
163                )
164{
165        char *volatile job_id = NULL;
166        fsd_job_t *volatile job = NULL;
167        pbsdrmaa_submit_t *volatile submit = NULL;
168
169        fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
170        TRY
171         {
172                submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
173                submit->eval( submit );
174                job_id = submit->submit( submit );
175                job = self->new_job( self, job_id );
176                job->submit_time = time(NULL);
177                job->flags |= FSD_JOB_CURRENT_SESSION;
178                self->jobs->add( self->jobs, job );
179                job->release( job );  job = NULL;
180         }
181        EXCEPT_DEFAULT
182         {
183                fsd_free( job_id );
184                fsd_exc_reraise();
185         }
186        FINALLY
187         {
188                if( submit )
189                        submit->destroy( submit );
190                if( job )
191                        job->release( job );
192         }
193        END_TRY
194        fsd_log_return(( " =%s", job_id ));
195        return job_id;
196}
197
198
199static fsd_job_t *
200pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
201{
202        fsd_job_t *job;
203        job = pbsdrmaa_job_new( fsd_strdup(job_id) );
204        job->session = self;
205        return job;
206}
207
208void
209pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
210{
211        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
212        fsd_conf_option_t *pbs_home;
213        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
214        if( pbs_home )
215         {
216                if( pbs_home->type == FSD_CONF_STRING )
217                 {
218                        struct stat statbuf;
219                        char * volatile log_path;
220                        time_t t;
221
222                        pbsself->pbs_home = pbs_home->val.string;
223                        fsd_log_debug(("pbs_home: %s",pbsself->pbs_home));
224                        pbsself->super_wait_thread = pbsself->super.wait_thread;
225                        pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;             
226                        pbsself->wait_thread_log = true;
227       
228                        time(&t);       
229                        localtime_r(&t,&pbsself->log_file_initial_time);
230
231                        if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
232                                pbsself->pbs_home,       
233                                pbsself->log_file_initial_time.tm_year + 1900,
234                                pbsself->log_file_initial_time.tm_mon + 1,
235                                pbsself->log_file_initial_time.tm_mday)) == NULL) {
236                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
237                        }
238
239                        if(stat(log_path,&statbuf) == -1) {
240                                char errbuf[256] = "InternalError";
241                                (void)strerror_r(errno, errbuf, 256);
242                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
243                        }
244       
245                        fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
246                        pbsself->log_file_initial_size = statbuf.st_size;
247                        fsd_free(log_path);
248                 }
249                else
250                {
251                        pbsself->super.enable_wait_thread = false;
252                        pbsself->wait_thread_log = false;
253                        fsd_log_debug(("pbs_home not configured. Running standard wait_thread (pooling)."));
254                }
255         }
256
257
258
259        pbsself->super_apply_configuration(self); /* call method from the superclass */
260}
261
262
263void
264pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
265{
266        volatile bool conn_lock = false;
267        volatile bool jobs_lock = false;
268        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
269        fsd_job_set_t *jobs = self->jobs;
270        struct batch_status *volatile status = NULL;
271
272        fsd_log_enter((""));
273
274        TRY
275         {
276                conn_lock = fsd_mutex_lock( &self->drm_connection_mutex );
277retry:
278
279#ifdef PBS_PROFESSIONAL
280                status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL );
281#else
282                status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL );
283#endif
284                fsd_log_debug(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p",
285                                 pbsself->pbs_conn, (void*)status ));
286                if( status == NULL  &&  pbs_errno != 0 )
287                 {
288                        if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED)
289                         {
290                                pbs_disconnect( pbsself->pbs_conn );
291                                sleep(1);
292                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
293                                if( pbsself->pbs_conn < 0 )
294                                        pbsdrmaa_exc_raise_pbs( "pbs_connect" );
295                                else
296                                        goto retry;
297                         }
298                        else
299                         {
300                                pbsdrmaa_exc_raise_pbs( "pbs_statjob" );
301                         }
302                 }
303                conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
304
305                 {
306                        size_t i;
307                        fsd_job_t *job;
308                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
309                        for( i = 0;  i < jobs->tab_size;  i++ )
310                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
311                                 {
312                                        fsd_mutex_lock( &job->mutex );
313                                        job->flags |= FSD_JOB_MISSING;
314                                        fsd_mutex_unlock( &job->mutex );
315                                 }
316                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
317                 }
318
319                 {
320                        struct batch_status *volatile i;
321                        for( i = status;  i != NULL;  i = i->next )
322                         {
323                                fsd_job_t *job = NULL;
324                                fsd_log_debug(( "job_id=%s", i->name ));
325                                job = self->get_job( self, i->name );
326                                if( job != NULL )
327                                 {
328                                        job->flags &= ~FSD_JOB_MISSING;
329                                        TRY
330                                         {
331                                                ((pbsdrmaa_job_t*)job)->update( job, i );
332                                         }
333                                        FINALLY
334                                         {
335                                                job->release( job );
336                                         }
337                                        END_TRY
338                                 }
339                         }
340                 }
341
342                 {
343                        size_t volatile i;
344                        fsd_job_t *volatile job;
345                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
346                        for( i = 0;  i < jobs->tab_size;  i++ )
347                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
348                                 {
349                                        fsd_mutex_lock( &job->mutex );
350                                        TRY
351                                         {
352                                                if( job->flags & FSD_JOB_MISSING )
353                                                        job->on_missing( job );
354                                         }
355                                        FINALLY{ fsd_mutex_unlock( &job->mutex ); }
356                                        END_TRY
357                                 }
358                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
359                 }
360         }
361        FINALLY
362         {
363                if( status != NULL )
364                        pbs_statfree( status );
365                if( conn_lock )
366                        conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
367                if( jobs_lock )
368                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
369         }
370        END_TRY
371
372        fsd_log_return((""));
373}
374
375
376
377struct attrl *
378pbsdrmaa_create_status_attrl(void)
379{
380        struct attrl *result = NULL;
381        struct attrl *i;
382        const int max_attribs = 16;
383        int n_attribs;
384        int j = 0;
385
386        fsd_log_enter((""));
387        fsd_calloc( result, max_attribs, struct attrl );
388        result[j++].name="job_state";
389        result[j++].name="exit_status";
390        result[j++].name="resources_used";
391        result[j++].name="ctime";
392        result[j++].name="mtime";
393        result[j++].name="qtime";
394        result[j++].name="etime";
395
396        result[j++].name="queue";
397        result[j++].name="Account_Name";
398        result[j++].name="exec_host";
399        result[j++].name="start_time";
400        result[j++].name="mtime";
401#if 0
402        result[j].name="resources_used";  result[j].resource="walltime";  j++;
403        result[j].name="resources_used";  result[j].resource="cput";  j++;
404        result[j].name="resources_used";  result[j].resource="mem";  j++;
405        result[j].name="resources_used";  result[j].resource="vmem";  j++;
406        result[j].name="Resource_List";  result[j].resource="walltime";  j++;
407        result[j].name="Resource_List";  result[j].resource="cput";  j++;
408        result[j].name="Resource_List";  result[j].resource="mem";  j++;
409        result[j].name="Resource_List";  result[j].resource="vmem";  j++;
410#endif
411        n_attribs = j;
412        for( i = result;  true;  i++ )
413                if( i+1 < result + n_attribs )
414                        i->next = i+1;
415                else
416                 {
417                        i->next = NULL;
418                        break;
419                 }
420
421#ifdef DEBUGGING
422        fsd_log_return((":"));
423        pbsdrmaa_dump_attrl( result, NULL );
424#endif
425        return result;
426}
427
428
429bool
430pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self )
431{
432
433#ifndef PBS_PROFESSIONAL
434        struct attrl default_queue_query;
435        struct attrl keep_completed_query;
436        struct batch_status *default_queue_result = NULL;
437        struct batch_status *keep_completed_result = NULL;
438        const char *default_queue = NULL;
439        const char *keep_completed = NULL;
440        volatile bool result = false;
441        volatile bool conn_lock = false;
442
443        TRY
444         {
445                default_queue_query.next = NULL;
446                default_queue_query.name = "default_queue";
447                default_queue_query.resource = NULL;
448                default_queue_query.value = NULL;
449                keep_completed_query.next = NULL;
450                keep_completed_query.name = "keep_completed";
451                keep_completed_query.resource = NULL;
452                keep_completed_query.value = NULL;
453
454                conn_lock = fsd_mutex_lock( &self->super.drm_connection_mutex );
455
456                default_queue_result =
457                                pbs_statserver( self->pbs_conn, &default_queue_query, NULL );
458                if( default_queue_result == NULL )
459                        pbsdrmaa_exc_raise_pbs( "pbs_statserver" );
460                if( default_queue_result->attribs
461                                &&  !strcmp( default_queue_result->attribs->name,
462                                        "default_queue" ) )
463                        default_queue = default_queue_result->attribs->value;
464
465                fsd_log_debug(( "default_queue: %s", default_queue ));
466
467                if( default_queue )
468                 {
469                        keep_completed_result = pbs_statque( self->pbs_conn,
470                                        (char*)default_queue, &keep_completed_query, NULL );
471                        if( keep_completed_result == NULL )
472                                pbsdrmaa_exc_raise_pbs( "pbs_statque" );
473                        if( keep_completed_result->attribs
474                                        &&  !strcmp( keep_completed_result->attribs->name,
475                                                "keep_completed" ) )
476                                keep_completed = keep_completed_result->attribs->value;
477                 }
478
479                fsd_log_debug(( "keep_completed: %s", keep_completed ));
480         }
481        EXCEPT_DEFAULT
482         {
483                const fsd_exc_t *e = fsd_exc_get();
484                fsd_log_warning(( "PBS server seems not to keep completed jobs\n"
485                                "detail: %s", e->message(e) ));
486                result = false;
487         }
488        ELSE
489         {
490                result = false;
491                if( default_queue == NULL )
492                        fsd_log_warning(( "no default queue set on PBS server" ));
493                else if( keep_completed == NULL && self->pbs_home == NULL )
494                        fsd_log_warning(( "PBS server is not configured to keep completed jobs\n"
495                                                "in Torque: set keep_completed parameter of default queue\n"
496                                                "  $ qmgr -c 'set queue batch keep_completed = 60'\n"
497                                                " or configure DRMAA to utilize log files"
498                                                ));
499                else
500                        result = true;
501         }
502        FINALLY
503         {
504                if( default_queue_result )
505                        pbs_statfree( default_queue_result );
506                if( keep_completed_result )
507                        pbs_statfree( keep_completed_result );
508                if( conn_lock )
509                        conn_lock = fsd_mutex_unlock( &self->super.drm_connection_mutex );
510
511         }
512        END_TRY
513#endif
514        return false;
515}
516
517
518enum field
519  {
520  FLD_DATE = 0,
521  FLD_EVENT = 1,
522  FLD_OBJ = 2,
523  FLD_TYPE = 3,
524  FLD_ID = 4,
525  FLD_MSG = 5
526  };
527
528enum field_msg
529  {
530  FLD_MSG_EXIT_STATUS = 0,
531  FLD_MSG_CPUT = 1,
532  FLD_MSG_MEM = 2,
533  FLD_MSG_VMEM = 3,
534  FLD_MSG_WALLTIME = 4
535  };
536
537#define FLD_MSG_STATUS "0010"
538#define FLD_MSG_STATE "0008"
539#define FLD_MSG_LOG "0002"
540
541ssize_t fsd_getline(char * line,ssize_t size, int fd)
542{
543        char buf;
544        char * ptr = NULL;
545        ssize_t n = 0, rc;
546        ptr = line;
547        for(n = 1; n< size; n++)
548        {               
549                if( (rc = read(fd,&buf,1 )) == 1) {
550                        *ptr++ = buf;
551                        if(buf == '\n')
552                        {
553                                break;
554                        }
555                }
556                else if (rc == 0) {
557                        if (n == 1)
558                                return 0;
559                        else
560                                break;
561                }               
562                else
563                        return -1;
564        }
565
566        return n;
567}
568
569void *
570pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
571{
572        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*) self;
573        fsd_job_t *volatile job = NULL;
574        pbsdrmaa_job_t *volatile pbsjob = NULL;
575        char job_id[256] = "";
576        char event[256] = "";
577        time_t t;
578        struct tm tm;
579
580        tm = pbsself->log_file_initial_time;
581
582        fsd_log_enter(( "" ));
583        fsd_mutex_lock( &self->mutex );
584        TRY
585         {     
586                char * volatile log_path = NULL;
587                char buffer[4096] = "";
588                bool volatile date_changed = true;
589                int  volatile fd = -1;
590                bool first_open = true;
591
592                fsd_log_debug(("WT - reading log files"));
593
594                while( self->wait_thread_run_flag )
595                TRY
596                 {                     
597                        if(date_changed)
598                        {
599                                int num_tries = 0;
600                               
601                                time(&t);       
602                                localtime_r(&t,&tm);
603                       
604                                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
605                                /* generate new date, close file and open new */
606                                if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
607                                        pbsself->pbs_home,       
608                                        tm.tm_year + 1900,
609                                        tm.tm_mon + 1,
610                                        tm.tm_mday)) == NULL) {
611                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
612                                }
613
614                                if(fd != -1)
615                                        close(fd);
616
617                                fsd_log_debug(("Log file: %s",log_path));
618                               
619                retry:
620                                if((fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES )
621                                {
622                                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
623                                        fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly"));
624                                        /*pbsself->super.enable_wait_thread = false;*/ /* run not wait_thread */
625                                        pbsself->wait_thread_log = false;
626                                        pbsself->super.wait_thread = pbsself->super_wait_thread;
627                                        pbsself->super.wait_thread(self);
628                                } else if ( fd == -1 ) {
629                                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
630                                        num_tries++;
631                                        sleep(5);
632                                        goto retry;
633                                }
634
635                                fsd_free(log_path);
636
637                                fsd_log_debug(("Log file opened"));
638
639                                if(first_open) {
640                                        fsd_log_debug(("Log file lseek"));
641                                        if(lseek(fd,pbsself->log_file_initial_size,SEEK_SET) == (off_t) -1) {
642                                                char errbuf[256] = "InternalError";
643                                                (void)strerror_r(errno, errbuf, 256);
644                                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf);
645                                        }
646                                        first_open = false;
647                                }
648
649                                date_changed = false;
650                        }                               
651                       
652                        while ((fsd_getline(buffer,sizeof(buffer),fd)) > 0)                     
653                        {
654                                const char *volatile ptr = buffer;
655                                char field[256] = "";
656                                int volatile field_n = 0;
657                                int n;
658
659                                bool volatile job_id_match = false;
660                                bool volatile event_match = false;
661                                bool volatile log_event = false;
662                                bool volatile log_match = false;
663                                char *  temp_date = NULL;
664                               
665
666                                struct batch_status status;
667                                status.next = NULL;
668
669                                if( strlcpy(job_id,"",sizeof(job_id)) > sizeof(job_id) ) {
670                                        fsd_log_error(("WT - strlcpy error"));
671                                }
672                                if( strlcpy(event,"",sizeof(event)) > sizeof(event) ) {
673                                        fsd_log_error(("WT - strlcpy error"));
674                                }
675                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */
676                                {
677                                        if(field_n == FLD_DATE)
678                                        {
679                                                temp_date = fsd_strdup(field);
680                                        }
681                                        else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 ||
682                                                                    strcmp(field,FLD_MSG_STATE) == 0 ))
683                                        {
684                                                /* event described by log line*/
685                                                if(strlcpy(event, field,sizeof(event)) > sizeof(event)) {
686                                                        fsd_log_error(("WT - strlcpy error"));
687                                                }
688                                                event_match = true;                                                                     
689                                        }
690                                        else if(event_match && field_n == FLD_ID)
691                                        {       
692                                                TRY
693                                                {       
694                                                        job = self->get_job( self, field );
695                                                        pbsjob = (pbsdrmaa_job_t*) job;
696
697                                                        if( job )
698                                                        {
699                                                                if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) {
700                                                                        fsd_log_error(("WT - strlcpy error"));
701                                                                }
702                                                                fsd_log_debug(("WT - job_id: %s",job_id));
703                                                                status.name = fsd_strdup(job_id);
704                                                                job_id_match = true; /* job_id is in drmaa */
705                                                        }
706                                                        else
707                                                        {
708                                                                fsd_log_debug(("WT - Unknown job: %s", field));
709                                                        }
710                                                }
711                                                END_TRY
712                                        }
713                                        else if(job_id_match && field_n == FLD_MSG)
714                                        {                                               
715                                                /* parse msg - depends on FLD_EVENT*/
716                                                struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem,
717                                                        struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name;       
718                                               
719                                                bool state_running = false;
720
721                                                struct_status.name = NULL;
722                                                struct_status.value = NULL;
723                                                struct_status.next = NULL;
724                                                struct_status.resource = NULL;
725
726                                                struct_state.name = NULL;
727                                                struct_state.value = NULL;
728                                                struct_state.next = NULL;
729                                                struct_state.resource = NULL;
730
731                                                struct_resource_cput.name = NULL;
732                                                struct_resource_cput.value = NULL;
733                                                struct_resource_cput.next = NULL;
734                                                struct_resource_cput.resource = NULL;
735
736                                                struct_resource_mem.name = NULL;
737                                                struct_resource_mem.value = NULL;
738                                                struct_resource_mem.next = NULL;
739                                                struct_resource_mem.resource = NULL;
740
741                                                struct_resource_vmem.name = NULL;
742                                                struct_resource_vmem.value = NULL;
743                                                struct_resource_vmem.next = NULL;
744                                                struct_resource_vmem.resource = NULL;
745
746                                                struct_resource_walltime.name = NULL;
747                                                struct_resource_walltime.value = NULL;
748                                                struct_resource_walltime.next = NULL;
749                                                struct_resource_walltime.resource = NULL;
750
751                                                struct_start_time.name = NULL;
752                                                struct_start_time.value = NULL;
753                                                struct_start_time.next = NULL;
754                                                struct_start_time.resource = NULL;
755
756                                                struct_mtime.name = NULL;
757                                                struct_mtime.value = NULL;
758                                                struct_mtime.next = NULL;
759                                                struct_mtime.resource = NULL;
760
761                                                struct_queue.name = NULL;
762                                                struct_queue.value = NULL;
763                                                struct_queue.next = NULL;
764                                                struct_queue.resource = NULL;
765
766                                                struct_account_name.name = NULL;
767                                                struct_account_name.value = NULL;
768                                                struct_account_name.next = NULL;
769                                                struct_account_name.resource = NULL;
770
771                                                               
772                                                if (strcmp(event,FLD_MSG_STATE) == 0)
773                                                {
774                                                        /* job run, modified, queued etc */
775                                                        int n = 0;
776                                                        status.attribs = &struct_state;
777                                                        struct_state.next = NULL;
778                                                        struct_state.name = "job_state";
779                                                        if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/
780                                                        {
781                                                                n = 4;                                                         
782                                                        }               
783                                                        if(field[4] == 'M') {
784                                                                struct tm temp_time_tm;
785                                                                memset(&temp_time_tm, 0, sizeof(temp_time_tm));
786                                                                temp_time_tm.tm_isdst = -1;
787
788                                                                if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
789                                                                 {
790                                                                        fsd_log_error(("failed to parse mtime: %s", temp_date));
791                                                                 }
792                                                                else
793                                                                 {
794                                                                        time_t temp_time = mktime(&temp_time_tm);
795                                                                        status.attribs = &struct_mtime;
796                                                                        struct_mtime.name = "mtime";
797                                                                        struct_mtime.next = NULL;
798                                                                        struct_mtime.value = fsd_asprintf("%lu",temp_time);
799                                                                 }
800                                                        }               
801                                                        /* != Job deleted and Job to be deleted*/
802                                                        #ifdef PBS_PROFESSIONAL
803                                                        else if (field[4] != 't' && field[10] != 'd') {
804                                                        #else           
805                                                        else if(field[4] != 'd') {
806                                                        #endif
807
808                                                                if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */
809                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible");
810                                                                }
811                                                                if(struct_state.value[0] == 'R'){
812                                                                        state_running = true;
813                                                                }
814                                                        }
815                                                        else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/
816                                                                struct_status.name = "exit_status";
817                                                                struct_status.value = fsd_strdup("-1");
818                                                                struct_status.next = NULL;
819                                                                struct_state.next = &struct_status;
820                                                                struct_state.value = fsd_strdup("C");                                                           
821                                                        }
822                                                }                                                   
823                                                else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/
824                                                {
825                                                        /* exit status and rusage */
826                                                        const char *ptr2 = field;
827                                                        char  msg[ 256 ] = "";
828                                                        int n2;
829                                                        int msg_field_n = 0;
830                                                       
831                                                        struct_resource_cput.name = "resources_used";
832                                                        struct_resource_mem.name = "resources_used";
833                                                        struct_resource_vmem.name = "resources_used";
834                                                        struct_resource_walltime.name = "resources_used";
835                                                        struct_status.name = "exit_status";
836                                                        struct_state.name = "job_state";
837                               
838                                                        status.attribs = &struct_resource_cput;
839                                                        struct_resource_cput.next = &struct_resource_mem;
840                                                        struct_resource_mem.next = &struct_resource_vmem;
841                                                        struct_resource_vmem.next = &struct_resource_walltime;
842                                                        struct_resource_walltime.next =  &struct_status;
843                                                        struct_status.next = &struct_state;
844                                                        struct_state.next = NULL;
845
846                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 )
847                                                         {                                             
848                                                                switch(msg_field_n)
849                                                                {
850                                                                        case FLD_MSG_EXIT_STATUS:
851                                                                                struct_status.value = fsd_strdup(strchr(msg,'=')+1);
852                                                                                break;
853
854                                                                        case FLD_MSG_CPUT:
855                                                                                struct_resource_cput.resource = "cput";
856                                                                                struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1);
857                                                                                break;
858
859                                                                        case FLD_MSG_MEM:
860                                                                                struct_resource_mem.resource = "mem";
861                                                                                struct_resource_mem.value  = fsd_strdup(strchr(msg,'=')+1);
862                                                                                break;
863
864                                                                        case FLD_MSG_VMEM:
865                                                                                struct_resource_vmem.resource = "vmem";
866                                                                                struct_resource_vmem.value  = fsd_strdup(strchr(msg,'=')+1);
867                                                                                break;
868
869                                                                        case FLD_MSG_WALLTIME:
870                                                                                struct_resource_walltime.resource = "walltime";
871                                                                                struct_resource_walltime.value  = fsd_strdup(strchr(msg,'=')+1);
872                                                                                break;
873                                                                }
874                                                             
875                                                                ptr2 += n2;
876                                                                msg_field_n++;
877                                                                if ( *ptr2 != ' ' )
878                                                                 {
879                                                                         break;
880                                                                 }
881                                                                ++ptr2;                                         
882                                                         }
883                                                        struct_state.value = fsd_strdup("C");   /* we got exit_status so we say that it has completed */
884                                                }                                               
885
886                                                if ( state_running )
887                                                 {
888                                                        fsd_log_debug(("WT - forcing update of job: %s", job->job_id ));
889                                                        job->update_status( job );
890                                                 }
891                                                else
892                                                 {
893                                                        fsd_log_debug(("WT - updating job: %s", job->job_id ));
894                                                        pbsjob->update( job, &status );
895                                                 }
896
897                               
898                                                fsd_cond_broadcast( &job->status_cond);
899                                                fsd_cond_broadcast( &self->wait_condition );
900
901                                                if ( job )
902                                                        job->release( job );
903       
904                                                fsd_free(struct_resource_cput.value);
905                                                fsd_free(struct_resource_mem.value);
906                                                fsd_free(struct_resource_vmem.value);
907                                                fsd_free(struct_resource_walltime.value);
908                                                fsd_free(struct_status.value);
909                                                fsd_free(struct_state.value);
910                                                fsd_free(struct_start_time.value);
911                                                fsd_free(struct_mtime.value);
912                                                fsd_free(struct_queue.value);
913                                                fsd_free(struct_account_name.value);
914
915                                                if ( status.name!=NULL )
916                                                        fsd_free(status.name);
917                                        }
918                                        else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)
919                                        {
920                                                log_event = true;                                       
921                                        }
922                                        else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )
923                                        {
924                                                log_match = true;
925                                                log_event = false;
926                                        }
927                                        else if( log_match && field_n == FLD_MSG &&
928                                                field[0] == 'L' &&
929                                                field[1] == 'o' &&
930                                                field[2] == 'g' &&
931                                                field[3] == ' ' &&
932                                                field[4] == 'c' &&
933                                                field[5] == 'l' &&
934                                                field[6] == 'o' &&
935                                                field[7] == 's' &&
936                                                field[8] == 'e' &&
937                                                field[9] == 'd' )  /* last field in the file - strange bahaviour*/
938                                        {
939                                                fsd_log_debug(("WT - Date changed. Closing log file"));
940                                                date_changed = true;
941                                                log_match = false;
942                                        }
943                                       
944                                        ptr += n;
945                                        if ( *ptr != ';' )
946                                        {
947                                                break; /* end of line */
948                                        }
949                                        field_n++;
950                                        ++ptr;
951                                }               
952
953                                if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) {
954                                        fsd_log_error(("WT - strlcpy error"));
955                                }
956
957                                fsd_free(temp_date);                   
958                        }
959
960                        fsd_mutex_unlock( &self->mutex );       
961                        usleep(1000000);
962                        fsd_mutex_lock( &self->mutex );
963                 }
964                EXCEPT_DEFAULT
965                 {
966                        const fsd_exc_t *e = fsd_exc_get();
967
968                        fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));
969                        fsd_exc_reraise();
970                 }
971                END_TRY
972
973                if(fd != -1)
974                        close(fd);
975                fsd_log_debug(("Log file closed"));
976         }
977        FINALLY
978         {
979                fsd_log_debug(("WT - Terminated."));   
980                fsd_mutex_unlock( &self->mutex );
981         }
982        END_TRY
983
984        fsd_log_return(( " =NULL" ));
985        return NULL;
986}
Note: See TracBrowser for help on using the repository browser.