/* $Id$ */
/*
* PSNC DRMAA for SLURM
* Copyright (C) 2010 Poznan Supercomputing and Networking Center
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
static char *slurmdrmaa_session_run_job(fsd_drmaa_session_t *self, const fsd_template_t *jt);
static fsd_iter_t *slurmdrmaa_session_run_bulk( fsd_drmaa_session_t *self,const fsd_template_t *jt, int start, int end, int incr );
static fsd_job_t *slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
fsd_drmaa_session_t *
slurmdrmaa_session_new( const char *contact )
{
slurmdrmaa_session_t *volatile self = NULL;
TRY
{
self = (slurmdrmaa_session_t*)fsd_drmaa_session_new(contact);
fsd_realloc( self, 1, slurmdrmaa_session_t );
self->super.run_job = slurmdrmaa_session_run_job;
self->super.run_bulk = slurmdrmaa_session_run_bulk;
self->super.new_job = slurmdrmaa_session_new_job;
self->super.load_configuration( &self->super, "slurm_drmaa" );
}
EXCEPT_DEFAULT
{
fsd_free( self );
fsd_exc_reraise();
}
END_TRY
return (fsd_drmaa_session_t*)self;
}
char *
slurmdrmaa_session_run_job(
fsd_drmaa_session_t *self,
const fsd_template_t *jt
)
{
char *job_id = NULL;
fsd_iter_t *volatile job_ids = NULL;
TRY
{
job_ids = self->run_bulk( self, jt, 0, 0, 0 ); /* single job run as bulk job specialization */
job_id = fsd_strdup( job_ids->next( job_ids ) );
}
FINALLY
{
if( job_ids )
job_ids->destroy( job_ids );
}
END_TRY
return job_id;
}
fsd_iter_t *
slurmdrmaa_session_run_bulk(
fsd_drmaa_session_t *self,
const fsd_template_t *jt,
int start, int end, int incr )
{
fsd_job_t *volatile job = NULL;
char **volatile job_ids = NULL;
unsigned n_jobs = 0;
volatile bool connection_lock = false;
fsd_environ_t *volatile env = NULL;
job_desc_msg_t job_desc;
submit_response_msg_t *submit_response = NULL;
TRY
{
if( start != end )
n_jobs = (end - start) / incr + 1;
else
n_jobs = 1;
if( start != end )
{
unsigned idx, i;
fsd_calloc( job_ids, n_jobs+1, char* );
for( idx = start, i = 0; idx <= (unsigned)end; idx += incr, i++ )
{
connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
slurmdrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , &job_desc, idx);
if(slurm_submit_batch_job(&job_desc,&submit_response)){
fsd_exc_raise_fmt(
FSD_ERRNO_INTERNAL_ERROR,"slurm_submit_batch_job: %s",slurm_strerror(slurm_get_errno()));
}
fsd_log_debug(("job %u submitted", submit_response->job_id));
connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
job_ids[i] = fsd_asprintf("%d",submit_response->job_id); /*TODO */
job = slurmdrmaa_job_new( fsd_strdup(job_ids[i]) );
job->session = self;
job->submit_time = time(NULL);
self->jobs->add( self->jobs, job );
job->release( job );
job = NULL;
}
fsd_assert( i == n_jobs );
}
else /* ! bulk */
{
fsd_calloc( job_ids, n_jobs+1, char* );
connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
slurmdrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , &job_desc, 0);
if(slurm_submit_batch_job(&job_desc,&submit_response)){
fsd_exc_raise_fmt(
FSD_ERRNO_INTERNAL_ERROR,"slurm_submit_batch_job: %s",slurm_strerror(slurm_get_errno()));
}
fsd_log_debug(("job %u submitted", submit_response->job_id));
connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
job_ids[0] = fsd_asprintf( "%d", submit_response->job_id); /* .0*/
job = slurmdrmaa_job_new( fsd_strdup(job_ids[0]) ); /* TODO: ??? */
job->session = self;
job->submit_time = time(NULL);
self->jobs->add( self->jobs, job );
job->release( job );
job = NULL;
}
}
ELSE
{
if ( !connection_lock )
connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
slurm_free_submit_response_response_msg ( submit_response );
}
FINALLY
{
if( connection_lock )
fsd_mutex_unlock( &self->drm_connection_mutex );
if( job )
job->release( job );
if( fsd_exc_get() != NULL )
fsd_free_vector( job_ids );
slurmdrmaa_free_job_desc(&job_desc);
}
END_TRY
return fsd_iter_new( job_ids, n_jobs );
}
fsd_job_t *
slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
{
fsd_job_t *job;
job = slurmdrmaa_job_new( fsd_strdup(job_id) );
job->session = self;
return job;
}