source: branches/2.0/drmaa2_utils/drmaa_utils/fsd_session.c @ 77

Revision 77, 23.8 KB checked in by mmamonski, 12 years ago (diff)

DRMAA 2.0 utils - first skeleton

Line 
1/* $Id: fsd_session.c 28 2011-10-29 21:31:46Z mmamonski $ */
2/*
3 *  PSNC DRMAA 2.0 utilities library
4 *  Copyright (C) 2012  Poznan Supercomputing and Networking Center
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <ctype.h>
21#include <stdlib.h>
22#include <string.h>
23
24#include <drmaa_utils/conf.h>
25#include <drmaa_utils/drmaa.h>
26#include <drmaa_utils/iter.h>
27#include <drmaa_utils/job.h>
28#include <drmaa_utils/session.h>
29
30#ifndef lint
31static char rcsid[]
32#       ifdef __GNUC__
33                __attribute__ ((unused))
34#       endif
35        = "$Id: fsd_session.c 28 2011-10-29 21:31:46Z mmamonski $";
36#endif
37
38
39static void
40fsd_drmaa_session_release( fsd_drmaa_session_t *self );
41
42static void
43fsd_drmaa_session_destroy(
44                fsd_drmaa_session_t *self );
45
46static void
47fsd_drmaa_session_destroy_nowait( fsd_drmaa_session_t *self );
48
49static char*
50fsd_drmaa_session_run_job(
51                fsd_drmaa_session_t *self,
52                const fsd_template_t *jt
53                );
54
55static fsd_iter_t*
56fsd_drmaa_session_run_bulk(
57                fsd_drmaa_session_t *self,
58                const fsd_template_t *jt,
59                int start, int end, int incr
60                );
61
62static void
63fsd_drmaa_session_control_job(
64                fsd_drmaa_session_t *self,
65                const char *job_id, int action
66                );
67
68static void
69fsd_drmaa_session_job_ps(
70                fsd_drmaa_session_t *self,
71                const char *job_id, int *remote_ps
72                );
73
74static void
75fsd_drmaa_session_synchronize(
76                fsd_drmaa_session_t *self,
77                const char **input_job_ids, const struct timespec *timeout,
78                bool dispose
79                );
80
81static char*
82fsd_drmaa_session_wait(
83                fsd_drmaa_session_t *self,
84                const char *job_id, const struct timespec *timeout,
85                int *status, fsd_iter_t **rusage
86                );
87
88static fsd_job_t *
89fsd_drmaa_session_new_job(
90                fsd_drmaa_session_t *self,
91                const char *job_id
92                );
93
94static char*
95fsd_drmaa_session_run_impl(
96                fsd_drmaa_session_t *self,
97                const fsd_template_t *jt, int bulk_incr
98                );
99
100static void
101fsd_drmaa_session_wait_for_single_job(
102                fsd_drmaa_session_t *self,
103                const char *job_id, const struct timespec *timeout,
104                int *status, fsd_iter_t **rusage, bool dispose
105                );
106
107static char*
108fsd_drmaa_session_wait_for_any_job(
109                fsd_drmaa_session_t *self,
110                const struct timespec *timeout,
111                int *status, fsd_iter_t **rusage,
112                bool dispose
113                );
114
115static void
116fsd_drmaa_session_wait_for_job_status_change(
117                fsd_drmaa_session_t *self,
118                fsd_cond_t *wait_condition,
119                fsd_mutex_t *mutex,
120                const struct timespec *timeout
121                );
122
123static void*
124fsd_drmaa_session_wait_thread( fsd_drmaa_session_t *self );
125
126static void
127fsd_drmaa_session_stop_wait_thread( fsd_drmaa_session_t *self );
128
129static void
130fsd_drmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
131
132static char**
133fsd_drmaa_session_get_submited_job_ids(
134                fsd_drmaa_session_t *self
135                );
136
137static fsd_job_t*
138fsd_drmaa_session_get_job(
139                fsd_drmaa_session_t *self, const char *job_id
140                );
141
142static void
143fsd_drmaa_session_load_configuration(
144                fsd_drmaa_session_t *self, const char *basename
145                );
146
147static void
148fsd_drmaa_session_read_configuration(
149                fsd_drmaa_session_t *self,
150                const char *filename, bool must_exist,
151                const char *configuration, size_t config_len
152                );
153
154static void
155fsd_drmaa_session_apply_configuration(
156                fsd_drmaa_session_t *self
157                );
158
159
160
161fsd_drmaa_session_t *
162fsd_drmaa_session_new( const char *contact )
163{
164        fsd_drmaa_session_t *volatile self = NULL;
165
166        fsd_log_enter(( "(%s)", contact ));
167        TRY
168         {
169                fsd_malloc( self, fsd_drmaa_session_t );
170
171                self->release = fsd_drmaa_session_release;
172                self->destroy = fsd_drmaa_session_destroy;
173                self->destroy_nowait = fsd_drmaa_session_destroy_nowait;
174                self->run_job = fsd_drmaa_session_run_job;
175                self->run_bulk = fsd_drmaa_session_run_bulk;
176                self->control_job = fsd_drmaa_session_control_job;
177                self->job_ps = fsd_drmaa_session_job_ps;
178                self->synchronize = fsd_drmaa_session_synchronize;
179                self->wait = fsd_drmaa_session_wait;
180                self->new_job = fsd_drmaa_session_new_job;
181                self->run_impl = fsd_drmaa_session_run_impl;
182                self->wait_for_single_job = fsd_drmaa_session_wait_for_single_job;
183                self->wait_for_any_job = fsd_drmaa_session_wait_for_any_job;
184                self->wait_for_job_status_change =
185                        fsd_drmaa_session_wait_for_job_status_change;
186                self->wait_thread = fsd_drmaa_session_wait_thread;
187                self->stop_wait_thread = fsd_drmaa_session_stop_wait_thread;
188                self->update_all_jobs_status = fsd_drmaa_session_update_all_jobs_status;
189                self->get_submited_job_ids = fsd_drmaa_session_get_submited_job_ids;
190                self->get_job = fsd_drmaa_session_get_job;
191                self->load_configuration = fsd_drmaa_session_load_configuration;
192                self->read_configuration = fsd_drmaa_session_read_configuration;
193                self->apply_configuration = fsd_drmaa_session_apply_configuration;
194
195                self->ref_cnt = 1;
196                self->destroy_requested = false;
197                self->contact = NULL;
198                self->jobs = NULL;
199                self->configuration = NULL;
200                self->pool_delay.tv_sec = 10;
201                self->pool_delay.tv_nsec = 0;
202                self->cache_job_state = 0;
203                self->enable_wait_thread = false;
204                self->job_categories = NULL;
205                self->missing_jobs = FSD_REVEAL_MISSING_JOBS;
206                self->wait_thread_started = false;
207                self->wait_thread_run_flag = false;
208
209                fsd_mutex_init( &self->mutex );
210                fsd_cond_init( &self->wait_condition );
211                fsd_cond_init( &self->destroy_condition );
212                fsd_mutex_init( &self->drm_connection_mutex );
213                self->jobs = fsd_job_set_new();
214                self->contact = fsd_strdup( contact );
215               
216         }
217        EXCEPT_DEFAULT
218         {
219                if( self != NULL )
220                        self->destroy( self );
221                fsd_exc_reraise();
222         }
223        END_TRY
224
225        return self;
226}
227
228
229void
230fsd_drmaa_session_release( fsd_drmaa_session_t *self )
231{
232        fsd_mutex_lock( &self->mutex );
233        self->ref_cnt--;
234        fsd_assert( self->ref_cnt > 0 );
235        if( self->ref_cnt == 1 )
236                fsd_cond_broadcast( &self->destroy_condition );
237        fsd_mutex_unlock( &self->mutex );
238}
239
240
241void
242fsd_drmaa_session_destroy( fsd_drmaa_session_t *self )
243{
244        bool already_destroying = false;
245
246        fsd_log_enter(( "" ));
247        fsd_mutex_lock( &self->mutex );
248        TRY
249         {
250                if( self->destroy_requested )
251                        already_destroying = true;
252                else
253                 {
254                        self->destroy_requested = true;
255                        fsd_cond_broadcast( &self->wait_condition );
256                 }
257         }
258        FINALLY
259         { fsd_mutex_unlock( &self->mutex ); }
260        END_TRY
261
262        if( already_destroying )
263         { /* XXX: actually it can not happen in current implementation
264                                when using DRMAA API */
265                self->release( self );
266                fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
267         }
268
269        self->jobs->signal_all( self->jobs );
270
271        fsd_mutex_lock( &self->mutex );
272        TRY
273         {
274                while( self->ref_cnt > 1 )
275                        fsd_cond_wait( &self->destroy_condition, &self->mutex );
276                fsd_log_debug(("started = %d  run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
277                if( self->wait_thread_started )
278                        self->stop_wait_thread( self );
279         }
280        FINALLY
281         { fsd_mutex_unlock( &self->mutex ); }
282        END_TRY
283
284        self->destroy_nowait( self );
285        fsd_log_return(( "" ));
286}
287
288
289void
290fsd_drmaa_session_destroy_nowait( fsd_drmaa_session_t *self )
291{
292        fsd_log_enter(( "" ));
293        fsd_conf_dict_destroy( self->configuration );
294        fsd_free( self->contact );
295
296        if( self->jobs )
297                self->jobs->destroy( self->jobs );
298
299        fsd_mutex_destroy( &self->mutex );
300        fsd_cond_destroy( &self->wait_condition );
301        fsd_cond_destroy( &self->destroy_condition );
302        fsd_mutex_destroy( &self->drm_connection_mutex );
303
304        fsd_free( self );
305        fsd_log_return(( "" ));
306}
307
308
309char *
310fsd_drmaa_session_run_job(
311                fsd_drmaa_session_t *self,
312                const fsd_template_t *jt )
313{
314        return self->run_impl( self, jt, -1 );
315}
316
317
318fsd_iter_t *
319fsd_drmaa_session_run_bulk(
320                fsd_drmaa_session_t *self,
321                const fsd_template_t *jt,
322                int start, int end, int incr )
323{
324        volatile unsigned n_jobs;
325        char **volatile result = NULL;
326
327        if( incr > 0 )
328                n_jobs = (end-start) / incr + 1;
329        else
330                n_jobs = (start-end) / -incr + 1;
331
332        TRY
333         {
334                unsigned i;
335                int idx;
336                fsd_calloc( result, n_jobs + 1, char* );
337                for( i=0, idx=start;  i < n_jobs;  i++, idx+=incr )
338                        result[i] = self->run_impl( self, jt, idx );
339         }
340        EXCEPT_DEFAULT
341         {
342                if( result )
343                        fsd_free_vector( result );
344                fsd_exc_reraise();
345         }
346        END_TRY
347
348        return fsd_iter_new( result, -1 );
349}
350
351
352void
353fsd_drmaa_session_control_job(
354                fsd_drmaa_session_t *self,
355                const char *job_id, int action )
356{
357        char **job_ids = NULL;
358        char **i;
359
360        TRY
361         {
362                if( !strcmp( job_id, DRMAA_JOB_IDS_SESSION_ALL ) )
363                        job_ids = self->get_submited_job_ids( self );
364                else
365                 {
366                        fsd_calloc( job_ids, 2, char* );
367                        job_ids[0] = fsd_strdup( job_id );
368                 }
369
370                for( i = job_ids;  *i != NULL;  i++ )
371                 {
372                        fsd_job_t *job = NULL;
373                        TRY
374                         {
375                                job = self->get_job( self, *i );
376                                if( job == NULL )
377                                 {
378                                        if( !strcmp( job_id, DRMAA_JOB_IDS_SESSION_ALL ) )
379                                         { /* job was just removed from session */ }
380                                        else
381                                                job = self->new_job( self, *i );
382                                 }
383                                if( job )
384                                        job->control( job, action );
385                         }
386                        FINALLY
387                         {
388                                if ( job )
389                                 job->release( job );
390                         }
391                        END_TRY
392                 }
393         }
394        FINALLY
395         {
396                fsd_free_vector( job_ids );
397         }
398        END_TRY
399}
400
401
402void
403fsd_drmaa_session_job_ps(
404                fsd_drmaa_session_t *self, const char *job_id, int *remote_ps )
405{
406        fsd_job_t *volatile job = NULL;
407        TRY
408         {
409                job = self->get_job( self, job_id );
410                if( job == NULL )
411                 {
412                        fsd_log_info(( "job_ps: recreating job object: %s", job_id ));
413                        job = self->new_job( self, job_id );
414                 }
415                fsd_log_debug((" job->last_update_time = %u",  (unsigned int)job->last_update_time));
416                if( time(NULL) - job->last_update_time >= self->cache_job_state
417                                || job->state == DRMAA_PS_UNDETERMINED )
418                  {
419                        fsd_log_debug(("updating status of job: %s ", job_id));
420                        job->update_status( job );
421                        job->last_update_time = time(NULL);
422                  }
423                *remote_ps = job->state;
424         }
425        FINALLY
426         {
427                if( job )
428                        job->release( job );
429         }
430        END_TRY
431}
432
433
434void
435fsd_drmaa_session_synchronize(
436                fsd_drmaa_session_t *self,
437                const char **input_job_ids, const struct timespec *timeout,
438                bool dispose
439                )
440{
441        volatile bool wait_for_all = false;
442        char **volatile job_ids_buf = NULL;
443        const char **job_ids = NULL;
444        const char **i;
445
446        fsd_log_enter(( "(job_ids={...}, timeout=..., dispose=%d)",
447                        (int)dispose ));
448
449        if( input_job_ids == NULL )
450                fsd_exc_raise_code( FSD_ERRNO_INVALID_ARGUMENT );
451
452        TRY
453         {
454                for( i = input_job_ids;  *i != NULL;  i++ )
455                        if( !strcmp(*i, DRMAA_JOB_IDS_SESSION_ALL) )
456                                wait_for_all = true;
457
458                if( wait_for_all )
459                 {
460                        job_ids_buf = self->get_submited_job_ids( self );
461                        job_ids = (const char**)job_ids_buf;
462                 }
463                else
464                        job_ids = input_job_ids;
465
466                for( i = job_ids;  *i != NULL;  i++ )
467                        TRY
468                         {
469                                self->wait_for_single_job( self, *i, timeout, NULL, NULL, dispose );
470                         }
471                        EXCEPT( FSD_DRMAA_ERRNO_INVALID_JOB )
472                         { /* job was ripped by another thread */ }
473                        END_TRY
474         }
475        FINALLY
476         {
477                fsd_free_vector( job_ids_buf );
478         }
479        END_TRY
480}
481
482
483char *
484fsd_drmaa_session_wait(
485                fsd_drmaa_session_t *self,
486                const char *job_id, const struct timespec *timeout,
487                int *stat, fsd_iter_t **rusage
488                )
489{
490        if( 0==strcmp(job_id, DRMAA_JOB_IDS_SESSION_ANY) )
491                return self->wait_for_any_job( self, timeout, stat, rusage, true );
492        else
493         {
494                self->wait_for_single_job( self, job_id, timeout, stat, rusage, true );
495                return fsd_strdup( job_id );
496         }
497}
498
499
500fsd_job_t *
501fsd_drmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
502{
503        fsd_job_t *job;
504        job = fsd_job_new( fsd_strdup(job_id) );
505        job->session = self;
506        return job;
507}
508
509
510char *
511fsd_drmaa_session_run_impl(
512                fsd_drmaa_session_t *self,
513                const fsd_template_t *jt, int bulk_incr )
514{
515        fsd_exc_raise_code( FSD_ERRNO_NOT_IMPLEMENTED );
516}
517
518
519void
520fsd_drmaa_session_wait_for_single_job(
521                fsd_drmaa_session_t *self,
522                const char *job_id, const struct timespec *timeout,
523                int *status, fsd_iter_t **rusage,
524                bool dispose
525                )
526{
527        fsd_job_t *volatile job = NULL;
528        volatile bool locked = false;
529
530        fsd_log_enter(( "(%s)", job_id ));
531        TRY
532         {
533                job = self->get_job( self, job_id );
534                if( job == NULL )
535                        fsd_exc_raise_fmt( FSD_DRMAA_ERRNO_INVALID_JOB,
536                                        "Job '%s' not found in DRMS queue", job_id );
537                job->update_status( job );
538                while( !self->destroy_requested  &&  job->state < DRMAA_PS_DONE )
539                 {
540                        bool signaled = true;
541                        fsd_log_debug(( "fsd_drmaa_session_wait_for_single_job: "
542                                                "waiting for %s to terminate", job_id ));
543                        if( self->enable_wait_thread )
544                         {
545                                if( timeout )
546                                        signaled = fsd_cond_timedwait(
547                                                        &job->status_cond, &job->mutex, timeout );
548                                else
549                                 {
550                                        fsd_cond_wait( &job->status_cond, &job->mutex );
551                                 }
552                                if( !signaled )
553                                        fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
554                         }
555                        else
556                         {
557                                self->wait_for_job_status_change(
558                                                self, &job->status_cond, &job->mutex, timeout );
559                         }
560
561                        fsd_log_debug(( "fsd_drmaa_session_wait_for_single_job: woken up" ));
562                        if( !self->enable_wait_thread )
563                                job->update_status( job );
564                 }
565
566                if( self->destroy_requested )
567                        fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
568
569                job->get_termination_status( job, status, rusage );
570                if( dispose )
571                 {
572                        job->release( job ); /*release mutex in order to ensure proper order of locking: first job_set mutex then job mutex */
573
574                        locked = fsd_mutex_lock( &self->mutex );
575
576                        job = self->get_job( self, job_id );
577                        if (job != NULL)
578                         {
579                                self->jobs->remove( self->jobs, job );
580                                job->flags |= FSD_JOB_DISPOSED;
581                         }
582                        else
583                         {
584                                fsd_log_error(("Some other thread has already reaped job %s", job_id ));
585                         }
586
587                        locked = fsd_mutex_unlock( &self->mutex );
588                 }
589         }
590        FINALLY
591         {
592                if ( job )
593                        job->release( job );
594                if ( locked )
595                        fsd_mutex_unlock( &self->mutex );
596         }
597        END_TRY
598        fsd_log_return((""));
599}
600
601
602char *
603fsd_drmaa_session_wait_for_any_job(
604                fsd_drmaa_session_t *self,
605                const struct timespec *timeout,
606                int *status, fsd_iter_t **rusage,
607                bool dispose
608                )
609{
610        fsd_job_set_t *set = self->jobs;
611        fsd_job_t *volatile job = NULL;
612        char *volatile job_id = NULL;
613        volatile bool locked = false;
614
615        fsd_log_enter(( "" ));
616
617        TRY
618         {
619                while( job == NULL )
620                 {
621                        bool signaled = true;
622
623                        if( self->destroy_requested )
624                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
625
626                        if( !self->enable_wait_thread )
627                                self->update_all_jobs_status( self );
628
629                        locked = fsd_mutex_lock( &self->mutex );
630                        if( set->empty( set ) )
631                                fsd_exc_raise_msg( FSD_DRMAA_ERRNO_INVALID_JOB,
632                                                "No job found to be waited for" );
633
634                        if( (job = set->find_terminated( set )) != NULL )
635                                break;
636
637                        if( self->destroy_requested )
638                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
639                        if( self->enable_wait_thread )
640                         {
641                                fsd_log_debug(( "wait_for_any_job: waiting for wait thread" ));
642                                if( timeout )
643                                        signaled = fsd_cond_timedwait(
644                                                        &self->wait_condition, &self->mutex, timeout );
645                                else
646                                        fsd_cond_wait( &self->wait_condition, &self->mutex );
647                         }
648                        else
649                         {
650                                fsd_log_debug(( "wait_for_any_job: waiting for next check" ));
651                                self->wait_for_job_status_change( self,
652                                                &self->wait_condition, &self->mutex, timeout );
653                         }
654                        locked = fsd_mutex_unlock( &self->mutex );
655                        fsd_log_debug((
656                                                "wait_for_any_job: woken up; signaled=%d", signaled ));
657
658                        if( !signaled )
659                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
660
661                 }
662                fsd_log_debug(( "wait_for_any_job: waiting finished" ));
663
664                job_id = fsd_strdup( job->job_id );
665                job->get_termination_status( job, status, rusage );
666         }
667        EXCEPT_DEFAULT
668         {
669                if( job_id )
670                        fsd_free( job_id );
671                fsd_exc_reraise();
672         }
673        FINALLY
674         {
675                if( job )
676                 {
677                        if( fsd_exc_get() == NULL  &&  dispose )
678                         {
679                                set->remove( set, job );
680                                job->flags |= FSD_JOB_DISPOSED;
681                         }
682                        job->release( job );
683                 }
684                if( locked )
685                        fsd_mutex_unlock( &self->mutex );
686         }
687        END_TRY
688
689        fsd_log_return(( " =%s", job_id ));
690        return job_id;
691}
692
693
694void
695fsd_drmaa_session_wait_for_job_status_change(
696                fsd_drmaa_session_t *self,
697                fsd_cond_t *wait_condition,
698                fsd_mutex_t *mutex,
699                const struct timespec *timeout
700                )
701{
702        struct timespec ts, *next_check = &ts;
703        bool status_changed;
704
705        if( timeout )
706                fsd_log_enter((
707                                        "(timeout=%ld.%09ld)",
708                                        timeout->tv_sec, timeout->tv_nsec ));
709        else
710                fsd_log_enter(( "(timeout=(null))" ));
711        fsd_get_time( next_check );
712        fsd_ts_add( next_check, &self->pool_delay );
713        if( timeout  &&  fsd_ts_cmp( timeout, next_check ) < 0 )
714                next_check = (struct timespec*)timeout;
715        fsd_log_debug(( "wait_for_job_status_change: waiting untill %ld.%09ld",
716                                next_check->tv_sec, next_check->tv_nsec ));
717        status_changed = fsd_cond_timedwait(wait_condition, mutex,(const struct timespec *) next_check );
718        if( !status_changed  &&  next_check == timeout )
719                fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
720
721        fsd_log_return(( ": next_check=%ld.%09ld, status_changed=%d",
722                                next_check->tv_sec, next_check->tv_nsec,
723                                (int)status_changed
724                                ));
725}
726
727
728void *
729fsd_drmaa_session_wait_thread( fsd_drmaa_session_t *self )
730{
731        struct timespec ts, *next_check = &ts;
732        bool volatile locked = false;
733
734        fsd_log_enter(( "" ));
735        locked = fsd_mutex_lock( &self->mutex );
736        TRY
737         {
738                while( self->wait_thread_run_flag )
739                        TRY
740                         {
741                                fsd_log_debug(( "wait thread: next iteration" ));
742                                self->update_all_jobs_status( self );
743                                fsd_cond_broadcast( &self->wait_condition );
744                               
745                                fsd_get_time( next_check );
746                                fsd_ts_add( next_check, &self->pool_delay );
747                                fsd_cond_timedwait( &self->wait_condition, &self->mutex, (const struct timespec *) next_check );
748                               
749                         }
750                        EXCEPT_DEFAULT
751                         {
752                                const fsd_exc_t *e = fsd_exc_get();
753                                fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));
754                         }
755                        END_TRY
756         }
757        FINALLY
758         {
759                if (locked)
760                        fsd_mutex_unlock( &self->mutex );
761         }
762        END_TRY
763
764        fsd_log_return(( " =NULL" ));
765        return NULL;
766}
767
768
769void
770fsd_drmaa_session_stop_wait_thread( fsd_drmaa_session_t *self )
771{
772        volatile int lock_count = 0;
773        fsd_log_enter(( "" ));
774        fsd_mutex_lock( &self->mutex );
775        TRY
776         {
777                fsd_log_debug(("started = %d  run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
778                if( self->wait_thread_started )
779                 {
780                        self->wait_thread_run_flag = false;
781                        fsd_log_debug(("started = %d  run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
782                        fsd_cond_broadcast( &self->wait_condition );
783                        TRY
784                         {
785                                lock_count = fsd_mutex_unlock_times( &self->mutex );
786                                fsd_thread_join( self->wait_thread_handle, NULL );
787                         }
788                        FINALLY
789                         {
790                                int i;
791                                for( i = 0;  i < lock_count;  i++ )
792                                        fsd_mutex_lock( &self->mutex );
793                         }
794                        END_TRY
795                        self->wait_thread_started = false;
796                 }
797
798         }
799        FINALLY
800         { fsd_mutex_unlock( &self->mutex ); }
801        END_TRY
802        fsd_log_return(( "" ));
803}
804
805
806void
807fsd_drmaa_session_update_all_jobs_status(
808                fsd_drmaa_session_t *self )
809{
810        char **volatile job_ids = NULL;
811        fsd_log_enter(( "" ));
812        TRY
813         {
814                const char **i;
815                fsd_job_t *volatile job = NULL;
816                job_ids = self->get_submited_job_ids( self );
817                for( i = (const char **)job_ids;  *i;  i++ )
818                        TRY
819                         {
820                                job = self->get_job( self, *i );
821                                if( job )
822                                        job->update_status( job );
823                         }
824                        FINALLY
825                         {
826                                if( job )
827                                        job->release( job );
828                         }
829                        END_TRY
830         }
831        FINALLY
832         {
833                fsd_free_vector( job_ids );
834         }
835        END_TRY
836        fsd_log_return(( "" ));
837}
838
839
840char **
841fsd_drmaa_session_get_submited_job_ids( fsd_drmaa_session_t *self )
842{
843        return self->jobs->get_all_job_ids( self->jobs );
844}
845
846
847fsd_job_t *
848fsd_drmaa_session_get_job( fsd_drmaa_session_t *self, const char *job_id )
849{
850        return self->jobs->get( self->jobs, job_id );
851}
852
853
854void
855fsd_drmaa_session_load_configuration(
856                fsd_drmaa_session_t *self, const char *basename
857                )
858{
859        char *volatile system_conf = NULL;
860        char *volatile user_conf = NULL;
861        char *volatile varname = NULL;
862        TRY
863         {
864                const char *home;
865                const char *envvalue;
866                char *i;
867
868                system_conf = fsd_asprintf( DRMAA_DIR_SYSCONF"/%s.conf", basename );
869
870                home = getenv( "HOME" );
871                if( home == NULL )
872                 { home = "."; }
873                user_conf = fsd_asprintf( "%s/.%s.conf", home, basename );
874
875                varname = fsd_asprintf( "%s_CONF", basename );
876                for( i = varname;  *i;  i++ )
877                        *i = toupper( *(unsigned char*)i );
878                envvalue = getenv( varname );
879
880                self->configuration = fsd_conf_read(
881                         self->configuration, system_conf, false, NULL, 0 );
882                self->configuration = fsd_conf_read(
883                         self->configuration, user_conf, false, NULL, 0 );
884                if( envvalue )
885                        self->configuration = fsd_conf_read(
886                                 self->configuration, envvalue, true, NULL, 0 );
887                self->apply_configuration( self );
888         }
889        FINALLY
890         {
891                fsd_free( system_conf );
892                fsd_free( user_conf );
893                fsd_free( varname );
894         }
895        END_TRY
896}
897
898
899void
900fsd_drmaa_session_read_configuration(
901                fsd_drmaa_session_t *self,
902                const char *filename, bool must_exist,
903                const char *configuration, size_t configuration_len
904                )
905{
906        self->configuration = fsd_conf_read(
907                 self->configuration,
908                 filename, must_exist,
909                 configuration, configuration_len );
910        self->apply_configuration( self );
911}
912
913
914void
915fsd_drmaa_session_apply_configuration( fsd_drmaa_session_t *self )
916{
917        fsd_conf_option_t *pool_delay = NULL;
918        fsd_conf_option_t *cache_job_state = NULL;
919        fsd_conf_option_t *wait_thread = NULL;
920        fsd_conf_option_t *job_categories = NULL;
921        fsd_conf_option_t *missing_jobs = NULL;
922
923        fsd_log_enter((""));
924        if( self->configuration  !=  NULL ) {
925
926                pool_delay = fsd_conf_dict_get(
927                                self->configuration, "pool_delay" );
928                cache_job_state = fsd_conf_dict_get(
929                                self->configuration, "cache_job_state" );
930                wait_thread = fsd_conf_dict_get(
931                                self->configuration, "wait_thread" );
932                job_categories = fsd_conf_dict_get(
933                                self->configuration, "job_categories" );
934                missing_jobs = fsd_conf_dict_get(
935                                self->configuration, "missing_jobs" );
936        }
937
938        if( pool_delay )
939         {
940                if( pool_delay->type == FSD_CONF_INTEGER
941                                &&  pool_delay->val.integer > 0 )
942                 {
943                        fsd_log_debug(("pool_delay=%d", pool_delay->val.integer));
944                        self->pool_delay.tv_sec = pool_delay->val.integer;
945                 }
946                else
947                        fsd_exc_raise_msg(
948                                        FSD_ERRNO_INTERNAL_ERROR,
949                                        "configuration: 'pool_delay' must be positive integer"
950                                        );
951         }
952        if( cache_job_state )
953         {
954                if( cache_job_state->type == FSD_CONF_INTEGER
955                                &&   cache_job_state->val.integer >= 0 )
956                 {
957                        fsd_log_debug(("cache_job_state=%d", cache_job_state->val.integer));
958                        self->cache_job_state = cache_job_state->val.integer;
959                 }
960                else
961                        fsd_exc_raise_msg(
962                                        FSD_ERRNO_INTERNAL_ERROR,
963                                        "configuration: 'cache_job_state' must be nonnegative integer"
964                                        );
965         }
966        if( wait_thread )
967         {
968                if( wait_thread->type == FSD_CONF_INTEGER )
969                 {
970
971                        fsd_log_info(("wait_thread=%d", wait_thread->val.integer));
972                        self->enable_wait_thread = (wait_thread->val.integer != 0 );
973                 }
974                else
975                        fsd_exc_raise_msg(
976                                        FSD_ERRNO_INTERNAL_ERROR,
977                                        "configuration: 'wait_thread' should be 0 or 1"
978                                        );
979         }
980        if( job_categories )
981         {
982                if( job_categories->type == FSD_CONF_DICT )
983                        self->job_categories = job_categories->val.dict;
984                else
985                        fsd_exc_raise_msg(
986                                        FSD_ERRNO_INTERNAL_ERROR,
987                                        "configuration: 'job_categories' should be dictionary"
988                                        );
989         }
990        if( missing_jobs )
991         {
992                bool ok = true;
993                if( missing_jobs->type != FSD_CONF_STRING )
994                 {
995                        const char *value = missing_jobs->val.string;
996                        if( !strcmp( value, "ignore" ) )
997                                self->missing_jobs = FSD_IGNORE_MISSING_JOBS;
998                        else if( !strcmp( value, "ignore-queued" ) )
999                                self->missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS;
1000                        else if( !strcmp( value, "reveal" ) )
1001                                self->missing_jobs = FSD_REVEAL_MISSING_JOBS;
1002                        else
1003                                ok = false;
1004                 }
1005                else
1006                        ok = false;
1007
1008                if( !ok )
1009                        fsd_exc_raise_msg(
1010                                        FSD_ERRNO_INTERNAL_ERROR,
1011                                        "configuration: 'missing_jobs' should be one of: "
1012                                        "'ignore', 'ignore-queued' or 'reveal'"
1013                                        );
1014         }
1015
1016        if( self->enable_wait_thread  &&  !self->wait_thread_started )
1017         {
1018                fsd_log_debug(("Starting wait thread"));
1019                self->wait_thread_run_flag = true;
1020                fsd_thread_create( &self->wait_thread_handle,
1021                                (void*(*)(void*))self->wait_thread, self );
1022                self->wait_thread_started = true;
1023                fsd_log_debug(( "wait thread started" ));
1024         }
1025        else if( !self->enable_wait_thread  &&  self->wait_thread_started )
1026         {
1027                fsd_log_debug(("Stopping wait thread"));
1028                self->stop_wait_thread( self );
1029         }
1030
1031        fsd_log_return((""));
1032}
1033
Note: See TracBrowser for help on using the repository browser.