source: trunk/pbs_drmaa/pbs_conn.c @ 98

Revision 98, 14.2 KB checked in by mmamonski, 10 years ago (diff)

Torque 4, on site fixes

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/datetime.h>
31
32#include <pbs_drmaa/session.h>
33#include <pbs_drmaa/pbs_conn.h>
34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
37#include <signal.h>
38#include <unistd.h>
39
40
41
42static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
43
44static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
45
46static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
47
48static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
49
50static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
51
52static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
53
54static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
55
56/* static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect); */
57
58
59static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect);
60
61/*
62static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
63
64static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
65
66static void autoclose_thread_loop( void *data ); */
67
68
69#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID
70        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID)
71#else
72        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID)
73#endif
74#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED || pbs_errno == PBSE_BADCRED)
75
76pbsdrmaa_pbs_conn_t *
77pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server )
78{
79        pbsdrmaa_pbs_conn_t *volatile self = NULL;
80
81        fsd_log_enter((""));
82
83        TRY
84          {
85                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
86               
87                self->session = session;
88               
89                self->submit = pbsdrmaa_pbs_submit;
90                self->statjob = pbsdrmaa_pbs_statjob;
91                self->statjob_free = pbsdrmaa_pbs_statjob_free;
92                self->sigjob = pbsdrmaa_pbs_sigjob;
93                self->deljob = pbsdrmaa_pbs_deljob;
94                self->rlsjob = pbsdrmaa_pbs_rlsjob;
95                self->holdjob = pbsdrmaa_pbs_holdjob;
96
97                self->server = fsd_strdup(server);
98
99                self->connection_fd = -1;
100
101                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
102                signal(SIGPIPE, SIG_IGN);       
103
104                check_reconnect(self, false);
105          }
106        EXCEPT_DEFAULT
107          {
108                if( self != NULL)
109                  {
110                        fsd_free(self->server);
111                        fsd_free(self);
112
113                        if (self->connection_fd != -1)
114                          {
115                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
116                                pbs_disconnect(self->connection_fd);
117                          }
118                  }
119                       
120                fsd_exc_reraise();
121          }
122        END_TRY
123
124        fsd_log_return((""));
125
126        return self;
127}
128
129void
130pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
131{
132        fsd_log_enter((""));
133
134        TRY
135        {
136                if(self != NULL)
137                {
138                        if (self->connection_fd != -1)
139                          {
140                                fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
141                                pbs_disconnect(self->connection_fd);
142
143                          }
144                        fsd_free(self->server);
145                        fsd_free(self);
146                }
147        }
148        EXCEPT_DEFAULT
149        {
150                fsd_exc_reraise();
151        }
152        END_TRY
153       
154        fsd_log_return((""));
155}
156
157#ifdef HAVE_PBS_SUBMIT_HASH
158
159
160#include <torque4.h>
161
162void set_job_defaults(job_info *ji) {
163  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_c, CHECKPOINT_UNSPECIFIED, STATIC_DATA);
164
165  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_h, NO_HOLD, STATIC_DATA);
166
167  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_j, NO_JOIN, STATIC_DATA);
168
169  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_k, NO_KEEP, STATIC_DATA);
170
171  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_m, MAIL_AT_ABORT, STATIC_DATA);
172
173  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_p, DEFAULT_PRIORITY, STATIC_DATA);
174
175  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_r, "FALSE", STATIC_DATA);
176  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_f, "FALSE", STATIC_DATA);
177 
178  hash_add_or_exit_c(&ji->mm, &ji->client_attr, "pbs_dprefix", "#PBS", STATIC_DATA);
179  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_job_radix, "0", STATIC_DATA);
180  hash_add_or_exit_c(&ji->mm, &ji->job_attr, ATTR_v, "", ENV_DATA);
181
182
183
184static char *pbs_submit_4_wrapper(int connection_fd, struct attropl *attrib, char  *script, char *destination)
185{       
186        char *new_jobname = NULL;
187        char *jobname_copy = NULL;
188        char *errmsg = NULL;
189        job_info          ji;
190        int local_errno = 0;
191        struct attropl *p;
192
193        memset(&ji, 0, sizeof(job_info));
194
195        if (memmgr_init_c(&ji.mm, 8192) != PBSE_NONE) /* do not want to use g++ just for this file*/
196          {
197                pbsdrmaa_exc_raise_pbs( "memmgr_init", connection_fd);
198          }
199
200        set_job_defaults(&ji);
201
202        for (p = attrib; p; p = p->next) {
203                if (p->resource) {
204                        hash_add_or_exit_c(&ji.mm, &ji.res_attr, p->resource, p->value, CMDLINE_DATA);
205                } else {
206                        hash_add_or_exit_c(&ji.mm, &ji.job_attr, p->name, p->value, CMDLINE_DATA);
207                }
208        }
209
210
211        pbs_errno = pbs_submit_hash(
212                  connection_fd,
213                  &ji.mm,
214                  ji.job_attr,
215                  ji.res_attr,
216                  script,
217                  destination,
218                  NULL,
219                  &new_jobname,
220                  &errmsg);             
221
222        fsd_log_info(("pbs_submit_hash(%s,%s) = %d (jobid=%s)", script, destination, local_errno, new_jobname));
223       
224        jobname_copy = fsd_strdup(new_jobname);
225
226        memmgr_destroy_c(&ji.mm);
227
228        return jobname_copy;
229}
230#endif
231
232char*
233pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
234{
235        char *volatile job_id = NULL;
236        volatile bool first_try = true;
237        volatile bool conn_lock = false;
238
239        fsd_log_enter((""));
240
241        TRY
242         {
243                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
244
245                check_reconnect(self, false);
246
247retry:
248
249#ifdef HAVE_PBS_SUBMIT_HASH
250                job_id = pbs_submit_4_wrapper(self->connection_fd, attrib, script, destination);
251#else
252                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
253#endif
254
255                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
256
257                if(job_id == NULL)
258                 {
259                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
260                        if (IS_TRANSIENT_ERROR && first_try)
261                         {
262                                check_reconnect(self, true);
263                                first_try = false;
264                                goto retry;
265                         }
266                        else
267                         {
268                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd);
269                         }
270                 }
271         }
272        EXCEPT_DEFAULT
273         {
274                fsd_free(job_id);
275                fsd_exc_reraise();
276         }
277        FINALLY
278         {
279                if(conn_lock)
280                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
281         }
282        END_TRY
283
284
285        fsd_log_return(("%s", job_id));
286
287        return job_id;
288}
289
290struct batch_status*
291pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
292{
293        struct batch_status *volatile status = NULL;
294        volatile bool first_try = true;
295        volatile bool conn_lock = false;
296
297
298        fsd_log_enter((""));
299
300        TRY
301         {
302                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
303
304                check_reconnect(self, false);
305
306retry:
307                status = pbs_statjob(self->connection_fd, job_id, attrib, NULL);
308
309                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) = %p", self->connection_fd, job_id, (void*)status));
310
311                if(status == NULL && pbs_errno)
312                 {
313                        if (IS_MISSING_JOB)
314                         {
315                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno ));
316                         }
317                        else if (IS_TRANSIENT_ERROR && first_try)
318                         {
319                                fsd_log_info(( "pbs_statjob failed, pbs_errno = %d, retrying", pbs_errno ));
320                                check_reconnect(self, true);
321                                first_try = false;
322                                goto retry;
323                         }
324                        else
325                         {
326                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd);
327                         }
328                 }
329         }
330        EXCEPT_DEFAULT
331         {
332                if( status != NULL )
333                        pbs_statfree( status );
334
335                fsd_exc_reraise();
336         }
337        FINALLY
338         {
339                if(conn_lock)
340                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
341         }
342        END_TRY
343
344
345        fsd_log_return((""));
346
347        return status;
348}
349
350void
351pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
352{
353        fsd_log_enter((""));
354
355        pbs_statfree( job_status );
356}
357
358void
359pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
360{
361        int rc = PBSE_NONE;
362        volatile bool first_try = true;
363        volatile bool conn_lock = false;
364
365
366        fsd_log_enter((""));
367
368        TRY
369         {
370                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
371
372                check_reconnect(self, false);
373
374retry:
375                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
376
377                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
378
379                if(rc != PBSE_NONE)
380                 {
381                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
382                        if (IS_TRANSIENT_ERROR && first_try)
383                         {
384                                check_reconnect(self, true);
385                                first_try = false;
386                                goto retry;
387                         }
388                        else
389                         {
390                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd);
391                         }
392                 }
393         }
394        EXCEPT_DEFAULT
395         {
396                fsd_exc_reraise();
397         }
398        FINALLY
399         {
400                if(conn_lock)
401                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
402         }
403        END_TRY
404
405
406        fsd_log_return((""));
407
408}
409
410void
411pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
412{
413        int rc = PBSE_NONE;
414        volatile bool first_try = true;
415        volatile bool conn_lock = false;
416
417
418        fsd_log_enter((""));
419
420        TRY
421         {
422                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
423
424                check_reconnect(self, false);
425
426retry:
427                rc = pbs_deljob(self->connection_fd, job_id, NULL);
428
429                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
430
431                if(rc != PBSE_NONE)
432                 {
433                        if (IS_TRANSIENT_ERROR && first_try)
434                         {
435                                fsd_log_info(( "pbs_deljob failed, rc = %d, pbs_errno = %d. Retrying...", rc, pbs_errno ));
436                                check_reconnect(self, true);
437                                first_try = false;
438                                goto retry;
439                         }
440                        else
441                         {
442                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd);
443                         }
444                 }
445         }
446        EXCEPT_DEFAULT
447         {
448                fsd_exc_reraise();
449         }
450        FINALLY
451         {
452                if(conn_lock)
453                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
454         }
455        END_TRY
456
457
458        fsd_log_return((""));
459}
460
461void
462pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
463{
464        int rc = PBSE_NONE;
465        volatile bool first_try = true;
466        volatile bool conn_lock = false;
467
468
469        fsd_log_enter((""));
470
471        TRY
472         {
473                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
474
475                check_reconnect(self, false);
476
477retry:
478                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
479
480                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
481
482                if(rc != PBSE_NONE)
483                 {
484                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
485                        if (IS_TRANSIENT_ERROR && first_try)
486                         {
487                                check_reconnect(self, true);
488                                first_try = false;
489                                goto retry;
490                         }
491                        else
492                         {
493                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd);
494                         }
495                 }
496         }
497        EXCEPT_DEFAULT
498         {
499                fsd_exc_reraise();
500         }
501        FINALLY
502         {
503                if(conn_lock)
504                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
505         }
506        END_TRY
507
508
509        fsd_log_return((""));
510}
511
512void
513pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
514{
515        int rc = PBSE_NONE;
516        volatile bool first_try = true;
517        volatile bool conn_lock = false;
518
519
520        fsd_log_enter((""));
521
522        TRY
523         {
524                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
525
526                check_reconnect(self, false);
527
528retry:
529                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
530
531                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
532
533                if(rc != PBSE_NONE)
534                 {
535                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
536                        if (IS_TRANSIENT_ERROR && first_try)
537                         {
538                                check_reconnect(self, true);
539                                first_try = false;
540                                goto retry;
541                         }
542                        else
543                         {
544                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd);
545                         }
546                 }
547         }
548        EXCEPT_DEFAULT
549         {
550                fsd_exc_reraise();
551         }
552        FINALLY
553         {
554                if(conn_lock)
555                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
556         }
557        END_TRY
558
559
560        fsd_log_return((""));
561}
562
563void
564check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
565{
566        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
567        int sleep_time = 1;
568
569        fsd_log_enter(("(%d)", self->connection_fd));
570
571        if ( self->connection_fd != -1 )
572          {
573                if (!force_reconnect)
574                  {
575                        fsd_log_return(("(%d)", self->connection_fd));
576                        return;
577                  }
578                else
579                 {
580                        fsd_log_info(( "pbs_disconnect(%d)", self->connection_fd ));
581                        pbs_disconnect(self->connection_fd);
582                        self->connection_fd = -1;
583                 }
584          }
585
586
587
588retry_connect: /* Life... */
589        self->connection_fd = pbs_connect( self->server );
590        fsd_log_info(( "pbs_connect(%s) = %d", self->server, self->connection_fd ));
591        if( self->connection_fd < 0 && tries_left-- )
592          {
593                sleep(sleep_time);
594                sleep_time *=2;
595                goto retry_connect;
596          }
597       
598        if( self->connection_fd < 0 )
599                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd );
600       
601        fsd_log_return(("(%d)", self->connection_fd));
602}
603
604
605/*
606void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
607{
608
609
610}
611
612void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
613{
614
615
616}
617
618void autoclose_thread_loop( void *data )
619{
620        pbsdrmaa_pbs_conn_t *self = (pbsdrmaa_pbs_conn_t *)data;
621        struct timespec wait_time;
622
623        fsd_mutex_lock(&self->session->drm_connection_mutex);
624
625        if (fsd_cond_timedwait(&self->autoclose_cond, &self->session->drm_connection_mutex, wait_time);
626         {
627                fsd_log_debug("autoclose thread signaled, waiting again");
628         }
629        else
630         {
631                fsd_log_info("autoclosing PBS connection: fd=%d, time_diff=%d", self->connection_fd, (int)(time(NULL) - self->last_connect_time));
632                pbs_disconnect(self->connection_fd);
633                self->connection_fd = -1;
634         }
635
636        fsd_mutex_unlock(&self->session->drm_connection_mutex);
637}
638*/
Note: See TracBrowser for help on using the repository browser.