/* $Id$ */
/*
* FedStage DRMAA for PBS Pro
* Copyright (C) 2006-2009 FedStage Systems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#ifdef HAVE_CONFIG_H
# include
#endif
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#ifndef lint
static char rcsid[]
# ifdef __GNUC__
__attribute__ ((unused))
# endif
= "$Id$";
#endif
static void
pbsdrmaa_session_destroy( fsd_drmaa_session_t *self );
static void
pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
static fsd_job_t *
pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
static void
pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self );
static void
*pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self );
static char *
pbsdrmaa_session_run_impl(
fsd_drmaa_session_t *self,
const fsd_template_t *jt,
int bulk_idx
);
static struct attrl *
pbsdrmaa_create_status_attrl(void);
fsd_drmaa_session_t *
pbsdrmaa_session_new( const char *contact )
{
pbsdrmaa_session_t *volatile self = NULL;
if( contact == NULL )
contact = "";
TRY
{
self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact);
fsd_realloc( self, 1, pbsdrmaa_session_t );
self->super_wait_thread = NULL;
self->log_file_initial_size = 0;
self->pbs_conn = -1;
self->pbs_home = NULL;
self->wait_thread_log = false;
self->status_attrl = NULL;
self->super_destroy = self->super.destroy;
self->super.destroy = pbsdrmaa_session_destroy;
self->super.new_job = pbsdrmaa_session_new_job;
self->super.update_all_jobs_status
= pbsdrmaa_session_update_all_jobs_status;
self->super.run_impl = pbsdrmaa_session_run_impl;
self->super_apply_configuration = self->super.apply_configuration;
self->super.apply_configuration = pbsdrmaa_session_apply_configuration;
self->status_attrl = pbsdrmaa_create_status_attrl();
self->max_retries_count = 3;
self->wait_thread_sleep_time = 1;
self->job_exit_status_file_prefix = NULL;
self->super.load_configuration( &self->super, "pbs_drmaa" );
self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS;
{
int tries_left = self->max_retries_count;
int sleep_time = 1;
retry_connect: /* Life... */
self->pbs_conn = pbs_connect( self->super.contact );
fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn ));
if( self->pbs_conn < 0 && tries_left-- )
{
sleep(sleep_time++);
goto retry_connect;
}
if( self->pbs_conn < 0 )
pbsdrmaa_exc_raise_pbs( "pbs_connect" );
}
}
EXCEPT_DEFAULT
{
if( self )
{
self->super.destroy( &self->super );
self = NULL;
}
fsd_exc_reraise();
}
END_TRY
return (fsd_drmaa_session_t*)self;
}
void
pbsdrmaa_session_destroy( fsd_drmaa_session_t *self )
{
pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
self->stop_wait_thread( self );
if( pbsself->pbs_conn >= 0 )
pbs_disconnect( pbsself->pbs_conn );
fsd_free( pbsself->status_attrl );
fsd_free( pbsself->job_exit_status_file_prefix );
pbsself->super_destroy( self );
}
static char *
pbsdrmaa_session_run_impl(
fsd_drmaa_session_t *self,
const fsd_template_t *jt,
int bulk_idx
)
{
char *volatile job_id = NULL;
fsd_job_t *volatile job = NULL;
pbsdrmaa_submit_t *volatile submit = NULL;
fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx ));
TRY
{
submit = pbsdrmaa_submit_new( self, jt, bulk_idx );
submit->eval( submit );
job_id = submit->submit( submit );
job = self->new_job( self, job_id );
job->submit_time = time(NULL);
job->flags |= FSD_JOB_CURRENT_SESSION;
self->jobs->add( self->jobs, job );
job->release( job ); job = NULL;
}
EXCEPT_DEFAULT
{
fsd_free( job_id );
fsd_exc_reraise();
}
FINALLY
{
if( submit )
submit->destroy( submit );
if( job )
job->release( job );
}
END_TRY
fsd_log_return(( " =%s", job_id ));
return job_id;
}
static fsd_job_t *
pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
{
fsd_job_t *job;
job = pbsdrmaa_job_new( fsd_strdup(job_id) );
job->session = self;
return job;
}
void
pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
{
pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
fsd_conf_option_t *pbs_home = NULL;
fsd_conf_option_t *wait_thread_sleep_time = NULL;
fsd_conf_option_t *max_retries_count = NULL;
fsd_conf_option_t *user_state_dir = NULL;
pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" );
wait_thread_sleep_time = fsd_conf_dict_get(self->configuration, "wait_thread_sleep_time" );
max_retries_count = fsd_conf_dict_get(self->configuration, "max_retries_count" );
user_state_dir = fsd_conf_dict_get(self->configuration, "user_state_dir" );
if( pbs_home && pbs_home->type == FSD_CONF_STRING )
{
struct stat statbuf;
char * volatile log_path;
struct tm tm;
pbsself->pbs_home = pbs_home->val.string;
fsd_log_info(("pbs_home: %s",pbsself->pbs_home));
pbsself->super_wait_thread = pbsself->super.wait_thread;
pbsself->super.wait_thread = pbsdrmaa_session_wait_thread;
pbsself->wait_thread_log = true;
time(&pbsself->log_file_initial_time);
localtime_r(&pbsself->log_file_initial_time,&tm);
log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d",
pbsself->pbs_home,
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday);
if(stat(log_path,&statbuf) == -1)
{
char errbuf[256] = "InternalError";
(void)strerror_r(errno, errbuf, sizeof(errbuf));
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error on file %s: %s", log_path, errbuf);
}
fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size));
pbsself->log_file_initial_size = statbuf.st_size;
fsd_free(log_path);
}
if ( max_retries_count && max_retries_count->type == FSD_CONF_INTEGER)
{
pbsself->max_retries_count = max_retries_count->val.integer;
fsd_log_info(("Max retries count: %d", pbsself->max_retries_count));
}
if ( wait_thread_sleep_time && wait_thread_sleep_time->type == FSD_CONF_INTEGER)
{
pbsself->wait_thread_sleep_time = wait_thread_sleep_time->val.integer;
fsd_log_info(("Wait thread sleep time: %d", pbsself->wait_thread_sleep_time));
}
if( user_state_dir && user_state_dir->type == FSD_CONF_STRING )
{
struct passwd *pw = NULL;
uid_t uid;
uid = geteuid();
pw = getpwuid(uid); /* drmaa_init is always called in thread safely fashion */
if (!pw)
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to get pw_name of the user %d", uid);
pbsself->job_exit_status_file_prefix = fsd_asprintf(user_state_dir->val.string, pw->pw_name);
}
else
{
pbsself->job_exit_status_file_prefix = fsd_asprintf("%s/.drmaa", getenv("HOME"));
}
fsd_log_debug(("Trying to create state directory: %s", pbsself->job_exit_status_file_prefix));
if (mkdir(pbsself->job_exit_status_file_prefix, 0700) == -1 && errno != EEXIST) /* TODO it would be much better to do stat before */
{
fsd_log_warning(("Failed to create job state directory: %s. Valid job exit status may not be available in some cases.", pbsself->job_exit_status_file_prefix));
}
/* TODO purge old exit statuses files */
pbsself->super_apply_configuration(self); /* call method from the superclass */
}
void
pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self )
{
volatile bool conn_lock = false;
volatile bool jobs_lock = false;
pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self;
fsd_job_set_t *jobs = self->jobs;
struct batch_status *volatile status = NULL;
volatile int tries_left = pbsself->max_retries_count;
volatile int sleep_time = 1;
fsd_log_enter((""));
TRY
{
conn_lock = fsd_mutex_lock( &self->drm_connection_mutex );
retry:
/* TODO: query only for user's jobs pbs_selstat + ATTR_u */
#ifdef PBS_PROFESSIONAL
status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL );
#else
status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL );
#endif
fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status ));
if( status == NULL && pbs_errno != 0 )
{
if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED)
{
if ( pbsself->pbs_conn >= 0)
pbs_disconnect( pbsself->pbs_conn );
retry_connect:
sleep(sleep_time++);
pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
if( pbsself->pbs_conn < 0)
{
if (tries_left--)
goto retry_connect;
else
pbsdrmaa_exc_raise_pbs( "pbs_connect" );
}
else
goto retry;
}
else
{
pbsdrmaa_exc_raise_pbs( "pbs_statjob" );
}
}
conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
{
size_t i;
fsd_job_t *job;
jobs_lock = fsd_mutex_lock( &jobs->mutex );
for( i = 0; i < jobs->tab_size; i++ )
for( job = jobs->tab[i]; job != NULL; job = job->next )
{
fsd_mutex_lock( &job->mutex );
job->flags |= FSD_JOB_MISSING;
fsd_mutex_unlock( &job->mutex );
}
jobs_lock = fsd_mutex_unlock( &jobs->mutex );
}
{
struct batch_status *volatile i;
for( i = status; i != NULL; i = i->next )
{
fsd_job_t *job = NULL;
fsd_log_debug(( "job_id=%s", i->name ));
job = self->get_job( self, i->name );
if( job != NULL )
{
job->flags &= ~FSD_JOB_MISSING;
TRY
{
((pbsdrmaa_job_t*)job)->update( job, i );
}
FINALLY
{
job->release( job );
}
END_TRY
}
}
}
{
size_t volatile i;
fsd_job_t *volatile job;
jobs_lock = fsd_mutex_lock( &jobs->mutex );
for( i = 0; i < jobs->tab_size; i++ )
for( job = jobs->tab[i]; job != NULL; job = job->next )
{
fsd_mutex_lock( &job->mutex );
TRY
{
if( job->flags & FSD_JOB_MISSING )
job->on_missing( job );
}
FINALLY{ fsd_mutex_unlock( &job->mutex ); }
END_TRY
}
jobs_lock = fsd_mutex_unlock( &jobs->mutex );
}
}
FINALLY
{
if( status != NULL )
pbs_statfree( status );
if( conn_lock )
conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
if( jobs_lock )
jobs_lock = fsd_mutex_unlock( &jobs->mutex );
}
END_TRY
fsd_log_return((""));
}
struct attrl *
pbsdrmaa_create_status_attrl(void)
{
struct attrl *result = NULL;
struct attrl *i;
const int max_attribs = 16;
int n_attribs;
int j = 0;
fsd_log_enter((""));
fsd_calloc( result, max_attribs, struct attrl );
result[j++].name="job_state";
result[j++].name="exit_status";
result[j++].name="resources_used";
result[j++].name="ctime";
result[j++].name="mtime";
result[j++].name="qtime";
result[j++].name="etime";
result[j++].name="queue";
result[j++].name="Account_Name";
result[j++].name="exec_host";
result[j++].name="start_time";
result[j++].name="mtime";
#if 0
result[j].name="resources_used"; result[j].resource="walltime"; j++;
result[j].name="resources_used"; result[j].resource="cput"; j++;
result[j].name="resources_used"; result[j].resource="mem"; j++;
result[j].name="resources_used"; result[j].resource="vmem"; j++;
result[j].name="Resource_List"; result[j].resource="walltime"; j++;
result[j].name="Resource_List"; result[j].resource="cput"; j++;
result[j].name="Resource_List"; result[j].resource="mem"; j++;
result[j].name="Resource_List"; result[j].resource="vmem"; j++;
#endif
n_attribs = j;
for( i = result; true; i++ )
if( i+1 < result + n_attribs )
i->next = i+1;
else
{
i->next = NULL;
break;
}
#ifdef DEBUGGING
fsd_log_return((":"));
pbsdrmaa_dump_attrl( result, NULL );
#endif
return result;
}
void *
pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self )
{
pbsdrmaa_log_reader_t *log_reader = NULL;
fsd_log_enter(( "" ));
TRY
{
log_reader = pbsdrmaa_log_reader_new( self );
log_reader->read_log( log_reader );
}
FINALLY
{
pbsdrmaa_log_reader_destroy( log_reader );
}
END_TRY
fsd_log_return(( " =NULL" ));
return NULL;
}