source: trunk/pbs_drmaa/session.c @ 53

Revision 53, 12.8 KB checked in by mmamonski, 12 years ago (diff)

on site fixies

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pwd.h>
25#include <stdlib.h>
26#include <string.h>
27#include <unistd.h>
28#include <sys/select.h>
29#include <sys/stat.h>
30#include <sys/types.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48
49#include <errno.h>
50
51#ifndef lint
52static char rcsid[]
53#       ifdef __GNUC__
54                __attribute__ ((unused))
55#       endif
56        = "$Id$";
57#endif
58
59static void
60pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
61
62static void
63pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
64
65static fsd_job_t *
66pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
67
68static void
69pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
70
71static void
72*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
73
74static char *
75pbsdrmaa_session_run_impl(
76                fsd_drmaa_session_t *self,
77                const fsd_template_t *jt,
78                int bulk_idx
79                );
80
81static struct attrl *
82pbsdrmaa_create_status_attrl(void);
83
84
85fsd_drmaa_session_t *
86pbsdrmaa_session_new( const char *contact )
87{
88        pbsdrmaa_session_t *volatile self = NULL;
89
90        if( contact == NULL )
91                contact = "";
92        TRY
93         {
94                self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
95                fsd_realloc( self, 1, pbsdrmaa_session_t );
96                self->super_wait_thread = NULL;
97
98                self->log_file_initial_size = 0;
99                self->pbs_conn = -1;
100                self->pbs_home = NULL;
101
102                self->wait_thread_log = false;
103                self->status_attrl = NULL;
104               
105                self->super_destroy = self->super.destroy;
106                self->super.destroy = pbsdrmaa_session_destroy;
107                self->super.new_job = pbsdrmaa_session_new_job;
108                self->super.update_all_jobs_status
109                                = pbsdrmaa_session_update_all_jobs_status;
110                self->super.run_impl = pbsdrmaa_session_run_impl;
111
112                self->super_apply_configuration = self->super.apply_configuration;
113                self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
114
115                self->status_attrl = pbsdrmaa_create_status_attrl();
116                self->max_retries_count = 3;
117                self->wait_thread_sleep_time = 1;
118                self->job_exit_status_file_prefix = NULL;
119
120                self->super.load_configuration( &self->super, "pbs_drmaa" );
121
122                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
123
124                 {
125                        int tries_left = self->max_retries_count;
126                        int sleep_time = 1;
127retry_connect: /* Life... */
128                        self->pbs_conn = pbs_connect( self->super.contact );
129                        fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn ));
130                        if( self->pbs_conn < 0 && tries_left-- )
131                         {
132                                sleep(sleep_time++);
133                                goto retry_connect;
134                         }
135
136                        if( self->pbs_conn < 0 )
137                                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
138                 }
139
140         }
141        EXCEPT_DEFAULT
142         {
143                if( self )
144                  {
145                        self->super.destroy( &self->super );
146                        self = NULL;
147                  }
148
149                fsd_exc_reraise();
150         }
151        END_TRY
152        return (fsd_drmaa_session_t*)self;
153}
154
155
156void
157pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
158{
159        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
160        self->stop_wait_thread( self );
161        if( pbsself->pbs_conn >= 0 )
162                pbs_disconnect( pbsself->pbs_conn );
163        fsd_free( pbsself->status_attrl );
164        fsd_free( pbsself->job_exit_status_file_prefix );
165
166        pbsself->super_destroy( self );
167}
168
169
170static char *
171pbsdrmaa_session_run_impl(
172                fsd_drmaa_session_t *self,
173                const fsd_template_t *jt,
174                int bulk_idx
175                )
176{
177        char *volatile job_id = NULL;
178        fsd_job_t *volatile job = NULL;
179        pbsdrmaa_submit_t *volatile submit = NULL;
180
181        fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
182        TRY
183         {
184                submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
185                submit->eval( submit );
186                job_id = submit->submit( submit );
187                job = self->new_job( self, job_id );
188                job->submit_time = time(NULL);
189                job->flags |= FSD_JOB_CURRENT_SESSION;
190                self->jobs->add( self->jobs, job );
191                job->release( job );  job = NULL;
192         }
193        EXCEPT_DEFAULT
194         {
195                fsd_free( job_id );
196                fsd_exc_reraise();
197         }
198        FINALLY
199         {
200                if( submit )
201                        submit->destroy( submit );
202                if( job )
203                        job->release( job );
204         }
205        END_TRY
206        fsd_log_return(( " =%s", job_id ));
207        return job_id;
208}
209
210
211static fsd_job_t *
212pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
213{
214        fsd_job_t *job;
215        job = pbsdrmaa_job_new( fsd_strdup(job_id) );
216        job->session = self;
217        return job;
218}
219
220void
221pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
222{
223        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
224        fsd_conf_option_t *pbs_home = NULL;
225        fsd_conf_option_t *wait_thread_sleep_time = NULL;
226        fsd_conf_option_t *max_retries_count = NULL;
227        fsd_conf_option_t *user_state_dir = NULL;
228
229        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
230        wait_thread_sleep_time = fsd_conf_dict_get(self->configuration, "wait_thread_sleep_time" );
231        max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" );
232        user_state_dir = fsd_conf_dict_get(self->configuration, "user_state_dir" );
233
234        if( pbs_home && pbs_home->type == FSD_CONF_STRING )
235          {
236                        struct stat statbuf;
237                        char * volatile log_path;
238                        struct tm tm;
239                       
240                        pbsself->pbs_home = pbs_home->val.string;
241                        fsd_log_info(("pbs_home: %s",pbsself->pbs_home));
242                        pbsself->super_wait_thread = pbsself->super.wait_thread;
243                        pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;             
244                        pbsself->wait_thread_log = true;
245       
246                        time(&pbsself->log_file_initial_time); 
247                        localtime_r(&pbsself->log_file_initial_time,&tm);
248
249                        log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
250                                        pbsself->pbs_home,
251                                        tm.tm_year + 1900,
252                                        tm.tm_mon + 1,
253                                        tm.tm_mday);
254
255                        if(stat(log_path,&statbuf) == -1)
256                          {
257                                char errbuf[256] = "InternalError";
258                                (void)strerror_r(errno, errbuf, sizeof(errbuf));
259                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error on file %s: %s", log_path, errbuf);
260                          }
261       
262                        fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
263                        pbsself->log_file_initial_size = statbuf.st_size;
264                        fsd_free(log_path);
265          }
266
267        if ( max_retries_count && max_retries_count->type == FSD_CONF_INTEGER)
268          {
269                pbsself->max_retries_count = max_retries_count->val.integer;
270                fsd_log_info(("Max retries count: %d", pbsself->max_retries_count));
271          }
272
273        if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER)
274          {
275                pbsself->wait_thread_sleep_time = wait_thread_sleep_time->val.integer;
276                fsd_log_info(("Wait thread sleep time: %d", pbsself->wait_thread_sleep_time));
277          }
278
279        if( user_state_dir && user_state_dir->type == FSD_CONF_STRING )
280          {
281                struct passwd *pw = NULL;
282                uid_t uid;
283
284                uid = geteuid();
285                pw = getpwuid(uid); /* drmaa_init is always called in thread safely fashion */
286
287                if (!pw)
288                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to get pw_name of the user %d", uid);
289
290                pbsself->job_exit_status_file_prefix = fsd_asprintf(user_state_dir->val.string, pw->pw_name);
291          }
292        else
293          {
294                pbsself->job_exit_status_file_prefix = fsd_asprintf("%s/.drmaa", getenv("HOME"));
295          }
296
297        fsd_log_debug(("Trying to create state directory: %s", pbsself->job_exit_status_file_prefix));
298
299        if (mkdir(pbsself->job_exit_status_file_prefix, 0700) == -1 && errno != EEXIST) /* TODO it would be much better to do stat before */
300          {
301                fsd_log_warning(("Failed to create job state directory: %s. Valid job exit status may not be available in some cases.", pbsself->job_exit_status_file_prefix));
302          }
303
304
305        /* TODO purge old exit statuses files */
306
307        pbsself->super_apply_configuration(self); /* call method from the superclass */
308}
309
310
311void
312pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
313{
314        volatile bool conn_lock = false;
315        volatile bool jobs_lock = false;
316        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
317        fsd_job_set_t *jobs = self->jobs;
318        struct batch_status *volatile status = NULL;
319        volatile int tries_left = pbsself->max_retries_count;
320        volatile int sleep_time = 1;
321
322        fsd_log_enter((""));
323
324        TRY
325         {
326                conn_lock = fsd_mutex_lock( &self->drm_connection_mutex );
327retry:
328/* TODO: query only for user's jobs pbs_selstat + ATTR_u */
329#ifdef PBS_PROFESSIONAL
330                status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL );
331#else
332                status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL );
333#endif
334                fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status ));
335                if( status == NULL  &&  pbs_errno != 0 )
336                 {
337                        if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED)
338                         {
339                                if ( pbsself->pbs_conn >= 0)
340                                        pbs_disconnect( pbsself->pbs_conn );
341retry_connect:
342                                sleep(sleep_time++);
343                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
344                                if( pbsself->pbs_conn < 0)
345                                 {
346                                        if (tries_left--)
347                                                goto retry_connect;
348                                        else
349                                                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
350                                 }
351                                else
352                                        goto retry;
353                         }
354                        else
355                         {
356                                pbsdrmaa_exc_raise_pbs( "pbs_statjob" );
357                         }
358                 }
359                conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
360
361                 {
362                        size_t i;
363                        fsd_job_t *job;
364                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
365                        for( i = 0;  i < jobs->tab_size;  i++ )
366                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
367                                 {
368                                        fsd_mutex_lock( &job->mutex );
369                                        job->flags |= FSD_JOB_MISSING;
370                                        fsd_mutex_unlock( &job->mutex );
371                                 }
372                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
373                 }
374
375                 {
376                        struct batch_status *volatile i;
377                        for( i = status;  i != NULL;  i = i->next )
378                         {
379                                fsd_job_t *job = NULL;
380                                fsd_log_debug(( "job_id=%s", i->name ));
381                                job = self->get_job( self, i->name );
382                                if( job != NULL )
383                                 {
384                                        job->flags &= ~FSD_JOB_MISSING;
385                                        TRY
386                                         {
387                                                ((pbsdrmaa_job_t*)job)->update( job, i );
388                                         }
389                                        FINALLY
390                                         {
391                                                job->release( job );
392                                         }
393                                        END_TRY
394                                 }
395                         }
396                 }
397
398                 {
399                        size_t volatile i;
400                        fsd_job_t *volatile job;
401                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
402                        for( i = 0;  i < jobs->tab_size;  i++ )
403                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
404                                 {
405                                        fsd_mutex_lock( &job->mutex );
406                                        TRY
407                                         {
408                                                if( job->flags & FSD_JOB_MISSING )
409                                                        job->on_missing( job );
410                                         }
411                                        FINALLY{ fsd_mutex_unlock( &job->mutex ); }
412                                        END_TRY
413                                 }
414                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
415                 }
416         }
417        FINALLY
418         {
419                if( status != NULL )
420                        pbs_statfree( status );
421                if( conn_lock )
422                        conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
423                if( jobs_lock )
424                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
425         }
426        END_TRY
427
428        fsd_log_return((""));
429}
430
431
432
433struct attrl *
434pbsdrmaa_create_status_attrl(void)
435{
436        struct attrl *result = NULL;
437        struct attrl *i;
438        const int max_attribs = 16;
439        int n_attribs;
440        int j = 0;
441
442        fsd_log_enter((""));
443        fsd_calloc( result, max_attribs, struct attrl );
444        result[j++].name="job_state";
445        result[j++].name="exit_status";
446        result[j++].name="resources_used";
447        result[j++].name="ctime";
448        result[j++].name="mtime";
449        result[j++].name="qtime";
450        result[j++].name="etime";
451
452        result[j++].name="queue";
453        result[j++].name="Account_Name";
454        result[j++].name="exec_host";
455        result[j++].name="start_time";
456        result[j++].name="mtime";
457#if 0
458        result[j].name="resources_used";  result[j].resource="walltime";  j++;
459        result[j].name="resources_used";  result[j].resource="cput";  j++;
460        result[j].name="resources_used";  result[j].resource="mem";  j++;
461        result[j].name="resources_used";  result[j].resource="vmem";  j++;
462        result[j].name="Resource_List";  result[j].resource="walltime";  j++;
463        result[j].name="Resource_List";  result[j].resource="cput";  j++;
464        result[j].name="Resource_List";  result[j].resource="mem";  j++;
465        result[j].name="Resource_List";  result[j].resource="vmem";  j++;
466#endif
467        n_attribs = j;
468        for( i = result;  true;  i++ )
469                if( i+1 < result + n_attribs )
470                        i->next = i+1;
471                else
472                 {
473                        i->next = NULL;
474                        break;
475                 }
476
477#ifdef DEBUGGING
478        fsd_log_return((":"));
479        pbsdrmaa_dump_attrl( result, NULL );
480#endif
481        return result;
482}
483
484void *
485pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
486{
487        pbsdrmaa_log_reader_t *log_reader = NULL;
488       
489        fsd_log_enter(( "" ));
490       
491        TRY
492        {       
493                log_reader = pbsdrmaa_log_reader_new( self );
494                log_reader->read_log( log_reader );
495        }
496        FINALLY
497        {
498                pbsdrmaa_log_reader_destroy( log_reader );
499        }
500        END_TRY
501       
502        fsd_log_return(( " =NULL" ));
503        return NULL;
504}
Note: See TracBrowser for help on using the repository browser.