source: trunk/slurm_drmaa/session.c @ 35

Revision 35, 5.1 KB checked in by pkopta, 11 years ago (diff)

Michael Gutteridge <michael.gutteridge@…> patch for "mincpus" calculations.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
RevLine 
[5]1/* $Id$ */
[1]2/*
3 * PSNC DRMAA for SLURM
[13]4 * Copyright (C) 2011 Poznan Supercomputing and Networking Center
[1]5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <string.h>
21#include <unistd.h>
22
23#include <drmaa_utils/iter.h>
24#include <drmaa_utils/conf.h>
25#include <slurm_drmaa/job.h>
26#include <slurm_drmaa/session.h>
27#include <slurm_drmaa/util.h>
28
29#include <slurm/slurm.h>
30
31static char *slurmdrmaa_session_run_job(fsd_drmaa_session_t *self, const fsd_template_t *jt);
32
33static fsd_iter_t *slurmdrmaa_session_run_bulk( fsd_drmaa_session_t *self,const fsd_template_t *jt, int start, int end, int incr );
34
35static fsd_job_t *slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id );
36
37fsd_drmaa_session_t *
38slurmdrmaa_session_new( const char *contact )
39{
40        slurmdrmaa_session_t *volatile self = NULL;
41        TRY
42         {
43                self = (slurmdrmaa_session_t*)fsd_drmaa_session_new(contact);
44
45                fsd_realloc( self, 1, slurmdrmaa_session_t );
46
47                self->super.run_job = slurmdrmaa_session_run_job;
48                self->super.run_bulk = slurmdrmaa_session_run_bulk;
49                self->super.new_job = slurmdrmaa_session_new_job;
50
51                self->super.load_configuration( &self->super, "slurm_drmaa" );
52         }
53        EXCEPT_DEFAULT
54         {
55                fsd_free( self );
56                fsd_exc_reraise();
57         }
58        END_TRY
59        return (fsd_drmaa_session_t*)self;
60}
61
62
63char *
64slurmdrmaa_session_run_job(
65                fsd_drmaa_session_t *self,
66                const fsd_template_t *jt
67                )
68{
69        char *job_id = NULL;
70        fsd_iter_t *volatile job_ids = NULL;
71
72        TRY
73         {
74                job_ids = self->run_bulk( self, jt, 0, 0, 0 ); /* single job run as bulk job specialization */
75                job_id = fsd_strdup( job_ids->next( job_ids ) );
76         }
77        FINALLY
78         {
79                if( job_ids )
80                        job_ids->destroy( job_ids );
81         }
82        END_TRY
83        return job_id;
84}
85
86
87fsd_iter_t *
88slurmdrmaa_session_run_bulk(
89                fsd_drmaa_session_t *self,
90                const fsd_template_t *jt,
91                int start, int end, int incr )
92{
93        fsd_job_t *volatile job = NULL;
94        char **volatile job_ids = NULL;
[35]95        unsigned n_jobs = 1;
[1]96        volatile bool connection_lock = false;
97        fsd_environ_t *volatile env = NULL;
98        job_desc_msg_t job_desc;
99        submit_response_msg_t *submit_response = NULL;
100       
101        TRY
102         {
103
104                if( start != end )
105                 {
106                        unsigned idx, i;
107
[35]108                        n_jobs = (end - start) / incr + 1;
109
[1]110                        fsd_calloc( job_ids, n_jobs+1, char* );
111
112                        for( idx = start, i = 0;  idx <= (unsigned)end;  idx += incr, i++ )
113                         {
114                                connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
115                                slurmdrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , &job_desc, idx);
116                                if(slurm_submit_batch_job(&job_desc,&submit_response)){
117                                        fsd_exc_raise_fmt(
118                                                FSD_ERRNO_INTERNAL_ERROR,"slurm_submit_batch_job: %s",slurm_strerror(slurm_get_errno()));
119                                }
120
121                                fsd_log_debug(("job %u submitted", submit_response->job_id));         
122                                connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
123
124                                job_ids[i] = fsd_asprintf("%d",submit_response->job_id); /*TODO  */
125
126                                job = slurmdrmaa_job_new( fsd_strdup(job_ids[i]) );
127                                job->session = self;
128                                job->submit_time = time(NULL);
129                                self->jobs->add( self->jobs, job );
130                                job->release( job ); 
131                                job = NULL;
132                         }
133                        fsd_assert( i == n_jobs );
134                 }
135                else /* ! bulk */
136                 {
137                        fsd_calloc( job_ids, n_jobs+1, char* );
[35]138
[1]139                        connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
140                        slurmdrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , &job_desc, 0);
141                        if(slurm_submit_batch_job(&job_desc,&submit_response)){
142                                fsd_exc_raise_fmt(
143                                        FSD_ERRNO_INTERNAL_ERROR,"slurm_submit_batch_job: %s",slurm_strerror(slurm_get_errno()));
144                        }
145
[35]146                        connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
147
[1]148                        fsd_log_debug(("job %u submitted", submit_response->job_id));         
149                       
150                        job_ids[0] = fsd_asprintf( "%d", submit_response->job_id); /* .0*/
151
152                        job = slurmdrmaa_job_new( fsd_strdup(job_ids[0]) ); /* TODO: ??? */
153                        job->session = self;
154                        job->submit_time = time(NULL);
155                        self->jobs->add( self->jobs, job );
156                        job->release( job );
157                        job = NULL;
158                 }
159         }
160         ELSE
161        {
162                if ( !connection_lock )
163                        connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
164
165                slurm_free_submit_response_response_msg ( submit_response );
166        }
167        FINALLY
168         {
169               
170                       
171                if( connection_lock )
172                        fsd_mutex_unlock( &self->drm_connection_mutex );
173
174                if( job )
175                        job->release( job );
176
177                if( fsd_exc_get() != NULL )
178                        fsd_free_vector( job_ids );
179                       
180                slurmdrmaa_free_job_desc(&job_desc);
181         }
182        END_TRY
183
184        return fsd_iter_new( job_ids, n_jobs );
185}
186
187
188fsd_job_t *
189slurmdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
190{
191        fsd_job_t *job;
192        job = slurmdrmaa_job_new( fsd_strdup(job_id) );
193        job->session = self;
194        return job;
195}
196
Note: See TracBrowser for help on using the repository browser.