source: trunk/slurm_drmaa/job.c @ 7

Revision 7, 17.0 KB checked in by mmatloka, 14 years ago (diff)

release package

  • Property svn:executable set to *
  • Property svn:keywords set to Id
RevLine 
[5]1/* $Id$ */
[1]2/*
3 * PSNC DRMAA for SLURM
4 * Copyright (C) 2010 Poznan Supercomputing and Networking Center
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19#include <string.h>
20#include <stdlib.h>
21#include <unistd.h>
22#include <signal.h>
23
24#include <drmaa_utils/common.h>
25#include <drmaa_utils/conf.h>
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/drmaa_util.h>
29#include <drmaa_utils/environ.h>
30#include <drmaa_utils/template.h>
31
32#include <slurm_drmaa/job.h>
33#include <slurm_drmaa/session.h>
34#include <slurm_drmaa/util.h>
35
36#include <slurm/slurm.h>
37#include <stdint.h>
38
39static void
40slurmdrmaa_job_control( fsd_job_t *self, int action )
41{
42        slurmdrmaa_job_t *slurm_self = (slurmdrmaa_job_t*)self;
43        job_desc_msg_t job_desc;
44
45        fsd_log_enter(( "({job_id=%s}, action=%d)", self->job_id, action ));
46
47        fsd_mutex_lock( &self->session->drm_connection_mutex );
48        TRY
49         {
50                switch( action )
51                 {
52                        case DRMAA_CONTROL_SUSPEND:
53                                if(slurm_suspend(fsd_atoi(self->job_id)) == -1) {
54                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_suspend error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
55                                }
[7]56                                slurm_self->user_suspended = true;
[1]57                                break;
58                        case DRMAA_CONTROL_HOLD:
59                                /* change priority to 0*/
60                                slurm_init_job_desc_msg(&job_desc);
61                                slurm_self->old_priority = job_desc.priority;
62                                job_desc.job_id = atoi(self->job_id);
63                                job_desc.priority = 0;
64                                if(slurm_update_job(&job_desc) == -1) {
65                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_update_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
66                                }
67                                break;
68                        case DRMAA_CONTROL_RESUME:
69                                if(slurm_resume(fsd_atoi(self->job_id)) == -1) {
70                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_resume error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
71                                }
[7]72                                slurm_self->user_suspended = false;
[1]73                                break;
74                        case DRMAA_CONTROL_RELEASE:
75                          /* change priority back*/
76                                slurm_init_job_desc_msg(&job_desc);
77                                job_desc.priority = 1;
78                                job_desc.job_id = atoi(self->job_id);
79                                if(slurm_update_job(&job_desc) == -1) {
80                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_update_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
81                                }
82                                break;
83                        case DRMAA_CONTROL_TERMINATE:
84                                if(slurm_kill_job(fsd_atoi(self->job_id),SIGKILL,0) == -1) {
85                                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_terminate_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
86                                }
87                                break;
88                        default:
89                                fsd_exc_raise_fmt(
90                                                FSD_ERRNO_INVALID_ARGUMENT,
91                                                "job::control: unknown action %d", action );
92                 }
93                                       
94                fsd_log_debug(("job::control: successful"));
95         }
96        FINALLY
97         {
98                fsd_mutex_unlock( &self->session->drm_connection_mutex );
99         }
100        END_TRY
101
102        fsd_log_return(( "" ));
103}
104
105
106static void
107slurmdrmaa_job_update_status( fsd_job_t *self )
108{
109        job_info_msg_t *job_info = NULL;
[7]110        slurmdrmaa_job_t * slurm_self = (slurmdrmaa_job_t *) self;
[1]111        fsd_log_enter(( "({job_id=%s})", self->job_id ));
112
113        fsd_mutex_lock( &self->session->drm_connection_mutex );
114        TRY
115        {
116                if ( slurm_load_job( &job_info, fsd_atoi(self->job_id), SHOW_ALL) ) {
117                        fsd_exc_raise_fmt(      FSD_ERRNO_INTERNAL_ERROR,"slurm_load_jobs error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
118        }
119               
120                self->exit_status = job_info->job_array[0].exit_code;
121                fsd_log_debug(("exit_status = %d -> %d",self->exit_status, WEXITSTATUS(self->exit_status)));
122
123                switch(job_info->job_array[0].job_state)
124                {
125                        case JOB_PENDING:
126                                switch(job_info->job_array[0].state_reason)
127                                {
128                                        case WAIT_NO_REASON:
129                                        case WAIT_PRIORITY:
130                                        case WAIT_DEPENDENCY:
131                                        case WAIT_RESOURCES:
132                                        case WAIT_PART_NODE_LIMIT:
133                                        case WAIT_PART_TIME_LIMIT:
134                                        case WAIT_PART_STATE:
135                                                self->state = DRMAA_PS_QUEUED_ACTIVE;
136                                                break;
137                                        case WAIT_HELD:
138                                                self->state = DRMAA_PS_USER_ON_HOLD;
139                                                break;
140                                        case WAIT_TIME:
141                                        case WAIT_LICENSES:
142                                        case WAIT_ASSOC_JOB_LIMIT:
143                                        case WAIT_ASSOC_RESOURCE_LIMIT:
144                                        case WAIT_ASSOC_TIME_LIMIT:
145                                        case WAIT_RESERVATION:
146                                        case WAIT_NODE_NOT_AVAIL:
147                                        case WAIT_TBD1:
148                                        case WAIT_TBD2:
149                                                self->state = DRMAA_PS_QUEUED_ACTIVE;
150                                                break;
151                                        case FAIL_DOWN_PARTITION:
152                                        case FAIL_DOWN_NODE:
153                                        case FAIL_BAD_CONSTRAINTS:
154                                        case FAIL_SYSTEM:
155                                        case FAIL_LAUNCH:
156                                        case FAIL_EXIT_CODE:
157                                        case FAIL_TIMEOUT:
158                                        case FAIL_INACTIVE_LIMIT:
159                                        case FAIL_BANK_ACCOUNT:
160                                                self->state = DRMAA_PS_FAILED;
161                                                break;
162                                        default:
163                                                fsd_log_error(("job_state_reason = %d, assert(0)",job_info->job_array[0].state_reason));
164                                                fsd_assert(false);
165       
166                                }
167                                break;
168                        case JOB_RUNNING:
169                                self->state = DRMAA_PS_RUNNING;
170                                break;
171                        case JOB_SUSPENDED:
[7]172                                if(slurm_self->user_suspended == true)
173                                        self->state = DRMAA_PS_USER_SUSPENDED;
174                                else
175                                        self->state = DRMAA_PS_SYSTEM_SUSPENDED; /* assume SYSTEM - suspendig jobs is administrator only */
[1]176                                break;
177                        case JOB_COMPLETE:
178                                self->state = DRMAA_PS_DONE;
179                                break;
180                        case JOB_CANCELLED:
181                                self->exit_status = -1;
182                        case JOB_FAILED:
183                        case JOB_TIMEOUT:
184                        case JOB_NODE_FAIL:
185                                self->state = DRMAA_PS_FAILED;
186                                break;
187                        default: /*transient states */
188                                if(job_info->job_array[0].job_state >= 0x8000) {
189                                        fsd_log_debug(("state COMPLETING"));
190                                }
191                                else if (job_info->job_array[0].job_state >= 0x4000) {
192                                        fsd_log_debug(("state Allocated nodes booting"));
193                                }
194                                else {
195                                        fsd_log_error(("job_state = %d, assert(0)",job_info->job_array[0].job_state));
196                                        fsd_assert(false);
197                                }
198                }
199
200                if(self->exit_status == -1) /* input,output,error path failure etc*/
201                        self->state = DRMAA_PS_FAILED;
202
203                fsd_log_debug(("state: %d ,state_reason: %d-> %s", job_info->job_array[0].job_state, job_info->job_array[0].state_reason, drmaa_job_ps_to_str(self->state)));
204
205                self->last_update_time = time(NULL);
206       
207                if( self->state >= DRMAA_PS_DONE )
208                        fsd_cond_broadcast( &self->status_cond );
209        }
210        FINALLY
211        {
212                if(job_info != NULL)
213                        slurm_free_job_info_msg (job_info);
214
215                fsd_mutex_unlock( &self->session->drm_connection_mutex );
216        }
217        END_TRY
218       
219        fsd_log_return(( "" ));
220}
221
222fsd_job_t *
223slurmdrmaa_job_new( char *job_id )
224{
225        slurmdrmaa_job_t *self = NULL;
226        self = (slurmdrmaa_job_t*)fsd_job_new( job_id );
227
228        fsd_realloc( self, 1, slurmdrmaa_job_t );
229
230        self->super.control = slurmdrmaa_job_control;
231        self->super.update_status = slurmdrmaa_job_update_status;
232        self->old_priority = UINT32_MAX;
[7]233        self->user_suspended = true;
[1]234        return (fsd_job_t*)self;
235}
236
237
238void
239slurmdrmaa_job_create_req(
240                fsd_drmaa_session_t *session,
241                const fsd_template_t *jt,
242                fsd_environ_t **envp,
243                job_desc_msg_t * job_desc,
244                int n_job /* ~job_step */
245                )
246{
247        fsd_expand_drmaa_ph_t *volatile expand = NULL;
248
249        TRY
250         {
251                expand = fsd_expand_drmaa_ph_new( NULL, NULL, fsd_asprintf("%d",n_job) );
252                slurmdrmaa_job_create( session, jt, envp, expand, job_desc, n_job);
253         }
254        EXCEPT_DEFAULT
255         {
256                fsd_exc_reraise();
257         }
258        FINALLY
259         {
260                if( expand )
261                        expand->destroy( expand );
262         }
263        END_TRY
264}
265
266static char *
267internal_map_file( fsd_expand_drmaa_ph_t *expand, const char *path,
268                bool *host_given, const char *name )
269{
270        const char *p;
271
272        for( p = path;  *p != ':';  p++ )
273                if( *p == '\0' )
274                        fsd_exc_raise_fmt( FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_FORMAT,
275                                                        "invalid format of drmaa_%s_path: missing colon", name );
276        if( host_given )
277                *host_given = ( p != path );
278
279        p++;
280
281        return expand->expand( expand, fsd_strdup(p), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD | FSD_DRMAA_PH_INCR );
282}
283
284void
285slurmdrmaa_job_create(
286                fsd_drmaa_session_t *session,
287                const fsd_template_t *jt,
288                fsd_environ_t **envp,
289                fsd_expand_drmaa_ph_t *expand,
290                job_desc_msg_t * job_desc,
291                int n_job
292                )
293{
294        const char *input_path_orig = NULL;
295        const char *output_path_orig = NULL;
296        const char *error_path_orig = NULL;
297        char *volatile input_path = NULL;
298        char *volatile output_path = NULL;
299        char *volatile error_path = NULL;
300        bool input_host = false;
301        bool output_host = false;
302        bool error_host = false;
303        bool join_files = false;
304        const char *value;
305        const char *const *vector;
306        const char *job_category = "default";
307       
308        slurmdrmaa_init_job_desc( job_desc );
309
310        slurm_init_job_desc_msg( job_desc );
311       
312        job_desc->user_id = getuid();
313        job_desc->group_id = getgid();
314
315        job_desc->env_size = 0;
316       
317        /* job name */
318        value = jt->get_attr( jt, DRMAA_JOB_NAME );
319        if( value )
320        {
321                job_desc->name = fsd_strdup(value);
322                fsd_log_debug(("# job_name = %s",job_desc->name));
323        }
324       
325        /* job state at submit */
326        value = jt->get_attr( jt, DRMAA_JS_STATE );
327        if( value )
328        {
329                if( 0 == strcmp( value, DRMAA_SUBMISSION_STATE_ACTIVE ) )
330                {}
331                else if( 0 == strcmp( value, DRMAA_SUBMISSION_STATE_HOLD ) )
332                {
333                        job_desc->priority = 0;
334                        fsd_log_debug(("# hold = user"));
335                }
336                else
337                {
338                        fsd_exc_raise_msg(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE, "invalid value of drmaa_js_state attribute" );
339                }
340        }
341       
342        TRY
343        {
344                const char *command = NULL;
345                char *command_expanded = NULL;
346                char *temp_script_old = NULL;
347                char *temp_script = "";
348                const char *const *i;
349                int j;
350
351                /* remote command */
352                command = jt->get_attr( jt, DRMAA_REMOTE_COMMAND );
353                if( command == NULL )
354                        fsd_exc_raise_msg(
355                                        FSD_DRMAA_ERRNO_CONFLICTING_ATTRIBUTE_VALUES,
356                                        "drmaa_remote_command not set for job template"
357                                        );
358
359                command_expanded = expand->expand( expand, fsd_strdup(command), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD );
360
361                temp_script = fsd_asprintf("#!/bin/bash\n%s",command_expanded);
362                fsd_free(command_expanded);
363
364                /* arguments list */
365                vector = jt->get_v_attr( jt, DRMAA_V_ARGV );
366
367                if( vector )
368                {
369                        for( i = vector, j = 2;  *i;  i++, j++ )
370                        {
371                                char *arg_expanded = expand->expand( expand, fsd_strdup(*i), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_WD );
372                               
373                                temp_script_old = fsd_strdup(temp_script);
374                               
375                                if (strcmp(temp_script, "") != 0) {
376                                        fsd_free(temp_script);
377                                }
378                                /* add too script */
379                                temp_script = fsd_asprintf("%s '%s'", temp_script_old, arg_expanded);
380                                fsd_free(temp_script_old);
381                                fsd_free(arg_expanded);
382                        }
383                }
384               
385                job_desc->script = fsd_asprintf("%s\n", temp_script);
386                fsd_log_debug(("# Script:\n%s", job_desc->script));
387                fsd_free(temp_script);
388        }
389        END_TRY
390       
391
392        /* start time */
393        value = jt->get_attr( jt, DRMAA_START_TIME );
394        if( value )
395        {
396                job_desc->begin_time = fsd_datetime_parse( value );
397                fsd_log_debug(( "\n  drmaa_start_time: %s -> %ld", value, (long)job_desc->begin_time));
398        }
399
400        /* environment */
401        vector = jt->get_v_attr( jt, DRMAA_V_ENV );
402        if( vector )
403        {
404                const char *const *i;
405                unsigned j = 0;
406
407                for( i = vector;  *i;  i++ )
408                {
409                        job_desc->env_size++;
410                }
411                fsd_log_debug(("env_size = %d",job_desc->env_size));
412
413                fsd_log_debug(("# environment ="));
414                fsd_calloc(job_desc->environment, job_desc->env_size+1, char *);
415
416                for( i = vector;  *i;  i++,j++ )
417                {
418                        job_desc->environment[j] = fsd_strdup(*i);
419                        fsd_log_debug((" %s", job_desc->environment[j]));
420                }
421         }
422       
423        /* wall clock time hard limit */
424        value = jt->get_attr( jt, DRMAA_WCT_HLIMIT );
425        if (value)
426        {
427                job_desc->time_limit = slurmdrmaa_datetime_parse( value );
428                fsd_log_debug(("# wct_hlimit = %s -> %ld",value,slurmdrmaa_datetime_parse( value )));
429        }
430
431               
432        /*expand->set(expand, FSD_DRMAA_PH_INCR,fsd_asprintf("%d", n_job));*/ /* set current value */
433        /* TODO: test drmaa_ph_incr */
434        /* job working directory */
435        value = jt->get_attr( jt, DRMAA_WD );
436        if( value )
437        {
438                char *cwd_expanded = expand->expand( expand, fsd_strdup(value), FSD_DRMAA_PH_HD | FSD_DRMAA_PH_INCR );
439
440                expand->set( expand, FSD_DRMAA_PH_WD, fsd_strdup(cwd_expanded));
441
442                fsd_log_debug(("# work_dir = %s",cwd_expanded));
443                job_desc->work_dir = fsd_strdup(cwd_expanded);
444                fsd_free(cwd_expanded);
445        }
446        else
447        {
448                char cwdbuf[4096] = "";
449
450                if ((getcwd(cwdbuf, 4095)) == NULL) {
451                        char errbuf[256] = "InternalError";
452                        (void)strerror_r(errno, errbuf, 256); /*on error the default message would be returned */
453                        fsd_log_error(("getcwd failed: %s", errbuf));
454                        job_desc->work_dir = fsd_strdup(".");
455                } else {
456                        job_desc->work_dir = fsd_strdup(cwdbuf);
457                }
458
459                fsd_log_debug(("work_dir(default:CWD) %s", job_desc->work_dir));
460        }
461
462        TRY
463        {
464                /* input path */
465                input_path_orig = jt->get_attr( jt, DRMAA_INPUT_PATH );
466                if( input_path_orig )
467                {
468                        input_path = internal_map_file( expand, input_path_orig, &input_host,"input" );
469                        fsd_log_debug(( "\n  drmaa_input_path: %s -> %s", input_path_orig, input_path ));
470                }
471
472                /* output path */
473                output_path_orig = jt->get_attr( jt, DRMAA_OUTPUT_PATH );
474                if( output_path_orig )
475                {
476                        output_path = internal_map_file( expand, output_path_orig, &output_host,"output" );
477                        fsd_log_debug(( "\n  drmaa_output_path: %s -> %s", output_path_orig, output_path ));
478                }
479
480                /* error path */
481                error_path_orig = jt->get_attr( jt, DRMAA_ERROR_PATH );
482                if( error_path_orig )
483                {
484                        error_path = internal_map_file( expand, error_path_orig, &error_host,"error" );
485                        fsd_log_debug(( "\n  drmaa_error_path: %s -> %s", error_path_orig, error_path ));
486                }
487
488                /* join files */
489                value = jt->get_attr( jt, DRMAA_JOIN_FILES );
490                if( value )
491                {
492                        if( (value[0] == 'y' || value[0] == 'Y')  &&  value[1] == '\0' )
493                                join_files = true;
494                        else if( (value[0] == 'n' || value[0] == 'N')  &&  value[1] == '\0' )
495                                join_files = false;
496                        else
497                                fsd_exc_raise_msg(
498                                                FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
499                                                "invalid value of drmaa_join_files attribute" );
500                }
501
502                if( join_files )
503                {
504                        if( output_path == NULL )
505                                fsd_exc_raise_msg(FSD_DRMAA_ERRNO_CONFLICTING_ATTRIBUTE_VALUES, "drmaa_join_files is set and output file is not given" );
506                        if( error_path!=NULL && 0 != strcmp( output_path, error_path ) )
507                                fsd_log_warning(( "Error file was given but will be ignored since drmaa_join_files was set." ));
508
509                        if (error_path)
510                                fsd_free(error_path);
511
512                         error_path = fsd_strdup(output_path);
513                }
514                else
515                {
516                        if( error_path == NULL  &&  output_path )
517                                error_path = fsd_strdup( "/dev/null" );
518                        if( output_path == NULL  &&  error_path )
519                                output_path = fsd_strdup( "/dev/null" );
520                }
521
522
523                /* email addresses to send notifications */
524                vector = jt->get_v_attr( jt, DRMAA_V_EMAIL );
525                if( vector  &&  vector[0] )
526                {
527                        /* only to one email address message may be send */
528                        job_desc->mail_user = fsd_strdup(vector[0]);
529                        fsd_log_debug(("# mail_user = %s\n",vector[0]));
530                        if( vector[1] != NULL )
531                        {
532                                fsd_log_error(( "LL only supports one e-mail notification address" ));
533                                fsd_exc_raise_msg(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,"LL only supports one e-mail notification address");
534                        }
535                }
536
537                /* block email */
538                value = jt->get_attr( jt, DRMAA_BLOCK_EMAIL );
539                if( value )
540                {
541                        bool block;
542                        if( strcmp(value, "0") == 0 )
543                        {
544                                block = true;
545                                fsd_log_debug(("# block_email = true"));
546                                fsd_log_debug(("# mail_user delated"));
547                                fsd_free(job_desc->mail_user);
548                                job_desc->mail_user = NULL;
549                        }
550                        else if( strcmp(value, "1") == 0 )
551                                block = false;
552                        else
553                                fsd_exc_raise_msg(FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,"invalid value of drmaa_block_email attribute" );
554
555                        if( block && output_path == NULL )
556                        {
557                                fsd_log_debug(( "output path not set and we want to block e-mail, set to /dev/null" ));
558                                output_path = fsd_strdup( "/dev/null" );
559                        }
560                }
561
562                if( input_path )
563                {
564                        job_desc->std_in = fsd_strdup(input_path);
565                        fsd_log_debug(("# input = %s", input_path));
566                }
567
568                if( output_path )
569                {
570                        job_desc->std_out = fsd_strdup(output_path);
571                        fsd_log_debug(("# output = %s", output_path));
572                }
573
574                if( error_path )
575                {
576                        job_desc->std_err = fsd_strdup(error_path);
577                        fsd_log_debug(("# error = %s", error_path));
578                }
579         }
580        FINALLY
581        {
582                fsd_free( input_path );
583                fsd_free( output_path );
584                fsd_free( error_path );
585                input_path = NULL;
586                output_path = NULL;
587                error_path = NULL;
588        }
589        END_TRY                 
590       
591        /* native specification */
592        value = jt->get_attr( jt, DRMAA_NATIVE_SPECIFICATION );
593        if( value )
594        {
595                fsd_log_debug(("# Native specification: %s\n", value));
596                slurmdrmaa_parse_native(job_desc, value);
597        }
598               
599        /* job category */
600        value = jt->get_attr( jt, DRMAA_JOB_CATEGORY );
601        if( value )
602                job_category = value;
603
604        {
605                fsd_conf_option_t *category_value = NULL;
606                category_value = fsd_conf_dict_get( session->job_categories, job_category );
607
608                if( category_value != NULL )
609                {
610                        if( category_value->type != FSD_CONF_STRING )
611                                fsd_exc_raise_fmt(
612                                                FSD_ERRNO_INTERNAL_ERROR,
613                                                "configuration error: job category should be string"
614                                                );
615
616                        fsd_log_debug(("# Job category %s : %s\n",value,category_value->val.string));                   
617                        slurmdrmaa_parse_native(job_desc,category_value->val.string);                   
618                }
619                else
620                {
621                        if( value != NULL )
622                                fsd_exc_raise_fmt(
623                                                FSD_DRMAA_ERRNO_INVALID_ATTRIBUTE_VALUE,
624                                                "invalid job category: %s", job_category
625                                                );
626                }
627        }
628       
629}               
630               
631
Note: See TracBrowser for help on using the repository browser.