/* $Id$ */ /* * FedStage DRMAA for PBS Pro * Copyright (C) 2006-2009 FedStage Systems * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #ifdef HAVE_CONFIG_H # include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef lint static char rcsid[] # ifdef __GNUC__ __attribute__ ((unused)) # endif = "$Id$"; #endif static void pbsdrmaa_session_destroy( fsd_drmaa_session_t *self ); static void pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self ); static fsd_job_t * pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id ); static bool pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self ); static void pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self ); static void *pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ); static char * pbsdrmaa_session_run_impl( fsd_drmaa_session_t *self, const fsd_template_t *jt, int bulk_idx ); static struct attrl * pbsdrmaa_create_status_attrl(void); fsd_drmaa_session_t * pbsdrmaa_session_new( const char *contact ) { pbsdrmaa_session_t *volatile self = NULL; if( contact == NULL ) contact = ""; TRY { self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact); fsd_realloc( self, 1, pbsdrmaa_session_t ); self->super_wait_thread = NULL; self->log_file_initial_size = 0; self->pbs_conn = -1; self->pbs_home = NULL; self->wait_thread_log = false; self->status_attrl = NULL; self->super_destroy = self->super.destroy; self->super.destroy = pbsdrmaa_session_destroy; self->super.new_job = pbsdrmaa_session_new_job; self->super.update_all_jobs_status = pbsdrmaa_session_update_all_jobs_status; self->super.run_impl = pbsdrmaa_session_run_impl; self->super_apply_configuration = self->super.apply_configuration; self->super.apply_configuration = pbsdrmaa_session_apply_configuration; self->do_drm_keeps_completed_jobs = pbsdrmaa_session_do_drm_keeps_completed_jobs; self->status_attrl = pbsdrmaa_create_status_attrl(); self->pbs_conn = pbs_connect( self->super.contact ); fsd_log_info(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn )); if( self->pbs_conn < 0 ) pbsdrmaa_exc_raise_pbs( "pbs_connect" ); self->super.load_configuration( &self->super, "pbs_drmaa" ); self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS; if( self->do_drm_keeps_completed_jobs( self ) ) self->super.missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS; } EXCEPT_DEFAULT { if( self ) { self->super.destroy( &self->super ); self = NULL; } } END_TRY return (fsd_drmaa_session_t*)self; } void pbsdrmaa_session_destroy( fsd_drmaa_session_t *self ) { pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; self->stop_wait_thread( self ); if( pbsself->pbs_conn >= 0 ) pbs_disconnect( pbsself->pbs_conn ); fsd_free( pbsself->status_attrl ); pbsself->super_destroy( self ); } static char * pbsdrmaa_session_run_impl( fsd_drmaa_session_t *self, const fsd_template_t *jt, int bulk_idx ) { char *volatile job_id = NULL; fsd_job_t *volatile job = NULL; pbsdrmaa_submit_t *volatile submit = NULL; fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx )); TRY { submit = pbsdrmaa_submit_new( self, jt, bulk_idx ); submit->eval( submit ); job_id = submit->submit( submit ); job = self->new_job( self, job_id ); job->submit_time = time(NULL); job->flags |= FSD_JOB_CURRENT_SESSION; self->jobs->add( self->jobs, job ); job->release( job ); job = NULL; } EXCEPT_DEFAULT { fsd_free( job_id ); fsd_exc_reraise(); } FINALLY { if( submit ) submit->destroy( submit ); if( job ) job->release( job ); } END_TRY fsd_log_return(( " =%s", job_id )); return job_id; } static fsd_job_t * pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id ) { fsd_job_t *job; job = pbsdrmaa_job_new( fsd_strdup(job_id) ); job->session = self; return job; } void pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self ) { pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; fsd_conf_option_t *pbs_home; pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" ); if( pbs_home && pbs_home->type == FSD_CONF_STRING ) { struct stat statbuf; char * volatile log_path; struct tm tm; pbsself->pbs_home = pbs_home->val.string; fsd_log_info(("pbs_home: %s",pbsself->pbs_home)); pbsself->super_wait_thread = pbsself->super.wait_thread; pbsself->super.wait_thread = pbsdrmaa_session_wait_thread; pbsself->wait_thread_log = true; time(&pbsself->log_file_initial_time); localtime_r(&pbsself->log_file_initial_time,&tm); log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbsself->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); if(stat(log_path,&statbuf) == -1) { char errbuf[256] = "InternalError"; (void)strerror_r(errno, errbuf, sizeof(errbuf)); fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf); } fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size)); pbsself->log_file_initial_size = statbuf.st_size; fsd_free(log_path); } pbsself->super_apply_configuration(self); /* call method from the superclass */ } void pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self ) { volatile bool conn_lock = false; volatile bool jobs_lock = false; pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; fsd_job_set_t *jobs = self->jobs; struct batch_status *volatile status = NULL; fsd_log_enter(("")); TRY { conn_lock = fsd_mutex_lock( &self->drm_connection_mutex ); retry: /* TODO: query only for user's jobs pbs_selstat + ATTR_u */ #ifdef PBS_PROFESSIONAL status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL ); #else status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL ); #endif fsd_log_info(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status )); if( status == NULL && pbs_errno != 0 ) { if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED) { if ( pbsself->pbs_conn >= 0) pbs_disconnect( pbsself->pbs_conn ); sleep(1); pbsself->pbs_conn = pbs_connect( pbsself->super.contact ); if( pbsself->pbs_conn < 0 ) pbsdrmaa_exc_raise_pbs( "pbs_connect" ); else goto retry; } else { pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); } } conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); { size_t i; fsd_job_t *job; jobs_lock = fsd_mutex_lock( &jobs->mutex ); for( i = 0; i < jobs->tab_size; i++ ) for( job = jobs->tab[i]; job != NULL; job = job->next ) { fsd_mutex_lock( &job->mutex ); job->flags |= FSD_JOB_MISSING; fsd_mutex_unlock( &job->mutex ); } jobs_lock = fsd_mutex_unlock( &jobs->mutex ); } { struct batch_status *volatile i; for( i = status; i != NULL; i = i->next ) { fsd_job_t *job = NULL; fsd_log_debug(( "job_id=%s", i->name )); job = self->get_job( self, i->name ); if( job != NULL ) { job->flags &= ~FSD_JOB_MISSING; TRY { ((pbsdrmaa_job_t*)job)->update( job, i ); } FINALLY { job->release( job ); } END_TRY } } } { size_t volatile i; fsd_job_t *volatile job; jobs_lock = fsd_mutex_lock( &jobs->mutex ); for( i = 0; i < jobs->tab_size; i++ ) for( job = jobs->tab[i]; job != NULL; job = job->next ) { fsd_mutex_lock( &job->mutex ); TRY { if( job->flags & FSD_JOB_MISSING ) job->on_missing( job ); } FINALLY{ fsd_mutex_unlock( &job->mutex ); } END_TRY } jobs_lock = fsd_mutex_unlock( &jobs->mutex ); } } FINALLY { if( status != NULL ) pbs_statfree( status ); if( conn_lock ) conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); if( jobs_lock ) jobs_lock = fsd_mutex_unlock( &jobs->mutex ); } END_TRY fsd_log_return(("")); } struct attrl * pbsdrmaa_create_status_attrl(void) { struct attrl *result = NULL; struct attrl *i; const int max_attribs = 16; int n_attribs; int j = 0; fsd_log_enter(("")); fsd_calloc( result, max_attribs, struct attrl ); result[j++].name="job_state"; result[j++].name="exit_status"; result[j++].name="resources_used"; result[j++].name="ctime"; result[j++].name="mtime"; result[j++].name="qtime"; result[j++].name="etime"; result[j++].name="queue"; result[j++].name="Account_Name"; result[j++].name="exec_host"; result[j++].name="start_time"; result[j++].name="mtime"; #if 0 result[j].name="resources_used"; result[j].resource="walltime"; j++; result[j].name="resources_used"; result[j].resource="cput"; j++; result[j].name="resources_used"; result[j].resource="mem"; j++; result[j].name="resources_used"; result[j].resource="vmem"; j++; result[j].name="Resource_List"; result[j].resource="walltime"; j++; result[j].name="Resource_List"; result[j].resource="cput"; j++; result[j].name="Resource_List"; result[j].resource="mem"; j++; result[j].name="Resource_List"; result[j].resource="vmem"; j++; #endif n_attribs = j; for( i = result; true; i++ ) if( i+1 < result + n_attribs ) i->next = i+1; else { i->next = NULL; break; } #ifdef DEBUGGING fsd_log_return((":")); pbsdrmaa_dump_attrl( result, NULL ); #endif return result; } bool pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self ) { #ifndef PBS_PROFESSIONAL struct attrl default_queue_query; struct attrl keep_completed_query; struct batch_status *default_queue_result = NULL; struct batch_status *keep_completed_result = NULL; const char *default_queue = NULL; const char *keep_completed = NULL; volatile bool result = false; volatile bool conn_lock = false; TRY { default_queue_query.next = NULL; default_queue_query.name = "default_queue"; default_queue_query.resource = NULL; default_queue_query.value = NULL; keep_completed_query.next = NULL; keep_completed_query.name = "keep_completed"; keep_completed_query.resource = NULL; keep_completed_query.value = NULL; conn_lock = fsd_mutex_lock( &self->super.drm_connection_mutex ); default_queue_result = pbs_statserver( self->pbs_conn, &default_queue_query, NULL ); if( default_queue_result == NULL ) pbsdrmaa_exc_raise_pbs( "pbs_statserver" ); if( default_queue_result->attribs && !strcmp( default_queue_result->attribs->name, "default_queue" ) ) default_queue = default_queue_result->attribs->value; fsd_log_debug(( "default_queue: %s", default_queue )); if( default_queue ) { keep_completed_result = pbs_statque( self->pbs_conn, (char*)default_queue, &keep_completed_query, NULL ); if( keep_completed_result == NULL ) pbsdrmaa_exc_raise_pbs( "pbs_statque" ); if( keep_completed_result->attribs && !strcmp( keep_completed_result->attribs->name, "keep_completed" ) ) keep_completed = keep_completed_result->attribs->value; } fsd_log_debug(( "keep_completed: %s", keep_completed )); } EXCEPT_DEFAULT { const fsd_exc_t *e = fsd_exc_get(); fsd_log_warning(( "PBS server seems not to keep completed jobs\n" "detail: %s", e->message(e) )); result = false; } ELSE { result = false; if( default_queue == NULL ) fsd_log_warning(( "no default queue set on PBS server" )); else if( keep_completed == NULL && self->pbs_home == NULL ) fsd_log_warning(( "Torque server is not configured to keep completed jobs\n" "in Torque: set keep_completed parameter of default queue\n" " $ qmgr -c 'set queue batch keep_completed = 60'\n" " or configure DRMAA to utilize log files" )); else result = true; } FINALLY { if( default_queue_result ) pbs_statfree( default_queue_result ); if( keep_completed_result ) pbs_statfree( keep_completed_result ); if( conn_lock ) conn_lock = fsd_mutex_unlock( &self->super.drm_connection_mutex ); } END_TRY return result; #endif fsd_log_warning(( "PBS Professional does not keep information about the completed jobs\n" " You must configure DRMAA to utilize log files in order to always get valid job exit status" )); return false; } void * pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ) { pbsdrmaa_log_reader_t *log_reader = NULL; fsd_log_enter(( "" )); TRY { log_reader = pbsdrmaa_log_reader_new( self, NULL); log_reader->read_log( log_reader ); } FINALLY { pbsdrmaa_log_reader_destroy( log_reader ); } END_TRY fsd_log_return(( " =NULL" )); return NULL; }