source: branches/slurm-drmaa-1.2.0/slurm_drmaa/session.c @ 38

Revision 38, 5.0 KB checked in by pkopta, 11 years ago (diff)
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 * PSNC DRMAA for SLURM
4 * Copyright (C) 2011 Poznan Supercomputing and Networking Center
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <string.h>
21#include <unistd.h>
22
23#include <drmaa_utils/iter.h>
24#include <drmaa_utils/conf.h>
25#include <slurm_drmaa/job.h>
26#include <slurm_drmaa/session.h>
27#include <slurm_drmaa/util.h>
28
29#include <slurm/slurm.h>
30
31static char *slurmdrmaa_session_run_job(fsd_drmaa_session_t *self, const fsd_template_t *jt);
32
33static fsd_iter_t *slurmdrmaa_session_run_bulk( fsd_drmaa_session_t *self,const fsd_template_t *jt, int start, int end, int incr );
34
35static fsd_job_t *slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
36
37fsd_drmaa_session_t *
38slurmdrmaa_session_new( const char *contact )
39{
40        slurmdrmaa_session_t *volatile self = NULL;
41        TRY
42         {
43                self = (slurmdrmaa_session_t*)fsd_drmaa_session_new(contact);
44
45                fsd_realloc( self, 1, slurmdrmaa_session_t );
46
47                self->super.run_job = slurmdrmaa_session_run_job;
48                self->super.run_bulk = slurmdrmaa_session_run_bulk;
49                self->super.new_job = slurmdrmaa_session_new_job;
50
51                self->super.load_configuration( &self->super, "slurm_drmaa" );
52         }
53        EXCEPT_DEFAULT
54         {
55                fsd_free( self );
56                fsd_exc_reraise();
57         }
58        END_TRY
59        return (fsd_drmaa_session_t*)self;
60}
61
62
63char *
64slurmdrmaa_session_run_job(
65                fsd_drmaa_session_t *self,
66                const fsd_template_t *jt
67                )
68{
69        char *job_id = NULL;
70        fsd_iter_t *volatile job_ids = NULL;
71
72        TRY
73         {
74                job_ids = self->run_bulk( self, jt, 0, 0, 0 ); /* single job run as bulk job specialization */
75                job_id = fsd_strdup( job_ids->next( job_ids ) );
76         }
77        FINALLY
78         {
79                if( job_ids )
80                        job_ids->destroy( job_ids );
81         }
82        END_TRY
83        return job_id;
84}
85
86
87fsd_iter_t *
88slurmdrmaa_session_run_bulk(
89                fsd_drmaa_session_t *self,
90                const fsd_template_t *jt,
91                int start, int end, int incr )
92{
93        fsd_job_t *volatile job = NULL;
94        char **volatile job_ids = NULL;
95        unsigned n_jobs = 1;
96        volatile bool connection_lock = false;
97        fsd_environ_t *volatile env = NULL;
98        job_desc_msg_t job_desc;
99        submit_response_msg_t *submit_response = NULL;
100        job_info_msg_t *job_info = NULL;
101
102    /* zero out the struct, and set default vaules */
103        slurm_init_job_desc_msg( &job_desc );
104       
105        TRY
106         {
107        unsigned i;
108                if( start != end )
109                 {
110                        n_jobs = (end - start) / incr + 1;
111
112                        fsd_calloc( job_ids, n_jobs+1, char* );
113                        job_desc.array_inx = fsd_asprintf( "%d-%d:%d", start, end, incr );
114         }
115
116                connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
117                slurmdrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , &job_desc );
118                if(slurm_submit_batch_job(&job_desc,&submit_response)){
119                        fsd_exc_raise_fmt(
120                                FSD_ERRNO_INTERNAL_ERROR,"slurm_submit_batch_job: %s",slurm_strerror(slurm_get_errno()));
121                }
122
123                connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
124
125                fsd_log_debug(("job %u submitted", submit_response->job_id));
126
127                if( start != end )
128                {
129                        if ( SLURM_SUCCESS == slurm_load_job( &job_info, submit_response->job_id, 0) )
130                        {
131                                fsd_assert(  job_info->record_count == n_jobs );
132                                for(i=0; i < job_info->record_count; i++)
133                                {
134                                        job_ids[i] = fsd_asprintf( "%d", job_info->job_array[i].job_id);
135
136                                        job = slurmdrmaa_job_new( fsd_strdup(job_ids[i]) );
137                                        job->session = self;
138                                        job->submit_time = time(NULL);
139                                        self->jobs->add( self->jobs, job );
140                                        job->release( job );
141                                        job = NULL;
142                                }
143                        } else  {
144                                fsd_exc_raise_fmt( FSD_ERRNO_INTERNAL_ERROR,"slurm_load_job: %s",slurm_strerror(slurm_get_errno()));
145                        }
146                } else {
147                        fsd_calloc( job_ids, n_jobs+1, char* );
148
149                        job_ids[0] = fsd_asprintf( "%d", submit_response->job_id); /* .0*/
150
151                        job = slurmdrmaa_job_new( fsd_strdup(job_ids[0]) ); /* TODO: ??? */
152                        job->session = self;
153                        job->submit_time = time(NULL);
154                        self->jobs->add( self->jobs, job );
155                        job->release( job );
156                        job = NULL;
157                }
158         }
159         ELSE
160        {
161                if ( !connection_lock )
162                        connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
163
164                slurm_free_submit_response_response_msg ( submit_response );
165        }
166        FINALLY
167         {
168               
169                       
170                if( connection_lock )
171                        fsd_mutex_unlock( &self->drm_connection_mutex );
172
173                if( job )
174                        job->release( job );
175
176                if( fsd_exc_get() != NULL )
177                        fsd_free_vector( job_ids );
178                       
179                slurmdrmaa_free_job_desc(&job_desc);
180         }
181        END_TRY
182
183        return fsd_iter_new( job_ids, n_jobs );
184}
185
186
187fsd_job_t *
188slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
189{
190        fsd_job_t *job;
191        job = slurmdrmaa_job_new( fsd_strdup(job_id) );
192        job->session = self;
193        return job;
194}
Note: See TracBrowser for help on using the repository browser.