source: trunk/pbs_drmaa/pbs_conn.c @ 101

Revision 101, 14.4 KB checked in by pkopta, 10 years ago (diff)

'x' parameter for PBS Pro pbs_statjob - this allows to get information about finishind and moved jobs

  • Property svn:keywords set to Id
RevLine 
[12]1/* $Id$ */
[7]2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
[98]20#ifdef HAVE_CONFIG_H
[7]21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/datetime.h>
31
[85]32#include <pbs_drmaa/session.h>
[76]33#include <pbs_drmaa/pbs_conn.h>
[7]34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
[76]37#include <signal.h>
38#include <unistd.h>
[7]39
40
[98]41
[76]42static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
[7]43
[76]44static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
[7]45
[76]46static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
[7]47
[76]48static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
[7]49
[76]50static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
[21]51
[76]52static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
[21]53
[76]54static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
[48]55
[87]56/* static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); */
[83]57
58
[85]59static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect);
60
[86]61/*
[85]62static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
63
64static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
65
[86]66static void autoclose_thread_loop( void *data ); */
[85]67
[86]68
[85]69#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID
70        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID)
71#else
72        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID)
73#endif
[92]74#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED || pbs_errno == PBSE_BADCRED)
[83]75
[76]76pbsdrmaa_pbs_conn_t *
[85]77pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server )
[7]78{
[76]79        pbsdrmaa_pbs_conn_t *volatile self = NULL;
[7]80
81        fsd_log_enter((""));
[29]82
[7]83        TRY
[76]84          {
85                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
[7]86               
87                self->session = session;
[76]88               
89                self->submit = pbsdrmaa_pbs_submit;
90                self->statjob = pbsdrmaa_pbs_statjob;
91                self->statjob_free = pbsdrmaa_pbs_statjob_free;
92                self->sigjob = pbsdrmaa_pbs_sigjob;
93                self->deljob = pbsdrmaa_pbs_deljob;
94                self->rlsjob = pbsdrmaa_pbs_rlsjob;
95                self->holdjob = pbsdrmaa_pbs_holdjob;
[29]96
[76]97                self->server = fsd_strdup(server);
98
99                self->connection_fd = -1;
[83]100
101                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
[76]102                signal(SIGPIPE, SIG_IGN);       
103
[85]104                check_reconnect(self, false);
[76]105          }
[7]106        EXCEPT_DEFAULT
[76]107          {
[7]108                if( self != NULL)
[76]109                  {
110                        fsd_free(self->server);
[7]111                        fsd_free(self);
[76]112
113                        if (self->connection_fd != -1)
[87]114                          {
115                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
[76]116                                pbs_disconnect(self->connection_fd);
[87]117                          }
[76]118                  }
[7]119                       
120                fsd_exc_reraise();
[76]121          }
[7]122        END_TRY
[29]123
[7]124        fsd_log_return((""));
[29]125
[7]126        return self;
127}
128
129void
[76]130pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
[7]131{
132        fsd_log_enter((""));
[83]133
[7]134        TRY
135        {
136                if(self != NULL)
137                {
[87]138                        if (self->connection_fd != -1)
139                          {
140                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
141                                pbs_disconnect(self->connection_fd);
142
143                          }
[76]144                        fsd_free(self->server);
[7]145                        fsd_free(self);
[29]146                }
[7]147        }
148        EXCEPT_DEFAULT
149        {
150                fsd_exc_reraise();
151        }
152        END_TRY
153       
154        fsd_log_return((""));
155}
156
[98]157#ifdef HAVE_PBS_SUBMIT_HASH
[94]158
159
[98]160#include <torque4.h>
161
[94]162void set_job_defaults(job_info *ji) {
[98]163  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_c, CHECKPOINT_UNSPECIFIED, STATIC_DATA);
[94]164
[98]165  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_h, NO_HOLD, STATIC_DATA);
[94]166
[98]167  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_j, NO_JOIN, STATIC_DATA);
[94]168
[98]169  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_k, NO_KEEP, STATIC_DATA);
[94]170
[98]171  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_m, MAIL_AT_ABORT, STATIC_DATA);
[94]172
[98]173  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_p, DEFAULT_PRIORITY, STATIC_DATA);
[94]174
[98]175  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_r, "FALSE", STATIC_DATA);
176  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_f, "FALSE", STATIC_DATA);
[94]177 
[98]178  hash_add_or_exit_c(&ji->mm, &ji->client_attr, "pbs_dprefix", "#PBS", STATIC_DATA);
179  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_job_radix, "0", STATIC_DATA);
180  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_v, "", ENV_DATA);
[94]181
182
183
184static char *pbs_submit_4_wrapper(int connection_fd, struct attropl *attrib, char  *script, char *destination)
185{       
186        char *new_jobname = NULL;
187        char *jobname_copy = NULL;
188        char *errmsg = NULL;
189        job_info          ji;
190        int local_errno = 0;
191        struct attropl *p;
192
193        memset(&ji, 0, sizeof(job_info));
194
[98]195        if (memmgr_init_c(&ji.mm, 8192) != PBSE_NONE) /* do not want to use g++ just for this file*/
[94]196          {
197                pbsdrmaa_exc_raise_pbs( "memmgr_init", connection_fd);
198          }
199
200        set_job_defaults(&ji);
201
202        for (p = attrib; p; p = p->next) {
203                if (p->resource) {
[98]204                        hash_add_or_exit_c(&ji.mm, &ji.res_attr, p->resource, p->value, CMDLINE_DATA);
[94]205                } else {
[98]206                        hash_add_or_exit_c(&ji.mm, &ji.job_attr, p->name, p->value, CMDLINE_DATA);
[94]207                }
208        }
209
210
211        pbs_errno = pbs_submit_hash(
212                  connection_fd,
213                  &ji.mm,
214                  ji.job_attr,
215                  ji.res_attr,
216                  script,
217                  destination,
218                  NULL,
219                  &new_jobname,
220                  &errmsg);             
221
222        fsd_log_info(("pbs_submit_hash(%s,%s) = %d (jobid=%s)", script, destination, local_errno, new_jobname));
223       
224        jobname_copy = fsd_strdup(new_jobname);
225
[98]226        memmgr_destroy_c(&ji.mm);
[94]227
228        return jobname_copy;
229}
230#endif
231
[76]232char*
233pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
[7]234{
[83]235        char *volatile job_id = NULL;
236        volatile bool first_try = true;
237        volatile bool conn_lock = false;
[8]238
[83]239        fsd_log_enter((""));
[7]240
[83]241        TRY
242         {
[85]243                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[83]244
[85]245                check_reconnect(self, false);
[83]246
247retry:
[94]248
[98]249#ifdef HAVE_PBS_SUBMIT_HASH
[94]250                job_id = pbs_submit_4_wrapper(self->connection_fd, attrib, script, destination);
251#else
[83]252                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
[94]253#endif
[83]254
255                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
256
257                if(job_id == NULL)
258                 {
259                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
260                        if (IS_TRANSIENT_ERROR && first_try)
261                         {
[85]262                                check_reconnect(self, true);
[83]263                                first_try = false;
264                                goto retry;
265                         }
266                        else
267                         {
[85]268                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd);
[83]269                         }
270                 }
271         }
272        EXCEPT_DEFAULT
273         {
274                fsd_free(job_id);
275                fsd_exc_reraise();
276         }
277        FINALLY
278         {
279                if(conn_lock)
[85]280                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[83]281         }
282        END_TRY
283
284
285        fsd_log_return(("%s", job_id));
286
287        return job_id;
[76]288}
[7]289
[76]290struct batch_status*
291pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
292{
[84]293        struct batch_status *volatile status = NULL;
294        volatile bool first_try = true;
295        volatile bool conn_lock = false;
[101]296#if defined PBS_PROFESSIONAL
297    char *stat_job_extend = "x";
298#else
299    char *stat_job_extend = NULL;
300#endif
[7]301
[84]302
[101]303
[84]304        fsd_log_enter((""));
305
306        TRY
307         {
[85]308                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]309
[85]310                check_reconnect(self, false);
[84]311
312retry:
[101]313                status = pbs_statjob(self->connection_fd, job_id, attrib, stat_job_extend);
[84]314
[101]315#if defined PBS_PROFESSIONAL
316                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...}, \"x\" ) = %p", self->connection_fd, job_id, (void*)status));
317#else
[87]318                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) = %p", self->connection_fd, job_id, (void*)status));
[101]319#endif
[84]320
[90]321                if(status == NULL && pbs_errno)
[84]322                 {
[85]323                        if (IS_MISSING_JOB)
[84]324                         {
[85]325                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno ));
326                         }
327                        else if (IS_TRANSIENT_ERROR && first_try)
328                         {
[87]329                                fsd_log_info(( "pbs_statjob failed, pbs_errno = %d, retrying", pbs_errno ));
[85]330                                check_reconnect(self, true);
[84]331                                first_try = false;
332                                goto retry;
333                         }
334                        else
335                         {
[85]336                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd);
[84]337                         }
338                 }
339         }
340        EXCEPT_DEFAULT
341         {
342                if( status != NULL )
343                        pbs_statfree( status );
344
345                fsd_exc_reraise();
346         }
347        FINALLY
348         {
349                if(conn_lock)
[85]350                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]351         }
352        END_TRY
353
354
355        fsd_log_return((""));
356
357        return status;
[76]358}
[26]359
[76]360void
361pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
362{
[84]363        fsd_log_enter((""));
[31]364
[84]365        pbs_statfree( job_status );
[76]366}
[25]367
[76]368void
[84]369pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
[76]370{
[84]371        int rc = PBSE_NONE;
372        volatile bool first_try = true;
373        volatile bool conn_lock = false;
[30]374
[25]375
[84]376        fsd_log_enter((""));
377
378        TRY
379         {
[85]380                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]381
[85]382                check_reconnect(self, false);
[84]383
384retry:
385                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
386
387                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
388
389                if(rc != PBSE_NONE)
390                 {
391                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
392                        if (IS_TRANSIENT_ERROR && first_try)
393                         {
[85]394                                check_reconnect(self, true);
[84]395                                first_try = false;
396                                goto retry;
397                         }
398                        else
399                         {
[85]400                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd);
[84]401                         }
402                 }
403         }
404        EXCEPT_DEFAULT
405         {
406                fsd_exc_reraise();
407         }
408        FINALLY
409         {
410                if(conn_lock)
[85]411                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]412         }
413        END_TRY
414
415
416        fsd_log_return((""));
417
[76]418}
[30]419
[76]420void
421pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
422{
[84]423        int rc = PBSE_NONE;
424        volatile bool first_try = true;
425        volatile bool conn_lock = false;
[30]426
[84]427
428        fsd_log_enter((""));
429
430        TRY
431         {
[85]432                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]433
[85]434                check_reconnect(self, false);
[84]435
436retry:
437                rc = pbs_deljob(self->connection_fd, job_id, NULL);
438
439                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
440
441                if(rc != PBSE_NONE)
442                 {
443                        if (IS_TRANSIENT_ERROR && first_try)
444                         {
[87]445                                fsd_log_info(( "pbs_deljob failed, rc = %d, pbs_errno = %d. Retrying...", rc, pbs_errno ));
[85]446                                check_reconnect(self, true);
[84]447                                first_try = false;
448                                goto retry;
449                         }
450                        else
451                         {
[85]452                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd);
[84]453                         }
454                 }
455         }
456        EXCEPT_DEFAULT
457         {
458                fsd_exc_reraise();
459         }
460        FINALLY
461         {
462                if(conn_lock)
[85]463                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]464         }
465        END_TRY
466
467
468        fsd_log_return((""));
[7]469}
470
[76]471void
472pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
[7]473{
[84]474        int rc = PBSE_NONE;
475        volatile bool first_try = true;
476        volatile bool conn_lock = false;
[7]477
[48]478
[84]479        fsd_log_enter((""));
480
481        TRY
482         {
[85]483                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]484
[85]485                check_reconnect(self, false);
[84]486
487retry:
488                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
489
490                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
491
492                if(rc != PBSE_NONE)
493                 {
494                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
495                        if (IS_TRANSIENT_ERROR && first_try)
496                         {
[85]497                                check_reconnect(self, true);
[84]498                                first_try = false;
499                                goto retry;
500                         }
501                        else
502                         {
[85]503                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd);
[84]504                         }
505                 }
506         }
507        EXCEPT_DEFAULT
508         {
509                fsd_exc_reraise();
510         }
511        FINALLY
512         {
513                if(conn_lock)
[85]514                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]515         }
516        END_TRY
517
518
519        fsd_log_return((""));
[7]520}
521
[76]522void
523pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
[7]524{
[84]525        int rc = PBSE_NONE;
526        volatile bool first_try = true;
527        volatile bool conn_lock = false;
[7]528
[84]529
530        fsd_log_enter((""));
531
532        TRY
533         {
[85]534                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]535
[85]536                check_reconnect(self, false);
[84]537
538retry:
539                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
540
541                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
542
543                if(rc != PBSE_NONE)
544                 {
545                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
546                        if (IS_TRANSIENT_ERROR && first_try)
547                         {
[85]548                                check_reconnect(self, true);
[84]549                                first_try = false;
550                                goto retry;
551                         }
552                        else
553                         {
[85]554                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd);
[84]555                         }
556                 }
557         }
558        EXCEPT_DEFAULT
559         {
560                fsd_exc_reraise();
561         }
562        FINALLY
563         {
564                if(conn_lock)
[85]565                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]566         }
567        END_TRY
568
569
570        fsd_log_return((""));
[7]571}
572
[76]573void
[85]574check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
[7]575{
[85]576        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
[76]577        int sleep_time = 1;
[34]578
[76]579        fsd_log_enter(("(%d)", self->connection_fd));
[34]580
[76]581        if ( self->connection_fd != -1 )
582          {
583                if (!force_reconnect)
584                  {
585                        fsd_log_return(("(%d)", self->connection_fd));
586                        return;
587                  }
[34]588                else
589                 {
[87]590                        fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
[76]591                        pbs_disconnect(self->connection_fd);
592                        self->connection_fd = -1;
[34]593                 }
[76]594          }
[34]595
[85]596
597
[76]598retry_connect: /* Life... */
599        self->connection_fd = pbs_connect( self->server );
[87]600        fsd_log_info(( "pbs_connect(%s) = %d", self->server, self->connection_fd ));
[76]601        if( self->connection_fd < 0 && tries_left-- )
602          {
603                sleep(sleep_time);
604                sleep_time *=2;
605                goto retry_connect;
606          }
[49]607       
[76]608        if( self->connection_fd < 0 )
[85]609                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd );
[76]610       
611        fsd_log_return(("(%d)", self->connection_fd));
[48]612}
613
[85]614
[86]615/*
616void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
[85]617{
618
619
620}
621
[86]622void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
[85]623{
624
625
626}
627
[86]628void autoclose_thread_loop( void *data )
629{
630        pbsdrmaa_pbs_conn_t *self = (pbsdrmaa_pbs_conn_t *)data;
631        struct timespec wait_time;
632
633        fsd_mutex_lock(&self->session->drm_connection_mutex);
634
635        if (fsd_cond_timedwait(&self->autoclose_cond, &self->session->drm_connection_mutex, wait_time);
636         {
637                fsd_log_debug("autoclose thread signaled, waiting again");
638         }
639        else
640         {
641                fsd_log_info("autoclosing PBS connection: fd=%d, time_diff=%d", self->connection_fd, (int)(time(NULL) - self->last_connect_time));
642                pbs_disconnect(self->connection_fd);
643                self->connection_fd = -1;
644         }
645
646        fsd_mutex_unlock(&self->session->drm_connection_mutex);
647}
648*/
Note: See TracBrowser for help on using the repository browser.