source: trunk/pbs_drmaa/submit.c @ 73

Revision 73, 23.6 KB checked in by mmamonski, 7 years ago (diff)

PBSDRMAA submit filter

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2009  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <unistd.h>
25#include <string.h>
26#include <stdlib.h>
27
28#include <pbs_ifl.h>
29#include <pbs_error.h>
30
31#include <drmaa_utils/conf.h>
32#include <drmaa_utils/exec.h>
33#include <drmaa_utils/drmaa.h>
34#include <drmaa_utils/drmaa_util.h>
35#include <drmaa_utils/datetime.h>
36#include <drmaa_utils/iter.h>
37#include <drmaa_utils/template.h>
38#include <pbs_drmaa/pbs_attrib.h>
39#include <pbs_drmaa/session.h>
40#include <pbs_drmaa/submit.h>
41#include <pbs_drmaa/util.h>
42
43
44
45#ifndef lint
46static char rcsid[]
47#       ifdef __GNUC__
48                __attribute__ ((unused))
49#       endif
50        = "$Id$";
51#endif
52
53static void pbsdrmaa_submit_destroy( pbsdrmaa_submit_t *self );
54
55static char *pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self );
56
57static void pbsdrmaa_submit_eval( pbsdrmaa_submit_t *self );
58
59static void pbsdrmaa_submit_set( pbsdrmaa_submit_t *self, const char *pbs_attr, char *value, unsigned placeholders );
60
61static struct attrl *pbsdrmaa_submit_filter(struct attrl *pbs_attr);
62
63
64static void pbsdrmaa_submit_apply_defaults( pbsdrmaa_submit_t *self );
65static void pbsdrmaa_submit_apply_job_script( pbsdrmaa_submit_t *self );
66static void pbsdrmaa_submit_apply_job_state( pbsdrmaa_submit_t *self );
67static void pbsdrmaa_submit_apply_job_files( pbsdrmaa_submit_t *self );
68static void pbsdrmaa_submit_apply_file_staging( pbsdrmaa_submit_t *self );
69static void pbsdrmaa_submit_apply_job_resources( pbsdrmaa_submit_t *self );
70static void pbsdrmaa_submit_apply_job_environment( pbsdrmaa_submit_t *self );
71static void pbsdrmaa_submit_apply_email_notification( pbsdrmaa_submit_t *self );
72static void pbsdrmaa_submit_apply_job_category( pbsdrmaa_submit_t *self );
73
74
75pbsdrmaa_submit_t *
76pbsdrmaa_submit_new( fsd_drmaa_session_t *session,
77                const fsd_template_t *job_template, int bulk_idx )
78{
79        pbsdrmaa_submit_t *volatile self = NULL;
80        TRY
81         {
82                fsd_malloc( self, pbsdrmaa_submit_t );
83                self->session = session;
84                self->job_template = job_template;
85                self->script_filename = NULL;
86                self->destination_queue = NULL;
87                self->pbs_job_attributes = NULL;
88                self->expand_ph = NULL;
89                self->destroy = pbsdrmaa_submit_destroy;
90                self->submit = pbsdrmaa_submit_submit;
91                self->eval = pbsdrmaa_submit_eval;
92                self->set = pbsdrmaa_submit_set;
93                self->apply_defaults = pbsdrmaa_submit_apply_defaults;
94                self->apply_job_category = pbsdrmaa_submit_apply_job_category;
95                self->apply_job_script = pbsdrmaa_submit_apply_job_script;
96                self->apply_job_state = pbsdrmaa_submit_apply_job_state;
97                self->apply_job_files = pbsdrmaa_submit_apply_job_files;
98                self->apply_file_staging = pbsdrmaa_submit_apply_file_staging;
99                self->apply_job_resources = pbsdrmaa_submit_apply_job_resources;
100                self->apply_job_environment = pbsdrmaa_submit_apply_job_environment;
101                self->apply_email_notification = pbsdrmaa_submit_apply_email_notification;
102                self->apply_native_specification = pbsdrmaa_submit_apply_native_specification;
103
104                self->pbs_job_attributes = pbsdrmaa_pbs_template_new();
105                self->expand_ph = fsd_expand_drmaa_ph_new( NULL, NULL,
106                                (bulk_idx >= 0) ? fsd_asprintf("%d", bulk_idx) : NULL );
107         }
108        EXCEPT_DEFAULT
109         {
110                if( self )
111                        self->destroy( self );
112         }
113        END_TRY
114        return self;
115}
116
117
118void
119pbsdrmaa_submit_destroy( pbsdrmaa_submit_t *self )
120{
121        if( self->script_filename )
122         {
123                unlink( self->script_filename );
124                fsd_free( self->script_filename );
125         }
126        if( self->pbs_job_attributes )
127                self->pbs_job_attributes->destroy( self->pbs_job_attributes );
128        if( self->expand_ph )
129                self->expand_ph->destroy( self->expand_ph );
130        fsd_free( self->destination_queue );
131        fsd_free( self );
132}
133
134
135char *
136pbsdrmaa_submit_submit( pbsdrmaa_submit_t *self )
137{
138        volatile bool conn_lock = false;
139        struct attrl *volatile pbs_attr = NULL;
140        char *volatile job_id = NULL;
141        TRY
142         {
143                fsd_template_t *pbs_tmpl = self->pbs_job_attributes;
144                int i;
145                int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
146                int sleep_time = 1;
147
148                for( i = PBSDRMAA_N_PBS_ATTRIBUTES - 1; i >= 0; i-- ) /* down loop -> start with custom resources */
149                 {
150                        const char *name = pbs_tmpl->by_code( pbs_tmpl, i )->name;
151                        const char *value = pbs_tmpl->get_attr( pbs_tmpl, name );
152
153                        if (!value)
154                                continue;
155
156                        if ( i == PBSDRMAA_ATTR_CUSTOM_RESOURCES)
157                         {
158                                char *value_copy = fsd_strdup(value);
159                                char *tok_comma_ctx = NULL;
160                                char *res_token = NULL;
161                                /* matlab:2,simulink:1 */
162
163                                for (res_token = strtok_r(value_copy, ";", &tok_comma_ctx); res_token; res_token = strtok_r(NULL, ";", &tok_comma_ctx))
164                                 {
165                                        char *value_p = strstr(res_token, ":");
166
167                                        if (value_p)
168                                         {
169                                                char *name_p = NULL;
170                                                *value_p = '\0';
171                                                value_p++;
172                                                name_p = fsd_asprintf("Resource_List.%s",res_token);
173                                                pbs_attr = pbsdrmaa_add_attr( pbs_attr, name_p, value_p );
174                                                fsd_free(name_p);
175                                         }
176                                        else
177                                         {
178                                                fsd_exc_raise_code( FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE );
179                                         }
180                                 }
181
182                                fsd_free(value_copy);
183                         }
184                        else if (i == PBSDRMAA_ATTR_NODE_PROPERTIES)
185                         {
186                                const char *nodes_value = pbs_tmpl->get_attr( pbs_tmpl, PBSDRMAA_NODES );
187                                char *final_value = NULL;
188
189                                if (nodes_value)
190                                 {
191                                        final_value = fsd_asprintf("%s:%s",nodes_value, value);
192                                 }
193                                else
194                                 {
195                                        final_value = fsd_asprintf("1:%s", value);
196                                 }
197
198                                pbs_tmpl->set_attr( pbs_tmpl, PBSDRMAA_NODES, final_value);
199                                fsd_free(final_value);
200                         }
201                        else
202                         {
203                                pbs_attr = pbsdrmaa_add_attr( pbs_attr, name, value );
204                         }
205                 }
206
207
208                pbs_attr = pbsdrmaa_submit_filter(pbs_attr);
209
210                conn_lock = fsd_mutex_lock( &self->session->drm_connection_mutex );
211retry:
212                job_id = pbs_submit( ((pbsdrmaa_session_t*)self->session)->pbs_conn,
213                                (struct attropl*)pbs_attr, self->script_filename,
214                                self->destination_queue, NULL );
215
216                fsd_log_info(("pbs_submit(%s, %s) =%s", self->script_filename, self->destination_queue, job_id));
217
218                if( job_id == NULL )
219                {
220                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
221                        if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED)
222                         {
223                                pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self->session;
224
225                                fsd_log_error(( "Protocol error. Retrying..." ));
226
227                                if (pbsself->pbs_conn >= 0 )
228                                        pbs_disconnect( pbsself->pbs_conn );
229retry_connect:
230                                sleep(sleep_time++);
231                                pbsself->pbs_conn = pbs_connect( pbsself->super.contact );
232                                if( pbsself->pbs_conn < 0)
233                                 {
234                                        if (tries_left--)
235                                                goto retry_connect;
236                                        else
237                                                pbsdrmaa_exc_raise_pbs( "pbs_connect" );
238                                 }
239                                else
240                                 {
241                                        if (tries_left--)
242                                                goto retry;
243                                        else
244                                                pbsdrmaa_exc_raise_pbs( "pbs_submit" );
245                                 }
246                         }
247                        else
248                         {
249                                pbsdrmaa_exc_raise_pbs( "pbs_submit" );
250                         }
251                }
252                conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );
253         }
254        EXCEPT_DEFAULT
255         {
256                fsd_free( job_id );
257                fsd_exc_reraise();
258         }
259        FINALLY
260         {
261                if( conn_lock )
262                        conn_lock = fsd_mutex_unlock( &self->session->drm_connection_mutex );
263                if( pbs_attr )
264                        pbsdrmaa_free_attrl( pbs_attr );
265         }
266        END_TRY
267        return job_id;
268}
269
270
271void
272pbsdrmaa_submit_eval( pbsdrmaa_submit_t *self )
273{
274        self->apply_defaults( self );
275        self->apply_job_category( self );
276        self->apply_job_script( self );
277        self->apply_job_state( self );
278        self->apply_job_files( self );
279        self->apply_file_staging( self );
280        self->apply_job_resources( self );
281        self->apply_job_environment( self );
282        self->apply_email_notification( self );
283        self->apply_native_specification( self, NULL );
284}
285
286
287void
288pbsdrmaa_submit_set( pbsdrmaa_submit_t *self, const char *name,
289                 char *value, unsigned placeholders )
290{
291        fsd_template_t *pbs_attr = self->pbs_job_attributes;
292        TRY
293         {
294                if( placeholders )
295                        value = self->expand_ph->expand(
296                                        self->expand_ph, value, placeholders );
297                pbs_attr->set_attr( pbs_attr, name, value );
298         }
299        FINALLY
300         {
301                fsd_free( value );
302         }
303        END_TRY
304}
305
306
307void
308pbsdrmaa_submit_apply_defaults( pbsdrmaa_submit_t *self )
309{
310        fsd_template_t *pbs_attr = self->pbs_job_attributes;
311        pbs_attr->set_attr( pbs_attr, PBSDRMAA_CHECKPOINT, "u" );
312        pbs_attr->set_attr( pbs_attr, PBSDRMAA_KEEP_FILES, "n" );
313        pbs_attr->set_attr( pbs_attr, PBSDRMAA_PRIORITY, "0" );
314}
315
316
317void
318pbsdrmaa_submit_apply_job_script( pbsdrmaa_submit_t *self )
319{
320        const fsd_template_t *jt = self->job_template;
321        /* fsd_template_t *pbs_attr = self->pbs_job_attributes; */
322        fsd_expand_drmaa_ph_t *expand = self->expand_ph;
323        char *script = NULL;
324        size_t script_len;
325        const char *executable;
326        const char *wd;
327        const char *const *argv;
328        const char *input_path;
329        const char *const *i;
330
331        executable   = jt->get_attr( jt, DRMAA_REMOTE_COMMAND );
332        wd           = jt->get_attr( jt, DRMAA_WD );
333        argv         = jt->get_v_attr( jt, DRMAA_V_ARGV );
334        input_path   = jt->get_attr( jt, DRMAA_INPUT_PATH );
335
336        if( wd )
337         {
338                char *cwd = NULL;
339                cwd = expand->expand( expand, fsd_strdup(wd),
340                                FSD_DRMAA_PH_HD | FSD_DRMAA_PH_INCR );
341                expand->set( expand, FSD_DRMAA_PH_WD, cwd );
342         }
343
344        if( executable == NULL )
345                fsd_exc_raise_code( FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE );
346
347        if( input_path != NULL )
348         {
349                if( input_path[0] == ':' )
350                        input_path++;
351         }
352
353         { /* compute script length */
354                script_len = 0;
355                if( wd != NULL )
356                        script_len += strlen("cd ") + strlen(wd) + strlen("; ");
357                script_len += strlen("touch ") + strlen(((pbsdrmaa_session_t *)self->session)->job_exit_status_file_prefix) + strlen("/$PBS_JOBID.started;");
358                script_len += strlen(executable);
359                if( argv != NULL )
360                        for( i = argv;  *i != NULL;  i++ )
361                                script_len += 3+strlen(*i);
362                if( input_path != NULL )
363                        script_len += strlen(" <") + strlen(input_path);
364
365                script_len += strlen(";EXIT_CODE=$?; echo $EXIT_CODE >") + strlen(((pbsdrmaa_session_t *)self->session)->job_exit_status_file_prefix) + strlen("/$PBS_JOBID.exitcode; exit $EXIT_CODE");
366         }
367
368        fsd_calloc( script, script_len+1, char );
369
370         {
371                char *s;
372                s = script;
373                if( wd != NULL )
374                        s += sprintf( s, "cd %s; ", wd );
375                s += sprintf( s, "touch %s/$PBS_JOBID.started;", ((pbsdrmaa_session_t *)self->session)->job_exit_status_file_prefix);
376                s += sprintf( s, "%s", executable );
377                if( argv != NULL )
378                        for( i = argv;  *i != NULL;  i++ )
379                                s += sprintf( s, " '%s'", *i );
380                if( input_path != NULL )
381                        s += sprintf( s, " <%s", input_path );
382
383                s += sprintf( s, ";EXIT_CODE=$?; echo $EXIT_CODE >%s/$PBS_JOBID.exitcode; exit $EXIT_CODE", ((pbsdrmaa_session_t *)self->session)->job_exit_status_file_prefix);
384
385                fsd_assert( s == script+script_len );
386         }
387
388        script = expand->expand( expand, script,
389                        FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD | FSD_DRMAA_PH_INCR );
390
391        /* pbs_attr->set_attr( pbs_attr, "!script", script ); */
392
393        self->script_filename = pbsdrmaa_write_tmpfile( script, strlen(script) );
394        fsd_free( script );
395}
396
397
398void
399pbsdrmaa_submit_apply_job_state( pbsdrmaa_submit_t *self )
400{
401        const fsd_template_t *jt = self->job_template;
402        fsd_template_t *pbs_attr = self->pbs_job_attributes;
403        const char *job_name = NULL;
404        const char *submit_state = NULL;
405        const char *drmaa_start_time = NULL;
406
407        job_name = jt->get_attr( jt, DRMAA_JOB_NAME );
408        submit_state = jt->get_attr( jt, DRMAA_JS_STATE );
409        drmaa_start_time = jt->get_attr( jt, DRMAA_START_TIME );
410
411        if( job_name != NULL )
412                pbs_attr->set_attr( pbs_attr, PBSDRMAA_JOB_NAME, job_name );
413
414        if( submit_state != NULL )
415         {
416                const char *hold_types;
417                if( !strcmp(submit_state, DRMAA_SUBMISSION_STATE_ACTIVE) )
418                        hold_types = "n";
419                else if( !strcmp(submit_state, DRMAA_SUBMISSION_STATE_HOLD) )
420                        hold_types = "u";
421                else
422                        fsd_exc_raise_fmt( FSD_ERRNO_INVALID_VALUE,
423                                        "invalid value of %s attribute (%s|%s)",
424                                        DRMAA_JS_STATE, DRMAA_SUBMISSION_STATE_ACTIVE,
425                                        DRMAA_SUBMISSION_STATE_HOLD );
426                pbs_attr->set_attr( pbs_attr, PBSDRMAA_HOLD_TYPES, hold_types );
427         }
428
429        if( drmaa_start_time != NULL )
430         {
431                time_t start_time;
432                char pbs_start_time[20];
433                struct tm start_time_tm;
434                start_time = fsd_datetime_parse( drmaa_start_time );
435                localtime_r( &start_time, &start_time_tm );
436                sprintf( pbs_start_time, "%04d%02d%02d%02d%02d.%02d",
437                                start_time_tm.tm_year + 1900,
438                                start_time_tm.tm_mon + 1,
439                                start_time_tm.tm_mday,
440                                start_time_tm.tm_hour,
441                                start_time_tm.tm_min,
442                                start_time_tm.tm_sec
443                                );
444                pbs_attr->set_attr( pbs_attr, PBSDRMAA_EXECUTION_TIME, pbs_start_time );
445         }
446}
447
448
449void
450pbsdrmaa_submit_apply_job_files( pbsdrmaa_submit_t *self )
451{
452        const fsd_template_t *jt = self->job_template;
453        fsd_template_t *pbs_attr = self->pbs_job_attributes;
454        const char *join_files;
455        bool b_join_files;
456        int i;
457
458        for( i = 0;  i < 2;  i++ )
459         {
460                const char *drmaa_name;
461                const char *pbs_name;
462                const char *path;
463
464                if( i == 0 )
465                 {
466                        drmaa_name = DRMAA_OUTPUT_PATH;
467                        pbs_name = PBSDRMAA_OUTPUT_PATH;
468                 }
469                else
470                 {
471                        drmaa_name = DRMAA_ERROR_PATH;
472                        pbs_name = PBSDRMAA_ERROR_PATH;
473                 }
474
475                path = jt->get_attr( jt, drmaa_name );
476                if( path != NULL )
477                 {
478                        if( path[0] == ':' )
479                                path++;
480                        self->set(self, pbs_name, fsd_strdup(path), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD | FSD_DRMAA_PH_INCR);
481                 }
482         }
483
484        join_files = jt->get_attr( jt, DRMAA_JOIN_FILES );
485        b_join_files = join_files != NULL  &&  !strcmp(join_files,"1");
486        pbs_attr->set_attr( pbs_attr, PBSDRMAA_JOIN_FILES, (b_join_files ? "y" : "n") );
487}
488
489
490void
491pbsdrmaa_submit_apply_file_staging( pbsdrmaa_submit_t *self )
492{
493        /* TODO */
494}
495
496
497void
498pbsdrmaa_submit_apply_job_resources( pbsdrmaa_submit_t *self )
499{
500        const fsd_template_t *jt = self->job_template;
501        fsd_template_t *pbs_attr = self->pbs_job_attributes;
502        const char *cpu_time_limit = NULL;
503        const char *walltime_limit = NULL;
504
505        cpu_time_limit = jt->get_attr( jt, DRMAA_DURATION_HLIMIT );
506        walltime_limit = jt->get_attr( jt, DRMAA_WCT_HLIMIT );
507        if( cpu_time_limit )
508         {
509                pbs_attr->set_attr( pbs_attr, "Resource_List.pcput", cpu_time_limit );
510                pbs_attr->set_attr( pbs_attr, "Resource_List.cput", cpu_time_limit );
511         }
512        if( walltime_limit )
513                pbs_attr->set_attr( pbs_attr, "Resource_List.walltime", walltime_limit );
514}
515
516
517
518void
519pbsdrmaa_submit_apply_job_environment( pbsdrmaa_submit_t *self )
520{
521        const fsd_template_t *jt = self->job_template;
522        const char *const *env_v;
523        const char *jt_wd;
524        char *wd;
525        char *env_c = NULL;
526        int ii = 0, len = 0;
527
528        env_v = jt->get_v_attr( jt, DRMAA_V_ENV);
529        jt_wd    = jt->get_attr( jt, DRMAA_WD );
530       
531        if (!jt_wd)
532        {
533                wd = fsd_getcwd();
534        }
535        else
536        {
537                wd = fsd_strdup(jt_wd);
538        }
539
540        if (env_v)
541         {
542                ii = 0;
543                while (env_v[ii])
544                 {
545                        len += strlen(env_v[ii]) + 1;
546                        ii++;
547                 }
548         }
549       
550        len+= strlen("PBS_O_WORKDIR=") + strlen(wd);
551
552        fsd_calloc(env_c, len + 1, char);
553        env_c[0] = '\0';
554
555        if (env_v)
556        {
557                ii = 0;
558                while (env_v[ii]) {
559                        strcat(env_c, env_v[ii]);
560                        strcat(env_c, ",");
561                        ii++;
562                }
563
564        }
565       
566        strcat(env_c, "PBS_O_WORKDIR=");
567        strcat(env_c, wd);
568
569        self->pbs_job_attributes->set_attr(self->pbs_job_attributes, "Variable_List", env_c);
570
571        fsd_free(env_c);
572        fsd_free(wd);
573}
574
575
576void
577pbsdrmaa_submit_apply_email_notification( pbsdrmaa_submit_t *self )
578{
579        /* TODO */
580}
581
582
583void
584pbsdrmaa_submit_apply_job_category( pbsdrmaa_submit_t *self )
585{
586        const char *job_category = NULL;
587        const char *category_spec = NULL;
588        fsd_conf_option_t *value = NULL;
589
590        job_category = self->job_template->get_attr(
591                        self->job_template, DRMAA_JOB_CATEGORY );
592        if( job_category == NULL  ||  job_category[0] == '\0' )
593                job_category = "default";
594        value = fsd_conf_dict_get( self->session->job_categories,
595                        job_category );
596        if( value != NULL  &&  value->type == FSD_CONF_STRING )
597                category_spec = value->val.string;
598        if( category_spec != NULL )
599                self->apply_native_specification( self, category_spec );
600}
601
602static const char *get_job_env(pbsdrmaa_submit_t *self, const char *env_name)
603{
604        const fsd_template_t *jt = self->job_template;
605        const char *const *env_v = jt->get_v_attr( jt, DRMAA_V_ENV);
606        int ii = 0;
607
608        while (env_v[ii])
609         {
610                char *eq_p = strstr(env_v[ii], "=");
611
612                if ((eq_p) && (strncmp(env_v[ii], env_name, eq_p - env_v[ii]) == 0))
613                                return ++eq_p;
614
615                ii++;
616         }
617
618        return NULL;
619}
620
621static void parse_resources(pbsdrmaa_submit_t *self, fsd_template_t *pbs_attr,const char *resources)
622{
623        char * volatile name = NULL;
624        char *arg = NULL;
625        char *value = NULL;
626        char *ctxt = NULL;
627        char * volatile resources_copy = fsd_strdup(resources);
628
629        TRY
630          {
631                for (arg = strtok_r(resources_copy, ",", &ctxt); arg; arg = strtok_r(NULL, ",",&ctxt) )
632                {
633                        char *psep = strchr(arg, '=');
634
635                        if (psep)
636                        {
637                                *psep = '\0';
638                                name = fsd_asprintf("Resource_List.%s", arg);
639                                value = ++psep;
640                                if (value[0] == '$' && get_job_env(self, value + 1))
641                                        pbs_attr->set_attr( pbs_attr, name , get_job_env(self, value + 1) ); /*get value from job env variable */
642                                else
643                                        pbs_attr->set_attr( pbs_attr, name , value );
644                                fsd_free(name);
645                                name = NULL;
646                        }
647                        else
648                        {
649                                fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE, "Invalid native specification: %s (Invalid resource specification: %s)", resources, arg);
650                        }
651                }
652          }
653        FINALLY
654          {
655                fsd_free(name);
656                fsd_free(resources_copy);
657          }
658        END_TRY
659}
660
661static void parse_additional_attr(fsd_template_t *pbs_attr,const char *add_attr)
662{
663        char * volatile name = NULL;
664        char *arg = NULL;
665        char *value = NULL;
666        char *ctxt = NULL, *ctxt2 = NULL;
667        char * volatile add_attr_copy = fsd_strdup(add_attr);
668
669        TRY
670          {
671                for (arg = strtok_r(add_attr_copy, ";", &ctxt); arg; arg = strtok_r(NULL, ";",&ctxt) )
672                {
673                        name = fsd_strdup(strtok_r(arg, "=", &ctxt2));
674                        value = strtok_r(NULL, "=", &ctxt2);
675                        pbs_attr->set_attr( pbs_attr, name , value );
676                        fsd_free(name);
677                        name = NULL;
678                }
679          }
680        FINALLY
681          {
682                fsd_free(name);
683                fsd_free(add_attr_copy);
684          }
685        END_TRY
686}
687
688
689void
690pbsdrmaa_submit_apply_native_specification( pbsdrmaa_submit_t *self,
691                const char *native_specification )
692{
693        fsd_log_enter(( "({native_specification=%s})", native_specification ));
694
695        if( native_specification == NULL )
696                native_specification = self->job_template->get_attr(
697                                self->job_template, DRMAA_NATIVE_SPECIFICATION );
698        if( native_specification == NULL )
699                return;
700
701        {
702                fsd_template_t *pbs_attr = self->pbs_job_attributes;
703                char *arg = NULL;
704                volatile char * native_spec_copy = fsd_strdup(native_specification);
705                char * ctxt = NULL;
706                int opt = 0;
707
708                TRY
709                  {
710                        for (arg = strtok_r((char *)native_spec_copy, " \t", &ctxt); arg; arg = strtok_r(NULL, " \t",&ctxt) ) {
711                                if (!opt)
712                                  {
713                                        if ( (arg[0] != '-') || (strlen(arg) != 2) )
714                                                fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
715                                                        "Invalid native specification: -o(ption) expected (arg=%s native=%s).",
716                                                        arg, native_specification);
717
718                                        opt = arg[1];
719
720                                        /* handle NO-arg options */
721
722                                        switch (opt) {
723                                                case 'h' :
724                                                        pbs_attr->set_attr( pbs_attr, "Hold_Types" , "u" );
725                                                        break;
726                                                default :
727                                                        continue; /*no NO-ARG option */
728                                        }
729
730                                        opt = 0;
731                                  }
732                                else
733                                  {
734                                        switch (opt) {
735                                               
736                                                case 'W' :
737                                                        parse_additional_attr(pbs_attr, arg);
738                                                        break;
739                                                case 'N' :
740                                                        pbs_attr->set_attr( pbs_attr, "Job_Name" , arg );
741                                                        break;
742                                                case 'o' :
743                                                        pbs_attr->set_attr( pbs_attr, "Output_Path" , arg );
744                                                        break;
745                                                case 'e' :
746                                                        pbs_attr->set_attr( pbs_attr, "Error_Path" , arg );
747                                                        break;
748                                                case 'j' :
749                                                        pbs_attr->set_attr( pbs_attr, "Join_Path" , arg );
750                                                        break;
751                                                case 'm' :
752                                                        pbs_attr->set_attr( pbs_attr, "Mail_Points" , arg );
753                                                        break;
754                                                case 'a' :
755                                                        pbs_attr->set_attr( pbs_attr, "Execution_Time" , arg );
756                                                        break;
757                                                case 'A' :
758                                                        pbs_attr->set_attr( pbs_attr, "Account_Name" , arg );
759                                                        break;
760                                                case 'c' :
761                                                        pbs_attr->set_attr( pbs_attr, "Checkpoint" , arg );
762                                                        break;
763                                                case 'k' :
764                                                        pbs_attr->set_attr( pbs_attr, "Keep_Files" , arg );
765                                                        break;
766                                                case 'p' :
767                                                        pbs_attr->set_attr( pbs_attr, "Priority" , arg );
768                                                        break;
769                                                case 'q' :
770                                                        if (self->destination_queue)
771                                                                fsd_free(self->destination_queue);
772
773                                                        self->destination_queue = fsd_strdup( arg );
774                                                        fsd_log_debug(("self->destination_queue = %s", self->destination_queue));
775                                                        break;
776                                                case 'r' :
777                                                        pbs_attr->set_attr( pbs_attr, "Rerunable" , arg );
778                                                        break;
779                                                case 'S' :
780                                                        pbs_attr->set_attr( pbs_attr, "Shell_Path_List" , arg );
781                                                        break;
782                                                case 'u' :
783                                                        pbs_attr->set_attr( pbs_attr, "User_List" , arg );
784                                                        break;
785                                                case 'v' :
786                                                        pbs_attr->set_attr( pbs_attr, "Variable_List" , arg );
787                                                        break;
788                                                case 'M' :
789                                                        pbs_attr->set_attr( pbs_attr, "Mail_Users" , arg );
790                                                        break;
791                                                case 'l' :
792                                                        parse_resources( self, pbs_attr, arg);
793                                                        break;                                                 
794                                                default :
795                                                       
796                                                        fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
797                                                                        "Invalid native specification: %s (Unsupported option: -%c)",
798                                                                        native_specification, opt);
799                                        }
800                                        opt = 0;
801                                }
802                        }
803
804                        if (opt) /* option without optarg */
805                                fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
806                                                "Invalid native specification: %s",
807                                                native_specification);
808                 }
809                FINALLY
810                 {
811#ifndef PBS_PROFESSIONAL
812                        pbs_attr->set_attr( pbs_attr, "submit_args", native_specification);
813#endif
814                        fsd_free((char *)native_spec_copy);
815                 }
816                END_TRY
817        }
818}
819
820struct attrl *
821pbsdrmaa_submit_filter(struct attrl *pbs_attr)
822{
823        fsd_log_enter(( "({pbs_attr=%p})", (void*)pbs_attr));
824
825        if (getenv(PBSDRMAA_SUBMIT_FILTER_ENV) == NULL)
826          {
827                return pbs_attr;
828          }
829        else
830          {
831                struct attrl *ii = NULL;
832                char *empty_args[] = { NULL };
833                int exit_status = -1;
834                const char *submit_filter = getenv(PBSDRMAA_SUBMIT_FILTER_ENV);
835                char *stdin_buf = NULL;
836                int stdin_size = 0;
837                char *stdout_buf = NULL;
838                char *stderr_buf = NULL;
839                char *output_line = NULL;
840                char *ctx = NULL;
841
842                fsd_log_debug(("Executing filter script: %s", submit_filter));
843               
844               
845                for (ii = pbs_attr; ii; ii = ii->next)
846                  {
847                        stdin_size += strlen(ii->name) + strlen(ii->value) + 2 /* '=' and '\n' */;
848                        if (ii->resource)
849                          {
850                                stdin_size += strlen(ii->resource) +  1; /* '.' */
851                          }
852                  }
853
854                stdin_size+=1; /* '\0' */
855
856                stdin_buf = fsd_calloc(stdin_buf, stdin_size, char);
857                stdin_buf[0] = '\0';
858
859                for (ii = pbs_attr; ii; ii = ii->next)
860                  {
861                        strcat(stdin_buf, ii->name);
862                        if (ii->resource)
863                          {
864                                strcat(stdin_buf, ".");
865                                strcat(stdin_buf, ii->resource);
866                          }
867                        strcat(stdin_buf, "=");
868                        strcat(stdin_buf, ii->value);
869                        strcat(stdin_buf, "\n");
870                  }
871               
872                exit_status = fsd_exec_sync(submit_filter, empty_args, stdin_buf, &stdout_buf, &stderr_buf);
873
874                if (exit_status != 0)
875                  {
876                        fsd_log_error(("Filter script %s exited with non-zero code: %d", submit_filter, exit_status));
877                        fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE, "Submit filter script failed (code: %d, message: %s)", exit_status, stderr_buf);
878                  }
879               
880                fsd_log_debug(("Submit filter exit_status=%d, stderr=%s", exit_status, stderr_buf));
881
882                pbsdrmaa_free_attrl(pbs_attr);
883                pbs_attr = NULL;
884
885                /* exit_status == 0 */
886                for (output_line = strtok_r(stdin_buf, "\n", &ctx);  output_line ; output_line = strtok_r(NULL, "\n", &ctx))
887                 {
888                        char *attr_name = NULL;
889                        char *attr_value = NULL;
890
891                        attr_value = strstr(output_line,"=");
892                        attr_name = output_line;
893
894                        if (!attr_value)
895                          {
896                                fsd_exc_raise_fmt(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT, "Invalid output line of submit filter:", output_line);
897                          }
898                        else
899                          {
900                                *attr_value = '\0';
901                                attr_value++;
902                          }
903                       
904
905                        fsd_log_debug(("Submit filter generated attribute: name=%s, value=%s", attr_name, attr_value));
906                        pbs_attr = pbsdrmaa_add_attr( pbs_attr, attr_name, attr_value );
907                 }
908               
909
910                return pbs_attr;
911          }
912
913}
914
Note: See TracBrowser for help on using the repository browser.