source: trunk/slurm_drmaa/session.c @ 13

Revision 13, 5.1 KB checked in by mmatloka, 14 years ago (diff)

support for slurm 2.2

  • Property svn:executable set to *
  • Property svn:keywords set to Id
RevLine 
[5]1/* $Id$ */
[1]2/*
3 * PSNC DRMAA for SLURM
[13]4 * Copyright (C) 2011 Poznan Supercomputing and Networking Center
[1]5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <string.h>
21#include <unistd.h>
22
23#include <drmaa_utils/iter.h>
24#include <drmaa_utils/conf.h>
25#include <slurm_drmaa/job.h>
26#include <slurm_drmaa/session.h>
27#include <slurm_drmaa/util.h>
28
29#include <slurm/slurm.h>
30
31static char *slurmdrmaa_session_run_job(fsd_drmaa_session_t *self, const fsd_template_t *jt);
32
33static fsd_iter_t *slurmdrmaa_session_run_bulk( fsd_drmaa_session_t *self,const fsd_template_t *jt, int start, int end, int incr );
34
35static fsd_job_t *slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
36
37fsd_drmaa_session_t *
38slurmdrmaa_session_new( const char *contact )
39{
40        slurmdrmaa_session_t *volatile self = NULL;
41        TRY
42         {
43                self = (slurmdrmaa_session_t*)fsd_drmaa_session_new(contact);
44
45                fsd_realloc( self, 1, slurmdrmaa_session_t );
46
47                self->super.run_job = slurmdrmaa_session_run_job;
48                self->super.run_bulk = slurmdrmaa_session_run_bulk;
49                self->super.new_job = slurmdrmaa_session_new_job;
50
51                self->super.load_configuration( &self->super, "slurm_drmaa" );
52         }
53        EXCEPT_DEFAULT
54         {
55                fsd_free( self );
56                fsd_exc_reraise();
57         }
58        END_TRY
59        return (fsd_drmaa_session_t*)self;
60}
61
62
63char *
64slurmdrmaa_session_run_job(
65                fsd_drmaa_session_t *self,
66                const fsd_template_t *jt
67                )
68{
69        char *job_id = NULL;
70        fsd_iter_t *volatile job_ids = NULL;
71
72        TRY
73         {
74                job_ids = self->run_bulk( self, jt, 0, 0, 0 ); /* single job run as bulk job specialization */
75                job_id = fsd_strdup( job_ids->next( job_ids ) );
76         }
77        FINALLY
78         {
79                if( job_ids )
80                        job_ids->destroy( job_ids );
81         }
82        END_TRY
83        return job_id;
84}
85
86
87fsd_iter_t *
88slurmdrmaa_session_run_bulk(
89                fsd_drmaa_session_t *self,
90                const fsd_template_t *jt,
91                int start, int end, int incr )
92{
93        fsd_job_t *volatile job = NULL;
94        char **volatile job_ids = NULL;
95        unsigned n_jobs = 0;
96        volatile bool connection_lock = false;
97        fsd_environ_t *volatile env = NULL;
98        job_desc_msg_t job_desc;
99        submit_response_msg_t *submit_response = NULL;
100       
101        TRY
102         {
103
104                if( start != end )
105                        n_jobs = (end - start) / incr + 1;
106                else
107                        n_jobs = 1;     
108
109                if(  start != end )
110                 {
111                        unsigned idx, i;
112
113                        fsd_calloc( job_ids, n_jobs+1, char* );
114
115                        for( idx = start, i = 0;  idx <= (unsigned)end;  idx += incr, i++ )
116                         {
117                                connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
118                                slurmdrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , &job_desc, idx);
119                                if(slurm_submit_batch_job(&job_desc,&submit_response)){
120                                        fsd_exc_raise_fmt(
121                                                FSD_ERRNO_INTERNAL_ERROR,"slurm_submit_batch_job: %s",slurm_strerror(slurm_get_errno()));
122                                }
123
124                                fsd_log_debug(("job %u submitted", submit_response->job_id));         
125                                connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
126
127
128                                job_ids[i] = fsd_asprintf("%d",submit_response->job_id); /*TODO  */
129
130                                job = slurmdrmaa_job_new( fsd_strdup(job_ids[i]) );
131                                job->session = self;
132                                job->submit_time = time(NULL);
133                                self->jobs->add( self->jobs, job );
134                                job->release( job ); 
135                                job = NULL;
136                         }
137                        fsd_assert( i == n_jobs );
138                 }
139                else /* ! bulk */
140                 {
141                        fsd_calloc( job_ids, n_jobs+1, char* );
142                       
143                        connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
144                        slurmdrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , &job_desc, 0);
145                        if(slurm_submit_batch_job(&job_desc,&submit_response)){
146                                fsd_exc_raise_fmt(
147                                        FSD_ERRNO_INTERNAL_ERROR,"slurm_submit_batch_job: %s",slurm_strerror(slurm_get_errno()));
148                        }
149
150                        fsd_log_debug(("job %u submitted", submit_response->job_id));         
151                        connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
152                       
153                        job_ids[0] = fsd_asprintf( "%d", submit_response->job_id); /* .0*/
154
155                        job = slurmdrmaa_job_new( fsd_strdup(job_ids[0]) ); /* TODO: ??? */
156                        job->session = self;
157                        job->submit_time = time(NULL);
158                        self->jobs->add( self->jobs, job );
159                        job->release( job );
160                        job = NULL;
161                 }
162         }
163         ELSE
164        {
165                if ( !connection_lock )
166                        connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
167
168                slurm_free_submit_response_response_msg ( submit_response );
169        }
170        FINALLY
171         {
172               
173                       
174                if( connection_lock )
175                        fsd_mutex_unlock( &self->drm_connection_mutex );
176
177                if( job )
178                        job->release( job );
179
180                if( fsd_exc_get() != NULL )
181                        fsd_free_vector( job_ids );
182                       
183                slurmdrmaa_free_job_desc(&job_desc);
184         }
185        END_TRY
186
187        return fsd_iter_new( job_ids, n_jobs );
188}
189
190
191fsd_job_t *
192slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
193{
194        fsd_job_t *job;
195        job = slurmdrmaa_job_new( fsd_strdup(job_id) );
196        job->session = self;
197        return job;
198}
199
Note: See TracBrowser for help on using the repository browser.