source: trunk/pbs_drmaa/session.c @ 12

Revision 12, 13.5 KB checked in by mmamonski, 13 years ago (diff)

version 1.0.7 release candidate

  • Property svn:keywords set to Id
RevLine 
[12]1/* $Id$ */
[1]2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <fcntl.h>
31
32#include <pbs_ifl.h>
33#include <pbs_error.h>
34
35#include <drmaa_utils/datetime.h>
36#include <drmaa_utils/drmaa.h>
37#include <drmaa_utils/iter.h>
38#include <drmaa_utils/conf.h>
39#include <drmaa_utils/session.h>
40#include <drmaa_utils/datetime.h>
41
42#include <pbs_drmaa/job.h>
[7]43#include <pbs_drmaa/log_reader.h>
[1]44#include <pbs_drmaa/session.h>
45#include <pbs_drmaa/submit.h>
46#include <pbs_drmaa/util.h>
47
48#include <errno.h>
49
50#ifndef lint
51static char rcsid[]
52#       ifdef __GNUC__
53                __attribute__ ((unused))
54#       endif
[12]55        = "$Id$";
[1]56#endif
57
58static void
59pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
60
61static void
62pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
63
64static fsd_job_t *
65pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
66
67static bool
68pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self );
69
70static void
71pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
72
73static void
74*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
75
76static char *
77pbsdrmaa_session_run_impl(
78                fsd_drmaa_session_t *self,
79                const fsd_template_t *jt,
80                int bulk_idx
81                );
82
83static struct attrl *
84pbsdrmaa_create_status_attrl(void);
85
86
87fsd_drmaa_session_t *
88pbsdrmaa_session_new( const char *contact )
89{
90        pbsdrmaa_session_t *volatile self = NULL;
91
92        if( contact == NULL )
93                contact = "";
94        TRY
95         {
96                self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
97                fsd_realloc( self, 1, pbsdrmaa_session_t );
98                self->super_wait_thread = NULL;
99
100                self->log_file_initial_size = 0;
101                self->pbs_conn = -1;
102                self->pbs_home = NULL;
103
104                self->wait_thread_log = false;
105                self->status_attrl = NULL;
106               
107                self->super_destroy = self->super.destroy;
108                self->super.destroy = pbsdrmaa_session_destroy;
109                self->super.new_job = pbsdrmaa_session_new_job;
110                self->super.update_all_jobs_status
111                                = pbsdrmaa_session_update_all_jobs_status;
112                self->super.run_impl = pbsdrmaa_session_run_impl;
113
114                self->super_apply_configuration = self->super.apply_configuration;
115                self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
116
117                self->do_drm_keeps_completed_jobs =
118                        pbsdrmaa_session_do_drm_keeps_completed_jobs;
119
120                self->status_attrl = pbsdrmaa_create_status_attrl();
121
122                self->pbs_conn = pbs_connect( self->super.contact );
[10]123                fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact,
[1]124                                        self->pbs_conn ));
125                if( self->pbs_conn < 0 )
126                        pbsdrmaa_exc_raise_pbs( "pbs_connect" );
127
128                self->super.load_configuration( &self->super, "pbs_drmaa" );
129
130                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
131                if( self->do_drm_keeps_completed_jobs( self ) )
132                        self->super.missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS;
133         }
134        EXCEPT_DEFAULT
135         {
136                if( self )
137                  {
138                        self->super.destroy( &self->super );
139                        self = NULL;
140                  }
141         }
142        END_TRY
143        return (fsd_drmaa_session_t*)self;
144}
145
146
147void
148pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
149{
150        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
151        self->stop_wait_thread( self );
152        if( pbsself->pbs_conn >= 0 )
153                pbs_disconnect( pbsself->pbs_conn );
154        fsd_free( pbsself->status_attrl );
155        pbsself->super_destroy( self );
156}
157
158
159static char *
160pbsdrmaa_session_run_impl(
161                fsd_drmaa_session_t *self,
162                const fsd_template_t *jt,
163                int bulk_idx
164                )
165{
166        char *volatile job_id = NULL;
167        fsd_job_t *volatile job = NULL;
168        pbsdrmaa_submit_t *volatile submit = NULL;
169
170        fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
171        TRY
172         {
173                submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
174                submit->eval( submit );
175                job_id = submit->submit( submit );
176                job = self->new_job( self, job_id );
177                job->submit_time = time(NULL);
178                job->flags |= FSD_JOB_CURRENT_SESSION;
179                self->jobs->add( self->jobs, job );
180                job->release( job );  job = NULL;
181         }
182        EXCEPT_DEFAULT
183         {
184                fsd_free( job_id );
185                fsd_exc_reraise();
186         }
187        FINALLY
188         {
189                if( submit )
190                        submit->destroy( submit );
191                if( job )
192                        job->release( job );
193         }
194        END_TRY
195        fsd_log_return(( " =%s", job_id ));
196        return job_id;
197}
198
199
200static fsd_job_t *
201pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
202{
203        fsd_job_t *job;
204        job = pbsdrmaa_job_new( fsd_strdup(job_id) );
205        job->session = self;
206        return job;
207}
208
209void
210pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
211{
212        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
213        fsd_conf_option_t *pbs_home;
214        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
[12]215
216        if( pbs_home && pbs_home->type == FSD_CONF_STRING )
217          {
[1]218                        struct stat statbuf;
219                        char * volatile log_path;
[7]220                        struct tm tm;
221                       
[1]222                        pbsself->pbs_home = pbs_home->val.string;
[12]223                        fsd_log_info(("pbs_home: %s",pbsself->pbs_home));
[1]224                        pbsself->super_wait_thread = pbsself->super.wait_thread;
225                        pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;             
226                        pbsself->wait_thread_log = true;
227       
[7]228                        time(&pbsself->log_file_initial_time); 
229                        localtime_r(&pbsself->log_file_initial_time,&tm);
[1]230
[12]231                        log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
232                                        pbsself->pbs_home,
233                                        tm.tm_year + 1900,
234                                        tm.tm_mon + 1,
235                                        tm.tm_mday);
[1]236
[12]237                        if(stat(log_path,&statbuf) == -1)
238                          {
[1]239                                char errbuf[256] = "InternalError";
[12]240                                (void)strerror_r(errno, errbuf, sizeof(errbuf));
[1]241                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf);
[12]242                          }
[1]243       
244                        fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
245                        pbsself->log_file_initial_size = statbuf.st_size;
246                        fsd_free(log_path);
[12]247          }
[1]248
249        pbsself->super_apply_configuration(self); /* call method from the superclass */
250}
251
252
253void
254pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
255{
256        volatile bool conn_lock = false;
257        volatile bool jobs_lock = false;
258        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
259        fsd_job_set_t *jobs = self->jobs;
260        struct batch_status *volatile status = NULL;
261
262        fsd_log_enter((""));
263
264        TRY
265         {
266                conn_lock = fsd_mutex_lock( &self->drm_connection_mutex );
267retry:
[12]268/* TODO: query only for user's jobs pbs_selstat + ATTR_u */
[1]269#ifdef PBS_PROFESSIONAL
270                status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL );
271#else
272                status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL );
273#endif
[9]274                fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p",
[1]275                                 pbsself->pbs_conn, (void*)status ));
276                if( status == NULL  &&  pbs_errno != 0 )
277                 {
278                        if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED)
279                         {
[3]280                                if ( pbsself->pbs_conn >= 0)
281                                        pbs_disconnect( pbsself->pbs_conn );
[1]282                                sleep(1);
283                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
284                                if( pbsself->pbs_conn < 0 )
285                                        pbsdrmaa_exc_raise_pbs( "pbs_connect" );
286                                else
287                                        goto retry;
288                         }
289                        else
290                         {
291                                pbsdrmaa_exc_raise_pbs( "pbs_statjob" );
292                         }
293                 }
294                conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
295
296                 {
297                        size_t i;
298                        fsd_job_t *job;
299                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
300                        for( i = 0;  i < jobs->tab_size;  i++ )
301                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
302                                 {
303                                        fsd_mutex_lock( &job->mutex );
304                                        job->flags |= FSD_JOB_MISSING;
305                                        fsd_mutex_unlock( &job->mutex );
306                                 }
307                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
308                 }
309
310                 {
311                        struct batch_status *volatile i;
312                        for( i = status;  i != NULL;  i = i->next )
313                         {
314                                fsd_job_t *job = NULL;
315                                fsd_log_debug(( "job_id=%s", i->name ));
316                                job = self->get_job( self, i->name );
317                                if( job != NULL )
318                                 {
319                                        job->flags &= ~FSD_JOB_MISSING;
320                                        TRY
321                                         {
322                                                ((pbsdrmaa_job_t*)job)->update( job, i );
323                                         }
324                                        FINALLY
325                                         {
326                                                job->release( job );
327                                         }
328                                        END_TRY
329                                 }
330                         }
331                 }
332
333                 {
334                        size_t volatile i;
335                        fsd_job_t *volatile job;
336                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
337                        for( i = 0;  i < jobs->tab_size;  i++ )
338                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
339                                 {
340                                        fsd_mutex_lock( &job->mutex );
341                                        TRY
342                                         {
343                                                if( job->flags & FSD_JOB_MISSING )
344                                                        job->on_missing( job );
345                                         }
346                                        FINALLY{ fsd_mutex_unlock( &job->mutex ); }
347                                        END_TRY
348                                 }
349                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
350                 }
351         }
352        FINALLY
353         {
354                if( status != NULL )
355                        pbs_statfree( status );
356                if( conn_lock )
357                        conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
358                if( jobs_lock )
359                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
360         }
361        END_TRY
362
363        fsd_log_return((""));
364}
365
366
367
368struct attrl *
369pbsdrmaa_create_status_attrl(void)
370{
371        struct attrl *result = NULL;
372        struct attrl *i;
373        const int max_attribs = 16;
374        int n_attribs;
375        int j = 0;
376
377        fsd_log_enter((""));
378        fsd_calloc( result, max_attribs, struct attrl );
379        result[j++].name="job_state";
380        result[j++].name="exit_status";
381        result[j++].name="resources_used";
382        result[j++].name="ctime";
383        result[j++].name="mtime";
384        result[j++].name="qtime";
385        result[j++].name="etime";
386
387        result[j++].name="queue";
388        result[j++].name="Account_Name";
389        result[j++].name="exec_host";
390        result[j++].name="start_time";
391        result[j++].name="mtime";
392#if 0
393        result[j].name="resources_used";  result[j].resource="walltime";  j++;
394        result[j].name="resources_used";  result[j].resource="cput";  j++;
395        result[j].name="resources_used";  result[j].resource="mem";  j++;
396        result[j].name="resources_used";  result[j].resource="vmem";  j++;
397        result[j].name="Resource_List";  result[j].resource="walltime";  j++;
398        result[j].name="Resource_List";  result[j].resource="cput";  j++;
399        result[j].name="Resource_List";  result[j].resource="mem";  j++;
400        result[j].name="Resource_List";  result[j].resource="vmem";  j++;
401#endif
402        n_attribs = j;
403        for( i = result;  true;  i++ )
404                if( i+1 < result + n_attribs )
405                        i->next = i+1;
406                else
407                 {
408                        i->next = NULL;
409                        break;
410                 }
411
412#ifdef DEBUGGING
413        fsd_log_return((":"));
414        pbsdrmaa_dump_attrl( result, NULL );
415#endif
416        return result;
417}
418
419
420bool
421pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self )
422{
423
424#ifndef PBS_PROFESSIONAL
425        struct attrl default_queue_query;
426        struct attrl keep_completed_query;
427        struct batch_status *default_queue_result = NULL;
428        struct batch_status *keep_completed_result = NULL;
429        const char *default_queue = NULL;
430        const char *keep_completed = NULL;
431        volatile bool result = false;
432        volatile bool conn_lock = false;
433
434        TRY
435         {
436                default_queue_query.next = NULL;
437                default_queue_query.name = "default_queue";
438                default_queue_query.resource = NULL;
439                default_queue_query.value = NULL;
440                keep_completed_query.next = NULL;
441                keep_completed_query.name = "keep_completed";
442                keep_completed_query.resource = NULL;
443                keep_completed_query.value = NULL;
444
445                conn_lock = fsd_mutex_lock( &self->super.drm_connection_mutex );
446
447                default_queue_result =
448                                pbs_statserver( self->pbs_conn, &default_queue_query, NULL );
449                if( default_queue_result == NULL )
450                        pbsdrmaa_exc_raise_pbs( "pbs_statserver" );
451                if( default_queue_result->attribs
452                                &&  !strcmp( default_queue_result->attribs->name,
453                                        "default_queue" ) )
454                        default_queue = default_queue_result->attribs->value;
455
456                fsd_log_debug(( "default_queue: %s", default_queue ));
457
458                if( default_queue )
459                 {
460                        keep_completed_result = pbs_statque( self->pbs_conn,
461                                        (char*)default_queue, &keep_completed_query, NULL );
462                        if( keep_completed_result == NULL )
463                                pbsdrmaa_exc_raise_pbs( "pbs_statque" );
464                        if( keep_completed_result->attribs
465                                        &&  !strcmp( keep_completed_result->attribs->name,
466                                                "keep_completed" ) )
467                                keep_completed = keep_completed_result->attribs->value;
468                 }
469
470                fsd_log_debug(( "keep_completed: %s", keep_completed ));
471         }
472        EXCEPT_DEFAULT
473         {
474                const fsd_exc_t *e = fsd_exc_get();
475                fsd_log_warning(( "PBS server seems not to keep completed jobs\n"
476                                "detail: %s", e->message(e) ));
477                result = false;
478         }
479        ELSE
480         {
481                result = false;
482                if( default_queue == NULL )
483                        fsd_log_warning(( "no default queue set on PBS server" ));
484                else if( keep_completed == NULL && self->pbs_home == NULL )
[12]485                        fsd_log_warning(( "Torque server is not configured to keep completed jobs\n"
[1]486                                                "in Torque: set keep_completed parameter of default queue\n"
487                                                "  $ qmgr -c 'set queue batch keep_completed = 60'\n"
488                                                " or configure DRMAA to utilize log files"
489                                                ));
490                else
491                        result = true;
492         }
493        FINALLY
494         {
495                if( default_queue_result )
496                        pbs_statfree( default_queue_result );
497                if( keep_completed_result )
498                        pbs_statfree( keep_completed_result );
499                if( conn_lock )
500                        conn_lock = fsd_mutex_unlock( &self->super.drm_connection_mutex );
501
502         }
503        END_TRY
[12]504
505        return result;
[1]506#endif
[12]507        fsd_log_warning(( "PBS Professional does not keep information about the completed jobs\n"
508                                " You must configure DRMAA to utilize log files in order to always get valid job exit status"
509                                ));
[1]510        return false;
511}
512
513void *
514pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
515{
[7]516        pbsdrmaa_log_reader_t *log_reader = NULL;
517       
[1]518        fsd_log_enter(( "" ));
[7]519       
[1]520        TRY
[7]521        {       
522                log_reader = pbsdrmaa_log_reader_new( self, NULL);
523                log_reader->read_log( log_reader );
524        }
[1]525        FINALLY
[7]526        {
527                pbsdrmaa_log_reader_destroy( log_reader );
528        }
[1]529        END_TRY
[7]530       
[1]531        fsd_log_return(( " =NULL" ));
532        return NULL;
533}
Note: See TracBrowser for help on using the repository browser.