source: trunk/pbs_drmaa/session.c @ 58

Revision 58, 12.9 KB checked in by mmamonski, 12 years ago (diff)

on site fixies

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pwd.h>
25#include <stdlib.h>
26#include <string.h>
27#include <unistd.h>
28#include <sys/select.h>
29#include <sys/stat.h>
30#include <sys/types.h>
31#include <fcntl.h>
32#include <signal.h>
33
34#include <pbs_ifl.h>
35#include <pbs_error.h>
36
37#include <drmaa_utils/datetime.h>
38#include <drmaa_utils/drmaa.h>
39#include <drmaa_utils/iter.h>
40#include <drmaa_utils/conf.h>
41#include <drmaa_utils/session.h>
42#include <drmaa_utils/datetime.h>
43
44#include <pbs_drmaa/job.h>
45#include <pbs_drmaa/log_reader.h>
46#include <pbs_drmaa/session.h>
47#include <pbs_drmaa/submit.h>
48#include <pbs_drmaa/util.h>
49
50#include <errno.h>
51
52#ifndef lint
53static char rcsid[]
54#       ifdef __GNUC__
55                __attribute__ ((unused))
56#       endif
57        = "$Id$";
58#endif
59
60static void
61pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
62
63static void
64pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
65
66static fsd_job_t *
67pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
68
69static void
70pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
71
72static void
73*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
74
75static char *
76pbsdrmaa_session_run_impl(
77                fsd_drmaa_session_t *self,
78                const fsd_template_t *jt,
79                int bulk_idx
80                );
81
82static struct attrl *
83pbsdrmaa_create_status_attrl(void);
84
85
86fsd_drmaa_session_t *
87pbsdrmaa_session_new( const char *contact )
88{
89        pbsdrmaa_session_t *volatile self = NULL;
90
91        if( contact == NULL )
92                contact = "";
93        TRY
94         {
95                self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
96                fsd_realloc( self, 1, pbsdrmaa_session_t );
97                self->super_wait_thread = NULL;
98
99                self->log_file_initial_size = 0;
100                self->pbs_conn = -1;
101                self->pbs_home = NULL;
102
103                self->wait_thread_log = false;
104                self->status_attrl = NULL;
105               
106                self->super_destroy = self->super.destroy;
107                self->super.destroy = pbsdrmaa_session_destroy;
108                self->super.new_job = pbsdrmaa_session_new_job;
109                self->super.update_all_jobs_status
110                                = pbsdrmaa_session_update_all_jobs_status;
111                self->super.run_impl = pbsdrmaa_session_run_impl;
112
113                self->super_apply_configuration = self->super.apply_configuration;
114                self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
115
116                self->status_attrl = pbsdrmaa_create_status_attrl();
117                self->max_retries_count = 3;
118                self->wait_thread_sleep_time = 1;
119                self->job_exit_status_file_prefix = NULL;
120
121                self->super.load_configuration( &self->super, "pbs_drmaa" );
122
123                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
124
125                 {
126                        int tries_left = self->max_retries_count;
127                        int sleep_time = 1;
128                        /*ignore SIGPIPE - otheriwse pbs_disconnect cause the program to exit */
129                        signal(SIGPIPE, SIG_IGN);
130retry_connect: /* Life... */
131                        self->pbs_conn = pbs_connect( self->super.contact );
132                        fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn ));
133                        if( self->pbs_conn < 0 && tries_left-- )
134                         {
135                                sleep(sleep_time++);
136                                goto retry_connect;
137                         }
138
139                        if( self->pbs_conn < 0 )
140                                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
141                 }
142
143         }
144        EXCEPT_DEFAULT
145         {
146                if( self )
147                  {
148                        self->super.destroy( &self->super );
149                        self = NULL;
150                  }
151
152                fsd_exc_reraise();
153         }
154        END_TRY
155        return (fsd_drmaa_session_t*)self;
156}
157
158
159void
160pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
161{
162        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
163        self->stop_wait_thread( self );
164        if( pbsself->pbs_conn >= 0 )
165                pbs_disconnect( pbsself->pbs_conn );
166        fsd_free( pbsself->status_attrl );
167        fsd_free( pbsself->job_exit_status_file_prefix );
168
169        pbsself->super_destroy( self );
170}
171
172
173static char *
174pbsdrmaa_session_run_impl(
175                fsd_drmaa_session_t *self,
176                const fsd_template_t *jt,
177                int bulk_idx
178                )
179{
180        char *volatile job_id = NULL;
181        fsd_job_t *volatile job = NULL;
182        pbsdrmaa_submit_t *volatile submit = NULL;
183
184        fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
185        TRY
186         {
187                submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
188                submit->eval( submit );
189                job_id = submit->submit( submit );
190                job = self->new_job( self, job_id );
191                job->submit_time = time(NULL);
192                job->flags |= FSD_JOB_CURRENT_SESSION;
193                self->jobs->add( self->jobs, job );
194                job->release( job );  job = NULL;
195         }
196        EXCEPT_DEFAULT
197         {
198                fsd_free( job_id );
199                fsd_exc_reraise();
200         }
201        FINALLY
202         {
203                if( submit )
204                        submit->destroy( submit );
205                if( job )
206                        job->release( job );
207         }
208        END_TRY
209        fsd_log_return(( " =%s", job_id ));
210        return job_id;
211}
212
213
214static fsd_job_t *
215pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
216{
217        fsd_job_t *job;
218        job = pbsdrmaa_job_new( fsd_strdup(job_id) );
219        job->session = self;
220        return job;
221}
222
223void
224pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
225{
226        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
227        fsd_conf_option_t *pbs_home = NULL;
228        fsd_conf_option_t *wait_thread_sleep_time = NULL;
229        fsd_conf_option_t *max_retries_count = NULL;
230        fsd_conf_option_t *user_state_dir = NULL;
231
232        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
233        wait_thread_sleep_time = fsd_conf_dict_get(self->configuration, "wait_thread_sleep_time" );
234        max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" );
235        user_state_dir = fsd_conf_dict_get(self->configuration, "user_state_dir" );
236
237        if( pbs_home && pbs_home->type == FSD_CONF_STRING )
238          {
239                        struct stat statbuf;
240                        char * volatile log_path;
241                        struct tm tm;
242                       
243                        pbsself->pbs_home = pbs_home->val.string;
244                        fsd_log_info(("pbs_home: %s",pbsself->pbs_home));
245                        pbsself->super_wait_thread = pbsself->super.wait_thread;
246                        pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;             
247                        pbsself->wait_thread_log = true;
248       
249                        time(&pbsself->log_file_initial_time); 
250                        localtime_r(&pbsself->log_file_initial_time,&tm);
251
252                        log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
253                                        pbsself->pbs_home,
254                                        tm.tm_year + 1900,
255                                        tm.tm_mon + 1,
256                                        tm.tm_mday);
257
258                        if(stat(log_path,&statbuf) == -1)
259                          {
260                                char errbuf[256] = "InternalError";
261                                (void)strerror_r(errno, errbuf, sizeof(errbuf));
262                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error on file %s: %s", log_path, errbuf);
263                          }
264       
265                        fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
266                        pbsself->log_file_initial_size = statbuf.st_size;
267                        fsd_free(log_path);
268          }
269
270        if ( max_retries_count && max_retries_count->type == FSD_CONF_INTEGER)
271          {
272                pbsself->max_retries_count = max_retries_count->val.integer;
273                fsd_log_info(("Max retries count: %d", pbsself->max_retries_count));
274          }
275
276        if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER)
277          {
278                pbsself->wait_thread_sleep_time = wait_thread_sleep_time->val.integer;
279                fsd_log_info(("Wait thread sleep time: %d", pbsself->wait_thread_sleep_time));
280          }
281
282        if( user_state_dir && user_state_dir->type == FSD_CONF_STRING )
283          {
284                struct passwd *pw = NULL;
285                uid_t uid;
286
287                uid = geteuid();
288                pw = getpwuid(uid); /* drmaa_init is always called in thread safely fashion */
289
290                if (!pw)
291                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to get pw_name of the user %d", uid);
292
293                pbsself->job_exit_status_file_prefix = fsd_asprintf(user_state_dir->val.string, pw->pw_name);
294          }
295        else
296          {
297                pbsself->job_exit_status_file_prefix = fsd_asprintf("%s/.drmaa", getenv("HOME"));
298          }
299
300        fsd_log_debug(("Trying to create state directory: %s", pbsself->job_exit_status_file_prefix));
301
302        if (mkdir(pbsself->job_exit_status_file_prefix, 0700) == -1 && errno != EEXIST) /* TODO it would be much better to do stat before */
303          {
304                fsd_log_warning(("Failed to create job state directory: %s. Valid job exit status may not be available in some cases.", pbsself->job_exit_status_file_prefix));
305          }
306
307
308        /* TODO purge old exit statuses files */
309
310        pbsself->super_apply_configuration(self); /* call method from the superclass */
311}
312
313
314void
315pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
316{
317        volatile bool conn_lock = false;
318        volatile bool jobs_lock = false;
319        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
320        fsd_job_set_t *jobs = self->jobs;
321        struct batch_status *volatile status = NULL;
322        volatile int tries_left = pbsself->max_retries_count;
323        volatile int sleep_time = 1;
324
325        fsd_log_enter((""));
326
327        TRY
328         {
329                conn_lock = fsd_mutex_lock( &self->drm_connection_mutex );
330retry:
331/* TODO: query only for user's jobs pbs_selstat + ATTR_u */
332#ifdef PBS_PROFESSIONAL
333                status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL );
334#else
335                status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL );
336#endif
337                fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status ));
338                if( status == NULL  &&  pbs_errno != 0 )
339                 {
340                        if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED)
341                         {
342                                if ( pbsself->pbs_conn >= 0)
343                                        pbs_disconnect( pbsself->pbs_conn );
344retry_connect:
345                                sleep(sleep_time++);
346                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
347                                if( pbsself->pbs_conn < 0)
348                                 {
349                                        if (tries_left--)
350                                                goto retry_connect;
351                                        else
352                                                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
353                                 }
354                                else
355                                        goto retry;
356                         }
357                        else
358                         {
359                                pbsdrmaa_exc_raise_pbs( "pbs_statjob" );
360                         }
361                 }
362                conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
363
364                 {
365                        size_t i;
366                        fsd_job_t *job;
367                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
368                        for( i = 0;  i < jobs->tab_size;  i++ )
369                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
370                                 {
371                                        fsd_mutex_lock( &job->mutex );
372                                        job->flags |= FSD_JOB_MISSING;
373                                        fsd_mutex_unlock( &job->mutex );
374                                 }
375                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
376                 }
377
378                 {
379                        struct batch_status *volatile i;
380                        for( i = status;  i != NULL;  i = i->next )
381                         {
382                                fsd_job_t *job = NULL;
383                                fsd_log_debug(( "job_id=%s", i->name ));
384                                job = self->get_job( self, i->name );
385                                if( job != NULL )
386                                 {
387                                        job->flags &= ~FSD_JOB_MISSING;
388                                        TRY
389                                         {
390                                                ((pbsdrmaa_job_t*)job)->update( job, i );
391                                         }
392                                        FINALLY
393                                         {
394                                                job->release( job );
395                                         }
396                                        END_TRY
397                                 }
398                         }
399                 }
400
401                 {
402                        size_t volatile i;
403                        fsd_job_t *volatile job;
404                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
405                        for( i = 0;  i < jobs->tab_size;  i++ )
406                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
407                                 {
408                                        fsd_mutex_lock( &job->mutex );
409                                        TRY
410                                         {
411                                                if( job->flags & FSD_JOB_MISSING )
412                                                        job->on_missing( job );
413                                         }
414                                        FINALLY{ fsd_mutex_unlock( &job->mutex ); }
415                                        END_TRY
416                                 }
417                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
418                 }
419         }
420        FINALLY
421         {
422                if( status != NULL )
423                        pbs_statfree( status );
424                if( conn_lock )
425                        conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
426                if( jobs_lock )
427                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
428         }
429        END_TRY
430
431        fsd_log_return((""));
432}
433
434
435
436struct attrl *
437pbsdrmaa_create_status_attrl(void)
438{
439        struct attrl *result = NULL;
440        struct attrl *i;
441        const int max_attribs = 16;
442        int n_attribs;
443        int j = 0;
444
445        fsd_log_enter((""));
446        fsd_calloc( result, max_attribs, struct attrl );
447        result[j++].name="job_state";
448        result[j++].name="exit_status";
449        result[j++].name="resources_used";
450        result[j++].name="ctime";
451        result[j++].name="mtime";
452        result[j++].name="qtime";
453        result[j++].name="etime";
454
455        result[j++].name="queue";
456        result[j++].name="Account_Name";
457        result[j++].name="exec_host";
458        result[j++].name="start_time";
459        result[j++].name="mtime";
460#if 0
461        result[j].name="resources_used";  result[j].resource="walltime";  j++;
462        result[j].name="resources_used";  result[j].resource="cput";  j++;
463        result[j].name="resources_used";  result[j].resource="mem";  j++;
464        result[j].name="resources_used";  result[j].resource="vmem";  j++;
465        result[j].name="Resource_List";  result[j].resource="walltime";  j++;
466        result[j].name="Resource_List";  result[j].resource="cput";  j++;
467        result[j].name="Resource_List";  result[j].resource="mem";  j++;
468        result[j].name="Resource_List";  result[j].resource="vmem";  j++;
469#endif
470        n_attribs = j;
471        for( i = result;  true;  i++ )
472                if( i+1 < result + n_attribs )
473                        i->next = i+1;
474                else
475                 {
476                        i->next = NULL;
477                        break;
478                 }
479
480#ifdef DEBUGGING
481        fsd_log_return((":"));
482        pbsdrmaa_dump_attrl( result, NULL );
483#endif
484        return result;
485}
486
487void *
488pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
489{
490        pbsdrmaa_log_reader_t *log_reader = NULL;
491       
492        fsd_log_enter(( "" ));
493       
494        TRY
495        {       
496                log_reader = pbsdrmaa_log_reader_new( self );
497                log_reader->read_log( log_reader );
498        }
499        FINALLY
500        {
501                pbsdrmaa_log_reader_destroy( log_reader );
502        }
503        END_TRY
504       
505        fsd_log_return(( " =NULL" ));
506        return NULL;
507}
Note: See TracBrowser for help on using the repository browser.