source: trunk/pbs_drmaa/session.c @ 52

Revision 52, 12.7 KB checked in by mmamonski, 12 years ago (diff)

store job exit status in file

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <fcntl.h>
31
32#include <pbs_ifl.h>
33#include <pbs_error.h>
34
35#include <drmaa_utils/datetime.h>
36#include <drmaa_utils/drmaa.h>
37#include <drmaa_utils/iter.h>
38#include <drmaa_utils/conf.h>
39#include <drmaa_utils/session.h>
40#include <drmaa_utils/datetime.h>
41
42#include <pbs_drmaa/job.h>
43#include <pbs_drmaa/log_reader.h>
44#include <pbs_drmaa/session.h>
45#include <pbs_drmaa/submit.h>
46#include <pbs_drmaa/util.h>
47
48#include <errno.h>
49
50#ifndef lint
51static char rcsid[]
52#       ifdef __GNUC__
53                __attribute__ ((unused))
54#       endif
55        = "$Id$";
56#endif
57
58static void
59pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
60
61static void
62pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
63
64static fsd_job_t *
65pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
66
67static void
68pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
69
70static void
71*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
72
73static char *
74pbsdrmaa_session_run_impl(
75                fsd_drmaa_session_t *self,
76                const fsd_template_t *jt,
77                int bulk_idx
78                );
79
80static struct attrl *
81pbsdrmaa_create_status_attrl(void);
82
83
84fsd_drmaa_session_t *
85pbsdrmaa_session_new( const char *contact )
86{
87        pbsdrmaa_session_t *volatile self = NULL;
88
89        if( contact == NULL )
90                contact = "";
91        TRY
92         {
93                self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
94                fsd_realloc( self, 1, pbsdrmaa_session_t );
95                self->super_wait_thread = NULL;
96
97                self->log_file_initial_size = 0;
98                self->pbs_conn = -1;
99                self->pbs_home = NULL;
100
101                self->wait_thread_log = false;
102                self->status_attrl = NULL;
103               
104                self->super_destroy = self->super.destroy;
105                self->super.destroy = pbsdrmaa_session_destroy;
106                self->super.new_job = pbsdrmaa_session_new_job;
107                self->super.update_all_jobs_status
108                                = pbsdrmaa_session_update_all_jobs_status;
109                self->super.run_impl = pbsdrmaa_session_run_impl;
110
111                self->super_apply_configuration = self->super.apply_configuration;
112                self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
113
114                self->status_attrl = pbsdrmaa_create_status_attrl();
115                self->max_retries_count = 3;
116                self->wait_thread_sleep_time = 1;
117
118                self->super.load_configuration( &self->super, "pbs_drmaa" );
119
120                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
121
122                 {
123                        int tries_left = self->max_retries_count;
124                        int sleep_time = 1;
125retry_connect: /* Life... */
126                        self->pbs_conn = pbs_connect( self->super.contact );
127                        fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn ));
128                        if( self->pbs_conn < 0 && tries_left-- )
129                         {
130                                sleep(sleep_time++);
131                                goto retry_connect;
132                         }
133
134                        if( self->pbs_conn < 0 )
135                                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
136                 }
137
138                 self->job_exit_status_file_prefix = NULL;
139         }
140        EXCEPT_DEFAULT
141         {
142                if( self )
143                  {
144                        self->super.destroy( &self->super );
145                        self = NULL;
146                  }
147
148                fsd_exc_reraise();
149         }
150        END_TRY
151        return (fsd_drmaa_session_t*)self;
152}
153
154
155void
156pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
157{
158        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
159        self->stop_wait_thread( self );
160        if( pbsself->pbs_conn >= 0 )
161                pbs_disconnect( pbsself->pbs_conn );
162        fsd_free( pbsself->status_attrl );
163        fsd_free( pbsself->job_exit_status_file_prefix );
164
165        pbsself->super_destroy( self );
166}
167
168
169static char *
170pbsdrmaa_session_run_impl(
171                fsd_drmaa_session_t *self,
172                const fsd_template_t *jt,
173                int bulk_idx
174                )
175{
176        char *volatile job_id = NULL;
177        fsd_job_t *volatile job = NULL;
178        pbsdrmaa_submit_t *volatile submit = NULL;
179
180        fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
181        TRY
182         {
183                submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
184                submit->eval( submit );
185                job_id = submit->submit( submit );
186                job = self->new_job( self, job_id );
187                job->submit_time = time(NULL);
188                job->flags |= FSD_JOB_CURRENT_SESSION;
189                self->jobs->add( self->jobs, job );
190                job->release( job );  job = NULL;
191         }
192        EXCEPT_DEFAULT
193         {
194                fsd_free( job_id );
195                fsd_exc_reraise();
196         }
197        FINALLY
198         {
199                if( submit )
200                        submit->destroy( submit );
201                if( job )
202                        job->release( job );
203         }
204        END_TRY
205        fsd_log_return(( " =%s", job_id ));
206        return job_id;
207}
208
209
210static fsd_job_t *
211pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
212{
213        fsd_job_t *job;
214        job = pbsdrmaa_job_new( fsd_strdup(job_id) );
215        job->session = self;
216        return job;
217}
218
219void
220pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
221{
222        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
223        fsd_conf_option_t *pbs_home = NULL;
224        fsd_conf_option_t *wait_thread_sleep_time = NULL;
225        fsd_conf_option_t *max_retries_count = NULL;
226        fsd_conf_option_t *user_state_dir = NULL;
227
228        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
229        wait_thread_sleep_time = fsd_conf_dict_get(self->configuration, "wait_thread_sleep_time" );
230        max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" );
231        user_state_dir = fsd_conf_dict_get(self->configuration, "user_state_dir" );
232
233        if( pbs_home && pbs_home->type == FSD_CONF_STRING )
234          {
235                        struct stat statbuf;
236                        char * volatile log_path;
237                        struct tm tm;
238                       
239                        pbsself->pbs_home = pbs_home->val.string;
240                        fsd_log_info(("pbs_home: %s",pbsself->pbs_home));
241                        pbsself->super_wait_thread = pbsself->super.wait_thread;
242                        pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;             
243                        pbsself->wait_thread_log = true;
244       
245                        time(&pbsself->log_file_initial_time); 
246                        localtime_r(&pbsself->log_file_initial_time,&tm);
247
248                        log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
249                                        pbsself->pbs_home,
250                                        tm.tm_year + 1900,
251                                        tm.tm_mon + 1,
252                                        tm.tm_mday);
253
254                        if(stat(log_path,&statbuf) == -1)
255                          {
256                                char errbuf[256] = "InternalError";
257                                (void)strerror_r(errno, errbuf, sizeof(errbuf));
258                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error on file %s: %s", log_path, errbuf);
259                          }
260       
261                        fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
262                        pbsself->log_file_initial_size = statbuf.st_size;
263                        fsd_free(log_path);
264          }
265
266        if ( max_retries_count && max_retries_count->type == FSD_CONF_INTEGER)
267          {
268                pbsself->max_retries_count = max_retries_count->val.integer;
269                fsd_log_info(("Max retries count: %d", pbsself->max_retries_count));
270          }
271
272        if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER)
273          {
274                pbsself->wait_thread_sleep_time = wait_thread_sleep_time->val.integer;
275                fsd_log_info(("Wait thread sleep time: %d", pbsself->wait_thread_sleep_time));
276          }
277
278        if( user_state_dir && user_state_dir->type == FSD_CONF_STRING )
279          {
280                struct passwd *pw = NULL;
281                uid_t uid;
282
283                uid = geteuid();
284                pw = getpwuid(uid); /* drmaa_init is always called in thread safely fashion */
285
286                if (!pw)
287                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to get pw_name of the user %d", uid);
288
289                pbsself->job_exit_status_file_prefix = fsd_asprintf(user_state_dir->val.string, pw->pw_name);
290          }
291        else
292          {
293                pbsself->job_exit_status_file_prefix = fsd_asprintf("%s/.drmaa", getenv("HOME"));
294          }
295
296        if (mkdir(pbsself->job_exit_status_file_prefix, 0600) == -1 && errno != EEXIST) /* TODO it would be much better to do stat before */
297          {
298                fsd_log_warning("Failed to create job state directory: %s.  Valid job exit status may not be available in some cases.", pbsself->job_exit_status_file_prefix)
299          }
300
301
302        /* TODO purge old exit statuses files */
303
304        pbsself->super_apply_configuration(self); /* call method from the superclass */
305}
306
307
308void
309pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
310{
311        volatile bool conn_lock = false;
312        volatile bool jobs_lock = false;
313        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
314        fsd_job_set_t *jobs = self->jobs;
315        struct batch_status *volatile status = NULL;
316        volatile int tries_left = pbsself->max_retries_count;
317        volatile int sleep_time = 1;
318
319        fsd_log_enter((""));
320
321        TRY
322         {
323                conn_lock = fsd_mutex_lock( &self->drm_connection_mutex );
324retry:
325/* TODO: query only for user's jobs pbs_selstat + ATTR_u */
326#ifdef PBS_PROFESSIONAL
327                status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL );
328#else
329                status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL );
330#endif
331                fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status ));
332                if( status == NULL  &&  pbs_errno != 0 )
333                 {
334                        if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED)
335                         {
336                                if ( pbsself->pbs_conn >= 0)
337                                        pbs_disconnect( pbsself->pbs_conn );
338retry_connect:
339                                sleep(sleep_time++);
340                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
341                                if( pbsself->pbs_conn < 0)
342                                 {
343                                        if (tries_left--)
344                                                goto retry_connect;
345                                        else
346                                                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
347                                 }
348                                else
349                                        goto retry;
350                         }
351                        else
352                         {
353                                pbsdrmaa_exc_raise_pbs( "pbs_statjob" );
354                         }
355                 }
356                conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
357
358                 {
359                        size_t i;
360                        fsd_job_t *job;
361                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
362                        for( i = 0;  i < jobs->tab_size;  i++ )
363                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
364                                 {
365                                        fsd_mutex_lock( &job->mutex );
366                                        job->flags |= FSD_JOB_MISSING;
367                                        fsd_mutex_unlock( &job->mutex );
368                                 }
369                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
370                 }
371
372                 {
373                        struct batch_status *volatile i;
374                        for( i = status;  i != NULL;  i = i->next )
375                         {
376                                fsd_job_t *job = NULL;
377                                fsd_log_debug(( "job_id=%s", i->name ));
378                                job = self->get_job( self, i->name );
379                                if( job != NULL )
380                                 {
381                                        job->flags &= ~FSD_JOB_MISSING;
382                                        TRY
383                                         {
384                                                ((pbsdrmaa_job_t*)job)->update( job, i );
385                                         }
386                                        FINALLY
387                                         {
388                                                job->release( job );
389                                         }
390                                        END_TRY
391                                 }
392                         }
393                 }
394
395                 {
396                        size_t volatile i;
397                        fsd_job_t *volatile job;
398                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
399                        for( i = 0;  i < jobs->tab_size;  i++ )
400                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
401                                 {
402                                        fsd_mutex_lock( &job->mutex );
403                                        TRY
404                                         {
405                                                if( job->flags & FSD_JOB_MISSING )
406                                                        job->on_missing( job );
407                                         }
408                                        FINALLY{ fsd_mutex_unlock( &job->mutex ); }
409                                        END_TRY
410                                 }
411                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
412                 }
413         }
414        FINALLY
415         {
416                if( status != NULL )
417                        pbs_statfree( status );
418                if( conn_lock )
419                        conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
420                if( jobs_lock )
421                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
422         }
423        END_TRY
424
425        fsd_log_return((""));
426}
427
428
429
430struct attrl *
431pbsdrmaa_create_status_attrl(void)
432{
433        struct attrl *result = NULL;
434        struct attrl *i;
435        const int max_attribs = 16;
436        int n_attribs;
437        int j = 0;
438
439        fsd_log_enter((""));
440        fsd_calloc( result, max_attribs, struct attrl );
441        result[j++].name="job_state";
442        result[j++].name="exit_status";
443        result[j++].name="resources_used";
444        result[j++].name="ctime";
445        result[j++].name="mtime";
446        result[j++].name="qtime";
447        result[j++].name="etime";
448
449        result[j++].name="queue";
450        result[j++].name="Account_Name";
451        result[j++].name="exec_host";
452        result[j++].name="start_time";
453        result[j++].name="mtime";
454#if 0
455        result[j].name="resources_used";  result[j].resource="walltime";  j++;
456        result[j].name="resources_used";  result[j].resource="cput";  j++;
457        result[j].name="resources_used";  result[j].resource="mem";  j++;
458        result[j].name="resources_used";  result[j].resource="vmem";  j++;
459        result[j].name="Resource_List";  result[j].resource="walltime";  j++;
460        result[j].name="Resource_List";  result[j].resource="cput";  j++;
461        result[j].name="Resource_List";  result[j].resource="mem";  j++;
462        result[j].name="Resource_List";  result[j].resource="vmem";  j++;
463#endif
464        n_attribs = j;
465        for( i = result;  true;  i++ )
466                if( i+1 < result + n_attribs )
467                        i->next = i+1;
468                else
469                 {
470                        i->next = NULL;
471                        break;
472                 }
473
474#ifdef DEBUGGING
475        fsd_log_return((":"));
476        pbsdrmaa_dump_attrl( result, NULL );
477#endif
478        return result;
479}
480
481void *
482pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
483{
484        pbsdrmaa_log_reader_t *log_reader = NULL;
485       
486        fsd_log_enter(( "" ));
487       
488        TRY
489        {       
490                log_reader = pbsdrmaa_log_reader_new( self );
491                log_reader->read_log( log_reader );
492        }
493        FINALLY
494        {
495                pbsdrmaa_log_reader_destroy( log_reader );
496        }
497        END_TRY
498       
499        fsd_log_return(( " =NULL" ));
500        return NULL;
501}
Note: See TracBrowser for help on using the repository browser.