/* $Id: fsd_session.c 28 2011-10-29 21:31:46Z mmamonski $ */
/*
* PSNC DRMAA 2.0 utilities library
* Copyright (C) 2012 Poznan Supercomputing and Networking Center
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#ifndef lint
static char rcsid[]
# ifdef __GNUC__
__attribute__ ((unused))
# endif
= "$Id: fsd_session.c 28 2011-10-29 21:31:46Z mmamonski $";
#endif
static void
fsd_drmaa_session_release( fsd_drmaa_session_t *self );
static void
fsd_drmaa_session_destroy(
fsd_drmaa_session_t *self );
static void
fsd_drmaa_session_destroy_nowait( fsd_drmaa_session_t *self );
static char*
fsd_drmaa_session_run_job(
fsd_drmaa_session_t *self,
const fsd_template_t *jt
);
static fsd_iter_t*
fsd_drmaa_session_run_bulk(
fsd_drmaa_session_t *self,
const fsd_template_t *jt,
int start, int end, int incr
);
static void
fsd_drmaa_session_control_job(
fsd_drmaa_session_t *self,
const char *job_id, int action
);
static void
fsd_drmaa_session_job_ps(
fsd_drmaa_session_t *self,
const char *job_id, int *remote_ps
);
static void
fsd_drmaa_session_synchronize(
fsd_drmaa_session_t *self,
const char **input_job_ids, const struct timespec *timeout,
bool dispose
);
static char*
fsd_drmaa_session_wait(
fsd_drmaa_session_t *self,
const char *job_id, const struct timespec *timeout,
int *status, fsd_iter_t **rusage
);
static fsd_job_t *
fsd_drmaa_session_new_job(
fsd_drmaa_session_t *self,
const char *job_id
);
static char*
fsd_drmaa_session_run_impl(
fsd_drmaa_session_t *self,
const fsd_template_t *jt, int bulk_incr
);
static void
fsd_drmaa_session_wait_for_single_job(
fsd_drmaa_session_t *self,
const char *job_id, const struct timespec *timeout,
int *status, fsd_iter_t **rusage, bool dispose
);
static char*
fsd_drmaa_session_wait_for_any_job(
fsd_drmaa_session_t *self,
const struct timespec *timeout,
int *status, fsd_iter_t **rusage,
bool dispose
);
static void
fsd_drmaa_session_wait_for_job_status_change(
fsd_drmaa_session_t *self,
fsd_cond_t *wait_condition,
fsd_mutex_t *mutex,
const struct timespec *timeout
);
static void*
fsd_drmaa_session_wait_thread( fsd_drmaa_session_t *self );
static void
fsd_drmaa_session_stop_wait_thread( fsd_drmaa_session_t *self );
static void
fsd_drmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
static char**
fsd_drmaa_session_get_submited_job_ids(
fsd_drmaa_session_t *self
);
static fsd_job_t*
fsd_drmaa_session_get_job(
fsd_drmaa_session_t *self, const char *job_id
);
static void
fsd_drmaa_session_load_configuration(
fsd_drmaa_session_t *self, const char *basename
);
static void
fsd_drmaa_session_read_configuration(
fsd_drmaa_session_t *self,
const char *filename, bool must_exist,
const char *configuration, size_t config_len
);
static void
fsd_drmaa_session_apply_configuration(
fsd_drmaa_session_t *self
);
fsd_drmaa_session_t *
fsd_drmaa_session_new( const char *contact )
{
fsd_drmaa_session_t *volatile self = NULL;
fsd_log_enter(( "(%s)", contact ));
TRY
{
fsd_malloc( self, fsd_drmaa_session_t );
self->release = fsd_drmaa_session_release;
self->destroy = fsd_drmaa_session_destroy;
self->destroy_nowait = fsd_drmaa_session_destroy_nowait;
self->run_job = fsd_drmaa_session_run_job;
self->run_bulk = fsd_drmaa_session_run_bulk;
self->control_job = fsd_drmaa_session_control_job;
self->job_ps = fsd_drmaa_session_job_ps;
self->synchronize = fsd_drmaa_session_synchronize;
self->wait = fsd_drmaa_session_wait;
self->new_job = fsd_drmaa_session_new_job;
self->run_impl = fsd_drmaa_session_run_impl;
self->wait_for_single_job = fsd_drmaa_session_wait_for_single_job;
self->wait_for_any_job = fsd_drmaa_session_wait_for_any_job;
self->wait_for_job_status_change =
fsd_drmaa_session_wait_for_job_status_change;
self->wait_thread = fsd_drmaa_session_wait_thread;
self->stop_wait_thread = fsd_drmaa_session_stop_wait_thread;
self->update_all_jobs_status = fsd_drmaa_session_update_all_jobs_status;
self->get_submited_job_ids = fsd_drmaa_session_get_submited_job_ids;
self->get_job = fsd_drmaa_session_get_job;
self->load_configuration = fsd_drmaa_session_load_configuration;
self->read_configuration = fsd_drmaa_session_read_configuration;
self->apply_configuration = fsd_drmaa_session_apply_configuration;
self->ref_cnt = 1;
self->destroy_requested = false;
self->contact = NULL;
self->jobs = NULL;
self->configuration = NULL;
self->pool_delay.tv_sec = 10;
self->pool_delay.tv_nsec = 0;
self->cache_job_state = 0;
self->enable_wait_thread = false;
self->job_categories = NULL;
self->missing_jobs = FSD_REVEAL_MISSING_JOBS;
self->wait_thread_started = false;
self->wait_thread_run_flag = false;
fsd_mutex_init( &self->mutex );
fsd_cond_init( &self->wait_condition );
fsd_cond_init( &self->destroy_condition );
fsd_mutex_init( &self->drm_connection_mutex );
self->jobs = fsd_job_set_new();
self->contact = fsd_strdup( contact );
}
EXCEPT_DEFAULT
{
if( self != NULL )
self->destroy( self );
fsd_exc_reraise();
}
END_TRY
return self;
}
void
fsd_drmaa_session_release( fsd_drmaa_session_t *self )
{
fsd_mutex_lock( &self->mutex );
self->ref_cnt--;
fsd_assert( self->ref_cnt > 0 );
if( self->ref_cnt == 1 )
fsd_cond_broadcast( &self->destroy_condition );
fsd_mutex_unlock( &self->mutex );
}
void
fsd_drmaa_session_destroy( fsd_drmaa_session_t *self )
{
bool already_destroying = false;
fsd_log_enter(( "" ));
fsd_mutex_lock( &self->mutex );
TRY
{
if( self->destroy_requested )
already_destroying = true;
else
{
self->destroy_requested = true;
fsd_cond_broadcast( &self->wait_condition );
}
}
FINALLY
{ fsd_mutex_unlock( &self->mutex ); }
END_TRY
if( already_destroying )
{ /* XXX: actually it can not happen in current implementation
when using DRMAA API */
self->release( self );
fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
}
self->jobs->signal_all( self->jobs );
fsd_mutex_lock( &self->mutex );
TRY
{
while( self->ref_cnt > 1 )
fsd_cond_wait( &self->destroy_condition, &self->mutex );
fsd_log_debug(("started = %d run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
if( self->wait_thread_started )
self->stop_wait_thread( self );
}
FINALLY
{ fsd_mutex_unlock( &self->mutex ); }
END_TRY
self->destroy_nowait( self );
fsd_log_return(( "" ));
}
void
fsd_drmaa_session_destroy_nowait( fsd_drmaa_session_t *self )
{
fsd_log_enter(( "" ));
fsd_conf_dict_destroy( self->configuration );
fsd_free( self->contact );
if( self->jobs )
self->jobs->destroy( self->jobs );
fsd_mutex_destroy( &self->mutex );
fsd_cond_destroy( &self->wait_condition );
fsd_cond_destroy( &self->destroy_condition );
fsd_mutex_destroy( &self->drm_connection_mutex );
fsd_free( self );
fsd_log_return(( "" ));
}
char *
fsd_drmaa_session_run_job(
fsd_drmaa_session_t *self,
const fsd_template_t *jt )
{
return self->run_impl( self, jt, -1 );
}
fsd_iter_t *
fsd_drmaa_session_run_bulk(
fsd_drmaa_session_t *self,
const fsd_template_t *jt,
int start, int end, int incr )
{
volatile unsigned n_jobs;
char **volatile result = NULL;
if( incr > 0 )
n_jobs = (end-start) / incr + 1;
else
n_jobs = (start-end) / -incr + 1;
TRY
{
unsigned i;
int idx;
fsd_calloc( result, n_jobs + 1, char* );
for( i=0, idx=start; i < n_jobs; i++, idx+=incr )
result[i] = self->run_impl( self, jt, idx );
}
EXCEPT_DEFAULT
{
if( result )
fsd_free_vector( result );
fsd_exc_reraise();
}
END_TRY
return fsd_iter_new( result, -1 );
}
void
fsd_drmaa_session_control_job(
fsd_drmaa_session_t *self,
const char *job_id, int action )
{
char **job_ids = NULL;
char **i;
TRY
{
if( !strcmp( job_id, DRMAA_JOB_IDS_SESSION_ALL ) )
job_ids = self->get_submited_job_ids( self );
else
{
fsd_calloc( job_ids, 2, char* );
job_ids[0] = fsd_strdup( job_id );
}
for( i = job_ids; *i != NULL; i++ )
{
fsd_job_t *job = NULL;
TRY
{
job = self->get_job( self, *i );
if( job == NULL )
{
if( !strcmp( job_id, DRMAA_JOB_IDS_SESSION_ALL ) )
{ /* job was just removed from session */ }
else
job = self->new_job( self, *i );
}
if( job )
job->control( job, action );
}
FINALLY
{
if ( job )
job->release( job );
}
END_TRY
}
}
FINALLY
{
fsd_free_vector( job_ids );
}
END_TRY
}
void
fsd_drmaa_session_job_ps(
fsd_drmaa_session_t *self, const char *job_id, int *remote_ps )
{
fsd_job_t *volatile job = NULL;
TRY
{
job = self->get_job( self, job_id );
if( job == NULL )
{
fsd_log_info(( "job_ps: recreating job object: %s", job_id ));
job = self->new_job( self, job_id );
}
fsd_log_debug((" job->last_update_time = %u", (unsigned int)job->last_update_time));
if( time(NULL) - job->last_update_time >= self->cache_job_state
|| job->state == DRMAA_PS_UNDETERMINED )
{
fsd_log_debug(("updating status of job: %s ", job_id));
job->update_status( job );
job->last_update_time = time(NULL);
}
*remote_ps = job->state;
}
FINALLY
{
if( job )
job->release( job );
}
END_TRY
}
void
fsd_drmaa_session_synchronize(
fsd_drmaa_session_t *self,
const char **input_job_ids, const struct timespec *timeout,
bool dispose
)
{
volatile bool wait_for_all = false;
char **volatile job_ids_buf = NULL;
const char **job_ids = NULL;
const char **i;
fsd_log_enter(( "(job_ids={...}, timeout=..., dispose=%d)",
(int)dispose ));
if( input_job_ids == NULL )
fsd_exc_raise_code( FSD_ERRNO_INVALID_ARGUMENT );
TRY
{
for( i = input_job_ids; *i != NULL; i++ )
if( !strcmp(*i, DRMAA_JOB_IDS_SESSION_ALL) )
wait_for_all = true;
if( wait_for_all )
{
job_ids_buf = self->get_submited_job_ids( self );
job_ids = (const char**)job_ids_buf;
}
else
job_ids = input_job_ids;
for( i = job_ids; *i != NULL; i++ )
TRY
{
self->wait_for_single_job( self, *i, timeout, NULL, NULL, dispose );
}
EXCEPT( FSD_DRMAA_ERRNO_INVALID_JOB )
{ /* job was ripped by another thread */ }
END_TRY
}
FINALLY
{
fsd_free_vector( job_ids_buf );
}
END_TRY
}
char *
fsd_drmaa_session_wait(
fsd_drmaa_session_t *self,
const char *job_id, const struct timespec *timeout,
int *stat, fsd_iter_t **rusage
)
{
if( 0==strcmp(job_id, DRMAA_JOB_IDS_SESSION_ANY) )
return self->wait_for_any_job( self, timeout, stat, rusage, true );
else
{
self->wait_for_single_job( self, job_id, timeout, stat, rusage, true );
return fsd_strdup( job_id );
}
}
fsd_job_t *
fsd_drmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
{
fsd_job_t *job;
job = fsd_job_new( fsd_strdup(job_id) );
job->session = self;
return job;
}
char *
fsd_drmaa_session_run_impl(
fsd_drmaa_session_t *self,
const fsd_template_t *jt, int bulk_incr )
{
fsd_exc_raise_code( FSD_ERRNO_NOT_IMPLEMENTED );
}
void
fsd_drmaa_session_wait_for_single_job(
fsd_drmaa_session_t *self,
const char *job_id, const struct timespec *timeout,
int *status, fsd_iter_t **rusage,
bool dispose
)
{
fsd_job_t *volatile job = NULL;
volatile bool locked = false;
fsd_log_enter(( "(%s)", job_id ));
TRY
{
job = self->get_job( self, job_id );
if( job == NULL )
fsd_exc_raise_fmt( FSD_DRMAA_ERRNO_INVALID_JOB,
"Job '%s' not found in DRMS queue", job_id );
job->update_status( job );
while( !self->destroy_requested && job->state < DRMAA_PS_DONE )
{
bool signaled = true;
fsd_log_debug(( "fsd_drmaa_session_wait_for_single_job: "
"waiting for %s to terminate", job_id ));
if( self->enable_wait_thread )
{
if( timeout )
signaled = fsd_cond_timedwait(
&job->status_cond, &job->mutex, timeout );
else
{
fsd_cond_wait( &job->status_cond, &job->mutex );
}
if( !signaled )
fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
}
else
{
self->wait_for_job_status_change(
self, &job->status_cond, &job->mutex, timeout );
}
fsd_log_debug(( "fsd_drmaa_session_wait_for_single_job: woken up" ));
if( !self->enable_wait_thread )
job->update_status( job );
}
if( self->destroy_requested )
fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
job->get_termination_status( job, status, rusage );
if( dispose )
{
job->release( job ); /*release mutex in order to ensure proper order of locking: first job_set mutex then job mutex */
locked = fsd_mutex_lock( &self->mutex );
job = self->get_job( self, job_id );
if (job != NULL)
{
self->jobs->remove( self->jobs, job );
job->flags |= FSD_JOB_DISPOSED;
}
else
{
fsd_log_error(("Some other thread has already reaped job %s", job_id ));
}
locked = fsd_mutex_unlock( &self->mutex );
}
}
FINALLY
{
if ( job )
job->release( job );
if ( locked )
fsd_mutex_unlock( &self->mutex );
}
END_TRY
fsd_log_return((""));
}
char *
fsd_drmaa_session_wait_for_any_job(
fsd_drmaa_session_t *self,
const struct timespec *timeout,
int *status, fsd_iter_t **rusage,
bool dispose
)
{
fsd_job_set_t *set = self->jobs;
fsd_job_t *volatile job = NULL;
char *volatile job_id = NULL;
volatile bool locked = false;
fsd_log_enter(( "" ));
TRY
{
while( job == NULL )
{
bool signaled = true;
if( self->destroy_requested )
fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
if( !self->enable_wait_thread )
self->update_all_jobs_status( self );
locked = fsd_mutex_lock( &self->mutex );
if( set->empty( set ) )
fsd_exc_raise_msg( FSD_DRMAA_ERRNO_INVALID_JOB,
"No job found to be waited for" );
if( (job = set->find_terminated( set )) != NULL )
break;
if( self->destroy_requested )
fsd_exc_raise_code( FSD_DRMAA_ERRNO_NO_ACTIVE_SESSION );
if( self->enable_wait_thread )
{
fsd_log_debug(( "wait_for_any_job: waiting for wait thread" ));
if( timeout )
signaled = fsd_cond_timedwait(
&self->wait_condition, &self->mutex, timeout );
else
fsd_cond_wait( &self->wait_condition, &self->mutex );
}
else
{
fsd_log_debug(( "wait_for_any_job: waiting for next check" ));
self->wait_for_job_status_change( self,
&self->wait_condition, &self->mutex, timeout );
}
locked = fsd_mutex_unlock( &self->mutex );
fsd_log_debug((
"wait_for_any_job: woken up; signaled=%d", signaled ));
if( !signaled )
fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
}
fsd_log_debug(( "wait_for_any_job: waiting finished" ));
job_id = fsd_strdup( job->job_id );
job->get_termination_status( job, status, rusage );
}
EXCEPT_DEFAULT
{
if( job_id )
fsd_free( job_id );
fsd_exc_reraise();
}
FINALLY
{
if( job )
{
if( fsd_exc_get() == NULL && dispose )
{
set->remove( set, job );
job->flags |= FSD_JOB_DISPOSED;
}
job->release( job );
}
if( locked )
fsd_mutex_unlock( &self->mutex );
}
END_TRY
fsd_log_return(( " =%s", job_id ));
return job_id;
}
void
fsd_drmaa_session_wait_for_job_status_change(
fsd_drmaa_session_t *self,
fsd_cond_t *wait_condition,
fsd_mutex_t *mutex,
const struct timespec *timeout
)
{
struct timespec ts, *next_check = &ts;
bool status_changed;
if( timeout )
fsd_log_enter((
"(timeout=%ld.%09ld)",
timeout->tv_sec, timeout->tv_nsec ));
else
fsd_log_enter(( "(timeout=(null))" ));
fsd_get_time( next_check );
fsd_ts_add( next_check, &self->pool_delay );
if( timeout && fsd_ts_cmp( timeout, next_check ) < 0 )
next_check = (struct timespec*)timeout;
fsd_log_debug(( "wait_for_job_status_change: waiting untill %ld.%09ld",
next_check->tv_sec, next_check->tv_nsec ));
status_changed = fsd_cond_timedwait(wait_condition, mutex,(const struct timespec *) next_check );
if( !status_changed && next_check == timeout )
fsd_exc_raise_code( FSD_DRMAA_ERRNO_EXIT_TIMEOUT );
fsd_log_return(( ": next_check=%ld.%09ld, status_changed=%d",
next_check->tv_sec, next_check->tv_nsec,
(int)status_changed
));
}
void *
fsd_drmaa_session_wait_thread( fsd_drmaa_session_t *self )
{
struct timespec ts, *next_check = &ts;
bool volatile locked = false;
fsd_log_enter(( "" ));
locked = fsd_mutex_lock( &self->mutex );
TRY
{
while( self->wait_thread_run_flag )
TRY
{
fsd_log_debug(( "wait thread: next iteration" ));
self->update_all_jobs_status( self );
fsd_cond_broadcast( &self->wait_condition );
fsd_get_time( next_check );
fsd_ts_add( next_check, &self->pool_delay );
fsd_cond_timedwait( &self->wait_condition, &self->mutex, (const struct timespec *) next_check );
}
EXCEPT_DEFAULT
{
const fsd_exc_t *e = fsd_exc_get();
fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));
}
END_TRY
}
FINALLY
{
if (locked)
fsd_mutex_unlock( &self->mutex );
}
END_TRY
fsd_log_return(( " =NULL" ));
return NULL;
}
void
fsd_drmaa_session_stop_wait_thread( fsd_drmaa_session_t *self )
{
volatile int lock_count = 0;
fsd_log_enter(( "" ));
fsd_mutex_lock( &self->mutex );
TRY
{
fsd_log_debug(("started = %d run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
if( self->wait_thread_started )
{
self->wait_thread_run_flag = false;
fsd_log_debug(("started = %d run_flag = %d", self->wait_thread_started, self->wait_thread_run_flag ));
fsd_cond_broadcast( &self->wait_condition );
TRY
{
lock_count = fsd_mutex_unlock_times( &self->mutex );
fsd_thread_join( self->wait_thread_handle, NULL );
}
FINALLY
{
int i;
for( i = 0; i < lock_count; i++ )
fsd_mutex_lock( &self->mutex );
}
END_TRY
self->wait_thread_started = false;
}
}
FINALLY
{ fsd_mutex_unlock( &self->mutex ); }
END_TRY
fsd_log_return(( "" ));
}
void
fsd_drmaa_session_update_all_jobs_status(
fsd_drmaa_session_t *self )
{
char **volatile job_ids = NULL;
fsd_log_enter(( "" ));
TRY
{
const char **i;
fsd_job_t *volatile job = NULL;
job_ids = self->get_submited_job_ids( self );
for( i = (const char **)job_ids; *i; i++ )
TRY
{
job = self->get_job( self, *i );
if( job )
job->update_status( job );
}
FINALLY
{
if( job )
job->release( job );
}
END_TRY
}
FINALLY
{
fsd_free_vector( job_ids );
}
END_TRY
fsd_log_return(( "" ));
}
char **
fsd_drmaa_session_get_submited_job_ids( fsd_drmaa_session_t *self )
{
return self->jobs->get_all_job_ids( self->jobs );
}
fsd_job_t *
fsd_drmaa_session_get_job( fsd_drmaa_session_t *self, const char *job_id )
{
return self->jobs->get( self->jobs, job_id );
}
void
fsd_drmaa_session_load_configuration(
fsd_drmaa_session_t *self, const char *basename
)
{
char *volatile system_conf = NULL;
char *volatile user_conf = NULL;
char *volatile varname = NULL;
TRY
{
const char *home;
const char *envvalue;
char *i;
system_conf = fsd_asprintf( DRMAA_DIR_SYSCONF"/%s.conf", basename );
home = getenv( "HOME" );
if( home == NULL )
{ home = "."; }
user_conf = fsd_asprintf( "%s/.%s.conf", home, basename );
varname = fsd_asprintf( "%s_CONF", basename );
for( i = varname; *i; i++ )
*i = toupper( *(unsigned char*)i );
envvalue = getenv( varname );
self->configuration = fsd_conf_read(
self->configuration, system_conf, false, NULL, 0 );
self->configuration = fsd_conf_read(
self->configuration, user_conf, false, NULL, 0 );
if( envvalue )
self->configuration = fsd_conf_read(
self->configuration, envvalue, true, NULL, 0 );
self->apply_configuration( self );
}
FINALLY
{
fsd_free( system_conf );
fsd_free( user_conf );
fsd_free( varname );
}
END_TRY
}
void
fsd_drmaa_session_read_configuration(
fsd_drmaa_session_t *self,
const char *filename, bool must_exist,
const char *configuration, size_t configuration_len
)
{
self->configuration = fsd_conf_read(
self->configuration,
filename, must_exist,
configuration, configuration_len );
self->apply_configuration( self );
}
void
fsd_drmaa_session_apply_configuration( fsd_drmaa_session_t *self )
{
fsd_conf_option_t *pool_delay = NULL;
fsd_conf_option_t *cache_job_state = NULL;
fsd_conf_option_t *wait_thread = NULL;
fsd_conf_option_t *job_categories = NULL;
fsd_conf_option_t *missing_jobs = NULL;
fsd_log_enter((""));
if( self->configuration != NULL ) {
pool_delay = fsd_conf_dict_get(
self->configuration, "pool_delay" );
cache_job_state = fsd_conf_dict_get(
self->configuration, "cache_job_state" );
wait_thread = fsd_conf_dict_get(
self->configuration, "wait_thread" );
job_categories = fsd_conf_dict_get(
self->configuration, "job_categories" );
missing_jobs = fsd_conf_dict_get(
self->configuration, "missing_jobs" );
}
if( pool_delay )
{
if( pool_delay->type == FSD_CONF_INTEGER
&& pool_delay->val.integer > 0 )
{
fsd_log_debug(("pool_delay=%d", pool_delay->val.integer));
self->pool_delay.tv_sec = pool_delay->val.integer;
}
else
fsd_exc_raise_msg(
FSD_ERRNO_INTERNAL_ERROR,
"configuration: 'pool_delay' must be positive integer"
);
}
if( cache_job_state )
{
if( cache_job_state->type == FSD_CONF_INTEGER
&& cache_job_state->val.integer >= 0 )
{
fsd_log_debug(("cache_job_state=%d", cache_job_state->val.integer));
self->cache_job_state = cache_job_state->val.integer;
}
else
fsd_exc_raise_msg(
FSD_ERRNO_INTERNAL_ERROR,
"configuration: 'cache_job_state' must be nonnegative integer"
);
}
if( wait_thread )
{
if( wait_thread->type == FSD_CONF_INTEGER )
{
fsd_log_info(("wait_thread=%d", wait_thread->val.integer));
self->enable_wait_thread = (wait_thread->val.integer != 0 );
}
else
fsd_exc_raise_msg(
FSD_ERRNO_INTERNAL_ERROR,
"configuration: 'wait_thread' should be 0 or 1"
);
}
if( job_categories )
{
if( job_categories->type == FSD_CONF_DICT )
self->job_categories = job_categories->val.dict;
else
fsd_exc_raise_msg(
FSD_ERRNO_INTERNAL_ERROR,
"configuration: 'job_categories' should be dictionary"
);
}
if( missing_jobs )
{
bool ok = true;
if( missing_jobs->type != FSD_CONF_STRING )
{
const char *value = missing_jobs->val.string;
if( !strcmp( value, "ignore" ) )
self->missing_jobs = FSD_IGNORE_MISSING_JOBS;
else if( !strcmp( value, "ignore-queued" ) )
self->missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS;
else if( !strcmp( value, "reveal" ) )
self->missing_jobs = FSD_REVEAL_MISSING_JOBS;
else
ok = false;
}
else
ok = false;
if( !ok )
fsd_exc_raise_msg(
FSD_ERRNO_INTERNAL_ERROR,
"configuration: 'missing_jobs' should be one of: "
"'ignore', 'ignore-queued' or 'reveal'"
);
}
if( self->enable_wait_thread && !self->wait_thread_started )
{
fsd_log_debug(("Starting wait thread"));
self->wait_thread_run_flag = true;
fsd_thread_create( &self->wait_thread_handle,
(void*(*)(void*))self->wait_thread, self );
self->wait_thread_started = true;
fsd_log_debug(( "wait thread started" ));
}
else if( !self->enable_wait_thread && self->wait_thread_started )
{
fsd_log_debug(("Stopping wait thread"));
self->stop_wait_thread( self );
}
fsd_log_return((""));
}