source: trunk/pbs_drmaa/pbs_conn.c @ 96

Revision 96, 14.5 KB checked in by mmamonski, 10 years ago (diff)

C -> C++ wrapper for pbs_conn

  • Property svn:keywords set to Id
RevLine 
[12]1/* $Id$ */
[7]2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
[96]20 #ifdef HAVE_CONFIG_H
[7]21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/datetime.h>
31
[85]32#include <pbs_drmaa/session.h>
[76]33#include <pbs_drmaa/pbs_conn.h>
[7]34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
[76]37#include <signal.h>
38#include <unistd.h>
[7]39
40
[76]41static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
[7]42
[76]43static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
[7]44
[76]45static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
[7]46
[76]47static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
[7]48
[76]49static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
[21]50
[76]51static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
[21]52
[76]53static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
[48]54
[87]55/* static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); */
[83]56
57
[85]58static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect);
59
[86]60/*
[85]61static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
62
63static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
64
[86]65static void autoclose_thread_loop( void *data ); */
[85]66
[86]67
[85]68#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID
69        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID)
70#else
71        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID)
72#endif
[92]73#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED || pbs_errno == PBSE_BADCRED)
[83]74
[76]75pbsdrmaa_pbs_conn_t *
[85]76pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server )
[7]77{
[76]78        pbsdrmaa_pbs_conn_t *volatile self = NULL;
[7]79
80        fsd_log_enter((""));
[29]81
[7]82        TRY
[76]83          {
84                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
[7]85               
86                self->session = session;
[76]87               
88                self->submit = pbsdrmaa_pbs_submit;
89                self->statjob = pbsdrmaa_pbs_statjob;
90                self->statjob_free = pbsdrmaa_pbs_statjob_free;
91                self->sigjob = pbsdrmaa_pbs_sigjob;
92                self->deljob = pbsdrmaa_pbs_deljob;
93                self->rlsjob = pbsdrmaa_pbs_rlsjob;
94                self->holdjob = pbsdrmaa_pbs_holdjob;
[29]95
[76]96                self->server = fsd_strdup(server);
97
98                self->connection_fd = -1;
[83]99
100                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
[76]101                signal(SIGPIPE, SIG_IGN);       
102
[85]103                check_reconnect(self, false);
[76]104          }
[7]105        EXCEPT_DEFAULT
[76]106          {
[7]107                if( self != NULL)
[76]108                  {
109                        fsd_free(self->server);
[7]110                        fsd_free(self);
[76]111
112                        if (self->connection_fd != -1)
[87]113                          {
114                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
[76]115                                pbs_disconnect(self->connection_fd);
[87]116                          }
[76]117                  }
[7]118                       
119                fsd_exc_reraise();
[76]120          }
[7]121        END_TRY
[29]122
[7]123        fsd_log_return((""));
[29]124
[7]125        return self;
126}
127
128void
[76]129pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
[7]130{
131        fsd_log_enter((""));
[83]132
[7]133        TRY
134        {
135                if(self != NULL)
136                {
[87]137                        if (self->connection_fd != -1)
138                          {
139                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
140                                pbs_disconnect(self->connection_fd);
141
142                          }
[76]143                        fsd_free(self->server);
[7]144                        fsd_free(self);
[29]145                }
[7]146        }
147        EXCEPT_DEFAULT
148        {
149                fsd_exc_reraise();
150        }
151        END_TRY
152       
153        fsd_log_return((""));
154}
155
[96]156#define HAS_PBS_SUBMIT_HASH             
157#ifdef HAS_PBS_SUBMIT_HASH
[94]158
[96]159#include <qsub_functions.h>
[94]160
161void set_job_defaults(job_info *ji) {
[96]162  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_c, CHECKPOINT_UNSPECIFIED, STATIC_DATA);
[94]163
[96]164  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_h, NO_HOLD, STATIC_DATA);
[94]165
[96]166  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_j, NO_JOIN, STATIC_DATA);
[94]167
[96]168  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_k, NO_KEEP, STATIC_DATA);
[94]169
[96]170  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_m, MAIL_AT_ABORT, STATIC_DATA);
[94]171
[96]172  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_p, DEFAULT_PRIORITY, STATIC_DATA);
[94]173
[96]174  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_r, "FALSE", STATIC_DATA);
175  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_f, "FALSE", STATIC_DATA);
[94]176 
[96]177  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->client_attr, "pbs_dprefix", "#PBS", STATIC_DATA);
178  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_job_radix, "0", STATIC_DATA);
179  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_v, "");
[94]180
181
182
183static char *pbs_submit_4_wrapper(int connection_fd, struct attropl *attrib, char  *script, char *destination)
184{       
185        char *new_jobname = NULL;
186        char *jobname_copy = NULL;
187        char *errmsg = NULL;
188        job_info          ji;
189        int local_errno = 0;
190        struct attropl *p;
191
192        memset(&ji, 0, sizeof(job_info));
193
[96]194        if (_Z11memmgr_initPP6memmgri(&ji.mm, 8192) != PBSE_NONE) /* do not want to use g++ just for this file*/
[94]195          {
196                pbsdrmaa_exc_raise_pbs( "memmgr_init", connection_fd);
197          }
198
199        set_job_defaults(&ji);
200
201        for (p = attrib; p; p = p->next) {
202                if (p->resource) {
[96]203                        _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji.mm, &ji.res_attr, p->resource, p->value);
[94]204                } else {
[96]205                        _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji.mm, &ji.job_attr, p->name, p->value);
[94]206                }
207        }
208
209
210        pbs_errno = pbs_submit_hash(
211                  connection_fd,
212                  &ji.mm,
213                  ji.job_attr,
214                  ji.res_attr,
215                  script,
216                  destination,
217                  NULL,
218                  &new_jobname,
219                  &errmsg);             
220
221        fsd_log_info(("pbs_submit_hash(%s,%s) = %d (jobid=%s)", script, destination, local_errno, new_jobname));
222       
223        jobname_copy = fsd_strdup(new_jobname);
224
[96]225        _Z14memmgr_destroyPP6memmgr(&ji.mm);
[94]226
227        return jobname_copy;
228}
229#endif
230
[76]231char*
232pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
[7]233{
[83]234        char *volatile job_id = NULL;
235        volatile bool first_try = true;
236        volatile bool conn_lock = false;
[8]237
[83]238        fsd_log_enter((""));
[7]239
[83]240        TRY
241         {
[85]242                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[83]243
[85]244                check_reconnect(self, false);
[83]245
246retry:
[94]247
[96]248#ifdef HAS_PBS_SUBMIT_HASH
[94]249                job_id = pbs_submit_4_wrapper(self->connection_fd, attrib, script, destination);
250#else
[83]251                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
[94]252#endif
[83]253
254                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
255
256                if(job_id == NULL)
257                 {
258                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
259                        if (IS_TRANSIENT_ERROR && first_try)
260                         {
[85]261                                check_reconnect(self, true);
[83]262                                first_try = false;
263                                goto retry;
264                         }
265                        else
266                         {
[85]267                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd);
[83]268                         }
269                 }
270         }
271        EXCEPT_DEFAULT
272         {
273                fsd_free(job_id);
274                fsd_exc_reraise();
275         }
276        FINALLY
277         {
278                if(conn_lock)
[85]279                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[83]280         }
281        END_TRY
282
283
284        fsd_log_return(("%s", job_id));
285
286        return job_id;
[76]287}
[7]288
[76]289struct batch_status*
290pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
291{
[84]292        struct batch_status *volatile status = NULL;
293        volatile bool first_try = true;
294        volatile bool conn_lock = false;
[7]295
[84]296
297        fsd_log_enter((""));
298
299        TRY
300         {
[85]301                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]302
[85]303                check_reconnect(self, false);
[84]304
305retry:
306                status = pbs_statjob(self->connection_fd, job_id, attrib, NULL);
307
[87]308                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) = %p", self->connection_fd, job_id, (void*)status));
[84]309
[90]310                if(status == NULL && pbs_errno)
[84]311                 {
[85]312                        if (IS_MISSING_JOB)
[84]313                         {
[85]314                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno ));
315                         }
316                        else if (IS_TRANSIENT_ERROR && first_try)
317                         {
[87]318                                fsd_log_info(( "pbs_statjob failed, pbs_errno = %d, retrying", pbs_errno ));
[85]319                                check_reconnect(self, true);
[84]320                                first_try = false;
321                                goto retry;
322                         }
323                        else
324                         {
[85]325                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd);
[84]326                         }
327                 }
328         }
329        EXCEPT_DEFAULT
330         {
331                if( status != NULL )
332                        pbs_statfree( status );
333
334                fsd_exc_reraise();
335         }
336        FINALLY
337         {
338                if(conn_lock)
[85]339                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]340         }
341        END_TRY
342
343
344        fsd_log_return((""));
345
346        return status;
[76]347}
[26]348
[76]349void
350pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
351{
[84]352        fsd_log_enter((""));
[31]353
[84]354        pbs_statfree( job_status );
[76]355}
[25]356
[76]357void
[84]358pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
[76]359{
[84]360        int rc = PBSE_NONE;
361        volatile bool first_try = true;
362        volatile bool conn_lock = false;
[30]363
[25]364
[84]365        fsd_log_enter((""));
366
367        TRY
368         {
[85]369                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]370
[85]371                check_reconnect(self, false);
[84]372
373retry:
374                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
375
376                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
377
378                if(rc != PBSE_NONE)
379                 {
380                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
381                        if (IS_TRANSIENT_ERROR && first_try)
382                         {
[85]383                                check_reconnect(self, true);
[84]384                                first_try = false;
385                                goto retry;
386                         }
387                        else
388                         {
[85]389                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd);
[84]390                         }
391                 }
392         }
393        EXCEPT_DEFAULT
394         {
395                fsd_exc_reraise();
396         }
397        FINALLY
398         {
399                if(conn_lock)
[85]400                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]401         }
402        END_TRY
403
404
405        fsd_log_return((""));
406
[76]407}
[30]408
[76]409void
410pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
411{
[84]412        int rc = PBSE_NONE;
413        volatile bool first_try = true;
414        volatile bool conn_lock = false;
[30]415
[84]416
417        fsd_log_enter((""));
418
419        TRY
420         {
[85]421                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]422
[85]423                check_reconnect(self, false);
[84]424
425retry:
426                rc = pbs_deljob(self->connection_fd, job_id, NULL);
427
428                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
429
430                if(rc != PBSE_NONE)
431                 {
432                        if (IS_TRANSIENT_ERROR && first_try)
433                         {
[87]434                                fsd_log_info(( "pbs_deljob failed, rc = %d, pbs_errno = %d. Retrying...", rc, pbs_errno ));
[85]435                                check_reconnect(self, true);
[84]436                                first_try = false;
437                                goto retry;
438                         }
439                        else
440                         {
[85]441                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd);
[84]442                         }
443                 }
444         }
445        EXCEPT_DEFAULT
446         {
447                fsd_exc_reraise();
448         }
449        FINALLY
450         {
451                if(conn_lock)
[85]452                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]453         }
454        END_TRY
455
456
457        fsd_log_return((""));
[7]458}
459
[76]460void
461pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
[7]462{
[84]463        int rc = PBSE_NONE;
464        volatile bool first_try = true;
465        volatile bool conn_lock = false;
[7]466
[48]467
[84]468        fsd_log_enter((""));
469
470        TRY
471         {
[85]472                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]473
[85]474                check_reconnect(self, false);
[84]475
476retry:
477                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
478
479                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
480
481                if(rc != PBSE_NONE)
482                 {
483                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
484                        if (IS_TRANSIENT_ERROR && first_try)
485                         {
[85]486                                check_reconnect(self, true);
[84]487                                first_try = false;
488                                goto retry;
489                         }
490                        else
491                         {
[85]492                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd);
[84]493                         }
494                 }
495         }
496        EXCEPT_DEFAULT
497         {
498                fsd_exc_reraise();
499         }
500        FINALLY
501         {
502                if(conn_lock)
[85]503                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]504         }
505        END_TRY
506
507
508        fsd_log_return((""));
[7]509}
510
[76]511void
512pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
[7]513{
[84]514        int rc = PBSE_NONE;
515        volatile bool first_try = true;
516        volatile bool conn_lock = false;
[7]517
[84]518
519        fsd_log_enter((""));
520
521        TRY
522         {
[85]523                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
[84]524
[85]525                check_reconnect(self, false);
[84]526
527retry:
528                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
529
530                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
531
532                if(rc != PBSE_NONE)
533                 {
534                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
535                        if (IS_TRANSIENT_ERROR && first_try)
536                         {
[85]537                                check_reconnect(self, true);
[84]538                                first_try = false;
539                                goto retry;
540                         }
541                        else
542                         {
[85]543                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd);
[84]544                         }
545                 }
546         }
547        EXCEPT_DEFAULT
548         {
549                fsd_exc_reraise();
550         }
551        FINALLY
552         {
553                if(conn_lock)
[85]554                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
[84]555         }
556        END_TRY
557
558
559        fsd_log_return((""));
[7]560}
561
[76]562void
[85]563check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
[7]564{
[85]565        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
[76]566        int sleep_time = 1;
[34]567
[76]568        fsd_log_enter(("(%d)", self->connection_fd));
[34]569
[76]570        if ( self->connection_fd != -1 )
571          {
572                if (!force_reconnect)
573                  {
574                        fsd_log_return(("(%d)", self->connection_fd));
575                        return;
576                  }
[34]577                else
578                 {
[87]579                        fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
[76]580                        pbs_disconnect(self->connection_fd);
581                        self->connection_fd = -1;
[34]582                 }
[76]583          }
[34]584
[85]585
586
[76]587retry_connect: /* Life... */
588        self->connection_fd = pbs_connect( self->server );
[87]589        fsd_log_info(( "pbs_connect(%s) = %d", self->server, self->connection_fd ));
[76]590        if( self->connection_fd < 0 && tries_left-- )
591          {
592                sleep(sleep_time);
593                sleep_time *=2;
594                goto retry_connect;
595          }
[49]596       
[76]597        if( self->connection_fd < 0 )
[85]598                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd );
[76]599       
600        fsd_log_return(("(%d)", self->connection_fd));
[48]601}
602
[85]603
[86]604/*
605void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
[85]606{
607
608
609}
610
[86]611void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
[85]612{
613
614
615}
616
[86]617void autoclose_thread_loop( void *data )
618{
619        pbsdrmaa_pbs_conn_t *self = (pbsdrmaa_pbs_conn_t *)data;
620        struct timespec wait_time;
621
622        fsd_mutex_lock(&self->session->drm_connection_mutex);
623
624        if (fsd_cond_timedwait(&self->autoclose_cond, &self->session->drm_connection_mutex, wait_time);
625         {
626                fsd_log_debug("autoclose thread signaled, waiting again");
627         }
628        else
629         {
630                fsd_log_info("autoclosing PBS connection: fd=%d, time_diff=%d", self->connection_fd, (int)(time(NULL) - self->last_connect_time));
631                pbs_disconnect(self->connection_fd);
632                self->connection_fd = -1;
633         }
634
635        fsd_mutex_unlock(&self->session->drm_connection_mutex);
636}
637*/
Note: See TracBrowser for help on using the repository browser.