source: trunk/pbs_drmaa/pbs_conn.c @ 96

Revision 96, 14.5 KB checked in by mmamonski, 10 years ago (diff)

C -> C++ wrapper for pbs_conn

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/datetime.h>
31
32#include <pbs_drmaa/session.h>
33#include <pbs_drmaa/pbs_conn.h>
34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
37#include <signal.h>
38#include <unistd.h>
39
40
41static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
42
43static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
44
45static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
46
47static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
48
49static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
50
51static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
52
53static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
54
55/* static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); */
56
57
58static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect);
59
60/*
61static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
62
63static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
64
65static void autoclose_thread_loop( void *data ); */
66
67
68#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID
69        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID)
70#else
71        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID)
72#endif
73#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED || pbs_errno == PBSE_BADCRED)
74
75pbsdrmaa_pbs_conn_t *
76pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server )
77{
78        pbsdrmaa_pbs_conn_t *volatile self = NULL;
79
80        fsd_log_enter((""));
81
82        TRY
83          {
84                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
85               
86                self->session = session;
87               
88                self->submit = pbsdrmaa_pbs_submit;
89                self->statjob = pbsdrmaa_pbs_statjob;
90                self->statjob_free = pbsdrmaa_pbs_statjob_free;
91                self->sigjob = pbsdrmaa_pbs_sigjob;
92                self->deljob = pbsdrmaa_pbs_deljob;
93                self->rlsjob = pbsdrmaa_pbs_rlsjob;
94                self->holdjob = pbsdrmaa_pbs_holdjob;
95
96                self->server = fsd_strdup(server);
97
98                self->connection_fd = -1;
99
100                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
101                signal(SIGPIPE, SIG_IGN);       
102
103                check_reconnect(self, false);
104          }
105        EXCEPT_DEFAULT
106          {
107                if( self != NULL)
108                  {
109                        fsd_free(self->server);
110                        fsd_free(self);
111
112                        if (self->connection_fd != -1)
113                          {
114                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
115                                pbs_disconnect(self->connection_fd);
116                          }
117                  }
118                       
119                fsd_exc_reraise();
120          }
121        END_TRY
122
123        fsd_log_return((""));
124
125        return self;
126}
127
128void
129pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
130{
131        fsd_log_enter((""));
132
133        TRY
134        {
135                if(self != NULL)
136                {
137                        if (self->connection_fd != -1)
138                          {
139                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
140                                pbs_disconnect(self->connection_fd);
141
142                          }
143                        fsd_free(self->server);
144                        fsd_free(self);
145                }
146        }
147        EXCEPT_DEFAULT
148        {
149                fsd_exc_reraise();
150        }
151        END_TRY
152       
153        fsd_log_return((""));
154}
155
156#define HAS_PBS_SUBMIT_HASH             
157#ifdef HAS_PBS_SUBMIT_HASH
158
159#include <qsub_functions.h>
160
161void set_job_defaults(job_info *ji) {
162  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_c, CHECKPOINT_UNSPECIFIED, STATIC_DATA);
163
164  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_h, NO_HOLD, STATIC_DATA);
165
166  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_j, NO_JOIN, STATIC_DATA);
167
168  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_k, NO_KEEP, STATIC_DATA);
169
170  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_m, MAIL_AT_ABORT, STATIC_DATA);
171
172  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_p, DEFAULT_PRIORITY, STATIC_DATA);
173
174  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_r, "FALSE", STATIC_DATA);
175  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_f, "FALSE", STATIC_DATA);
176 
177  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->client_attr, "pbs_dprefix", "#PBS", STATIC_DATA);
178  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_job_radix, "0", STATIC_DATA);
179  _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji->mm, &ji->job_attr, ATTR_v, "");
180
181
182
183static char *pbs_submit_4_wrapper(int connection_fd, struct attropl *attrib, char  *script, char *destination)
184{       
185        char *new_jobname = NULL;
186        char *jobname_copy = NULL;
187        char *errmsg = NULL;
188        job_info          ji;
189        int local_errno = 0;
190        struct attropl *p;
191
192        memset(&ji, 0, sizeof(job_info));
193
194        if (_Z11memmgr_initPP6memmgri(&ji.mm, 8192) != PBSE_NONE) /* do not want to use g++ just for this file*/
195          {
196                pbsdrmaa_exc_raise_pbs( "memmgr_init", connection_fd);
197          }
198
199        set_job_defaults(&ji);
200
201        for (p = attrib; p; p = p->next) {
202                if (p->resource) {
203                        _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji.mm, &ji.res_attr, p->resource, p->value);
204                } else {
205                        _Z16hash_add_or_exitPP6memmgrPP8job_dataPKcS6_i(&ji.mm, &ji.job_attr, p->name, p->value);
206                }
207        }
208
209
210        pbs_errno = pbs_submit_hash(
211                  connection_fd,
212                  &ji.mm,
213                  ji.job_attr,
214                  ji.res_attr,
215                  script,
216                  destination,
217                  NULL,
218                  &new_jobname,
219                  &errmsg);             
220
221        fsd_log_info(("pbs_submit_hash(%s,%s) = %d (jobid=%s)", script, destination, local_errno, new_jobname));
222       
223        jobname_copy = fsd_strdup(new_jobname);
224
225        _Z14memmgr_destroyPP6memmgr(&ji.mm);
226
227        return jobname_copy;
228}
229#endif
230
231char*
232pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
233{
234        char *volatile job_id = NULL;
235        volatile bool first_try = true;
236        volatile bool conn_lock = false;
237
238        fsd_log_enter((""));
239
240        TRY
241         {
242                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
243
244                check_reconnect(self, false);
245
246retry:
247
248#ifdef HAS_PBS_SUBMIT_HASH
249                job_id = pbs_submit_4_wrapper(self->connection_fd, attrib, script, destination);
250#else
251                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
252#endif
253
254                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
255
256                if(job_id == NULL)
257                 {
258                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
259                        if (IS_TRANSIENT_ERROR && first_try)
260                         {
261                                check_reconnect(self, true);
262                                first_try = false;
263                                goto retry;
264                         }
265                        else
266                         {
267                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd);
268                         }
269                 }
270         }
271        EXCEPT_DEFAULT
272         {
273                fsd_free(job_id);
274                fsd_exc_reraise();
275         }
276        FINALLY
277         {
278                if(conn_lock)
279                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
280         }
281        END_TRY
282
283
284        fsd_log_return(("%s", job_id));
285
286        return job_id;
287}
288
289struct batch_status*
290pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
291{
292        struct batch_status *volatile status = NULL;
293        volatile bool first_try = true;
294        volatile bool conn_lock = false;
295
296
297        fsd_log_enter((""));
298
299        TRY
300         {
301                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
302
303                check_reconnect(self, false);
304
305retry:
306                status = pbs_statjob(self->connection_fd, job_id, attrib, NULL);
307
308                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) = %p", self->connection_fd, job_id, (void*)status));
309
310                if(status == NULL && pbs_errno)
311                 {
312                        if (IS_MISSING_JOB)
313                         {
314                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno ));
315                         }
316                        else if (IS_TRANSIENT_ERROR && first_try)
317                         {
318                                fsd_log_info(( "pbs_statjob failed, pbs_errno = %d, retrying", pbs_errno ));
319                                check_reconnect(self, true);
320                                first_try = false;
321                                goto retry;
322                         }
323                        else
324                         {
325                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd);
326                         }
327                 }
328         }
329        EXCEPT_DEFAULT
330         {
331                if( status != NULL )
332                        pbs_statfree( status );
333
334                fsd_exc_reraise();
335         }
336        FINALLY
337         {
338                if(conn_lock)
339                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
340         }
341        END_TRY
342
343
344        fsd_log_return((""));
345
346        return status;
347}
348
349void
350pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
351{
352        fsd_log_enter((""));
353
354        pbs_statfree( job_status );
355}
356
357void
358pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
359{
360        int rc = PBSE_NONE;
361        volatile bool first_try = true;
362        volatile bool conn_lock = false;
363
364
365        fsd_log_enter((""));
366
367        TRY
368         {
369                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
370
371                check_reconnect(self, false);
372
373retry:
374                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
375
376                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
377
378                if(rc != PBSE_NONE)
379                 {
380                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
381                        if (IS_TRANSIENT_ERROR && first_try)
382                         {
383                                check_reconnect(self, true);
384                                first_try = false;
385                                goto retry;
386                         }
387                        else
388                         {
389                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd);
390                         }
391                 }
392         }
393        EXCEPT_DEFAULT
394         {
395                fsd_exc_reraise();
396         }
397        FINALLY
398         {
399                if(conn_lock)
400                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
401         }
402        END_TRY
403
404
405        fsd_log_return((""));
406
407}
408
409void
410pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
411{
412        int rc = PBSE_NONE;
413        volatile bool first_try = true;
414        volatile bool conn_lock = false;
415
416
417        fsd_log_enter((""));
418
419        TRY
420         {
421                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
422
423                check_reconnect(self, false);
424
425retry:
426                rc = pbs_deljob(self->connection_fd, job_id, NULL);
427
428                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
429
430                if(rc != PBSE_NONE)
431                 {
432                        if (IS_TRANSIENT_ERROR && first_try)
433                         {
434                                fsd_log_info(( "pbs_deljob failed, rc = %d, pbs_errno = %d. Retrying...", rc, pbs_errno ));
435                                check_reconnect(self, true);
436                                first_try = false;
437                                goto retry;
438                         }
439                        else
440                         {
441                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd);
442                         }
443                 }
444         }
445        EXCEPT_DEFAULT
446         {
447                fsd_exc_reraise();
448         }
449        FINALLY
450         {
451                if(conn_lock)
452                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
453         }
454        END_TRY
455
456
457        fsd_log_return((""));
458}
459
460void
461pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
462{
463        int rc = PBSE_NONE;
464        volatile bool first_try = true;
465        volatile bool conn_lock = false;
466
467
468        fsd_log_enter((""));
469
470        TRY
471         {
472                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
473
474                check_reconnect(self, false);
475
476retry:
477                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
478
479                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
480
481                if(rc != PBSE_NONE)
482                 {
483                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
484                        if (IS_TRANSIENT_ERROR && first_try)
485                         {
486                                check_reconnect(self, true);
487                                first_try = false;
488                                goto retry;
489                         }
490                        else
491                         {
492                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd);
493                         }
494                 }
495         }
496        EXCEPT_DEFAULT
497         {
498                fsd_exc_reraise();
499         }
500        FINALLY
501         {
502                if(conn_lock)
503                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
504         }
505        END_TRY
506
507
508        fsd_log_return((""));
509}
510
511void
512pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
513{
514        int rc = PBSE_NONE;
515        volatile bool first_try = true;
516        volatile bool conn_lock = false;
517
518
519        fsd_log_enter((""));
520
521        TRY
522         {
523                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
524
525                check_reconnect(self, false);
526
527retry:
528                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
529
530                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
531
532                if(rc != PBSE_NONE)
533                 {
534                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
535                        if (IS_TRANSIENT_ERROR && first_try)
536                         {
537                                check_reconnect(self, true);
538                                first_try = false;
539                                goto retry;
540                         }
541                        else
542                         {
543                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd);
544                         }
545                 }
546         }
547        EXCEPT_DEFAULT
548         {
549                fsd_exc_reraise();
550         }
551        FINALLY
552         {
553                if(conn_lock)
554                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
555         }
556        END_TRY
557
558
559        fsd_log_return((""));
560}
561
562void
563check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
564{
565        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
566        int sleep_time = 1;
567
568        fsd_log_enter(("(%d)", self->connection_fd));
569
570        if ( self->connection_fd != -1 )
571          {
572                if (!force_reconnect)
573                  {
574                        fsd_log_return(("(%d)", self->connection_fd));
575                        return;
576                  }
577                else
578                 {
579                        fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
580                        pbs_disconnect(self->connection_fd);
581                        self->connection_fd = -1;
582                 }
583          }
584
585
586
587retry_connect: /* Life... */
588        self->connection_fd = pbs_connect( self->server );
589        fsd_log_info(( "pbs_connect(%s) = %d", self->server, self->connection_fd ));
590        if( self->connection_fd < 0 && tries_left-- )
591          {
592                sleep(sleep_time);
593                sleep_time *=2;
594                goto retry_connect;
595          }
596       
597        if( self->connection_fd < 0 )
598                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd );
599       
600        fsd_log_return(("(%d)", self->connection_fd));
601}
602
603
604/*
605void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
606{
607
608
609}
610
611void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
612{
613
614
615}
616
617void autoclose_thread_loop( void *data )
618{
619        pbsdrmaa_pbs_conn_t *self = (pbsdrmaa_pbs_conn_t *)data;
620        struct timespec wait_time;
621
622        fsd_mutex_lock(&self->session->drm_connection_mutex);
623
624        if (fsd_cond_timedwait(&self->autoclose_cond, &self->session->drm_connection_mutex, wait_time);
625         {
626                fsd_log_debug("autoclose thread signaled, waiting again");
627         }
628        else
629         {
630                fsd_log_info("autoclosing PBS connection: fd=%d, time_diff=%d", self->connection_fd, (int)(time(NULL) - self->last_connect_time));
631                pbs_disconnect(self->connection_fd);
632                self->connection_fd = -1;
633         }
634
635        fsd_mutex_unlock(&self->session->drm_connection_mutex);
636}
637*/
Note: See TracBrowser for help on using the repository browser.