source: trunk/drmaa_utils/drmaa_utils/fsd_job.c @ 1

Revision 1, 10.7 KB checked in by mmamonski, 13 years ago (diff)

Torque/PBS DRMAA initial commit

Line 
1/* $Id: fsd_job.c 308 2010-09-21 07:29:41Z mamonski $ */
2/*
3 * FedStage DRMAA utilities library
4 * Copyright (C) 2006-2008  FedStage Systems
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <string.h>
21
22#include <drmaa_utils/drmaa.h>
23#include <drmaa_utils/iter.h>
24#include <drmaa_utils/job.h>
25#include <drmaa_utils/lookup3.h>
26
27#ifndef lint
28static char rcsid[]
29#       ifdef __GNUC__
30                __attribute__ ((unused))
31#       endif
32        = "$Id: fsd_job.c 308 2010-09-21 07:29:41Z mamonski $";
33#endif
34
35
36static void fsd_job_release( fsd_job_t *self );
37static void fsd_job_destroy( fsd_job_t *self );
38static void fsd_job_control( fsd_job_t *self, int action );
39static void fsd_job_update_status( fsd_job_t *self );
40static void fsd_job_get_termination_status( fsd_job_t *self,
41                        int *status, fsd_iter_t **rusage_out );
42static void fsd_job_on_missing( fsd_job_t *self );
43
44fsd_job_t *
45fsd_job_new( char *job_id )
46{
47        fsd_job_t *volatile self = NULL;
48        fsd_log_enter(( "(%s)", job_id ));
49        TRY
50         {
51                fsd_malloc( self, fsd_job_t );
52                self->release = fsd_job_release;
53                self->destroy = fsd_job_destroy;
54                self->control = fsd_job_control;
55                self->update_status = fsd_job_update_status;
56                self->get_termination_status = fsd_job_get_termination_status;
57                self->on_missing = fsd_job_on_missing;
58                self->next              = NULL;
59                self->ref_cnt           = 1;
60                self->job_id            = job_id;
61                self->session           = NULL;
62                self->last_update_time  = 0;
63                self->flags             = 0;
64                self->state             = DRMAA_PS_UNDETERMINED;
65                self->exit_status       = 0;
66                self->submit_time       = 0;
67                self->start_time        = 0;
68                self->end_time          = 0;
69                self->cpu_usage         = 0;
70                self->mem_usage         = 0;
71                self->vmem_usage        = 0;
72                self->walltime          = 0;
73                self->execution_hosts   = NULL;
74                self->queue                             = NULL;
75                self->project                   = NULL;
76                fsd_mutex_init( &self->mutex );
77                fsd_cond_init( &self->status_cond );
78                fsd_cond_init( &self->destroy_cond );
79                fsd_mutex_lock( &self->mutex );
80         }
81        EXCEPT_DEFAULT
82         {
83                if( self )
84                        self->destroy( self );
85                else
86                        fsd_free( job_id );
87                fsd_exc_reraise();
88         }
89        END_TRY
90        fsd_log_return(( "=%p: ref_cnt=%d [lock %s]",
91                                (void*)self, self->ref_cnt, self->job_id ));
92        return self;
93}
94
95
96void
97fsd_job_release( fsd_job_t *self )
98{
99        bool destroy;
100        fsd_log_enter(( "(%p={job_id=%s, ref_cnt=%d}) [unlock %s]",
101                                (void*)self, self->job_id, self->ref_cnt, self->job_id ));
102        fsd_assert( self->ref_cnt > 0 );
103        destroy = ( --(self->ref_cnt) == 0 );
104        fsd_mutex_unlock( &self->mutex );
105        if( destroy )
106                self->destroy( self );
107        fsd_log_return(( "" ));
108}
109
110
111void
112fsd_job_destroy( fsd_job_t *self )
113{
114        fsd_log_enter(( "(%p={job_id=%s})", (void*)self, self->job_id ));
115        fsd_cond_destroy( &self->status_cond );
116        fsd_cond_destroy( &self->destroy_cond );
117        fsd_mutex_destroy( &self->mutex );
118        fsd_free( self->job_id );
119        fsd_free( self->execution_hosts );
120        fsd_free( self->queue );
121        fsd_free( self->project );
122        fsd_free( self );
123        fsd_log_return(( "" ));
124}
125
126
127void
128fsd_job_control( fsd_job_t *self, int action )
129{
130        fsd_exc_raise_code( FSD_ERRNO_NOT_IMPLEMENTED );
131}
132
133void
134fsd_job_update_status( fsd_job_t *self )
135{
136        fsd_exc_raise_code( FSD_ERRNO_NOT_IMPLEMENTED );
137}
138
139void
140fsd_job_get_termination_status( fsd_job_t *self,
141                        int *status, fsd_iter_t **rusage_out )
142{
143        fsd_iter_t* volatile rusage = NULL;
144
145        TRY
146         {
147                if( rusage_out )
148                 {
149                        rusage = fsd_iter_new( NULL, 0 );
150                        rusage->append( rusage, fsd_asprintf(
151                                                "submission_time=%ld", (long)self->submit_time ) );
152                        rusage->append( rusage, fsd_asprintf(
153                                                "start_time=%ld", (long)self->start_time ) );
154                        rusage->append( rusage, fsd_asprintf(
155                                                "end_time=%ld", (long)self->end_time ) );
156                        rusage->append( rusage, fsd_asprintf(
157                                                "cpu=%ld", self->cpu_usage ) );
158                        rusage->append( rusage, fsd_asprintf(
159                                                "mem=%ld", self->mem_usage ) );
160                        rusage->append( rusage, fsd_asprintf(
161                                                "vmem=%ld", self->vmem_usage ) );
162                        rusage->append( rusage, fsd_asprintf(
163                                                "walltime=%ld", self->walltime ) );
164                        rusage->append( rusage, fsd_asprintf(
165                                                "hosts=%s", self->execution_hosts ) );
166
167                        if (self->queue) {
168                                rusage->append( rusage, fsd_asprintf("queue=%s", self->queue ) );
169                        }
170
171                        if (self->project) {
172                                rusage->append( rusage, fsd_asprintf("project=%s", self->project ) );
173                        }
174                 }
175         }
176        EXCEPT_DEFAULT
177         {
178                if( rusage )
179                        rusage->destroy( rusage );
180                if( rusage_out )
181                        *rusage_out = NULL;
182                fsd_exc_reraise();
183         }
184        ELSE
185         {
186                if( status )
187                        *status = self->exit_status;
188                if( rusage_out )
189                        *rusage_out = rusage;
190         }
191        END_TRY
192}
193
194void
195fsd_job_on_missing( fsd_job_t *self )
196{
197        fsd_log_warning(( "job %s missing from DRM queue", self->job_id ));
198}
199
200
201static void
202fsd_job_set_destroy( fsd_job_set_t *self );
203static void
204fsd_job_set_add( fsd_job_set_t *self, fsd_job_t *job );
205static void
206fsd_job_set_remove( fsd_job_set_t *self, fsd_job_t *job );
207static fsd_job_t *
208fsd_job_set_get( fsd_job_set_t *self, const char *job_id );
209static bool
210fsd_job_set_empty( fsd_job_set_t *self );
211static fsd_job_t *
212fsd_job_set_find_terminated( fsd_job_set_t *self );
213static char **
214fsd_job_set_get_all_job_ids( fsd_job_set_t *self );
215static void fsd_job_set_signal_all( fsd_job_set_t *self );
216
217
218fsd_job_set_t *
219fsd_job_set_new(void)
220{
221        fsd_job_set_t *volatile self = NULL;
222        const size_t initial_size = 1024;
223
224        fsd_log_enter(( "()" ));
225        TRY
226         {
227                fsd_malloc( self, fsd_job_set_t );
228                self->destroy = fsd_job_set_destroy;
229                self->add = fsd_job_set_add;
230                self->remove = fsd_job_set_remove;
231                self->get = fsd_job_set_get;
232                self->empty = fsd_job_set_empty;
233                self->find_terminated = fsd_job_set_find_terminated;
234                self->get_all_job_ids = fsd_job_set_get_all_job_ids;
235                self->signal_all = fsd_job_set_signal_all;
236                self->tab = NULL;
237                self->n_jobs = 0;
238                fsd_calloc( self->tab, initial_size, fsd_job_t* );
239                self->tab_size = initial_size;
240                self->tab_mask = self->tab_size - 1;
241                fsd_mutex_init( &self->mutex );
242         }
243        EXCEPT_DEFAULT
244         {
245                if( self )
246                 {
247                        fsd_free( self->tab );
248                        fsd_free( self );
249                 }
250                fsd_exc_reraise();
251         }
252        END_TRY
253
254        fsd_log_return(( " =%p", (void*)self ));
255        return self;
256}
257
258
259void
260fsd_job_set_destroy( fsd_job_set_t *self )
261{
262        unsigned i;
263        fsd_job_t *j;
264
265        fsd_log_enter(( "()" ));
266        for( i = 0;  i < self->tab_size;  i++ )
267                for( j = self->tab[i];  j != NULL;  )
268                 {
269                        fsd_job_t *job = j;
270                        j = j->next;
271                        fsd_mutex_lock( &job->mutex );
272                        job->release( job );
273                 }
274        fsd_free( self->tab );
275        fsd_free( self );
276        fsd_log_return(( "" ));
277}
278
279
280void
281fsd_job_set_add( fsd_job_set_t *self, fsd_job_t *job )
282{
283        uint32_t h;
284        fsd_log_enter(( "(job=%p, job_id=%s)", (void*)job, job->job_id ));
285        fsd_mutex_lock( &self->mutex );
286        h = hashstr( job->job_id, strlen(job->job_id), 0 );
287        h &= self->tab_mask;
288        job->next = self->tab[ h ];
289        self->tab[ h ] = job;
290        self->n_jobs++;
291        job->ref_cnt++;
292        fsd_mutex_unlock( &self->mutex );
293        fsd_log_return(( ": job->ref_cnt=%d", job->ref_cnt ));
294}
295
296
297void
298fsd_job_set_remove( fsd_job_set_t *self, fsd_job_t *job )
299{
300        fsd_job_t **pjob = NULL;
301        uint32_t h;
302
303        fsd_log_enter(( "(job_id=%s)", job->job_id ));
304        fsd_mutex_lock( &self->mutex );
305        TRY
306         {
307                h = hashstr( job->job_id, strlen(job->job_id), 0 );
308                h &= self->tab_mask;
309                for( pjob = &self->tab[ h ];  *pjob;  pjob = &(*pjob)->next )
310                 {
311                        if( *pjob == job )
312                                break;
313                 }
314                if( *pjob )
315                 {
316                        *pjob = (*pjob)->next;
317                        job->next = NULL;
318                        self->n_jobs--;
319                        job->ref_cnt--;
320                 }
321                else
322                        fsd_exc_raise_code( FSD_DRMAA_ERRNO_INVALID_JOB );
323         }
324        FINALLY
325         { fsd_mutex_unlock( &self->mutex ); }
326        END_TRY
327        fsd_log_return(( ": job->ref_cnt=%d", job->ref_cnt ));
328}
329
330
331fsd_job_t *
332fsd_job_set_get( fsd_job_set_t *self, const char *job_id )
333{
334        uint32_t h;
335        fsd_job_t *job = NULL;
336
337        fsd_log_enter(( "(job_id=%s)", job_id ));
338        fsd_mutex_lock( &self->mutex );
339        h = hashstr( job_id, strlen(job_id), 0 );
340        h &= self->tab_mask;
341        for( job = self->tab[ h ];  job;  job = job->next )
342                if( !strcmp( job->job_id, job_id ) )
343                        break;
344        if( job )
345         {
346                fsd_mutex_lock( &job->mutex );
347                fsd_assert( !(job->flags & FSD_JOB_DISPOSED) );
348                job->ref_cnt ++;
349         }
350        fsd_mutex_unlock( &self->mutex );
351        if( job )
352                fsd_log_return(( "(job_id=%s) =%p: ref_cnt=%d [lock %s]",
353                                        job_id, (void*)job, job->ref_cnt, job->job_id ));
354        else
355                fsd_log_return(( "(job_id=%s) =NULL", job_id ));
356        return job;
357}
358
359
360bool
361fsd_job_set_empty( fsd_job_set_t *self )
362{
363        return self->n_jobs == 0;
364}
365
366
367fsd_job_t *
368fsd_job_set_find_terminated( fsd_job_set_t *self )
369{
370        fsd_job_t *job = NULL;
371        size_t i;
372        fsd_mutex_t* volatile mutex = & self->mutex;
373
374        fsd_log_enter(( "()" ));
375        fsd_mutex_lock( mutex );
376        TRY
377         {
378                for( i = 0;  i < self->tab_size;  i++ )
379                        for( job = self->tab[ i ];  job;  job = job->next )
380                                if( job->state >= DRMAA_PS_DONE )
381                                        goto found;
382found:
383                if( job )
384                 {
385                        fsd_mutex_lock( &job->mutex );
386                        fsd_assert( !(job->flags & FSD_JOB_DISPOSED) );
387                        job->ref_cnt ++;
388                 }
389         }
390        FINALLY
391         { fsd_mutex_unlock( mutex ); }
392        END_TRY
393        if( job )
394                fsd_log_return(( "() =%p: job_id=%s, ref_cnt=%d [lock %s]",
395                                        (void*)job, job->job_id, job->ref_cnt, job->job_id ));
396        else
397                fsd_log_return(( "() =%p", (void*)job ));
398        return job;
399}
400
401
402char **
403fsd_job_set_get_all_job_ids( fsd_job_set_t *self )
404{
405        fsd_job_t *job = NULL;
406        char** volatile job_ids = NULL;
407        /* size_t n_jobs = 0, capacity = 0; */
408        size_t i;
409        unsigned j = 0;
410        fsd_mutex_t* volatile mutex = & self->mutex;
411
412        fsd_log_enter(( "" ));
413        fsd_mutex_lock( mutex );
414        TRY
415         {
416                fsd_calloc( job_ids, self->n_jobs+1, char* );
417                for( i = 0;  i < self->tab_size;  i++ )
418                        for( job = self->tab[ i ];  job;  job = job->next )
419                                job_ids[ j++ ] = fsd_strdup( job->job_id );
420                fsd_realloc( job_ids, j+1, char* );
421         }
422        FINALLY
423         {
424                fsd_mutex_unlock( mutex );
425                if( fsd_exc_get() )
426                        fsd_free_vector( job_ids );
427         }
428        END_TRY
429
430        fsd_log_return(( " =%p", (void*)job_ids ));
431        return job_ids;
432}
433
434
435void
436fsd_job_set_signal_all( fsd_job_set_t *self )
437{
438        fsd_job_t *volatile job = NULL;
439        fsd_mutex_t *volatile mutex = & self->mutex;
440
441        fsd_log_enter(( "" ));
442        fsd_mutex_lock( mutex );
443        TRY
444         {
445                volatile size_t i;
446                for( i = 0;  i < self->tab_size;  i++ )
447                        for( job = self->tab[ i ];  job;  job = job->next )
448                         {
449                                fsd_mutex_lock( &job->mutex );
450                                TRY{ fsd_cond_broadcast( &job->status_cond ); }
451                                FINALLY{ fsd_mutex_unlock( &job->mutex ); }
452                                END_TRY
453                         }
454         }
455        FINALLY
456         { fsd_mutex_unlock( mutex ); }
457        END_TRY
458
459        fsd_log_return(( "" ));
460}
461
Note: See TracBrowser for help on using the repository browser.