source: trunk/ll_drmaa/session.c @ 26

Revision 26, 12.7 KB checked in by mmamonski, 13 years ago (diff)

SupMUC on site fixes: 1. Polling mode 2. Handling missing jobs 3. monitor -> drmaa_monitor 4. force stderr file creation

  • Property svn:keywords set to Id Revision
RevLine 
[20]1/* $Id$ */
[1]2/*
3 * PSNC DRMAA for LL
4 * Copyright (C) 2010 Poznan Supercomputing and Networking Center
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *    http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include <string.h>
20#include <unistd.h>
21#include <stdlib.h>
22#include <stdio.h>
23#include <errno.h>
24#include <sys/types.h>
25#include <sys/stat.h>
26#include <sys/socket.h>
27#include <sys/un.h>
28#include <signal.h>
29#include <assert.h>
30
31#include <drmaa_utils/drmaa.h>
32#include <drmaa_utils/iter.h>
33#include <drmaa_utils/conf.h>
34
35#include <ll_drmaa/job.h>
36#include <ll_drmaa/session.h>
37#include <ll_drmaa/util.h>
38
39#include <llapi.h>
40
41static char *lldrmaa_session_run_job(fsd_drmaa_session_t *self, const fsd_template_t *jt);
42
43static fsd_iter_t *lldrmaa_session_run_bulk(fsd_drmaa_session_t *self, const fsd_template_t *jt, int start, int end, int incr );
44
45static fsd_job_t *lldrmaa_session_new_job( fsd_drmaa_session_t *self,  const char *job_id );
46
47static void lldrmaa_session_apply_configuration( fsd_drmaa_session_t *self );
48
49static void lldrmaa_session_destroy_nowait(fsd_drmaa_session_t *self);
50
51static void *lldrmaa_session_wait_thread( fsd_drmaa_session_t *self );
52
53static void lldrmaa_create_unix_domain_socket(lldrmaa_session_t *self);
54
55fsd_drmaa_session_t *
56lldrmaa_session_new( const char *contact )
57{
58        lldrmaa_session_t *volatile self = NULL;
59
60        TRY
61         {
62                self = (lldrmaa_session_t*)fsd_drmaa_session_new(contact);
63
64                fsd_realloc( self, 1, lldrmaa_session_t );
65                self->super.run_job = lldrmaa_session_run_job;
66                self->super.run_bulk = lldrmaa_session_run_bulk;
67                self->super.new_job = lldrmaa_session_new_job;
68                self->super.wait_thread = lldrmaa_session_wait_thread;
69
70                self->super_destroy_nowait = self->super.destroy_nowait;
71                self->super.destroy_nowait = lldrmaa_session_destroy_nowait;
72
73                self->super_apply_configuration = self->super.apply_configuration;
74                self->super.apply_configuration = lldrmaa_session_apply_configuration;
75
76                self->terminate_job_on_vacated = true;
77                self->unix_socket_name = NULL;
78                self->socket_fd = -1;
79
80                lldrmaa_create_unix_domain_socket(self);
81
82                /* This must be called after creating socket as wait thread tries to accept new connections on it*/
83                self->super.load_configuration( &self->super, "ll_drmaa" );
84         }
85        EXCEPT_DEFAULT
86         {
87                fsd_free( self );
88                fsd_exc_reraise();
89         }
90        END_TRY
91        return (fsd_drmaa_session_t*)self;
92}
93
94
95char *
96lldrmaa_session_run_job(
97                fsd_drmaa_session_t *self,
98                const fsd_template_t *jt
99                )
100{
101        char *job_id = NULL;
102        fsd_iter_t *volatile job_ids = NULL;
103        TRY
104         {
105                job_ids = self->run_bulk( self, jt, 0, 0, 0 ); /* single job run as bulk job specialization */
106                job_id = fsd_strdup( job_ids->next( job_ids ) );
107         }
108        FINALLY
109         {
110                if( job_ids )
111                        job_ids->destroy( job_ids );
112         }
113        END_TRY
114        return job_id;
115}
116
117fsd_iter_t *
118lldrmaa_session_run_bulk(
119                fsd_drmaa_session_t *self,
120                const fsd_template_t *jt,
121                int start, int end, int incr )
122{
123        fsd_job_t *volatile job = NULL;
124        char **volatile job_ids = NULL;
125        unsigned n_jobs = 0;
126        volatile bool connection_lock = false;
127        lldrmaa_session_t *llself = (lldrmaa_session_t*) self;
128        fsd_environ_t *volatile env = NULL;
129
130        char *volatile  cmd_path = NULL;
131        int status;
132        LL_job job_info;
133
[26]134        char *monitor_program = LL_DRMAA_BIN_DIR"/lldrmaa_monitor";
[1]135
136        TRY
137        {
138                if( start != end )
139                        n_jobs = (end - start) / incr + 1;
140                else
141                        n_jobs = 1;
142
143                fsd_log_debug(("Create cmd file"));
144                cmd_path = (char *volatile) lldrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , n_jobs, start, incr);
145
146                fsd_log_debug(("llsubmit: %s",cmd_path));
147
148                connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
[26]149                if (self->wait_thread_run_flag)
150                  {
151                        fsd_log_info(("llsubmit(%s, %s, %s, %p, %d)",cmd_path, monitor_program, llself->unix_socket_name, (void*)&job_info, LL_JOB_VERSION));
152                        status = llsubmit(cmd_path, monitor_program, llself->unix_socket_name, &job_info, LL_JOB_VERSION);
153                  }
154                else
155                  {
156                        fsd_log_info(("llsubmit(%s, NULL, NULL, %p, %d)",cmd_path, (void*)&job_info, LL_JOB_VERSION));
157                        status = llsubmit(cmd_path, NULL, NULL, &job_info, LL_JOB_VERSION);
158                  }
[1]159                connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
160
[26]161                if(getenv("LLDRMAA_KEEP_CMD") == NULL && remove(cmd_path) == -1)
[1]162                        fsd_log_warning(("Can't delete cmd file: %s", cmd_path));
163
164                fsd_free(cmd_path); /* check if this affects llusbmit */
165
166                if(status) /* -1 Error, error messages written to stderr.*/
167                 {
168                        fsd_log_error(("llsubmit: returned non-zero"));
169                        fsd_exc_raise_fmt(lldrmaa_map_submit(status), "llsubmit returned non-zero. %s",lldrmaa_err_submit(status));
170                 }
171                else /* 0 */
[26]172                 {
[1]173                        fsd_log_debug(("llsubmit: %s",lldrmaa_err_submit(status)));
[26]174                 }
[1]175
176                if( start != end )
177                 {
178                        unsigned idx, i;
179                        fsd_calloc( job_ids, n_jobs+1, char* );
180                        for( idx = start, i = 0;  idx <= (unsigned)end;  idx += incr, i++ )
181                         {
182                                job_ids[i] = fsd_asprintf("%s.%d.%d", job_info.step_list[i]->id.from_host, job_info.step_list[i]->id.cluster, job_info.step_list[i]->id.proc);
[26]183                                fsd_log_info((" new array job id: %s", job_ids[i]));
[1]184                                job = lldrmaa_job_new( fsd_strdup(job_ids[i]) );
185                                job->session = self;
186                                job->submit_time = time(NULL);
187                                self->jobs->add( self->jobs, job );
188                                job->release( job );
189                                job = NULL;
190                         }
191                        fsd_assert( i == n_jobs );
192                 }
193                else /* ! bulk */
194                 {
195                        fsd_calloc( job_ids, n_jobs+1, char* );
196                        job_ids[0] = fsd_asprintf( "%s.%d.0", job_info.step_list[0]->id.from_host, job_info.step_list[0]->id.cluster);
197
[26]198                        fsd_log_info((" new job id: %s", job_ids[0]));
[1]199                        job = lldrmaa_job_new( fsd_strdup(job_ids[0]) );
200                        job->session = self;
201                        job->submit_time = time(NULL);
202                        self->jobs->add( self->jobs, job );
203                        job->release( job );
204                        job = NULL;
205                 }
206        }
207        ELSE
208        {
209                connection_lock = fsd_mutex_lock( &self->drm_connection_mutex );
210                llfree_job_info(&job_info,LL_JOB_VERSION);
211                connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex );
212                fsd_log_debug(("llfree_job_info run"));
213        }
214        FINALLY
215        {
216                if( connection_lock )
217                        fsd_mutex_unlock( &self->drm_connection_mutex );
218                if( job )
219                        job->release( job );
220                if( fsd_exc_get() != NULL )
221                        fsd_free_vector( job_ids );
222        }
223        END_TRY
224
225        return fsd_iter_new( job_ids, n_jobs );
226}
227
228
229fsd_job_t *
230lldrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id )
231{
232        fsd_job_t *job;
233        job = lldrmaa_job_new( fsd_strdup(job_id) );
234        job->session = self;
235        return job;
236}
237
238void
239lldrmaa_session_apply_configuration( fsd_drmaa_session_t *self )
240{
241        lldrmaa_session_t *llself = (lldrmaa_session_t*)self;
242        fsd_conf_option_t *terminate_job_on_vacated;
243
244        llself->super_apply_configuration(self); /* call method from the superclass */
245
246        terminate_job_on_vacated = fsd_conf_dict_get(self->configuration, "terminate_job_on_vacated" );
247        if( terminate_job_on_vacated )
248         {
249                if( terminate_job_on_vacated->type == FSD_CONF_INTEGER && (terminate_job_on_vacated->val.integer == 0 || terminate_job_on_vacated->val.integer == 1))
250                 {
251                        llself->terminate_job_on_vacated = (terminate_job_on_vacated->val.integer != 0 );
252                        fsd_log_debug(("terminate_job_on_vacated: %d",llself->terminate_job_on_vacated));
253                 }
254                else
255                        fsd_exc_raise_msg(
256                                        FSD_ERRNO_INTERNAL_ERROR,
257                                        "configuration: 'terminate_job_on_vacated' should be 0 or 1"
258                                        );
259         }
260
261}
262
263void
264lldrmaa_session_destroy_nowait(fsd_drmaa_session_t *self) {
265        lldrmaa_session_t *llself = (lldrmaa_session_t*) self;
266
267        fsd_log_enter(( "" ));
268
269        /*close socket */
270        if (close(llself->socket_fd) != 0) {
271                fsd_log_error(("Failed to close UNIX socket"));
272        } else {
273                llself->socket_fd = -1;
274        }
275
276        /*delete socket file*/
277        if (unlink(llself->unix_socket_name) != 0) {
278                fsd_log_error(("Failed to delete UNIX socket: %s", llself->unix_socket_name));
279        }
280
281        fsd_free(llself->unix_socket_name);
282
283        /* call the destroy_nowait method from the super class */
284        llself->super_destroy_nowait(self);
285        fsd_log_return(( "" ));
286}
287
288void *
289lldrmaa_session_wait_thread( fsd_drmaa_session_t *self )
290{
291        lldrmaa_session_t *llself = (lldrmaa_session_t*) self;
292        fsd_job_t *volatile job = NULL;
293        lldrmaa_job_t *lljob = NULL;
294        char state[256] = "";
295        char job_id[256] = "";
296        int status = -1;
297        char buffer[256] = "";
298        struct sockaddr_un saddress;
299        size_t address_length = 0;
300        int connection_fd = -1;
301
302        fsd_log_enter(( "" ));
303
304        fsd_mutex_lock( &self->mutex );
305        TRY
306         {
307
308                while( self->wait_thread_run_flag )
309                TRY
310                 {
311                        int bytes_read = -1;
312                        int rc = -1;
313                        fd_set fds;
314                        struct timeval tv;
315
316                        /*beginning of blocking part - release mutex */
317                        fsd_mutex_unlock( &self->mutex );
318
319                        fsd_log_debug(("WT - next iteration"));
320
321                        FD_ZERO(&fds);
322                        FD_SET(llself->socket_fd,&fds);
323                        tv.tv_sec = 1;
324                        tv.tv_usec = 0;
325
326                        rc = select(llself->socket_fd+1, &fds, NULL, NULL, &tv);
327
328                        if (rc < 0) {
329                                fsd_log_error(("WT - select() failed (errno=%d)", errno));
330                                fsd_mutex_lock( &self->mutex );
331                                continue;
332                        } else if (rc > 0 && FD_ISSET(llself->socket_fd,&fds)) {
333                                fsd_log_debug(("WT - server socket ready for accept... "));
334                        } else /*rc == 0 (timeout) || FD_ISSET == false */ {
335                                fsd_log_debug(("WT - select() timeout"));
336                                fsd_mutex_lock( &self->mutex );
337                                continue;
338                        }
339
340                        if ((connection_fd = accept(llself->socket_fd, (struct sockaddr *) &saddress, &address_length)) == -1) {
341                                fsd_log_error(("WT - failed to accept new connection (errno = %d)", errno));
342                                fsd_mutex_lock( &self->mutex );
343                                continue;
344                        }
345
346                        fsd_log_debug(("WT - connection accepted: fd = %d ", connection_fd));
347
348                        if ((bytes_read = read(connection_fd, buffer, 255)) < 0) {
349                                fsd_log_error(("WT - failed to read from socket (errno = %d)", errno));
350                                close(connection_fd);
351                                fsd_mutex_lock( &self->mutex );
352                                continue;
353                        } else {
354                                buffer[bytes_read] = '\0'; /* make sure that message is NULL terminated */
355                        }
356
357                        close(connection_fd);
358
359                        /*end of blocking part - acquire mutex */
360                        fsd_mutex_lock( &self->mutex );
361
362                        if (sscanf(buffer,"%s %s %d",state,job_id,&status) != 3) {
363                                fsd_log_error(("WT - failed to decode message: %s", buffer));
364                                continue;
365                        }
366
367
368                        fsd_log_info(("WT - monitor program: %s %s %d", state, job_id, status));
369                        /*update job status */
370                        TRY
371                         {
372                                job = self->get_job( self, job_id );
373                                lljob = (lldrmaa_job_t*) job;
374
375                                if( job )
376                                {
377                                        if( strcmp(state,"JOB_STARTED")==0 && job->state >= DRMAA_PS_DONE)
378                                        {
379                                                fsd_log_debug(("WT - received JOB_STARTED when job has already finished"));
380                                        }
381                                        else if ( strcmp(state, "JOB_COMPLETED")==0)
382                                        {
383                                                lljob->read_job_info_mon(job, state, status);
384                                        }
385                                        else
386                                        {
387                                                lljob->read_job_info_mon(job, state, -1);
388                                        }
389                                }
390                                else
391                                {
392                                        fsd_log_error(("WT - Unknown job: %s", job_id));
393                                }
394                         }
395                        FINALLY
396                         {
397                                if( job )
398                                        job->release( job );
399                         }
400                        END_TRY
401
402                        fsd_cond_broadcast( &self->wait_condition );
403                 }
404                EXCEPT_DEFAULT
405                 {
406                        const fsd_exc_t *e = fsd_exc_get();
407
408                        fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));
409                        fsd_exc_reraise();
410                 }
411                END_TRY
412         }
413        FINALLY
414         {
415                fsd_log_debug(("WT - Terminated."));
416                fsd_mutex_unlock( &self->mutex );
417         }
418        END_TRY
419
420        fsd_log_return(( " =NULL" ));
421        return NULL;
422}
423
424void
425lldrmaa_create_unix_domain_socket(lldrmaa_session_t *self)
426{
427        char socket_name_template[] = "/tmp/drmaa_socket_XXXXXX";
428        struct sockaddr_un address;
429        size_t address_length;
430        int rc;
431
432        if (mktemp(socket_name_template) == NULL)
433                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Can't generate socket tmp name: %s",socket_name_template);
434
435        fsd_log_info(("UNIX DOMAIN SOCKET name: %s",socket_name_template));
436
437        if ((self->socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
438                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Unable to create UNIX socket");
439
440        address.sun_family = AF_UNIX;
441        address_length = sizeof(address.sun_family) + sprintf(address.sun_path, socket_name_template) + 1;
442
443        if(bind(self->socket_fd, (struct sockaddr *) &address, address_length) != 0)
444                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Bind failed (socket name: %s)", socket_name_template);
445
446        if((rc = chmod(socket_name_template, S_IRUSR|S_IWUSR)) == -1)
447         {
448                char errbuf[256] = "InternalError";
449                (void)strerror_r(errno, errbuf, 256); /*on error the default message would be returned */
450                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Chmod() returned -1 errmsg: %s", errbuf);
451         }
452        else if (rc != 0)
453         {
454                fsd_assert(false);
455         }
456        else
457         {
458                fsd_log_debug(("Socket permissions changed with chmod()"));
459         }
460
461        if(listen(self->socket_fd, 128) != 0)
462                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "listen() failed");
463
464        self->unix_socket_name = fsd_strdup(socket_name_template);
465}
Note: See TracBrowser for help on using the repository browser.