source: trunk/slurm_drmaa/job.c @ 1

Revision 1, 16.7 KB checked in by mmatloka, 14 years ago (diff)

init commit

Line 
1/* $Id: job.c 351 2010-10-01 13:14:55Z mamonski $ */
2/*
3 * PSNC DRMAA for SLURM
4 * Copyright (C) 2010 Poznan Supercomputing and Networking Center
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19#include <string.h>
20#include <stdlib.h>
21#include <unistd.h>
22#include <signal.h>
23
24#include <drmaa_utils/common.h>
25#include <drmaa_utils/conf.h>
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/drmaa_util.h>
29#include <drmaa_utils/environ.h>
30#include <drmaa_utils/template.h>
31
32#include <slurm_drmaa/job.h>
33#include <slurm_drmaa/session.h>
34#include <slurm_drmaa/util.h>
35
36#include <slurm/slurm.h>
37#include <stdint.h>
38
39static void
40slurmdrmaa_job_control( fsd_job_t *self, int action )
41{
42        slurmdrmaa_job_t *slurm_self = (slurmdrmaa_job_t*)self;
43        job_desc_msg_t job_desc;
44
45        fsd_log_enter(( "({job_id=%s}, action=%d)", self->job_id, action ));
46
47        fsd_mutex_lock( &self->session->drm_connection_mutex );
48        TRY
49         {
50                switch( action )
51                 {
52                        case DRMAA_CONTROL_SUSPEND:
53                                if(slurm_suspend(fsd_atoi(self->job_id)) == -1) {
54                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_suspend error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
55                                }
56                                break;
57                        case DRMAA_CONTROL_HOLD:
58                                /* change priority to 0*/
59                                slurm_init_job_desc_msg(&job_desc);
60                                slurm_self->old_priority = job_desc.priority;
61                                job_desc.job_id = atoi(self->job_id);
62                                job_desc.priority = 0;
63                                if(slurm_update_job(&job_desc) == -1) {
64                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_update_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
65                                }
66                                break;
67                        case DRMAA_CONTROL_RESUME:
68                                if(slurm_resume(fsd_atoi(self->job_id)) == -1) {
69                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_resume error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
70                                }
71                                break;
72                        case DRMAA_CONTROL_RELEASE:
73                          /* change priority back*/
74                                slurm_init_job_desc_msg(&job_desc);
75                                job_desc.priority = 1;
76                                job_desc.job_id = atoi(self->job_id);
77                                if(slurm_update_job(&job_desc) == -1) {
78                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_update_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
79                                }
80                                break;
81                        case DRMAA_CONTROL_TERMINATE:
82                                if(slurm_kill_job(fsd_atoi(self->job_id),SIGKILL,0) == -1) {
83                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_terminate_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
84                                }
85                                break;
86                        default:
87                                fsd_exc_raise_fmt(
88                                                FSD_ERRNO_INVALID_ARGUMENT,
89                                                "job::control: unknown action %d", action );
90                 }
91                                       
92                fsd_log_debug(("job::control: successful"));
93         }
94        FINALLY
95         {
96                fsd_mutex_unlock( &self->session->drm_connection_mutex );
97         }
98        END_TRY
99
100        fsd_log_return(( "" ));
101}
102
103
104static void
105slurmdrmaa_job_update_status( fsd_job_t *self )
106{
107        job_info_msg_t *job_info = NULL;
108        fsd_log_enter(( "({job_id=%s})", self->job_id ));
109
110        fsd_mutex_lock( &self->session->drm_connection_mutex );
111        TRY
112        {
113                if ( slurm_load_job( &job_info, fsd_atoi(self->job_id), SHOW_ALL) ) {
114                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_load_jobs error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
115        }
116               
117                self->exit_status = job_info->job_array[0].exit_code;
118                fsd_log_debug(("exit_status = %d -> %d",self->exit_status, WEXITSTATUS(self->exit_status)));
119
120                switch(job_info->job_array[0].job_state)
121                {
122                        case JOB_PENDING:
123                                switch(job_info->job_array[0].state_reason)
124                                {
125                                        case WAIT_NO_REASON:
126                                        case WAIT_PRIORITY:
127                                        case WAIT_DEPENDENCY:
128                                        case WAIT_RESOURCES:
129                                        case WAIT_PART_NODE_LIMIT:
130                                        case WAIT_PART_TIME_LIMIT:
131                                        case WAIT_PART_STATE:
132                                                self->state = DRMAA_PS_QUEUED_ACTIVE;
133                                                break;
134                                        case WAIT_HELD:
135                                                self->state = DRMAA_PS_USER_ON_HOLD;
136                                                break;
137                                        case WAIT_TIME:
138                                        case WAIT_LICENSES:
139                                        case WAIT_ASSOC_JOB_LIMIT:
140                                        case WAIT_ASSOC_RESOURCE_LIMIT:
141                                        case WAIT_ASSOC_TIME_LIMIT:
142                                        case WAIT_RESERVATION:
143                                        case WAIT_NODE_NOT_AVAIL:
144                                        case WAIT_TBD1:
145                                        case WAIT_TBD2:
146                                                self->state = DRMAA_PS_QUEUED_ACTIVE;
147                                                break;
148                                        case FAIL_DOWN_PARTITION:
149                                        case FAIL_DOWN_NODE:
150                                        case FAIL_BAD_CONSTRAINTS:
151                                        case FAIL_SYSTEM:
152                                        case FAIL_LAUNCH:
153                                        case FAIL_EXIT_CODE:
154                                        case FAIL_TIMEOUT:
155                                        case FAIL_INACTIVE_LIMIT:
156                                        case FAIL_BANK_ACCOUNT:
157                                                self->state = DRMAA_PS_FAILED;
158                                                break;
159                                        default:
160                                                fsd_log_error(("job_state_reason = %d, assert(0)",job_info->job_array[0].state_reason));
161                                                fsd_assert(false);
162       
163                                }
164                                break;
165                        case JOB_RUNNING:
166                                self->state = DRMAA_PS_RUNNING;
167                                break;
168                        case JOB_SUSPENDED:
169                                self->state = DRMAA_PS_SYSTEM_SUSPENDED; /* assume SYSTEM - suspendig jobs is administrator only */
170                                break;
171                        case JOB_COMPLETE:
172                                self->state = DRMAA_PS_DONE;
173                                break;
174                        case JOB_CANCELLED:
175                                self->exit_status = -1;
176                        case JOB_FAILED:
177                        case JOB_TIMEOUT:
178                        case JOB_NODE_FAIL:
179                                self->state = DRMAA_PS_FAILED;
180                                break;
181                        default: /*transient states */
182                                if(job_info->job_array[0].job_state >= 0x8000) {
183                                        fsd_log_debug(("state COMPLETING"));
184                                }
185                                else if (job_info->job_array[0].job_state >= 0x4000) {
186                                        fsd_log_debug(("state Allocated nodes booting"));
187                                }
188                                else {
189                                        fsd_log_error(("job_state = %d, assert(0)",job_info->job_array[0].job_state));
190                                        fsd_assert(false);
191                                }
192                }
193
194                if(self->exit_status == -1) /* input,output,error path failure etc*/
195                        self->state = DRMAA_PS_FAILED;
196
197                fsd_log_debug(("state: %d ,state_reason: %d-> %s", job_info->job_array[0].job_state, job_info->job_array[0].state_reason, drmaa_job_ps_to_str(self->state)));
198
199                self->last_update_time = time(NULL);
200       
201                if( self->state >= DRMAA_PS_DONE )
202                        fsd_cond_broadcast( &self->status_cond );
203        }
204        FINALLY
205        {
206                if(job_info != NULL)
207                        slurm_free_job_info_msg (job_info);
208
209                fsd_mutex_unlock( &self->session->drm_connection_mutex );
210        }
211        END_TRY
212       
213        fsd_log_return(( "" ));
214}
215
216fsd_job_t *
217slurmdrmaa_job_new( char *job_id )
218{
219        slurmdrmaa_job_t *self = NULL;
220        self = (slurmdrmaa_job_t*)fsd_job_new( job_id );
221
222        fsd_realloc( self, 1, slurmdrmaa_job_t );
223
224        self->super.control = slurmdrmaa_job_control;
225        self->super.update_status = slurmdrmaa_job_update_status;
226        self->old_priority = UINT32_MAX;
227
228        return (fsd_job_t*)self;
229}
230
231
232void
233slurmdrmaa_job_create_req(
234                fsd_drmaa_session_t *session,
235                const fsd_template_t *jt,
236                fsd_environ_t **envp,
237                job_desc_msg_t * job_desc,
238                int n_job /* ~job_step */
239                )
240{
241        fsd_expand_drmaa_ph_t *volatile expand = NULL;
242
243        TRY
244         {
245                expand = fsd_expand_drmaa_ph_new( NULL, NULL, fsd_asprintf("%d",n_job) );
246                slurmdrmaa_job_create( session, jt, envp, expand, job_desc, n_job);
247         }
248        EXCEPT_DEFAULT
249         {
250                fsd_exc_reraise();
251         }
252        FINALLY
253         {
254                if( expand )
255                        expand->destroy( expand );
256         }
257        END_TRY
258}
259
260static char *
261internal_map_file( fsd_expand_drmaa_ph_t *expand, const char *path,
262                bool *host_given, const char *name )
263{
264        const char *p;
265
266        for( p = path;  *p != ':';  p++ )
267                if( *p == '\0' )
268                        fsd_exc_raise_fmt( FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT,
269                                                        "invalid format of drmaa_%s_path: missing colon", name );
270        if( host_given )
271                *host_given = ( p != path );
272
273        p++;
274
275        return expand->expand( expand, fsd_strdup(p), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD | FSD_DRMAA_PH_INCR );
276}
277
278void
279slurmdrmaa_job_create(
280                fsd_drmaa_session_t *session,
281                const fsd_template_t *jt,
282                fsd_environ_t **envp,
283                fsd_expand_drmaa_ph_t *expand,
284                job_desc_msg_t * job_desc,
285                int n_job
286                )
287{
288        const char *input_path_orig = NULL;
289        const char *output_path_orig = NULL;
290        const char *error_path_orig = NULL;
291        char *volatile input_path = NULL;
292        char *volatile output_path = NULL;
293        char *volatile error_path = NULL;
294        bool input_host = false;
295        bool output_host = false;
296        bool error_host = false;
297        bool join_files = false;
298        const char *value;
299        const char *const *vector;
300        const char *job_category = "default";
301       
302        slurmdrmaa_init_job_desc( job_desc );
303
304        slurm_init_job_desc_msg( job_desc );
305       
306        job_desc->user_id = getuid();
307        job_desc->group_id = getgid();
308
309        job_desc->env_size = 0;
310       
311        /* job name */
312        value = jt->get_attr( jt, DRMAA_JOB_NAME );
313        if( value )
314        {
315                job_desc->name = fsd_strdup(value);
316                fsd_log_debug(("# job_name = %s",job_desc->name));
317        }
318       
319        /* job state at submit */
320        value = jt->get_attr( jt, DRMAA_JS_STATE );
321        if( value )
322        {
323                if( 0 == strcmp( value, DRMAA_SUBMISSION_STATE_ACTIVE ) )
324                {}
325                else if( 0 == strcmp( value, DRMAA_SUBMISSION_STATE_HOLD ) )
326                {
327                        job_desc->priority = 0;
328                        fsd_log_debug(("# hold = user"));
329                }
330                else
331                {
332                        fsd_exc_raise_msg(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE, "invalid value of drmaa_js_state attribute" );
333                }
334        }
335       
336        TRY
337        {
338                const char *command = NULL;
339                char *command_expanded = NULL;
340                char *temp_script_old = NULL;
341                char *temp_script = "";
342                const char *const *i;
343                int j;
344
345                /* remote command */
346                command = jt->get_attr( jt, DRMAA_REMOTE_COMMAND );
347                if( command == NULL )
348                        fsd_exc_raise_msg(
349                                        FSD_DRMAA_ERRNO_CONFLICTING_ATTRIBUTE_VALUES,
350                                        "drmaa_remote_command not set for job template"
351                                        );
352
353                command_expanded = expand->expand( expand, fsd_strdup(command), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD );
354
355                temp_script = fsd_asprintf("#!/bin/bash\n%s",command_expanded);
356                fsd_free(command_expanded);
357
358                /* arguments list */
359                vector = jt->get_v_attr( jt, DRMAA_V_ARGV );
360
361                if( vector )
362                {
363                        for( i = vector, j = 2;  *i;  i++, j++ )
364                        {
365                                char *arg_expanded = expand->expand( expand, fsd_strdup(*i), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD );
366                               
367                                temp_script_old = fsd_strdup(temp_script);
368                               
369                                if (strcmp(temp_script, "") != 0) {
370                                        fsd_free(temp_script);
371                                }
372                                /* add too script */
373                                temp_script = fsd_asprintf("%s '%s'", temp_script_old, arg_expanded);
374                                fsd_free(temp_script_old);
375                                fsd_free(arg_expanded);
376                        }
377                }
378               
379                job_desc->script = fsd_asprintf("%s\n", temp_script);
380                fsd_log_debug(("# Script:\n%s", job_desc->script));
381                fsd_free(temp_script);
382        }
383        END_TRY
384       
385
386        /* start time */
387        value = jt->get_attr( jt, DRMAA_START_TIME );
388        if( value )
389        {
390                job_desc->begin_time = fsd_datetime_parse( value );
391                fsd_log_debug(( "\n  drmaa_start_time: %s -> %ld", value, (long)job_desc->begin_time));
392        }
393
394        /* environment */
395        vector = jt->get_v_attr( jt, DRMAA_V_ENV );
396        if( vector )
397        {
398                const char *const *i;
399                unsigned j = 0;
400
401                for( i = vector;  *i;  i++ )
402                {
403                        job_desc->env_size++;
404                }
405                fsd_log_debug(("env_size = %d",job_desc->env_size));
406
407                fsd_log_debug(("# environment ="));
408                fsd_calloc(job_desc->environment, job_desc->env_size+1, char *);
409
410                for( i = vector;  *i;  i++,j++ )
411                {
412                        job_desc->environment[j] = fsd_strdup(*i);
413                        fsd_log_debug((" %s", job_desc->environment[j]));
414                }
415         }
416       
417        /* wall clock time hard limit */
418        value = jt->get_attr( jt, DRMAA_WCT_HLIMIT );
419        if (value)
420        {
421                job_desc->time_limit = slurmdrmaa_datetime_parse( value );
422                fsd_log_debug(("# wct_hlimit = %s -> %ld",value,slurmdrmaa_datetime_parse( value )));
423        }
424
425               
426        /*expand->set(expand, FSD_DRMAA_PH_INCR,fsd_asprintf("%d", n_job));*/ /* set current value */
427        /* TODO: test drmaa_ph_incr */
428        /* job working directory */
429        value = jt->get_attr( jt, DRMAA_WD );
430        if( value )
431        {
432                char *cwd_expanded = expand->expand( expand, fsd_strdup(value), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_INCR );
433
434                expand->set( expand, FSD_DRMAA_PH_WD, fsd_strdup(cwd_expanded));
435
436                fsd_log_debug(("# work_dir = %s",cwd_expanded));
437                job_desc->work_dir = fsd_strdup(cwd_expanded);
438                fsd_free(cwd_expanded);
439        }
440        else
441        {
442                char cwdbuf[4096] = "";
443
444                if ((getcwd(cwdbuf, 4095)) == NULL) {
445                        char errbuf[256] = "InternalError";
446                        (void)strerror_r(errno, errbuf, 256); /*on error the default message would be returned */
447                        fsd_log_error(("getcwd failed: %s", errbuf));
448                        job_desc->work_dir = fsd_strdup(".");
449                } else {
450                        job_desc->work_dir = fsd_strdup(cwdbuf);
451                }
452
453                fsd_log_debug(("work_dir(default:CWD) %s", job_desc->work_dir));
454        }
455
456        TRY
457        {
458                /* input path */
459                input_path_orig = jt->get_attr( jt, DRMAA_INPUT_PATH );
460                if( input_path_orig )
461                {
462                        input_path = internal_map_file( expand, input_path_orig, &input_host,"input" );
463                        fsd_log_debug(( "\n  drmaa_input_path: %s -> %s", input_path_orig, input_path ));
464                }
465
466                /* output path */
467                output_path_orig = jt->get_attr( jt, DRMAA_OUTPUT_PATH );
468                if( output_path_orig )
469                {
470                        output_path = internal_map_file( expand, output_path_orig, &output_host,"output" );
471                        fsd_log_debug(( "\n  drmaa_output_path: %s -> %s", output_path_orig, output_path ));
472                }
473
474                /* error path */
475                error_path_orig = jt->get_attr( jt, DRMAA_ERROR_PATH );
476                if( error_path_orig )
477                {
478                        error_path = internal_map_file( expand, error_path_orig, &error_host,"error" );
479                        fsd_log_debug(( "\n  drmaa_error_path: %s -> %s", error_path_orig, error_path ));
480                }
481
482                /* join files */
483                value = jt->get_attr( jt, DRMAA_JOIN_FILES );
484                if( value )
485                {
486                        if( (value[0] == 'y' || value[0] == 'Y')  &&  value[1] == '\0' )
487                                join_files = true;
488                        else if( (value[0] == 'n' || value[0] == 'N')  &&  value[1] == '\0' )
489                                join_files = false;
490                        else
491                                fsd_exc_raise_msg(
492                                                FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
493                                                "invalid value of drmaa_join_files attribute" );
494                }
495
496                if( join_files )
497                {
498                        if( output_path == NULL )
499                                fsd_exc_raise_msg(FSD_DRMAA_ERRNO_CONFLICTING_ATTRIBUTE_VALUES, "drmaa_join_files is set and output file is not given" );
500                        if( error_path!=NULL && 0 != strcmp( output_path, error_path ) )
501                                fsd_log_warning(( "Error file was given but will be ignored since drmaa_join_files was set." ));
502
503                        if (error_path)
504                                fsd_free(error_path);
505
506                         error_path = fsd_strdup(output_path);
507                }
508                else
509                {
510                        if( error_path == NULL  &&  output_path )
511                                error_path = fsd_strdup( "/dev/null" );
512                        if( output_path == NULL  &&  error_path )
513                                output_path = fsd_strdup( "/dev/null" );
514                }
515
516
517                /* email addresses to send notifications */
518                vector = jt->get_v_attr( jt, DRMAA_V_EMAIL );
519                if( vector  &&  vector[0] )
520                {
521                        /* only to one email address message may be send */
522                        job_desc->mail_user = fsd_strdup(vector[0]);
523                        fsd_log_debug(("# mail_user = %s\n",vector[0]));
524                        if( vector[1] != NULL )
525                        {
526                                fsd_log_error(( "LL only supports one e-mail notification address" ));
527                                fsd_exc_raise_msg(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,"LL only supports one e-mail notification address");
528                        }
529                }
530
531                /* block email */
532                value = jt->get_attr( jt, DRMAA_BLOCK_EMAIL );
533                if( value )
534                {
535                        bool block;
536                        if( strcmp(value, "0") == 0 )
537                        {
538                                block = true;
539                                fsd_log_debug(("# block_email = true"));
540                                fsd_log_debug(("# mail_user delated"));
541                                fsd_free(job_desc->mail_user);
542                                job_desc->mail_user = NULL;
543                        }
544                        else if( strcmp(value, "1") == 0 )
545                                block = false;
546                        else
547                                fsd_exc_raise_msg(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,"invalid value of drmaa_block_email attribute" );
548
549                        if( block && output_path == NULL )
550                        {
551                                fsd_log_debug(( "output path not set and we want to block e-mail, set to /dev/null" ));
552                                output_path = fsd_strdup( "/dev/null" );
553                        }
554                }
555
556                if( input_path )
557                {
558                        job_desc->std_in = fsd_strdup(input_path);
559                        fsd_log_debug(("# input = %s", input_path));
560                }
561
562                if( output_path )
563                {
564                        job_desc->std_out = fsd_strdup(output_path);
565                        fsd_log_debug(("# output = %s", output_path));
566                }
567
568                if( error_path )
569                {
570                        job_desc->std_err = fsd_strdup(error_path);
571                        fsd_log_debug(("# error = %s", error_path));
572                }
573         }
574        FINALLY
575        {
576                fsd_free( input_path );
577                fsd_free( output_path );
578                fsd_free( error_path );
579                input_path = NULL;
580                output_path = NULL;
581                error_path = NULL;
582        }
583        END_TRY                 
584       
585        /* native specification */
586        value = jt->get_attr( jt, DRMAA_NATIVE_SPECIFICATION );
587        if( value )
588        {
589                fsd_log_debug(("# Native specification: %s\n", value));
590                slurmdrmaa_parse_native(job_desc, value);
591        }
592               
593        /* job category */
594        value = jt->get_attr( jt, DRMAA_JOB_CATEGORY );
595        if( value )
596                job_category = value;
597
598        {
599                fsd_conf_option_t *category_value = NULL;
600                category_value = fsd_conf_dict_get( session->job_categories, job_category );
601
602                if( category_value != NULL )
603                {
604                        if( category_value->type != FSD_CONF_STRING )
605                                fsd_exc_raise_fmt(
606                                                FSD_ERRNO_INTERNAL_ERROR,
607                                                "configuration error: job category should be string"
608                                                );
609
610                        fsd_log_debug(("# Job category %s : %s\n",value,category_value->val.string));                   
611                        slurmdrmaa_parse_native(job_desc,category_value->val.string);                   
612                }
613                else
614                {
615                        if( value != NULL )
616                                fsd_exc_raise_fmt(
617                                                FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
618                                                "invalid job category: %s", job_category
619                                                );
620                }
621        }
622       
623}               
624               
625
Note: See TracBrowser for help on using the repository browser.