source: trunk/pbs_drmaa/session.c @ 93

Revision 93, 12.1 KB checked in by mmamonski, 11 years ago (diff)

Segfault on wait thread enabled

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pwd.h>
25#include <stdlib.h>
26#include <string.h>
27#include <unistd.h>
28#include <sys/select.h>
29#include <sys/stat.h>
30#include <sys/types.h>
31#include <fcntl.h>
32#include <signal.h>
33
34#include <pbs_ifl.h>
35#include <pbs_error.h>
36
37#include <drmaa_utils/datetime.h>
38#include <drmaa_utils/drmaa.h>
39#include <drmaa_utils/iter.h>
40#include <drmaa_utils/conf.h>
41#include <drmaa_utils/session.h>
42#include <drmaa_utils/datetime.h>
43
44#include <pbs_drmaa/job.h>
45#include <pbs_drmaa/log_reader.h>
46#include <pbs_drmaa/session.h>
47#include <pbs_drmaa/submit.h>
48#include <pbs_drmaa/util.h>
49
50#include <errno.h>
51
52#ifndef lint
53static char rcsid[]
54#       ifdef __GNUC__
55                __attribute__ ((unused))
56#       endif
57        = "$Id$";
58#endif
59
60static void
61pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
62
63static void
64pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
65
66static fsd_job_t *
67pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
68
69static void
70pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
71
72static void
73*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
74
75static char *
76pbsdrmaa_session_run_impl(
77                fsd_drmaa_session_t *self,
78                const fsd_template_t *jt,
79                int bulk_idx
80                );
81
82static struct attrl *
83pbsdrmaa_create_status_attrl(void);
84
85
86fsd_drmaa_session_t *
87pbsdrmaa_session_new( const char *contact )
88{
89        pbsdrmaa_session_t *volatile self = NULL;
90
91        if( contact == NULL )
92                contact = "";
93        TRY
94         {
95                self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
96                fsd_realloc( self, 1, pbsdrmaa_session_t );
97
98                fsd_mutex_lock( &self->super.mutex );
99
100                self->super_wait_thread = NULL;
101
102                self->log_file_initial_size = 0;
103                self->pbs_home = NULL;
104
105                self->wait_thread_log = false;
106                self->status_attrl = NULL;
107               
108                self->super_destroy = self->super.destroy;
109                self->super.destroy = pbsdrmaa_session_destroy;
110                self->super.new_job = pbsdrmaa_session_new_job;
111                self->super.update_all_jobs_status
112                                = pbsdrmaa_session_update_all_jobs_status;
113                self->super.run_impl = pbsdrmaa_session_run_impl;
114
115                self->super_apply_configuration = self->super.apply_configuration;
116                self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
117
118                self->status_attrl = pbsdrmaa_create_status_attrl();
119                self->max_retries_count = 3;
120                self->wait_thread_sleep_time = 1;
121                self->job_exit_status_file_prefix = NULL;
122
123                self->super.load_configuration( &self->super, "pbs_drmaa" );
124
125                self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
126
127                self->pbs_connection = pbsdrmaa_pbs_conn_new( (fsd_drmaa_session_t *)self, contact );
128                self->connection_max_lifetime =  30; /* 30 seconds */
129
130                fsd_mutex_unlock( &self->super.mutex );
131         }
132        EXCEPT_DEFAULT
133         {
134                if( self )
135                  {
136                        fsd_mutex_unlock( &self->super.mutex );
137                        self->super.destroy( &self->super );
138                        self = NULL;
139                  }
140
141                fsd_exc_reraise();
142         }
143        END_TRY
144        return (fsd_drmaa_session_t*)self;
145}
146
147
148void
149pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
150{
151        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
152        self->stop_wait_thread( self );
153        pbsdrmaa_pbs_conn_destroy(pbsself->pbs_connection);
154        fsd_free( pbsself->status_attrl );
155        fsd_free( pbsself->job_exit_status_file_prefix );
156
157        pbsself->super_destroy( self );
158}
159
160
161static char *
162pbsdrmaa_session_run_impl(
163                fsd_drmaa_session_t *self,
164                const fsd_template_t *jt,
165                int bulk_idx
166                )
167{
168        char *volatile job_id = NULL;
169        fsd_job_t *volatile job = NULL;
170        pbsdrmaa_submit_t *volatile submit = NULL;
171
172        fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
173        TRY
174         {
175                submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
176                submit->eval( submit );
177                job_id = submit->submit( submit );
178                job = self->new_job( self, job_id );
179                job->submit_time = time(NULL);
180                job->flags |= FSD_JOB_CURRENT_SESSION;
181                self->jobs->add( self->jobs, job );
182                job->release( job );  job = NULL;
183         }
184        EXCEPT_DEFAULT
185         {
186                fsd_free( job_id );
187                fsd_exc_reraise();
188         }
189        FINALLY
190         {
191                if( submit )
192                        submit->destroy( submit );
193                if( job )
194                        job->release( job );
195         }
196        END_TRY
197        fsd_log_return(( " =%s", job_id ));
198        return job_id;
199}
200
201
202static fsd_job_t *
203pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
204{
205        fsd_job_t *job;
206        job = pbsdrmaa_job_new( fsd_strdup(job_id) );
207        job->session = self;
208        return job;
209}
210
211void
212pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
213{
214        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
215        fsd_conf_option_t *pbs_home = NULL;
216        fsd_conf_option_t *wait_thread_sleep_time = NULL;
217        fsd_conf_option_t *max_retries_count = NULL;
218        fsd_conf_option_t *user_state_dir = NULL;
219        fsd_conf_option_t *connection_max_lifetime = NULL;
220
221
222        pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
223        wait_thread_sleep_time = fsd_conf_dict_get(self->configuration, "wait_thread_sleep_time" );
224        max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" );
225        user_state_dir = fsd_conf_dict_get(self->configuration, "user_state_dir" );
226        connection_max_lifetime = fsd_conf_dict_get(self->configuration, "connection_max_lifetime");
227
228        if( pbs_home && pbs_home->type == FSD_CONF_STRING )
229          {
230                        struct stat statbuf;
231                        char * volatile log_path;
232                        struct tm tm;
233                       
234                        pbsself->pbs_home = pbs_home->val.string;
235                        fsd_log_info(("pbs_home: %s",pbsself->pbs_home));
236                        pbsself->super_wait_thread = pbsself->super.wait_thread;
237                        pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;             
238                        pbsself->wait_thread_log = true;
239       
240                        time(&pbsself->log_file_initial_time); 
241                        localtime_r(&pbsself->log_file_initial_time,&tm);
242
243                        log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
244                                        pbsself->pbs_home,
245                                        tm.tm_year + 1900,
246                                        tm.tm_mon + 1,
247                                        tm.tm_mday);
248
249                        if(stat(log_path,&statbuf) == -1)
250                          {
251                                char errbuf[256] = "InternalError";
252                                (void)strerror_r(errno, errbuf, sizeof(errbuf));
253                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error on file %s: %s", log_path, errbuf);
254                          }
255       
256                        fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
257                        pbsself->log_file_initial_size = statbuf.st_size;
258                        fsd_free(log_path);
259          }
260
261        if ( max_retries_count && max_retries_count->type == FSD_CONF_INTEGER)
262          {
263                pbsself->max_retries_count = max_retries_count->val.integer;
264                fsd_log_info(("Max retries count: %d", pbsself->max_retries_count));
265          }
266
267        if ( connection_max_lifetime && connection_max_lifetime->type == FSD_CONF_INTEGER)
268          {
269                pbsself->connection_max_lifetime = connection_max_lifetime->val.integer;
270                fsd_log_info(("Max connection lifetime: %d", pbsself->connection_max_lifetime));
271          }
272
273        if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER)
274          {
275                pbsself->wait_thread_sleep_time = wait_thread_sleep_time->val.integer;
276                fsd_log_info(("Wait thread sleep time: %d", pbsself->wait_thread_sleep_time));
277          }
278
279        if( user_state_dir && user_state_dir->type == FSD_CONF_STRING )
280          {
281                struct passwd *pw = NULL;
282                uid_t uid;
283
284                uid = geteuid();
285                pw = getpwuid(uid); /* drmaa_init is always called in thread safely fashion */
286
287                if (!pw)
288                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to get pw_name of the user %d", uid);
289
290                pbsself->job_exit_status_file_prefix = fsd_asprintf(user_state_dir->val.string, pw->pw_name);
291          }
292        else
293          {
294                pbsself->job_exit_status_file_prefix = fsd_asprintf("%s/.drmaa", getenv("HOME"));
295          }
296
297        fsd_log_debug(("Trying to create state directory: %s", pbsself->job_exit_status_file_prefix));
298
299        if (mkdir(pbsself->job_exit_status_file_prefix, 0700) == -1 && errno != EEXIST) /* TODO it would be much better to do stat before */
300          {
301                fsd_log_warning(("Failed to create job state directory: %s. Valid job exit status may not be available in some cases.", pbsself->job_exit_status_file_prefix));
302          }
303
304
305        /* TODO purge old exit statuses files */
306
307        pbsself->super_apply_configuration(self); /* call method from the superclass */
308}
309
310
311void
312pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
313{
314        volatile bool jobs_lock = false;
315        pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
316        fsd_job_set_t *jobs = self->jobs;
317        struct batch_status *volatile status = NULL;
318
319        fsd_log_enter((""));
320
321        TRY
322         {
323
324/* TODO: query only for user's jobs pbs_selstat + ATTR_u */
325#ifdef PBS_PROFESSIONAL
326                status = pbsself->pbs_connection->statjob(pbsself->pbs_connection, NULL, NULL);
327#else
328                status = pbsself->pbs_connection->statjob(pbsself->pbs_connection, NULL, pbsself->status_attrl);
329#endif
330
331                 {
332                        size_t i;
333                        fsd_job_t *job;
334                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
335                        for( i = 0;  i < jobs->tab_size;  i++ )
336                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
337                                 {
338                                        fsd_mutex_lock( &job->mutex );
339                                        job->flags |= FSD_JOB_MISSING;
340                                        fsd_mutex_unlock( &job->mutex );
341                                 }
342                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
343                 }
344
345                 {
346                        struct batch_status *volatile i;
347                        for( i = status;  i != NULL;  i = i->next )
348                         {
349                                fsd_job_t *job = NULL;
350                                fsd_log_debug(( "job_id=%s", i->name ));
351                                job = self->get_job( self, i->name );
352                                if( job != NULL )
353                                 {
354                                        job->flags &= ~FSD_JOB_MISSING;
355                                        TRY
356                                         {
357                                                ((pbsdrmaa_job_t*)job)->update( job, i );
358                                         }
359                                        FINALLY
360                                         {
361                                                job->release( job );
362                                         }
363                                        END_TRY
364                                 }
365                         }
366                 }
367
368                 {
369                        size_t volatile i;
370                        fsd_job_t *volatile job;
371                        jobs_lock = fsd_mutex_lock( &jobs->mutex );
372                        for( i = 0;  i < jobs->tab_size;  i++ )
373                                for( job = jobs->tab[i];  job != NULL;  job = job->next )
374                                 {
375                                        fsd_mutex_lock( &job->mutex );
376                                        TRY
377                                         {
378                                                if( job->flags & FSD_JOB_MISSING )
379                                                        job->on_missing( job );
380                                         }
381                                        FINALLY{ fsd_mutex_unlock( &job->mutex ); }
382                                        END_TRY
383                                 }
384                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
385                 }
386         }
387        FINALLY
388         {
389                if( status != NULL )
390                         pbsself->pbs_connection->statjob_free(pbsself->pbs_connection, status );
391                if( jobs_lock )
392                        jobs_lock = fsd_mutex_unlock( &jobs->mutex );
393         }
394        END_TRY
395
396        fsd_log_return((""));
397}
398
399
400
401struct attrl *
402pbsdrmaa_create_status_attrl(void)
403{
404        struct attrl *result = NULL;
405        struct attrl *i;
406        const int max_attribs = 16;
407        int n_attribs;
408        int j = 0;
409
410        fsd_log_enter((""));
411        fsd_calloc( result, max_attribs, struct attrl );
412        result[j++].name="job_state";
413        result[j++].name="exit_status";
414        result[j++].name="resources_used";
415        result[j++].name="ctime";
416        result[j++].name="mtime";
417        result[j++].name="qtime";
418        result[j++].name="etime";
419
420        result[j++].name="queue";
421        result[j++].name="Account_Name";
422        result[j++].name="exec_host";
423        result[j++].name="start_time";
424        result[j++].name="mtime";
425#if 0
426        result[j].name="resources_used";  result[j].resource="walltime";  j++;
427        result[j].name="resources_used";  result[j].resource="cput";  j++;
428        result[j].name="resources_used";  result[j].resource="mem";  j++;
429        result[j].name="resources_used";  result[j].resource="vmem";  j++;
430        result[j].name="Resource_List";  result[j].resource="walltime";  j++;
431        result[j].name="Resource_List";  result[j].resource="cput";  j++;
432        result[j].name="Resource_List";  result[j].resource="mem";  j++;
433        result[j].name="Resource_List";  result[j].resource="vmem";  j++;
434#endif
435        n_attribs = j;
436        for( i = result;  true;  i++ )
437                if( i+1 < result + n_attribs )
438                        i->next = i+1;
439                else
440                 {
441                        i->next = NULL;
442                        break;
443                 }
444
445#ifdef DEBUGGING
446        fsd_log_return((":"));
447        pbsdrmaa_dump_attrl( result, NULL );
448#endif
449        return result;
450}
451
452void *
453pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
454{
455        pbsdrmaa_log_reader_t *log_reader = NULL;
456       
457        fsd_log_enter(( "" ));
458       
459        TRY
460        {       
461                log_reader = pbsdrmaa_log_reader_new( self );
462                log_reader->read_log( log_reader );
463        }
464        FINALLY
465        {
466                pbsdrmaa_log_reader_destroy( log_reader );
467        }
468        END_TRY
469       
470        fsd_log_return(( " =NULL" ));
471        return NULL;
472}
Note: See TracBrowser for help on using the repository browser.