/* $Id$ */ /* * FedStage DRMAA utilities library * Copyright (C) 2006-2008 FedStage Systems * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #ifndef lint static char rcsid[] # ifdef __GNUC__ __attribute__ ((unused)) # endif = "$Id$"; #endif static void fsd_job_release( fsd_job_t *self ); static void fsd_job_destroy( fsd_job_t *self ); static void fsd_job_control( fsd_job_t *self, int action ); static void fsd_job_update_status( fsd_job_t *self ); static void fsd_job_get_termination_status( fsd_job_t *self, int *status, fsd_iter_t **rusage_out ); static void fsd_job_on_missing( fsd_job_t *self ); fsd_job_t * fsd_job_new( char *job_id ) { fsd_job_t *volatile self = NULL; fsd_log_enter(( "(%s)", job_id )); TRY { fsd_malloc( self, fsd_job_t ); self->release = fsd_job_release; self->destroy = fsd_job_destroy; self->control = fsd_job_control; self->update_status = fsd_job_update_status; self->get_termination_status = fsd_job_get_termination_status; self->on_missing = fsd_job_on_missing; self->next = NULL; self->ref_cnt = 1; self->job_id = job_id; self->session = NULL; self->last_update_time = 0; self->flags = 0; self->state = DRMAA_PS_UNDETERMINED; self->exit_status = 0; self->submit_time = 0; self->start_time = 0; self->end_time = 0; self->cpu_usage = 0; self->mem_usage = 0; self->vmem_usage = 0; self->walltime = 0; self->execution_hosts = NULL; self->queue = NULL; self->project = NULL; fsd_mutex_init( &self->mutex ); fsd_cond_init( &self->status_cond ); fsd_cond_init( &self->destroy_cond ); fsd_mutex_lock( &self->mutex ); } EXCEPT_DEFAULT { if( self ) self->destroy( self ); else fsd_free( job_id ); fsd_exc_reraise(); } END_TRY fsd_log_return(( "=%p: ref_cnt=%d [lock %s]", (void*)self, self->ref_cnt, self->job_id )); return self; } void fsd_job_release( fsd_job_t *self ) { bool destroy; fsd_log_enter(( "(%p={job_id=%s, ref_cnt=%d}) [unlock %s]", (void*)self, self->job_id, self->ref_cnt, self->job_id )); fsd_assert( self->ref_cnt > 0 ); destroy = ( --(self->ref_cnt) == 0 ); fsd_mutex_unlock( &self->mutex ); if( destroy ) self->destroy( self ); fsd_log_return(( "" )); } void fsd_job_destroy( fsd_job_t *self ) { fsd_log_enter(( "(%p={job_id=%s})", (void*)self, self->job_id )); fsd_cond_destroy( &self->status_cond ); fsd_cond_destroy( &self->destroy_cond ); fsd_mutex_destroy( &self->mutex ); fsd_free( self->job_id ); fsd_free( self->execution_hosts ); fsd_free( self->queue ); fsd_free( self->project ); fsd_free( self ); fsd_log_return(( "" )); } void fsd_job_control( fsd_job_t *self, int action ) { fsd_exc_raise_code( FSD_ERRNO_NOT_IMPLEMENTED ); } void fsd_job_update_status( fsd_job_t *self ) { fsd_exc_raise_code( FSD_ERRNO_NOT_IMPLEMENTED ); } void fsd_job_get_termination_status( fsd_job_t *self, int *status, fsd_iter_t **rusage_out ) { fsd_iter_t* volatile rusage = NULL; TRY { if( rusage_out ) { rusage = fsd_iter_new( NULL, 0 ); rusage->append( rusage, fsd_asprintf( "submission_time=%ld", (long)self->submit_time ) ); if (self->start_time) rusage->append( rusage, fsd_asprintf( "start_time=%ld", (long)self->start_time ) ); if (self->end_time) rusage->append( rusage, fsd_asprintf( "end_time=%ld", (long)self->end_time ) ); rusage->append( rusage, fsd_asprintf( "cpu=%ld", self->cpu_usage ) ); rusage->append( rusage, fsd_asprintf( "mem=%ld", self->mem_usage ) ); rusage->append( rusage, fsd_asprintf( "vmem=%ld", self->vmem_usage ) ); rusage->append( rusage, fsd_asprintf( "walltime=%ld", self->walltime ) ); rusage->append( rusage, fsd_asprintf( "hosts=%s", self->execution_hosts ) ); if (self->queue) { rusage->append( rusage, fsd_asprintf("queue=%s", self->queue ) ); } if (self->project) { rusage->append( rusage, fsd_asprintf("project=%s", self->project ) ); } } } EXCEPT_DEFAULT { if( rusage ) rusage->destroy( rusage ); if( rusage_out ) *rusage_out = NULL; fsd_exc_reraise(); } ELSE { if( status ) *status = self->exit_status; if( rusage_out ) *rusage_out = rusage; } END_TRY } void fsd_job_on_missing( fsd_job_t *self ) { fsd_log_warning(( "job %s missing from DRM queue", self->job_id )); } static void fsd_job_set_destroy( fsd_job_set_t *self ); static void fsd_job_set_add( fsd_job_set_t *self, fsd_job_t *job ); static void fsd_job_set_remove( fsd_job_set_t *self, fsd_job_t *job ); static fsd_job_t * fsd_job_set_get( fsd_job_set_t *self, const char *job_id ); static bool fsd_job_set_empty( fsd_job_set_t *self ); static fsd_job_t * fsd_job_set_find_terminated( fsd_job_set_t *self ); static char ** fsd_job_set_get_all_job_ids( fsd_job_set_t *self ); static void fsd_job_set_signal_all( fsd_job_set_t *self ); fsd_job_set_t * fsd_job_set_new(void) { fsd_job_set_t *volatile self = NULL; const size_t initial_size = 1024; fsd_log_enter(( "()" )); TRY { fsd_malloc( self, fsd_job_set_t ); self->destroy = fsd_job_set_destroy; self->add = fsd_job_set_add; self->remove = fsd_job_set_remove; self->get = fsd_job_set_get; self->empty = fsd_job_set_empty; self->find_terminated = fsd_job_set_find_terminated; self->get_all_job_ids = fsd_job_set_get_all_job_ids; self->signal_all = fsd_job_set_signal_all; self->tab = NULL; self->n_jobs = 0; fsd_calloc( self->tab, initial_size, fsd_job_t* ); self->tab_size = initial_size; self->tab_mask = self->tab_size - 1; fsd_mutex_init( &self->mutex ); } EXCEPT_DEFAULT { if( self ) { fsd_free( self->tab ); fsd_free( self ); } fsd_exc_reraise(); } END_TRY fsd_log_return(( " =%p", (void*)self )); return self; } void fsd_job_set_destroy( fsd_job_set_t *self ) { unsigned i; fsd_job_t *j; fsd_log_enter(( "()" )); for( i = 0; i < self->tab_size; i++ ) for( j = self->tab[i]; j != NULL; ) { fsd_job_t *job = j; j = j->next; fsd_mutex_lock( &job->mutex ); job->release( job ); } fsd_free( self->tab ); fsd_free( self ); fsd_log_return(( "" )); } void fsd_job_set_add( fsd_job_set_t *self, fsd_job_t *job ) { uint32_t h; fsd_log_enter(( "(job=%p, job_id=%s)", (void*)job, job->job_id )); fsd_mutex_lock( &self->mutex ); h = hashstr( job->job_id, strlen(job->job_id), 0 ); h &= self->tab_mask; job->next = self->tab[ h ]; self->tab[ h ] = job; self->n_jobs++; job->ref_cnt++; fsd_mutex_unlock( &self->mutex ); fsd_log_return(( ": job->ref_cnt=%d", job->ref_cnt )); } void fsd_job_set_remove( fsd_job_set_t *self, fsd_job_t *job ) { fsd_job_t **pjob = NULL; uint32_t h; fsd_log_enter(( "(job_id=%s)", job->job_id )); fsd_mutex_lock( &self->mutex ); TRY { h = hashstr( job->job_id, strlen(job->job_id), 0 ); h &= self->tab_mask; for( pjob = &self->tab[ h ]; *pjob; pjob = &(*pjob)->next ) { if( *pjob == job ) break; } if( *pjob ) { *pjob = (*pjob)->next; job->next = NULL; self->n_jobs--; job->ref_cnt--; } else fsd_exc_raise_code( FSD_DRMAA_ERRNO_INVALID_JOB ); } FINALLY { fsd_mutex_unlock( &self->mutex ); } END_TRY fsd_log_return(( ": job->ref_cnt=%d", job->ref_cnt )); } fsd_job_t * fsd_job_set_get( fsd_job_set_t *self, const char *job_id ) { uint32_t h; fsd_job_t *job = NULL; fsd_log_enter(( "(job_id=%s)", job_id )); fsd_mutex_lock( &self->mutex ); h = hashstr( job_id, strlen(job_id), 0 ); h &= self->tab_mask; for( job = self->tab[ h ]; job; job = job->next ) if( !strcmp( job->job_id, job_id ) ) break; if( job ) { fsd_mutex_lock( &job->mutex ); fsd_assert( !(job->flags & FSD_JOB_DISPOSED) ); job->ref_cnt ++; } fsd_mutex_unlock( &self->mutex ); if( job ) fsd_log_return(( "(job_id=%s) =%p: ref_cnt=%d [lock %s]", job_id, (void*)job, job->ref_cnt, job->job_id )); else fsd_log_return(( "(job_id=%s) =NULL", job_id )); return job; } bool fsd_job_set_empty( fsd_job_set_t *self ) { return self->n_jobs == 0; } fsd_job_t * fsd_job_set_find_terminated( fsd_job_set_t *self ) { fsd_job_t *job = NULL; size_t i; fsd_mutex_t* volatile mutex = & self->mutex; fsd_log_enter(( "()" )); fsd_mutex_lock( mutex ); TRY { for( i = 0; i < self->tab_size; i++ ) for( job = self->tab[ i ]; job; job = job->next ) if( job->state >= DRMAA_PS_DONE ) goto found; found: if( job ) { fsd_mutex_lock( &job->mutex ); fsd_assert( !(job->flags & FSD_JOB_DISPOSED) ); job->ref_cnt ++; } } FINALLY { fsd_mutex_unlock( mutex ); } END_TRY if( job ) fsd_log_return(( "() =%p: job_id=%s, ref_cnt=%d [lock %s]", (void*)job, job->job_id, job->ref_cnt, job->job_id )); else fsd_log_return(( "() =%p", (void*)job )); return job; } char ** fsd_job_set_get_all_job_ids( fsd_job_set_t *self ) { fsd_job_t *job = NULL; char** volatile job_ids = NULL; /* size_t n_jobs = 0, capacity = 0; */ size_t i; unsigned j = 0; fsd_mutex_t* volatile mutex = & self->mutex; fsd_log_enter(( "" )); fsd_mutex_lock( mutex ); TRY { fsd_calloc( job_ids, self->n_jobs+1, char* ); for( i = 0; i < self->tab_size; i++ ) for( job = self->tab[ i ]; job; job = job->next ) job_ids[ j++ ] = fsd_strdup( job->job_id ); fsd_realloc( job_ids, j+1, char* ); } FINALLY { fsd_mutex_unlock( mutex ); if( fsd_exc_get() ) fsd_free_vector( job_ids ); } END_TRY fsd_log_return(( " =%p", (void*)job_ids )); return job_ids; } void fsd_job_set_signal_all( fsd_job_set_t *self ) { fsd_job_t *volatile job = NULL; fsd_mutex_t *volatile mutex = & self->mutex; fsd_log_enter(( "" )); fsd_mutex_lock( mutex ); TRY { volatile size_t i; for( i = 0; i < self->tab_size; i++ ) for( job = self->tab[ i ]; job; job = job->next ) { fsd_mutex_lock( &job->mutex ); TRY{ fsd_cond_broadcast( &job->status_cond ); } FINALLY{ fsd_mutex_unlock( &job->mutex ); } END_TRY } } FINALLY { fsd_mutex_unlock( mutex ); } END_TRY fsd_log_return(( "" )); }