source: trunk/drmaa_utils/drmaa_utils/fsd_session.c @ 1

Revision 1, 23.6 KB checked in by mmamonski, 13 years ago (diff)

Torque/PBS DRMAA initial commit

Line 
1/* $Id: fsd_session.c 359 2010-11-02 17:47:19Z mamonski $ */
2/*
3 * FedStage DRMAA utilities library
4 * Copyright (C) 2006-2008  FedStage Systems
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <ctype.h>
21#include <stdlib.h>
22#include <string.h>
23
24#include <drmaa_utils/conf.h>
25#include <drmaa_utils/drmaa.h>
26#include <drmaa_utils/iter.h>
27#include <drmaa_utils/job.h>
28#include <drmaa_utils/session.h>
29
30#ifndef lint
31static char rcsid[]
32#       ifdef __GNUC__
33                __attribute__ ((unused))
34#       endif
35        = "$Id: fsd_session.c 359 2010-11-02 17:47:19Z mamonski $";
36#endif
37
38
39static void
40fsd_drmaa_session_release( fsd_drmaa_session_t *self );
41
42static void
43fsd_drmaa_session_destroy(
44                fsd_drmaa_session_t *self );
45
46static void
47fsd_drmaa_session_destroy_nowait( fsd_drmaa_session_t *self );
48
49static char*
50fsd_drmaa_session_run_job(
51                fsd_drmaa_session_t *self,
52                const fsd_template_t *jt
53                );
54
55static fsd_iter_t*
56fsd_drmaa_session_run_bulk(
57                fsd_drmaa_session_t *self,
58                const fsd_template_t *jt,
59                int start, int end, int incr
60                );
61
62static void
63fsd_drmaa_session_control_job(
64                fsd_drmaa_session_t *self,
65                const char *job_id, int action
66                );
67
68static void
69fsd_drmaa_session_job_ps(
70                fsd_drmaa_session_t *self,
71                const char *job_id, int *remote_ps
72                );
73
74static void
75fsd_drmaa_session_synchronize(
76                fsd_drmaa_session_t *self,
77                const char **input_job_ids, const struct timespec *timeout,
78                bool dispose
79                );
80
81static char*
82fsd_drmaa_session_wait(
83                fsd_drmaa_session_t *self,
84                const char *job_id, const struct timespec *timeout,
85                int *status, fsd_iter_t **rusage
86                );
87
88static fsd_job_t *
89fsd_drmaa_session_new_job(
90                fsd_drmaa_session_t *self,
91                const char *job_id
92                );
93
94static char*
95fsd_drmaa_session_run_impl(
96                fsd_drmaa_session_t *self,
97                const fsd_template_t *jt, int bulk_incr
98                );
99
100static void
101fsd_drmaa_session_wait_for_single_job(
102                fsd_drmaa_session_t *self,
103                const char *job_id, const struct timespec *timeout,
104                int *status, fsd_iter_t **rusage, bool dispose
105                );
106
107static char*
108fsd_drmaa_session_wait_for_any_job(
109                fsd_drmaa_session_t *self,
110                const struct timespec *timeout,
111                int *status, fsd_iter_t **rusage,
112                bool dispose
113                );
114
115static void
116fsd_drmaa_session_wait_for_job_status_change(
117                fsd_drmaa_session_t *self,
118                fsd_cond_t *wait_condition,
119                fsd_mutex_t *mutex,
120                const struct timespec *timeout
121                );
122
123static void*
124fsd_drmaa_session_wait_thread( fsd_drmaa_session_t *self );
125
126static void
127fsd_drmaa_session_stop_wait_thread( fsd_drmaa_session_t *self );
128
129static void
130fsd_drmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
131
132static char**
133fsd_drmaa_session_get_submited_job_ids(
134                fsd_drmaa_session_t *self
135                );
136
137static fsd_job_t*
138fsd_drmaa_session_get_job(
139                fsd_drmaa_session_t *self, const char *job_id
140                );
141
142static void
143fsd_drmaa_session_load_configuration(
144                fsd_drmaa_session_t *self, const char *basename
145                );
146
147static void
148fsd_drmaa_session_read_configuration(
149                fsd_drmaa_session_t *self,
150                const char *filename, bool must_exist,
151                const char *configuration, size_t config_len
152                );
153
154static void
155fsd_drmaa_session_apply_configuration(
156                fsd_drmaa_session_t *self
157                );
158
159
160
161fsd_drmaa_session_t *
162fsd_drmaa_session_new( const char *contact )
163{
164        fsd_drmaa_session_t *volatile self = NULL;
165
166        fsd_log_enter(( "(%s)", contact ));
167        TRY
168         {
169                fsd_malloc( self, fsd_drmaa_session_t );
170
171                self->release = fsd_drmaa_session_release;
172                self->destroy = fsd_drmaa_session_destroy;
173                self->destroy_nowait = fsd_drmaa_session_destroy_nowait;
174                self->run_job = fsd_drmaa_session_run_job;
175                self->run_bulk = fsd_drmaa_session_run_bulk;
176                self->control_job = fsd_drmaa_session_control_job;
177                self->job_ps = fsd_drmaa_session_job_ps;
178                self->synchronize = fsd_drmaa_session_synchronize;
179                self->wait = fsd_drmaa_session_wait;
180                self->new_job = fsd_drmaa_session_new_job;
181                self->run_impl = fsd_drmaa_session_run_impl;
182                self->wait_for_single_job = fsd_drmaa_session_wait_for_single_job;
183                self->wait_for_any_job = fsd_drmaa_session_wait_for_any_job;
184                self->wait_for_job_status_change =
185                        fsd_drmaa_session_wait_for_job_status_change;
186                self->wait_thread = fsd_drmaa_session_wait_thread;
187                self->stop_wait_thread = fsd_drmaa_session_stop_wait_thread;
188                self->update_all_jobs_status = fsd_drmaa_session_update_all_jobs_status;
189                self->get_submited_job_ids = fsd_drmaa_session_get_submited_job_ids;
190                self->get_job = fsd_drmaa_session_get_job;
191                self->load_configuration = fsd_drmaa_session_load_configuration;
192                self->read_configuration = fsd_drmaa_session_read_configuration;
193                self->apply_configuration = fsd_drmaa_session_apply_configuration;
194
195                self->ref_cnt = 1;
196                self->destroy_requested = false;
197                self->contact = NULL;
198                self->jobs = NULL;
199                self->configuration = NULL;
200                self->pool_delay.tv_sec = 5;
201                self->pool_delay.tv_nsec = 0;
202                self->cache_job_state = 0;
203                self->enable_wait_thread = true;
204                self->job_categories = NULL;
205                self->missing_jobs = FSD_REVEAL_MISSING_JOBS;
206                self->wait_thread_started = false;
207                self->wait_thread_run_flag = false;
208
209                fsd_mutex_init( &self->mutex );
210                fsd_cond_init( &self->wait_condition );
211                fsd_cond_init( &self->destroy_condition );
212                fsd_mutex_init( &self->drm_connection_mutex );
213                self->jobs = fsd_job_set_new();
214                self->contact = fsd_strdup( contact );
215         }
216        EXCEPT_DEFAULT
217         {
218                if( self != NULL )
219                        self->destroy( self );
220                fsd_exc_reraise();
221         }
222        END_TRY
223
224        return self;
225}
226
227
228void
229fsd_drmaa_session_release( fsd_drmaa_session_t *self )
230{
231        fsd_mutex_lock( &self->mutex );
232        self->ref_cnt--;
233        fsd_assert( self->ref_cnt > 0 );
234        if( self->ref_cnt == 1 )
235                fsd_cond_broadcast( &self->destroy_condition );
236        fsd_mutex_unlock( &self->mutex );
237}
238
239
240void
241fsd_drmaa_session_destroy( fsd_drmaa_session_t *self )
242{
243        bool already_destroying = false;
244
245        fsd_log_enter(( "" ));
246        fsd_mutex_lock( &self->mutex );
247        TRY
248         {
249                if( self->destroy_requested )
250                        already_destroying = true;
251                else
252                 {
253                        self->destroy_requested = true;
254                        fsd_cond_broadcast( &self->wait_condition );
255                 }
256         }
257        FINALLY
258         { fsd_mutex_unlock( &self->mutex ); }
259        END_TRY
260
261        if( already_destroying )
262         { /* XXX: actually it can not happen in current implementation
263                                when using DRMAA API */
264                self->release( self );
265                fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
266         }
267
268        self->jobs->signal_all( self->jobs );
269
270        fsd_mutex_lock( &self->mutex );
271        TRY
272         {
273                while( self->ref_cnt > 1 )
274                        fsd_cond_wait( &self->destroy_condition, &self->mutex );
275                fsd_log_debug(("started = %d  run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
276                if( self->wait_thread_started )
277                        self->stop_wait_thread( self );
278         }
279        FINALLY
280         { fsd_mutex_unlock( &self->mutex ); }
281        END_TRY
282
283        self->destroy_nowait( self );
284        fsd_log_return(( "" ));
285}
286
287
288void
289fsd_drmaa_session_destroy_nowait( fsd_drmaa_session_t *self )
290{
291        fsd_log_enter(( "" ));
292        fsd_conf_dict_destroy( self->configuration );
293        fsd_free( self->contact );
294
295        if( self->jobs )
296                self->jobs->destroy( self->jobs );
297
298        fsd_mutex_destroy( &self->mutex );
299        fsd_cond_destroy( &self->wait_condition );
300        fsd_cond_destroy( &self->destroy_condition );
301        fsd_mutex_destroy( &self->drm_connection_mutex );
302
303        fsd_free( self );
304        fsd_log_return(( "" ));
305}
306
307
308char *
309fsd_drmaa_session_run_job(
310                fsd_drmaa_session_t *self,
311                const fsd_template_t *jt )
312{
313        return self->run_impl( self, jt, -1 );
314}
315
316
317fsd_iter_t *
318fsd_drmaa_session_run_bulk(
319                fsd_drmaa_session_t *self,
320                const fsd_template_t *jt,
321                int start, int end, int incr )
322{
323        volatile unsigned n_jobs;
324        char **volatile result = NULL;
325
326        if( incr > 0 )
327                n_jobs = (end-start) / incr + 1;
328        else
329                n_jobs = (start-end) / -incr + 1;
330
331        TRY
332         {
333                unsigned i;
334                int idx;
335                fsd_calloc( result, n_jobs + 1, char* );
336                for( i=0, idx=start;  i < n_jobs;  i++, idx+=incr )
337                        result[i] = self->run_impl( self, jt, idx );
338         }
339        EXCEPT_DEFAULT
340         {
341                if( result )
342                        fsd_free_vector( result );
343                fsd_exc_reraise();
344         }
345        END_TRY
346
347        return fsd_iter_new( result, -1 );
348}
349
350
351void
352fsd_drmaa_session_control_job(
353                fsd_drmaa_session_t *self,
354                const char *job_id, int action )
355{
356        char **job_ids = NULL;
357        char **i;
358
359        TRY
360         {
361                if( !strcmp( job_id, DRMAA_JOB_IDS_SESSION_ALL ) )
362                        job_ids = self->get_submited_job_ids( self );
363                else
364                 {
365                        fsd_calloc( job_ids, 2, char* );
366                        job_ids[0] = fsd_strdup( job_id );
367                 }
368
369                for( i = job_ids;  *i != NULL;  i++ )
370                 {
371                        fsd_job_t *job = NULL;
372                        TRY
373                         {
374                                job = self->get_job( self, *i );
375                                if( job == NULL )
376                                 {
377                                        if( !strcmp( job_id, DRMAA_JOB_IDS_SESSION_ALL ) )
378                                         { /* job was just removed from session */ }
379                                        else
380                                                job = self->new_job( self, *i );
381                                 }
382                                if( job )
383                                        job->control( job, action );
384                         }
385                        FINALLY
386                         {
387                                if ( job )
388                                 job->release( job );
389                         }
390                        END_TRY
391                 }
392         }
393        FINALLY
394         {
395                fsd_free_vector( job_ids );
396         }
397        END_TRY
398}
399
400
401void
402fsd_drmaa_session_job_ps(
403                fsd_drmaa_session_t *self, const char *job_id, int *remote_ps )
404{
405        fsd_job_t *volatile job = NULL;
406        TRY
407         {
408                job = self->get_job( self, job_id );
409                if( job == NULL )
410                        job = self->new_job( self, job_id );
411                fsd_log_debug((" job->last_update_time = %u",  (unsigned int)job->last_update_time));
412                if( time(NULL) - job->last_update_time >= self->cache_job_state
413                                || job->state == DRMAA_PS_UNDETERMINED )
414                  {
415                        fsd_log_debug(("updating status of job: %s ", job_id));
416                        job->update_status( job );
417                        job->last_update_time = time(NULL);
418                  }
419                *remote_ps = job->state;
420         }
421        FINALLY
422         {
423                if( job )
424                        job->release( job );
425         }
426        END_TRY
427}
428
429
430void
431fsd_drmaa_session_synchronize(
432                fsd_drmaa_session_t *self,
433                const char **input_job_ids, const struct timespec *timeout,
434                bool dispose
435                )
436{
437        volatile bool wait_for_all = false;
438        char **volatile job_ids_buf = NULL;
439        const char **job_ids = NULL;
440        const char **i;
441
442        fsd_log_enter(( "(job_ids={...}, timeout=..., dispose=%d)",
443                        (int)dispose ));
444
445        if( input_job_ids == NULL )
446                fsd_exc_raise_code( FSD_ERRNO_INVALID_ARGUMENT );
447
448        TRY
449         {
450                for( i = input_job_ids;  *i != NULL;  i++ )
451                        if( !strcmp(*i, DRMAA_JOB_IDS_SESSION_ALL) )
452                                wait_for_all = true;
453
454                if( wait_for_all )
455                 {
456                        job_ids_buf = self->get_submited_job_ids( self );
457                        job_ids = (const char**)job_ids_buf;
458                 }
459                else
460                        job_ids = input_job_ids;
461
462                for( i = job_ids;  *i != NULL;  i++ )
463                        TRY
464                         {
465                                self->wait_for_single_job( self, *i, timeout, NULL, NULL, dispose );
466                         }
467                        EXCEPT( FSD_DRMAA_ERRNO_INVALID_JOB )
468                         { /* job was ripped by another thread */ }
469                        END_TRY
470         }
471        FINALLY
472         {
473                fsd_free_vector( job_ids_buf );
474         }
475        END_TRY
476}
477
478
479char *
480fsd_drmaa_session_wait(
481                fsd_drmaa_session_t *self,
482                const char *job_id, const struct timespec *timeout,
483                int *stat, fsd_iter_t **rusage
484                )
485{
486        if( 0==strcmp(job_id, DRMAA_JOB_IDS_SESSION_ANY) )
487                return self->wait_for_any_job( self, timeout, stat, rusage, true );
488        else
489         {
490                self->wait_for_single_job( self, job_id, timeout, stat, rusage, true );
491                return fsd_strdup( job_id );
492         }
493}
494
495
496fsd_job_t *
497fsd_drmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
498{
499        fsd_job_t *job;
500        job = fsd_job_new( fsd_strdup(job_id) );
501        job->session = self;
502        return job;
503}
504
505
506char *
507fsd_drmaa_session_run_impl(
508                fsd_drmaa_session_t *self,
509                const fsd_template_t *jt, int bulk_incr )
510{
511        fsd_exc_raise_code( FSD_ERRNO_NOT_IMPLEMENTED );
512}
513
514
515void
516fsd_drmaa_session_wait_for_single_job(
517                fsd_drmaa_session_t *self,
518                const char *job_id, const struct timespec *timeout,
519                int *status, fsd_iter_t **rusage,
520                bool dispose
521                )
522{
523        fsd_job_t *volatile job = NULL;
524        volatile bool locked = false;
525
526        fsd_log_enter(( "(%s)", job_id ));
527        TRY
528         {
529                job = self->get_job( self, job_id );
530                if( job == NULL )
531                        fsd_exc_raise_fmt( FSD_DRMAA_ERRNO_INVALID_JOB,
532                                        "Job '%s' not found in DRMS queue", job_id );
533                job->update_status( job );
534                while( !self->destroy_requested  &&  job->state < DRMAA_PS_DONE )
535                 {
536                        bool signaled = true;
537                        fsd_log_debug(( "fsd_drmaa_session_wait_for_single_job: "
538                                                "waiting for %s to terminate", job_id ));
539                        if( self->enable_wait_thread )
540                         {
541                                if( timeout )
542                                        signaled = fsd_cond_timedwait(
543                                                        &job->status_cond, &job->mutex, timeout );
544                                else
545                                 {
546                                        fsd_cond_wait( &job->status_cond, &job->mutex );
547                                 }
548                                if( !signaled )
549                                        fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
550                         }
551                        else
552                         {
553                                self->wait_for_job_status_change(
554                                                self, &job->status_cond, &job->mutex, timeout );
555                         }
556
557                        fsd_log_debug(( "fsd_drmaa_session_wait_for_single_job: woken up" ));
558                        if( !self->enable_wait_thread )
559                                job->update_status( job );
560                 }
561
562                if( self->destroy_requested )
563                        fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
564
565                job->get_termination_status( job, status, rusage );
566                if( dispose )
567                 {
568                        job->release( job ); /*release mutex in order to ensure proper order of locking: first job_set mutex then job mutex */
569
570                        locked = fsd_mutex_lock( &self->mutex );
571
572                        job = self->get_job( self, job_id );
573                        if (job != NULL)
574                         {
575                                self->jobs->remove( self->jobs, job );
576                                job->flags |= FSD_JOB_DISPOSED;
577                         }
578                        else
579                         {
580                                fsd_log_error(("Some other thread has already reaped job %s", job_id ));
581                         }
582
583                        locked = fsd_mutex_unlock( &self->mutex );
584                 }
585         }
586        FINALLY
587         {
588                if ( job )
589                        job->release( job );
590                if ( locked )
591                        fsd_mutex_unlock( &self->mutex );
592         }
593        END_TRY
594        fsd_log_return((""));
595}
596
597
598char *
599fsd_drmaa_session_wait_for_any_job(
600                fsd_drmaa_session_t *self,
601                const struct timespec *timeout,
602                int *status, fsd_iter_t **rusage,
603                bool dispose
604                )
605{
606        fsd_job_set_t *set = self->jobs;
607        fsd_job_t *volatile job = NULL;
608        char *volatile job_id = NULL;
609        volatile bool locked = false;
610
611        fsd_log_enter(( "" ));
612
613        TRY
614         {
615                while( job == NULL )
616                 {
617                        bool signaled = true;
618
619                        if( self->destroy_requested )
620                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
621
622                        if( !self->enable_wait_thread )
623                                self->update_all_jobs_status( self );
624
625                        locked = fsd_mutex_lock( &self->mutex );
626                        if( set->empty( set ) )
627                                fsd_exc_raise_msg( FSD_DRMAA_ERRNO_INVALID_JOB,
628                                                "No job found to be waited for" );
629
630                        if( (job = set->find_terminated( set )) != NULL )
631                                break;
632
633                        if( self->destroy_requested )
634                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
635                        if( self->enable_wait_thread )
636                         {
637                                fsd_log_debug(( "wait_for_any_job: waiting for wait thread" ));
638                                if( timeout )
639                                        signaled = fsd_cond_timedwait(
640                                                        &self->wait_condition, &self->mutex, timeout );
641                                else
642                                        fsd_cond_wait( &self->wait_condition, &self->mutex );
643                         }
644                        else
645                         {
646                                fsd_log_debug(( "wait_for_any_job: waiting for next check" ));
647                                self->wait_for_job_status_change( self,
648                                                &self->wait_condition, &self->mutex, timeout );
649                         }
650                        locked = fsd_mutex_unlock( &self->mutex );
651                        fsd_log_debug((
652                                                "wait_for_any_job: woken up; signaled=%d", signaled ));
653
654                        if( !signaled )
655                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
656
657                 }
658                fsd_log_debug(( "wait_for_any_job: waiting finished" ));
659
660                job_id = fsd_strdup( job->job_id );
661                job->get_termination_status( job, status, rusage );
662         }
663        EXCEPT_DEFAULT
664         {
665                if( job_id )
666                        fsd_free( job_id );
667                fsd_exc_reraise();
668         }
669        FINALLY
670         {
671                if( job )
672                 {
673                        if( fsd_exc_get() == NULL  &&  dispose )
674                         {
675                                set->remove( set, job );
676                                job->flags |= FSD_JOB_DISPOSED;
677                         }
678                        job->release( job );
679                 }
680                if( locked )
681                        fsd_mutex_unlock( &self->mutex );
682         }
683        END_TRY
684
685        fsd_log_return(( " =%s", job_id ));
686        return job_id;
687}
688
689
690void
691fsd_drmaa_session_wait_for_job_status_change(
692                fsd_drmaa_session_t *self,
693                fsd_cond_t *wait_condition,
694                fsd_mutex_t *mutex,
695                const struct timespec *timeout
696                )
697{
698        struct timespec ts, *next_check = &ts;
699        bool status_changed;
700
701        if( timeout )
702                fsd_log_enter((
703                                        "(timeout=%ld.%09ld)",
704                                        timeout->tv_sec, timeout->tv_nsec ));
705        else
706                fsd_log_enter(( "(timeout=(null))" ));
707        fsd_get_time( next_check );
708        fsd_ts_add( next_check, &self->pool_delay );
709        if( timeout  &&  fsd_ts_cmp( timeout, next_check ) < 0 )
710                next_check = (struct timespec*)timeout;
711        fsd_log_debug(( "wait_for_job_status_change: waiting untill %ld.%09ld",
712                                next_check->tv_sec, next_check->tv_nsec ));
713        status_changed = fsd_cond_timedwait(
714                        wait_condition, mutex, next_check );
715        if( !status_changed  &&  next_check == timeout )
716                fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
717
718        fsd_log_return(( ": next_check=%ld.%09ld, status_changed=%d",
719                                next_check->tv_sec, next_check->tv_nsec,
720                                (int)status_changed
721                                ));
722}
723
724
725void *
726fsd_drmaa_session_wait_thread( fsd_drmaa_session_t *self )
727{
728        struct timespec ts, *next_check = &ts;
729        bool volatile locked = false;
730
731        fsd_log_enter(( "" ));
732        locked = fsd_mutex_lock( &self->mutex );
733        TRY
734         {
735                while( self->wait_thread_run_flag )
736                        TRY
737                         {
738                                fsd_log_debug(( "wait thread: next iteration" ));
739                                self->update_all_jobs_status( self );
740                                fsd_cond_broadcast( &self->wait_condition );
741                               
742                                fsd_get_time( next_check );
743                                fsd_ts_add( next_check, &self->pool_delay );
744                                fsd_cond_timedwait( &self->wait_condition, &self->mutex, next_check );
745                               
746                         }
747                        EXCEPT_DEFAULT
748                         {
749                                const fsd_exc_t *e = fsd_exc_get();
750                                fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));
751                         }
752                        END_TRY
753         }
754        FINALLY
755         {
756                if (locked)
757                        fsd_mutex_unlock( &self->mutex );
758         }
759        END_TRY
760
761        fsd_log_return(( " =NULL" ));
762        return NULL;
763}
764
765
766void
767fsd_drmaa_session_stop_wait_thread( fsd_drmaa_session_t *self )
768{
769        volatile int lock_count = 0;
770        fsd_log_enter(( "" ));
771        fsd_mutex_lock( &self->mutex );
772        TRY
773         {
774                fsd_log_debug(("started = %d  run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
775                if( self->wait_thread_started )
776                 {
777                        self->wait_thread_run_flag = false;
778                        fsd_log_debug(("started = %d  run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
779                        fsd_cond_broadcast( &self->wait_condition );
780                        TRY
781                         {
782                                lock_count = fsd_mutex_unlock_times( &self->mutex );
783                                fsd_thread_join( self->wait_thread_handle, NULL );
784                         }
785                        FINALLY
786                         {
787                                int i;
788                                for( i = 0;  i < lock_count;  i++ )
789                                        fsd_mutex_lock( &self->mutex );
790                         }
791                        END_TRY
792                        self->wait_thread_started = false;
793                 }
794
795         }
796        FINALLY
797         { fsd_mutex_unlock( &self->mutex ); }
798        END_TRY
799        fsd_log_return(( "" ));
800}
801
802
803void
804fsd_drmaa_session_update_all_jobs_status(
805                fsd_drmaa_session_t *self )
806{
807        char **volatile job_ids = NULL;
808        fsd_log_enter(( "" ));
809        TRY
810         {
811                const char **i;
812                fsd_job_t *volatile job = NULL;
813                job_ids = self->get_submited_job_ids( self );
814                for( i = (const char **)job_ids;  *i;  i++ )
815                        TRY
816                         {
817                                job = self->get_job( self, *i );
818                                if( job )
819                                        job->update_status( job );
820                         }
821                        FINALLY
822                         {
823                                if( job )
824                                        job->release( job );
825                         }
826                        END_TRY
827         }
828        FINALLY
829         {
830                fsd_free_vector( job_ids );
831         }
832        END_TRY
833        fsd_log_return(( "" ));
834}
835
836
837char **
838fsd_drmaa_session_get_submited_job_ids( fsd_drmaa_session_t *self )
839{
840        return self->jobs->get_all_job_ids( self->jobs );
841}
842
843
844fsd_job_t *
845fsd_drmaa_session_get_job( fsd_drmaa_session_t *self, const char *job_id )
846{
847        return self->jobs->get( self->jobs, job_id );
848}
849
850
851void
852fsd_drmaa_session_load_configuration(
853                fsd_drmaa_session_t *self, const char *basename
854                )
855{
856        char *volatile system_conf = NULL;
857        char *volatile user_conf = NULL;
858        char *volatile varname = NULL;
859        TRY
860         {
861                const char *home;
862                const char *envvalue;
863                char *i;
864
865                system_conf = fsd_asprintf( "/etc/%s.conf", basename );
866
867                home = getenv( "HOME" );
868                if( home == NULL )
869                 { home = ""; }
870                user_conf = fsd_asprintf( "%s/.%s.conf", home, basename );
871
872                varname = fsd_asprintf( "%s_CONF", basename );
873                for( i = varname;  *i;  i++ )
874                        *i = toupper( *(unsigned char*)i );
875                envvalue = getenv( varname );
876
877                self->configuration = fsd_conf_read(
878                         self->configuration, system_conf, false, NULL, 0 );
879                self->configuration = fsd_conf_read(
880                         self->configuration, user_conf, false, NULL, 0 );
881                if( envvalue )
882                        self->configuration = fsd_conf_read(
883                                 self->configuration, envvalue, true, NULL, 0 );
884                self->apply_configuration( self );
885         }
886        FINALLY
887         {
888                fsd_free( system_conf );
889                fsd_free( user_conf );
890                fsd_free( varname );
891         }
892        END_TRY
893}
894
895
896void
897fsd_drmaa_session_read_configuration(
898                fsd_drmaa_session_t *self,
899                const char *filename, bool must_exist,
900                const char *configuration, size_t configuration_len
901                )
902{
903        self->configuration = fsd_conf_read(
904                 self->configuration,
905                 filename, must_exist,
906                 configuration, configuration_len );
907        self->apply_configuration( self );
908}
909
910
911void
912fsd_drmaa_session_apply_configuration( fsd_drmaa_session_t *self )
913{
914        fsd_conf_option_t *pool_delay = NULL;
915        fsd_conf_option_t *cache_job_state = NULL;
916        fsd_conf_option_t *wait_thread = NULL;
917        fsd_conf_option_t *job_categories = NULL;
918        fsd_conf_option_t *missing_jobs = NULL;
919
920        fsd_log_enter((""));
921        if( self->configuration  !=  NULL ) {
922
923                pool_delay = fsd_conf_dict_get(
924                                self->configuration, "pool_delay" );
925                cache_job_state = fsd_conf_dict_get(
926                                self->configuration, "cache_job_state" );
927                wait_thread = fsd_conf_dict_get(
928                                self->configuration, "wait_thread" );
929                job_categories = fsd_conf_dict_get(
930                                self->configuration, "job_categories" );
931                missing_jobs = fsd_conf_dict_get(
932                                self->configuration, "missing_jobs" );
933        }
934
935        if( pool_delay )
936         {
937                if( pool_delay->type == FSD_CONF_INTEGER
938                                &&  pool_delay->val.integer > 0 )
939                 {
940                        fsd_log_debug(("pool_delay=%d", pool_delay->val.integer));
941                        self->pool_delay.tv_sec = pool_delay->val.integer;
942                 }
943                else
944                        fsd_exc_raise_msg(
945                                        FSD_ERRNO_INTERNAL_ERROR,
946                                        "configuration: 'pool_delay' must be positive integer"
947                                        );
948         }
949        if( cache_job_state )
950         {
951                if( cache_job_state->type == FSD_CONF_INTEGER
952                                &&   cache_job_state->val.integer >= 0 )
953                 {
954                        fsd_log_debug(("cache_job_state=%d", cache_job_state->val.integer));
955                        self->cache_job_state = cache_job_state->val.integer;
956                 }
957                else
958                        fsd_exc_raise_msg(
959                                        FSD_ERRNO_INTERNAL_ERROR,
960                                        "configuration: 'cache_job_state' must be nonnegative integer"
961                                        );
962         }
963        if( wait_thread )
964         {
965                if( wait_thread->type == FSD_CONF_INTEGER )
966                 {
967
968                        fsd_log_debug(("wait_thread=%d", wait_thread->val.integer));
969                        self->enable_wait_thread = (wait_thread->val.integer != 0 );
970                 }
971                else
972                        fsd_exc_raise_msg(
973                                        FSD_ERRNO_INTERNAL_ERROR,
974                                        "configuration: 'wait_thread' should be 0 or 1"
975                                        );
976         }
977        if( job_categories )
978         {
979                if( job_categories->type == FSD_CONF_DICT )
980                        self->job_categories = job_categories->val.dict;
981                else
982                        fsd_exc_raise_msg(
983                                        FSD_ERRNO_INTERNAL_ERROR,
984                                        "configuration: 'job_categories' should be dictionary"
985                                        );
986         }
987        if( missing_jobs )
988         {
989                bool ok = true;
990                if( missing_jobs->type != FSD_CONF_STRING )
991                 {
992                        const char *value = missing_jobs->val.string;
993                        if( !strcmp( value, "ignore" ) )
994                                self->missing_jobs = FSD_IGNORE_MISSING_JOBS;
995                        else if( !strcmp( value, "ignore-queued" ) )
996                                self->missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS;
997                        else if( !strcmp( value, "reveal" ) )
998                                self->missing_jobs = FSD_REVEAL_MISSING_JOBS;
999                        else
1000                                ok = false;
1001                 }
1002                else
1003                        ok = false;
1004
1005                if( !ok )
1006                        fsd_exc_raise_msg(
1007                                        FSD_ERRNO_INTERNAL_ERROR,
1008                                        "configuration: 'missing_jobs' should be one of: "
1009                                        "'ignore', 'ignore-queued' or 'reveal'"
1010                                        );
1011         }
1012
1013        if( self->enable_wait_thread  &&  !self->wait_thread_started )
1014         {
1015                fsd_log_debug(("Starting wait thread"));
1016                self->wait_thread_run_flag = true;
1017                fsd_thread_create( &self->wait_thread_handle,
1018                                (void*(*)(void*))self->wait_thread, self );
1019                self->wait_thread_started = true;
1020                fsd_log_debug(( "wait thread started" ));
1021         }
1022        else if( !self->enable_wait_thread  &&  self->wait_thread_started )
1023         {
1024                fsd_log_debug(("Stopping wait thread"));
1025                self->stop_wait_thread( self );
1026         }
1027
1028        fsd_log_return((""));
1029}
1030
Note: See TracBrowser for help on using the repository browser.