source: trunk/ll_drmaa/session.c @ 20

Revision 20, 12.3 KB checked in by mmatloka, 14 years ago (diff)

svn:keywords

  • Property svn:keywords set to Id Revision
RevLine 
[20]1/* $Id$ */
[1]2/*
3 * PSNC DRMAA for LL
4 * Copyright (C) 2010 Poznan Supercomputing and Networking Center
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *    http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include <string.h>
20#include <unistd.h>
21#include <stdlib.h>
22#include <stdio.h>
23#include <errno.h>
24#include <sys/types.h>
25#include <sys/stat.h>
26#include <sys/socket.h>
27#include <sys/un.h>
28#include <signal.h>
29#include <assert.h>
30
31#include <drmaa_utils/drmaa.h>
32#include <drmaa_utils/iter.h>
33#include <drmaa_utils/conf.h>
34
35#include <ll_drmaa/job.h>
36#include <ll_drmaa/session.h>
37#include <ll_drmaa/util.h>
38
39#include <llapi.h>
40
41static char *lldrmaa_session_run_job(fsd_drmaa_session_t *self, const fsd_template_t *jt);
42
43static fsd_iter_t *lldrmaa_session_run_bulk(fsd_drmaa_session_t *self, const fsd_template_t *jt, int start, int end, int incr );
44
45static fsd_job_t *lldrmaa_session_new_job( fsd_drmaa_session_t *self,  const char *job_id );
46
47static void lldrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
48
49static void lldrmaa_session_destroy_nowait(fsd_drmaa_session_t *self);
50
51static void *lldrmaa_session_wait_thread( fsd_drmaa_session_t *self );
52
53static void lldrmaa_create_unix_domain_socket(lldrmaa_session_t *self);
54
55fsd_drmaa_session_t *
56lldrmaa_session_new( const char *contact )
57{
58        lldrmaa_session_t *volatile self = NULL;
59
60        TRY
61         {
62                self = (lldrmaa_session_t*)fsd_drmaa_session_new(contact);
63
64                fsd_realloc( self, 1, lldrmaa_session_t );
65                self->super.run_job = lldrmaa_session_run_job;
66                self->super.run_bulk = lldrmaa_session_run_bulk;
67                self->super.new_job = lldrmaa_session_new_job;
68                self->super.wait_thread = lldrmaa_session_wait_thread;
69
70                self->super_destroy_nowait = self->super.destroy_nowait;
71                self->super.destroy_nowait = lldrmaa_session_destroy_nowait;
72
73                self->super_apply_configuration = self->super.apply_configuration;
74                self->super.apply_configuration = lldrmaa_session_apply_configuration;
75
76                self->terminate_job_on_vacated = true;
77                self->unix_socket_name = NULL;
78                self->socket_fd = -1;
79
80                lldrmaa_create_unix_domain_socket(self);
81
82                /* This must be called after creating socket as wait thread tries to accept new connections on it*/
83                self->super.load_configuration( &self->super, "ll_drmaa" );
84         }
85        EXCEPT_DEFAULT
86         {
87                fsd_free( self );
88                fsd_exc_reraise();
89         }
90        END_TRY
91        return (fsd_drmaa_session_t*)self;
92}
93
94
95char *
96lldrmaa_session_run_job(
97                fsd_drmaa_session_t *self,
98                const fsd_template_t *jt
99                )
100{
101        char *job_id = NULL;
102        fsd_iter_t *volatile job_ids = NULL;
103        TRY
104         {
105                job_ids = self->run_bulk( self, jt, 0, 0, 0 ); /* single job run as bulk job specialization */
106                job_id = fsd_strdup( job_ids->next( job_ids ) );
107         }
108        FINALLY
109         {
110                if( job_ids )
111                        job_ids->destroy( job_ids );
112         }
113        END_TRY
114        return job_id;
115}
116
117fsd_iter_t *
118lldrmaa_session_run_bulk(
119                fsd_drmaa_session_t *self,
120                const fsd_template_t *jt,
121                int start, int end, int incr )
122{
123        fsd_job_t *volatile job = NULL;
124        char **volatile job_ids = NULL;
125        unsigned n_jobs = 0;
126        volatile bool connection_lock = false;
127        lldrmaa_session_t *llself = (lldrmaa_session_t*) self;
128        fsd_environ_t *volatile env = NULL;
129
130        char *volatile  cmd_path = NULL;
131        int status;
132        LL_job job_info;
133
134        char *monitor_program = LL_DRMAA_BIN_DIR"/monitor";
135
136        TRY
137        {
138                if( start != end )
139                        n_jobs = (end - start) / incr + 1;
140                else
141                        n_jobs = 1;
142
143                fsd_log_debug(("Create cmd file"));
144                cmd_path = (char *volatile) lldrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , n_jobs, start, incr);
145
146                fsd_log_debug(("llsubmit: %s",cmd_path));
147
148                connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
149                status = llsubmit(cmd_path, monitor_program, llself->unix_socket_name, &job_info, LL_JOB_VERSION);
150                connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
151
152                if(remove(cmd_path) == -1)
153                        fsd_log_warning(("Can't delete cmd file: %s", cmd_path));
154
155                fsd_free(cmd_path); /* check if this affects llusbmit */
156
157                if(status) /* -1 Error, error messages written to stderr.*/
158                 {
159                        fsd_log_error(("llsubmit: returned non-zero"));
160                        fsd_exc_raise_fmt(lldrmaa_map_submit(status), "llsubmit returned non-zero. %s",lldrmaa_err_submit(status));
161                 }
162                else /* 0 */
163                        fsd_log_debug(("llsubmit: %s",lldrmaa_err_submit(status)));
164
165                if( start != end )
166                 {
167                        unsigned idx, i;
168                        fsd_calloc( job_ids, n_jobs+1, char* );
169                        for( idx = start, i = 0;  idx <= (unsigned)end;  idx += incr, i++ )
170                         {
171                                job_ids[i] = fsd_asprintf("%s.%d.%d", job_info.step_list[i]->id.from_host, job_info.step_list[i]->id.cluster, job_info.step_list[i]->id.proc);
172
173                                job = lldrmaa_job_new( fsd_strdup(job_ids[i]) );
174                                job->session = self;
175                                job->submit_time = time(NULL);
176                                self->jobs->add( self->jobs, job );
177                                job->release( job );
178                                job = NULL;
179                         }
180                        fsd_assert( i == n_jobs );
181                 }
182                else /* ! bulk */
183                 {
184                        fsd_calloc( job_ids, n_jobs+1, char* );
185                        job_ids[0] = fsd_asprintf( "%s.%d.0", job_info.step_list[0]->id.from_host, job_info.step_list[0]->id.cluster);
186
187                        job = lldrmaa_job_new( fsd_strdup(job_ids[0]) );
188                        job->session = self;
189                        job->submit_time = time(NULL);
190                        self->jobs->add( self->jobs, job );
191                        job->release( job );
192                        job = NULL;
193                 }
194        }
195        ELSE
196        {
197                connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
198                llfree_job_info(&job_info,LL_JOB_VERSION);
199                connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
200                fsd_log_debug(("llfree_job_info run"));
201        }
202        FINALLY
203        {
204                if( connection_lock )
205                        fsd_mutex_unlock( &self->drm_connection_mutex );
206                if( job )
207                        job->release( job );
208                if( fsd_exc_get() != NULL )
209                        fsd_free_vector( job_ids );
210        }
211        END_TRY
212
213        return fsd_iter_new( job_ids, n_jobs );
214}
215
216
217fsd_job_t *
218lldrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
219{
220        fsd_job_t *job;
221        job = lldrmaa_job_new( fsd_strdup(job_id) );
222        job->session = self;
223        return job;
224}
225
226void
227lldrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
228{
229        lldrmaa_session_t *llself = (lldrmaa_session_t*)self;
230        fsd_conf_option_t *terminate_job_on_vacated;
231
232        llself->super_apply_configuration(self); /* call method from the superclass */
233
234        terminate_job_on_vacated = fsd_conf_dict_get(self->configuration, "terminate_job_on_vacated" );
235        if( terminate_job_on_vacated )
236         {
237                if( terminate_job_on_vacated->type == FSD_CONF_INTEGER && (terminate_job_on_vacated->val.integer == 0 || terminate_job_on_vacated->val.integer == 1))
238                 {
239                        llself->terminate_job_on_vacated = (terminate_job_on_vacated->val.integer != 0 );
240                        fsd_log_debug(("terminate_job_on_vacated: %d",llself->terminate_job_on_vacated));
241                 }
242                else
243                        fsd_exc_raise_msg(
244                                        FSD_ERRNO_INTERNAL_ERROR,
245                                        "configuration: 'terminate_job_on_vacated' should be 0 or 1"
246                                        );
247         }
248
249        if ( !self->wait_thread_started )
250                fsd_exc_raise_msg(FSD_ERRNO_INTERNAL_ERROR, "DRMAA for LL requires that wait thread is enable. Don't disable it in configuration file!" );
251}
252
253void
254lldrmaa_session_destroy_nowait(fsd_drmaa_session_t *self) {
255        lldrmaa_session_t *llself = (lldrmaa_session_t*) self;
256
257        fsd_log_enter(( "" ));
258
259        /*close socket */
260        if (close(llself->socket_fd) != 0) {
261                fsd_log_error(("Failed to close UNIX socket"));
262        } else {
263                llself->socket_fd = -1;
264        }
265
266        /*delete socket file*/
267        if (unlink(llself->unix_socket_name) != 0) {
268                fsd_log_error(("Failed to delete UNIX socket: %s", llself->unix_socket_name));
269        }
270
271        fsd_free(llself->unix_socket_name);
272
273        /* call the destroy_nowait method from the super class */
274        llself->super_destroy_nowait(self);
275        fsd_log_return(( "" ));
276}
277
278void *
279lldrmaa_session_wait_thread( fsd_drmaa_session_t *self )
280{
281        lldrmaa_session_t *llself = (lldrmaa_session_t*) self;
282        fsd_job_t *volatile job = NULL;
283        lldrmaa_job_t *lljob = NULL;
284        char state[256] = "";
285        char job_id[256] = "";
286        int status = -1;
287        char buffer[256] = "";
288        struct sockaddr_un saddress;
289        size_t address_length = 0;
290        int connection_fd = -1;
291
292        fsd_log_enter(( "" ));
293
294        fsd_mutex_lock( &self->mutex );
295        TRY
296         {
297
298                while( self->wait_thread_run_flag )
299                TRY
300                 {
301                        int bytes_read = -1;
302                        int rc = -1;
303                        fd_set fds;
304                        struct timeval tv;
305
306                        /*beginning of blocking part - release mutex */
307                        fsd_mutex_unlock( &self->mutex );
308
309                        fsd_log_debug(("WT - next iteration"));
310
311                        FD_ZERO(&fds);
312                        FD_SET(llself->socket_fd,&fds);
313                        tv.tv_sec = 1;
314                        tv.tv_usec = 0;
315
316                        rc = select(llself->socket_fd+1, &fds, NULL, NULL, &tv);
317
318                        if (rc < 0) {
319                                fsd_log_error(("WT - select() failed (errno=%d)", errno));
320                                fsd_mutex_lock( &self->mutex );
321                                continue;
322                        } else if (rc > 0 && FD_ISSET(llself->socket_fd,&fds)) {
323                                fsd_log_debug(("WT - server socket ready for accept... "));
324                        } else /*rc == 0 (timeout) || FD_ISSET == false */ {
325                                fsd_log_debug(("WT - select() timeout"));
326                                fsd_mutex_lock( &self->mutex );
327                                continue;
328                        }
329
330                        if ((connection_fd = accept(llself->socket_fd, (struct sockaddr *) &saddress, &address_length)) == -1) {
331                                fsd_log_error(("WT - failed to accept new connection (errno = %d)", errno));
332                                fsd_mutex_lock( &self->mutex );
333                                continue;
334                        }
335
336                        fsd_log_debug(("WT - connection accepted: fd = %d ", connection_fd));
337
338                        if ((bytes_read = read(connection_fd, buffer, 255)) < 0) {
339                                fsd_log_error(("WT - failed to read from socket (errno = %d)", errno));
340                                close(connection_fd);
341                                fsd_mutex_lock( &self->mutex );
342                                continue;
343                        } else {
344                                buffer[bytes_read] = '\0'; /* make sure that message is NULL terminated */
345                        }
346
347                        close(connection_fd);
348
349                        /*end of blocking part - acquire mutex */
350                        fsd_mutex_lock( &self->mutex );
351
352                        if (sscanf(buffer,"%s %s %d",state,job_id,&status) != 3) {
353                                fsd_log_error(("WT - failed to decode message: %s", buffer));
354                                continue;
355                        }
356
357
358                        fsd_log_info(("WT - monitor program: %s %s %d", state, job_id, status));
359                        /*update job status */
360                        TRY
361                         {
362                                job = self->get_job( self, job_id );
363                                lljob = (lldrmaa_job_t*) job;
364
365                                if( job )
366                                {
367                                        if( strcmp(state,"JOB_STARTED")==0 && job->state >= DRMAA_PS_DONE)
368                                        {
369                                                fsd_log_debug(("WT - received JOB_STARTED when job has already finished"));
370                                        }
371                                        else if ( strcmp(state, "JOB_COMPLETED")==0)
372                                        {
373                                                lljob->read_job_info_mon(job, state, status);
374                                        }
375                                        else
376                                        {
377                                                lljob->read_job_info_mon(job, state, -1);
378                                        }
379                                }
380                                else
381                                {
382                                        fsd_log_error(("WT - Unknown job: %s", job_id));
383                                }
384                         }
385                        FINALLY
386                         {
387                                if( job )
388                                        job->release( job );
389                         }
390                        END_TRY
391
392                        fsd_cond_broadcast( &self->wait_condition );
393                 }
394                EXCEPT_DEFAULT
395                 {
396                        const fsd_exc_t *e = fsd_exc_get();
397
398                        fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));
399                        fsd_exc_reraise();
400                 }
401                END_TRY
402         }
403        FINALLY
404         {
405                fsd_log_debug(("WT - Terminated."));
406                fsd_mutex_unlock( &self->mutex );
407         }
408        END_TRY
409
410        fsd_log_return(( " =NULL" ));
411        return NULL;
412}
413
414void
415lldrmaa_create_unix_domain_socket(lldrmaa_session_t *self)
416{
417        char socket_name_template[] = "/tmp/drmaa_socket_XXXXXX";
418        struct sockaddr_un address;
419        size_t address_length;
420        int rc;
421
422        if (mktemp(socket_name_template) == NULL)
423                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Can't generate socket tmp name: %s",socket_name_template);
424
425        fsd_log_info(("UNIX DOMAIN SOCKET name: %s",socket_name_template));
426
427        if ((self->socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
428                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Unable to create UNIX socket");
429
430        address.sun_family = AF_UNIX;
431        address_length = sizeof(address.sun_family) + sprintf(address.sun_path, socket_name_template) + 1;
432
433        if(bind(self->socket_fd, (struct sockaddr *) &address, address_length) != 0)
434                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Bind failed (socket name: %s)", socket_name_template);
435
436        if((rc = chmod(socket_name_template, S_IRUSR|S_IWUSR)) == -1)
437         {
438                char errbuf[256] = "InternalError";
439                (void)strerror_r(errno, errbuf, 256); /*on error the default message would be returned */
440                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Chmod() returned -1 errmsg: %s", errbuf);
441         }
442        else if (rc != 0)
443         {
444                fsd_assert(false);
445         }
446        else
447         {
448                fsd_log_debug(("Socket permissions changed with chmod()"));
449         }
450
451        if(listen(self->socket_fd, 128) != 0)
452                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "listen() failed");
453
454        self->unix_socket_name = fsd_strdup(socket_name_template);
455}
Note: See TracBrowser for help on using the repository browser.