source: trunk/ll_drmaa/job.c @ 26

Revision 26, 22.5 KB checked in by mmamonski, 13 years ago (diff)

SupMUC on site fixes: 1. Polling mode 2. Handling missing jobs 3. monitor -> drmaa_monitor 4. force stderr file creation

  • Property svn:keywords set to Id Revision
Line 
1/* $Id$ */
2/*
3 * PSNC DRMAA for LL
4 * Copyright (C) 2010 Poznan Supercomputing and Networking Center
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *    http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include <string.h>
20#include <stdlib.h>
21#include <unistd.h>
22#include <time.h>
23
24#include <drmaa_utils/common.h>
25#include <drmaa_utils/conf.h>
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/drmaa_util.h>
29#include <drmaa_utils/environ.h>
30#include <drmaa_utils/template.h>
31
32#include <ll_drmaa/job.h>
33#include <ll_drmaa/session.h>
34#include <ll_drmaa/util.h>
35
36#include <llapi.h>
37
38#define LL_DRMAA_MAX_MISSING_TIME (60)
39
40static void
41lldrmaa_job_control( fsd_job_t *self, int action )
42{
43        int rc = 0;
44        LL_element *errObj = NULL;
45        LL_terminate_job_info cancel_info;
46        char *rest = NULL;
47        char *token = NULL;
48        char *ptr = NULL;
49        char *volatile job_list[2];
50
51        job_list[0] = self->job_id;
52        job_list[1] = NULL;
53
54        cancel_info.StepId.from_host = NULL;
55        ptr = fsd_strdup(self->job_id);
56        fsd_log_enter(( "({job_id=%s}, action=%s)", self->job_id, drmaa_control_to_str(action) ));
57
58        fsd_mutex_lock( &self->session->drm_connection_mutex );
59        TRY
60        {
61                switch( action )
62                {
63                        case DRMAA_CONTROL_SUSPEND:
64                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"job::control: could not send %s to job %s. Function not implemented.",drmaa_control_to_str(action), self->job_id);
65                                break;
66                        case DRMAA_CONTROL_HOLD:
67                                rc = ll_control(LL_CONTROL_VERSION, LL_CONTROL_HOLD_USER, NULL, NULL, (char **)job_list, NULL,0);
68                                fsd_log_warning(("This command does not affect a job step that is running unless the job step attempts to enter the Idle state."));
69                                if(rc)
70                                        fsd_exc_raise_fmt(lldrmaa_map_control(rc),
71                                        "job::control: could not send %s to job %s. Error code: %d,%s",
72                                        drmaa_control_to_str(action), self->job_id, rc, lldrmaa_err_control(rc)
73                                        );
74                                break;
75                        case DRMAA_CONTROL_RESUME:
76                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"job::control: could not send %s to job %s. Function not implemented.",drmaa_control_to_str(action), self->job_id);
77                                break;
78                        case DRMAA_CONTROL_RELEASE:
79                                rc = ll_control(LL_CONTROL_VERSION, LL_CONTROL_HOLD_RELEASE, NULL,NULL, (char **)job_list,NULL,0);
80                                if(rc)
81                                        fsd_exc_raise_fmt(lldrmaa_map_control(rc),
82                                        "job::control: could not send %s to job %s. Error code: %d,%s",
83                                        drmaa_control_to_str(action), self->job_id, rc, lldrmaa_err_control(rc)
84                                        );
85                                break;
86                        case DRMAA_CONTROL_TERMINATE:
87                                cancel_info.version_num=LL_PROC_VERSION;
88                                cancel_info.msg=NULL;
89
90                                token = strtok_r(ptr, ".", &rest);
91                                if(token == NULL)
92                                {
93                                        fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_JOB,"Invalid job_id format");
94                                }
95                                cancel_info.StepId.from_host = fsd_strdup(token);
96
97                                token = strtok_r(NULL, ".", &rest);
98                                if(token == NULL)
99                                {
100                                        fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_JOB,"Invalid job_id format");
101                                }
102
103                                cancel_info.StepId.cluster = fsd_atoi(token);
104
105                                token = strtok_r(NULL, ".", &rest);
106                                if(token!=NULL)
107                                {
108                                        cancel_info.StepId.proc = fsd_atoi(token);
109                                        rc = ll_terminate_job(&cancel_info);
110                                }
111                                else /* No StepId specified */
112                                {
113                                        fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_JOB, "Stepid is NULL");
114                                }
115                                if(rc)
116                                        fsd_exc_raise_fmt(
117                                        lldrmaa_map_terminate_job(rc),
118                                        "job::control: could not send %s to job %s. Error code: %d,%s",
119                                        drmaa_control_to_str(action), self->job_id, rc, lldrmaa_err_terminate_job(rc)
120                                        );
121
122                                break;
123                        default:
124                                fsd_exc_raise_fmt(
125                                                FSD_ERRNO_INVALID_ARGUMENT,
126                                                "job::control: unknown action %d", action );
127                }
128
129                fsd_log_debug(("job::control: successful"));
130         }
131        FINALLY
132        {
133                fsd_free(ptr);
134                if(cancel_info.StepId.from_host!=NULL)
135                        fsd_free(cancel_info.StepId.from_host);
136                if(errObj != NULL)
137                {
138                        ll_free_objs(errObj);
139                        ll_deallocate(errObj);
140                }
141
142                fsd_mutex_unlock( &self->session->drm_connection_mutex );
143        }
144        END_TRY
145
146        fsd_log_return(( "" ));
147}
148
149void
150lldrmaa_job_read_job_info( fsd_job_t *self, enum StepState step_state, enum HoldType hold_type )
151{
152        fsd_log_enter(( "" ));
153
154        fsd_log_debug(("step state: %d",step_state));
155        fsd_log_debug(("hold type: %d",hold_type));
156        if( hold_type && ( step_state == STATE_HOLD || step_state == STATE_IDLE ))
157        {
158                if (hold_type == HOLDTYPE_USER)
159                        self->state = DRMAA_PS_USER_ON_HOLD;
160                else if (hold_type == HOLDTYPE_SYSTEM)
161                        self->state = DRMAA_PS_SYSTEM_ON_HOLD;
162                else if (hold_type == HOLDTYPE_USERSYS)
163                        self->state = DRMAA_PS_USER_SYSTEM_ON_HOLD;
164                else
165                {
166                        fsd_log_error(("hold_type does not match step_state, %d", hold_type));
167                        fsd_assert(false);
168                }
169        }
170        else if( step_state == STATE_DEFERRED || step_state == STATE_IDLE || step_state == STATE_PENDING )
171                self->state = DRMAA_PS_QUEUED_ACTIVE;
172        else if( step_state == STATE_RUNNING )
173                self->state = DRMAA_PS_RUNNING;
174        else if( step_state == STATE_PREEMPTED )
175                self->state = DRMAA_PS_SYSTEM_SUSPENDED;
176        else if( step_state == STATE_COMPLETED )
177                self->state = DRMAA_PS_DONE;
178        else if ( step_state == STATE_VACATED)
179        {
180                if( ((lldrmaa_session_t*) self->session)->terminate_job_on_vacated )
181                {
182                        lldrmaa_job_control(self, DRMAA_CONTROL_TERMINATE);
183                }
184                self->state = DRMAA_PS_FAILED;
185        }
186        else if( step_state == STATE_CANCELED || step_state == STATE_NOTQUEUED || step_state == STATE_NOTRUN ||
187                         step_state == STATE_REJECTED || step_state == STATE_REMOVED || step_state == STATE_TERMINATED ||
188                         step_state == STATE_SUBMISSION_ERR)
189                self->state = DRMAA_PS_FAILED;
190        else if ( step_state == STATE_COMPLETE_PENDING ||  step_state == STATE_PREEMPT_PENDING ||
191                          step_state == STATE_REJECT_PENDING ||  step_state == STATE_REMOVE_PENDING ||
192                          step_state == STATE_RESUME_PENDING || step_state == STATE_STARTING || step_state == STATE_VACATE_PENDING)
193                fsd_log_debug(("STATE_X_PENDING"));
194        else if ( step_state == STATE_UNEXPANDED)
195                self->state = DRMAA_PS_UNDETERMINED;
196        else
197        {
198                fsd_log_error(("step_state = %d, assert(0)",step_state));
199                fsd_assert(false);
200        }
201
202        self->last_update_time = time(NULL);
203
204        fsd_log_return(( "" ));
205}
206
207void
208lldrmaa_job_read_job_info_mon( fsd_job_t *self, const char * state , unsigned status)
209{
210        fsd_log_enter(( "" ));
211
212        self->exit_status = status;
213
214        if( strcmp(state,"JOB_STARTED") == 0)
215        {
216                self->state = DRMAA_PS_RUNNING;
217        }
218        else if( strcmp(state,"JOB_COMPLETED") == 0)
219        {
220                self->state = DRMAA_PS_DONE;
221        }
222        else if( strcmp(state,"JOB_VACATED") == 0 )
223        {
224                if( ((lldrmaa_session_t*) self->session)->terminate_job_on_vacated )
225                {
226                        lldrmaa_job_control(self, DRMAA_CONTROL_TERMINATE);
227                }
228                self->state = DRMAA_PS_FAILED;
229        }
230        else if( strcmp(state,"JOB_REJECTED") == 0)
231        {
232                self->state = DRMAA_PS_FAILED;
233        }
234        else if( strcmp(state,"JOB_REMOVED") == 0)
235        {
236                self->state = DRMAA_PS_FAILED;
237        }
238        else if( strcmp(state,"JOB_NOTRUN") == 0)
239        {
240                self->state = DRMAA_PS_FAILED;
241        }
242        else
243        {
244                fsd_log_error(("state = %s, assert(0)",state));
245                fsd_assert(false);
246        }
247
248        fsd_log_info(("state: %s -> %s",state, drmaa_job_ps_to_str(self->state)));
249
250        if( self->state >= DRMAA_PS_DONE )
251                fsd_cond_broadcast( &self->status_cond );
252
253        fsd_log_return(( "" ));
254}
255
256static void
257lldrmaa_job_update_status( fsd_job_t *self )
258{
259        enum StepState step_state;
260        enum HoldType hold_type;
261        int rc;
262        int  obj_count,err_code;
263        LL_element * job=NULL,*data=NULL,*step=NULL;
264        char **host_list = NULL;
265        lldrmaa_job_t *llself = (lldrmaa_job_t*) self;
266
267        fsd_log_enter(( "({job_id=%s})", self->job_id ));
268
269        fsd_mutex_lock( &self->session->drm_connection_mutex );
270        TRY
271        {
272                fsd_log_debug(( "drm connection locked" ));
273                job = ll_query(JOBS);
274                if (!job) {
275                                fsd_exc_raise_msg(FSD_ERRNO_INTERNAL_ERROR,
276                                                        "ll_query() returns NULL. The subroutine was unable to create the appropriate pointer.");
277                }
278
279                fsd_calloc(host_list, 2, char *);
280                host_list[0] = fsd_strdup(self->job_id);
281                host_list[1] = NULL;
282
283                fsd_log_debug(("host_list[0]: %s",host_list[0]));
284
285                rc = ll_set_request(job,QUERY_STEPID,host_list,ALL_DATA);
286                if (rc) {
287                                fsd_exc_raise_fmt(lldrmaa_map_set_request(rc),
288                                                        "ll_set_request() return code is non-zero. %s",lldrmaa_err_set_request(rc));
289                }
290
291                data = ll_get_objs(job, LL_CM, NULL, &obj_count, &err_code);
292                if (data == NULL) {
293                        if (err_code == LL_GET_OBJS_NO_OBJECTS_ERR)
294                          {
295                                if (self->state != DRMAA_PS_UNDETERMINED)
296                                  {
297                                        fsd_log_info(("Job %s missing. Assuming finished", self->job_id));
298                                        self->state = DRMAA_PS_DONE;
299                                        self->exit_status = 0;
300                                  }
301                                else if (llself->missing_time == 0)
302                                 {
303                                        llself->missing_time = time(NULL);
304                                        fsd_log_debug(("Job %s missing for the first time", self->job_id)); /* Job may not yet be visible in LL */
305                                 }
306                                else if (time(NULL) - llself->missing_time > LL_DRMAA_MAX_MISSING_TIME)
307                                 {
308                                        fsd_log_error(("Job %s missing for more then %d seconds. Assuming failef", self->job_id, LL_DRMAA_MAX_MISSING_TIME));
309                                        /* Job may not yet be visible in LL */
310                                        self->state = DRMAA_PS_FAILED;
311                                        self->exit_status = -1;
312                                 }
313                                else
314                                 {
315                                        fsd_log_debug(("Job %s still missing", self->job_id));
316                                 }
317                          }
318                        else
319                          {
320                                fsd_log_error(("Code: %d,%d ll_get_objs() returns NULL. %s", err_code, lldrmaa_map_get_objs(err_code), lldrmaa_err_get_objs(err_code) ));
321                          }
322                } else {
323
324                        rc = ll_get_data(data, LL_JobGetFirstStep, &step);
325                        if(rc)
326                        {
327                                fsd_exc_raise_fmt(lldrmaa_map_get_data(rc),
328                                                        "ll_get_data() return code is non-zero. %s",lldrmaa_err_get_data(rc));
329                        }
330
331                        rc = ll_get_data(step, LL_StepState,&step_state);
332                        if (rc) {
333                                fsd_exc_raise_fmt(lldrmaa_map_get_data(rc),
334                                                        "ll_get_data() return code is non-zero. %s",lldrmaa_err_get_data(rc));
335                        }
336
337                        rc = ll_get_data(step, LL_StepHoldType, &hold_type);
338                        if (rc) {
339                                fsd_exc_raise_fmt(lldrmaa_map_get_data(rc),
340                                                        "ll_get_data() return code is non-zero. %s",lldrmaa_err_get_data(rc));
341                        }
342
343                        llself->read_job_info(self, step_state, hold_type);
344
345                        fsd_log_info(("LL State: %d -> DRMAA: %s", step_state, drmaa_job_ps_to_str(self->state)));
346                }
347        }
348        FINALLY
349        {
350                if(job!=NULL)
351                {
352                        ll_free_objs(job);
353                        ll_deallocate(job);
354                }
355                if(data!=NULL)
356                {
357                        ll_free_objs(data);
358                        ll_deallocate(data);
359                }
360                if(step!=NULL)
361                {
362                        ll_free_objs(step);
363                        ll_deallocate(step);
364                }
365                if(host_list!=NULL)
366                        fsd_free_vector (host_list);
367
368                fsd_mutex_unlock( &self->session->drm_connection_mutex );
369        }
370        END_TRY
371
372        fsd_log_return(( "" ));
373}
374
375fsd_job_t *
376lldrmaa_job_new( char *job_id )
377{
378        lldrmaa_job_t *self = NULL;
379        self = (lldrmaa_job_t*)fsd_job_new( job_id );
380        fsd_realloc( self, 1, lldrmaa_job_t );
381        self->super.control = lldrmaa_job_control;
382        self->super.update_status = lldrmaa_job_update_status;
383        self->missing_time = 0;
384        self->read_job_info_mon = lldrmaa_job_read_job_info_mon;
385        self->read_job_info = lldrmaa_job_read_job_info;
386        return (fsd_job_t*)self;
387}
388
389char *
390lldrmaa_job_create_req(
391                fsd_drmaa_session_t *session,
392                const fsd_template_t *jt,
393                fsd_environ_t **envp,
394                unsigned bulk,
395                int start,
396                int incr
397                )
398{
399        char *volatile cmd_file = NULL;
400        fsd_expand_drmaa_ph_t *volatile expand = NULL;
401        TRY
402         {
403                expand = fsd_expand_drmaa_ph_new( NULL, NULL, fsd_strdup("0") );
404                cmd_file = (char *) lldrmaa_job_create_command_file( session, jt, envp, expand, bulk, start, incr );
405         }
406        EXCEPT_DEFAULT
407         {
408                remove(cmd_file);
409                fsd_exc_reraise();
410         }
411        FINALLY
412         {
413                if( expand )
414                        expand->destroy( expand );
415         }
416        END_TRY
417        return cmd_file;
418}
419
420
421static char *
422internal_map_file( fsd_expand_drmaa_ph_t *expand, const char *path,
423                bool *host_given, const char *name )
424{
425        const char *p;
426        for( p = path;  *p != ':';  p++ )
427                if( *p == '\0' )
428                        fsd_exc_raise_fmt( FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT,
429                                                        "invalid format of drmaa_%s_path: missing colon", name );
430        if( host_given )
431                *host_given = ( p != path );
432        p++;
433        return expand->expand( expand, fsd_strdup(p),
434                        FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD | FSD_DRMAA_PH_INCR );
435}
436
437char *
438lldrmaa_job_create_command_file(
439                fsd_drmaa_session_t *session,
440                const fsd_template_t *jt,
441                fsd_environ_t **envp,
442                fsd_expand_drmaa_ph_t *expand,
443                unsigned n_jobs,
444                int start,
445                int incr
446                )
447{
448        char template_cmd[] = "/tmp/drmaa_cmd_XXXXXX";
449        int fdt;
450        FILE * fd;
451
452
453        if((fdt = mkstemp(template_cmd) ) == -1)
454        {
455                fsd_log_error(("Can't create cmd file"));
456                fsd_exc_raise_msg(FSD_ERRNO_INTERNAL_ERROR,"Can't create cmd file");
457        }
458        close(fdt);
459
460        if((fd = fopen(template_cmd,"w") ) != NULL )
461        {
462                const char *input_path_orig = NULL;
463                const char *output_path_orig = NULL;
464                const char *error_path_orig = NULL;
465                char *volatile input_path = NULL;
466                char *volatile output_path = NULL;
467                char *volatile error_path = NULL;
468                bool input_host = false;
469                bool output_host = false;
470                bool error_host = false;
471                bool join_files = false;
472                const char *value;
473                const char *const *vector;
474                const char *job_category = "default";
475                fsd_log_debug(("Cmd file: %s opened",template_cmd));
476                fprintf(fd,"# File created by LoadLeveler DRMAA. If no LL DRMAA applications are running you can delete this file\n");
477
478                /* job name */
479                value = jt->get_attr( jt, DRMAA_JOB_NAME );
480                if( value )
481                {
482                        fprintf(fd,"# @ job_name = %s\n",value);
483                        fsd_log_debug(("# @ job_name = %s",value));
484                }
485
486                /* job state at submit */
487                value = jt->get_attr( jt, DRMAA_JS_STATE );
488                if( value )
489                {
490                        if( 0 == strcmp( value, DRMAA_SUBMISSION_STATE_ACTIVE ) )
491                        {}
492                        else if( 0 == strcmp( value, DRMAA_SUBMISSION_STATE_HOLD ) )
493                        {
494                                fprintf(fd,"# @ hold = user\n");
495                                fsd_log_debug(("# @ hold = user"));
496                        }
497                        else
498                                fsd_exc_raise_msg(
499                                        FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
500                                        "invalid value of drmaa_js_state attribute" );
501                }
502
503                /* start time */
504                value = jt->get_attr( jt, DRMAA_START_TIME );
505                if( value )
506                {
507                        char buf[30];
508                        const time_t time_ll = fsd_datetime_parse( value );
509                        struct tm temp;
510                        localtime_r(&time_ll,&temp);
511                        strftime(buf, sizeof(buf),"%m/%d/%Y %H:%M:%S", &temp );
512                        fprintf(fd,"# @ startdate = %s\n", buf);
513                        fsd_log_debug(("# @ startdate = %s", buf));
514                        fsd_log_debug(( "\n  drmaa_start_time: %s -> %ld",      value, (long)time_ll ));
515                 }
516
517                /* environment */
518                vector = jt->get_v_attr( jt, DRMAA_V_ENV );
519                if( vector )
520                {
521                        const char *const *i;
522
523                        fprintf(fd,"# @ environment =");
524                        fsd_log_debug(("# @ environment ="));
525                        for( i = vector;  *i;  i++ )
526                        {
527                                fprintf(fd," %s;",*i);
528                                fsd_log_debug((" %s;",*i));
529                        }
530                        fprintf(fd," \n");
531                 }
532
533                 /* wall clock time soft and hard limit */
534                {
535                        const char *value2;
536                        value = jt->get_attr( jt, DRMAA_WCT_SLIMIT );
537                        value2 = jt->get_attr( jt, DRMAA_WCT_HLIMIT );
538                        if( value && value2)
539                        {
540                                fprintf(fd,"# @ wall_clock_limit = %s,%s\n",value,value2);
541                                fsd_log_debug(("# @ wall_clock_limit = %s,%s",value,value2));
542                        }
543                        else if (value)
544                        {
545                                fprintf(fd,"# @ wall_clock_limit = %s\n",value);
546                                fsd_log_debug(("# @ wall_clock_limit = %s",value));
547                        }
548                        else if (value2)
549                        {
550                                fprintf(fd,"# @ wall_clock_limit = ,%s\n",value);
551                                fsd_log_debug(("# @ wall_clock_limit = ,%s",value));
552                        }
553                }
554
555                /* native specification */
556                value = jt->get_attr( jt, DRMAA_NATIVE_SPECIFICATION );
557                if( value )
558                {
559                        char * temp = fsd_strdup(value);
560                        temp = fsd_replace(temp,"@","\n# $"); /* infinite loop when try replace @ with # @ */
561                        temp = fsd_replace(temp,"# $","# @ ");
562
563                        fprintf(fd,"# Native specification:%s\n", temp);
564                        fsd_log_debug(("Native specification:%s", temp));
565                        fsd_free(temp);
566                }
567
568                /* job category */
569                value = jt->get_attr( jt, DRMAA_JOB_CATEGORY );
570                if( value )
571                        job_category = value;
572
573                {
574                        char * temp;
575                        fsd_conf_option_t *category_value = NULL;
576                        category_value = fsd_conf_dict_get( session->job_categories, job_category );
577                        if( category_value != NULL )
578                        {
579                                if( category_value->type != FSD_CONF_STRING )
580                                        fsd_exc_raise_fmt(
581                                                        FSD_ERRNO_INTERNAL_ERROR,
582                                                        "configuration error: job category should be string"
583                                                        );
584
585                                temp = fsd_strdup(category_value->val.string);
586                                temp = fsd_replace(temp,"@","\n# $"); /* infinite loop when try replace @ with # @ */
587                                temp = fsd_replace(temp,"# $","# @ ");
588
589                                fprintf(fd,"# Job category:%s\n", temp);
590                                fsd_log_debug(("Job category:\n%s",temp));
591                                fsd_free(temp);
592                        }
593                        else
594                        {
595                                if( value != NULL )
596                                        fsd_exc_raise_fmt(
597                                                        FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
598                                                        "invalid job category: %s", job_category
599                                                        );
600                        }
601                }
602
603
604                {
605                        unsigned i;
606                        for(i=0;i<n_jobs;i++)
607                        {
608                                expand->set(expand, FSD_DRMAA_PH_INCR,fsd_asprintf("%d", start+incr*i)); /* set current value */
609
610                                /* job working directory */
611                                value = jt->get_attr( jt, DRMAA_WD );
612                                if( value )
613                                {
614                                        char *cwd_expanded = expand->expand( expand, fsd_strdup(value), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_INCR );
615
616                                        expand->set( expand, FSD_DRMAA_PH_WD, fsd_strdup(cwd_expanded));
617                                        fprintf(fd,"# @ initialdir = %s\n",cwd_expanded);
618                                        fsd_log_debug(("# @ initialdir = %s",cwd_expanded));
619
620                                        fsd_free(cwd_expanded);
621                                }
622
623                                TRY
624                                {
625                                        /* input path */
626                                        input_path_orig = jt->get_attr( jt, DRMAA_INPUT_PATH );
627                                        if( input_path_orig )
628                                        {
629                                                input_path = internal_map_file( expand, input_path_orig, &input_host,
630                                                                                "input" );
631                                                fsd_log_debug(( "\n  drmaa_input_path: %s -> %s",
632                                                                        input_path_orig, input_path ));
633                                        }
634
635                                        /* output path */
636                                        output_path_orig = jt->get_attr( jt, DRMAA_OUTPUT_PATH );
637                                        if( output_path_orig )
638                                        {
639                                                output_path = internal_map_file( expand, output_path_orig, &output_host,
640                                                                                "output" );
641                                                fsd_log_debug(( "\n  drmaa_output_path: %s -> %s",
642                                                                                output_path_orig, output_path ));
643                                        }
644
645                                        /* error path */
646                                        error_path_orig = jt->get_attr( jt, DRMAA_ERROR_PATH );
647                                        if( error_path_orig )
648                                        {
649                                                error_path = internal_map_file( expand, error_path_orig, &error_host,
650                                                                                "error" );
651                                                fsd_log_debug(( "\n  drmaa_error_path: %s -> %s",
652                                                                        error_path_orig, error_path ));
653                                        }
654
655                                        /* join files */
656                                        value = jt->get_attr( jt, DRMAA_JOIN_FILES );
657                                        if( value )
658                                        {
659                                                if( (value[0] == 'y' || value[0] == 'Y')  &&  value[1] == '\0' )
660                                                        join_files = true;
661                                                else if( (value[0] == 'n' || value[0] == 'N')  &&  value[1] == '\0' )
662                                                        join_files = false;
663                                                else
664                                                        fsd_exc_raise_msg(
665                                                                        FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
666                                                                        "invalid value of drmaa_join_files attribute" );
667                                        }
668
669                                        if( join_files )
670                                        {
671                                                if( output_path == NULL )
672                                                        fsd_exc_raise_msg(
673                                                                        FSD_DRMAA_ERRNO_CONFLICTING_ATTRIBUTE_VALUES,
674                                                                        "drmaa_join_files is set and output file is not given" );
675                                                if( error_path!=NULL && 0 != strcmp( output_path, error_path ) )
676                                                        fsd_log_warning(( "Error file was given but will be ignored "
677                                                                                "since drmaa_join_files was set." ));
678
679                                                if (error_path)
680                                                        fsd_free(error_path);
681
682                                                 error_path = fsd_strdup(output_path);
683                                        }
684                                        else
685                                        {
686                                                if( error_path == NULL  &&  output_path )
687                                                        error_path = fsd_strdup( "/dev/null" );
688                                                if( output_path == NULL  &&  error_path )
689                                                        output_path = fsd_strdup( "/dev/null" );
690                                        }
691
692
693                                        /* email addresses to send notifications */
694                                        vector = jt->get_v_attr( jt, DRMAA_V_EMAIL );
695                                        if( vector  &&  vector[0] )
696                                        {
697                                                /* only to one email address message may be send */
698                                                fprintf(fd,"# @ notify_user = %s\n",vector[0]);
699                                                fsd_log_debug(("# @ notify_user = %s\n",vector[0]));
700                                                if( vector[1] != NULL )
701                                                {
702                                                        fsd_log_error(( "LL only supports one e-mail "
703                                                                                "notification address" ));
704                                                        fsd_exc_raise_msg(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,"LL only supports one e-mail "
705                                                                                "notification address");
706                                                }
707                                        }
708
709                                        /* block email */
710                                        value = jt->get_attr( jt, DRMAA_BLOCK_EMAIL );
711                                        if( value )
712                                        {
713                                                bool block;
714                                                if( strcmp(value, "0") == 0 )
715                                                {
716                                                        block = true;
717                                                        fprintf(fd,"# @ notification = never\n");
718                                                        fsd_log_debug(("# @ notification = never"));
719                                                }
720                                                else if( strcmp(value, "1") == 0 )
721                                                        block = false;
722                                                else
723                                                        fsd_exc_raise_msg(
724                                                                        FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
725                                                                        "invalid value of drmaa_block_email attribute" );
726
727                                                if( block && output_path == NULL )
728                                                {
729                                                        fsd_log_debug(( "output path not set and we want to block e-mail, "
730                                                                                "set to /dev/null" ));
731                                                        output_path = fsd_strdup( "/dev/null" );
732                                                }
733                                        }
734
735                                        if( input_path )
736                                        {
737                                                fprintf(fd,"# @ input = %s\n", input_path);
738                                                fsd_log_debug(("# @ input = %s", input_path));
739                                        }
740
741                                        if( output_path )
742                                        {
743                                                fprintf(fd,"# @ output = %s\n", output_path);
744                                                fsd_log_debug(("# @ output = %s", output_path));
745                                        }
746
747                                        if( error_path )
748                                        {
749                                                fprintf(fd,"# @ error = %s\n", error_path);
750                                                fsd_log_debug(("# @ error = %s", error_path));
751                                        }
752                                }
753                                FINALLY
754                                {
755                                        fsd_free( input_path );
756                                        fsd_free( output_path );
757                                        fsd_free( error_path );
758                                        input_path = NULL;
759                                        output_path = NULL;
760                                        error_path = NULL;
761                                }
762                                END_TRY
763
764                                fprintf(fd,"# @ queue\n");
765                                fsd_log_debug(("# @ queue"));
766                               
767                                fprintf(fd,"\n");
768                                fprintf(fd,"echo >&2\n"); /* this line forces creation of stderr file */
769
770                                TRY
771                                {
772                                        const char *command = NULL;
773                                        char *command_expanded = NULL;
774                                        const char *const *i;
775                                        int j;
776
777                                        /* remote command */
778                                        command = jt->get_attr( jt, DRMAA_REMOTE_COMMAND );
779                                        if( command == NULL )
780                                                fsd_exc_raise_msg(
781                                                                FSD_DRMAA_ERRNO_CONFLICTING_ATTRIBUTE_VALUES,
782                                                                "drmaa_remote_command not set for job template"
783                                                                );
784
785                                        command_expanded = expand->expand( expand, fsd_strdup(command), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD );
786
787                                        fprintf(fd,"%s", command_expanded); /* we put the commmand at the end of script (instead of using @executable keyword
788                                                                                 in roder to avoid coping of the binary */
789                                        fsd_log_debug(("command = %s\n", command_expanded));
790
791                                        fsd_free(command_expanded);
792
793                                        /* arguments list */
794                                        vector = jt->get_v_attr( jt, DRMAA_V_ARGV );
795
796                                        if( vector )
797                                        {
798                                                fsd_log_debug(("arguments = "));
799
800                                                for( i = vector, j = 2;  *i;  i++, j++ )
801                                                {
802                                                        char *arg_expanded = expand->expand( expand, fsd_strdup(*i), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD );
803
804                                                        fprintf(fd," '%s'", arg_expanded);
805                                                        fsd_log_debug(("%s", arg_expanded));
806
807                                                        fsd_free(arg_expanded);
808                                                }
809                                        }
810
811                                        fprintf(fd," \n");
812                                }
813                                END_TRY
814
815
816                        }
817                }
818
819                fclose(fd);
820        }
821        else
822        {
823                fsd_log_error(("Can't create cmd file"));
824                fsd_exc_raise_msg(FSD_ERRNO_INTERNAL_ERROR,"Can't create cmd file");
825        }
826
827        return fsd_strdup(template_cmd);
828}
829
Note: See TracBrowser for help on using the repository browser.