source: trunk/pbs_drmaa/pbs_conn.c @ 86

Revision 86, 11.6 KB checked in by mmamonski, 11 years ago (diff)

DRMAA pbs connection

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <pbs_error.h>
25
26#include <drmaa_utils/datetime.h>
27#include <drmaa_utils/drmaa.h>
28#include <drmaa_utils/iter.h>
29#include <drmaa_utils/conf.h>
30#include <drmaa_utils/datetime.h>
31
32#include <pbs_drmaa/session.h>
33#include <pbs_drmaa/pbs_conn.h>
34#include <pbs_drmaa/util.h>
35
36#include <errno.h>
37#include <signal.h>
38#include <unistd.h>
39
40
41static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination );
42
43static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib );
44
45static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status );
46
47static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal );
48
49static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
50
51static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id );
52
53static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id );
54
55static void pbsdrmaa_pbs_connection_autoclose_thread_loop( pbsdrmaa_pbs_conn_t *self, bool reconnect);
56
57
58static void check_reconnect( pbsdrmaa_pbs_conn_t *self, bool reconnect);
59
60/*
61static void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
62
63static void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self );
64
65static void autoclose_thread_loop( void *data ); */
66
67
68#if defined PBS_PROFESSIONAL && defined PBSE_HISTJOBID
69        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID || pbs_errno == PBSE_HISTJOBID)
70#else
71        #define IS_MISSING_JOB (pbs_errno == PBSE_UNKJOBID)
72#endif
73#define IS_TRANSIENT_ERROR (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED || pbs_errno == PBSOLDE_PROTOCOL || pbs_errno == PBSOLDE_EXPIRED)
74
75pbsdrmaa_pbs_conn_t *
76pbsdrmaa_pbs_conn_new( fsd_drmaa_session_t *session, const char *server )
77{
78        pbsdrmaa_pbs_conn_t *volatile self = NULL;
79
80        fsd_log_enter((""));
81
82        TRY
83          {
84                fsd_malloc(self, pbsdrmaa_pbs_conn_t );
85               
86                self->session = session;
87               
88                self->submit = pbsdrmaa_pbs_submit;
89                self->statjob = pbsdrmaa_pbs_statjob;
90                self->statjob_free = pbsdrmaa_pbs_statjob_free;
91                self->sigjob = pbsdrmaa_pbs_sigjob;
92                self->deljob = pbsdrmaa_pbs_deljob;
93                self->rlsjob = pbsdrmaa_pbs_rlsjob;
94                self->holdjob = pbsdrmaa_pbs_holdjob;
95
96                self->server = fsd_strdup(server);
97
98                self->connection_fd = -1;
99
100                /*ignore SIGPIPE - otherwise pbs_disconnect cause the program to exit */
101                signal(SIGPIPE, SIG_IGN);       
102
103                check_reconnect(self, false);
104          }
105        EXCEPT_DEFAULT
106          {
107                if( self != NULL)
108                  {
109                        fsd_free(self->server);
110                        fsd_free(self);
111
112                        if (self->connection_fd != -1)
113                                pbs_disconnect(self->connection_fd);
114                  }
115                       
116                fsd_exc_reraise();
117          }
118        END_TRY
119
120        fsd_log_return((""));
121
122        return self;
123}
124
125void
126pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self )
127{
128        fsd_log_enter((""));
129
130        TRY
131        {
132                if(self != NULL)
133                {
134                        fsd_free(self->server);
135                        fsd_free(self);
136
137                        if (self->connection_fd != -1)
138                                pbs_disconnect(self->connection_fd);
139                }
140        }
141        EXCEPT_DEFAULT
142        {
143                fsd_exc_reraise();
144        }
145        END_TRY
146       
147        fsd_log_return((""));
148}
149
150char*
151pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination )
152{
153        char *volatile job_id = NULL;
154        volatile bool first_try = true;
155        volatile bool conn_lock = false;
156
157        fsd_log_enter((""));
158
159        TRY
160         {
161                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
162
163                check_reconnect(self, false);
164
165retry:
166                job_id = pbs_submit(self->connection_fd, attrib, script, destination, NULL);
167
168                fsd_log_info(("pbs_submit(%s, %s) = %s", script, destination, job_id));
169
170                if(job_id == NULL)
171                 {
172                        fsd_log_error(( "pbs_submit failed, pbs_errno = %d", pbs_errno ));
173                        if (IS_TRANSIENT_ERROR && first_try)
174                         {
175                                check_reconnect(self, true);
176                                first_try = false;
177                                goto retry;
178                         }
179                        else
180                         {
181                                pbsdrmaa_exc_raise_pbs( "pbs_submit", self->connection_fd);
182                         }
183                 }
184         }
185        EXCEPT_DEFAULT
186         {
187                fsd_free(job_id);
188                fsd_exc_reraise();
189         }
190        FINALLY
191         {
192                if(conn_lock)
193                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
194         }
195        END_TRY
196
197
198        fsd_log_return(("%s", job_id));
199
200        return job_id;
201}
202
203struct batch_status*
204pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib )
205{
206        struct batch_status *volatile status = NULL;
207        volatile bool first_try = true;
208        volatile bool conn_lock = false;
209
210
211        fsd_log_enter((""));
212
213        TRY
214         {
215                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
216
217                check_reconnect(self, false);
218
219retry:
220                status = pbs_statjob(self->connection_fd, job_id, attrib, NULL);
221
222                fsd_log_info(( "pbs_statjob( fd=%d, job_id=%s, attribs={...} ) =%p", self->connection_fd, job_id, (void*)status));
223
224                if(status == NULL)
225                 {
226                        if (IS_MISSING_JOB)
227                         {
228                                fsd_log_info(( "missing job = %s (code=%d)", job_id, pbs_errno ));
229                         }
230                        else if (IS_TRANSIENT_ERROR && first_try)
231                         {
232                                fsd_log_error(( "pbs_statjob failed, pbs_errno = %d", pbs_errno ));
233                                check_reconnect(self, true);
234                                first_try = false;
235                                goto retry;
236                         }
237                        else
238                         {
239                                pbsdrmaa_exc_raise_pbs( "pbs_statjob", self->connection_fd);
240                         }
241                 }
242         }
243        EXCEPT_DEFAULT
244         {
245                if( status != NULL )
246                        pbs_statfree( status );
247
248                fsd_exc_reraise();
249         }
250        FINALLY
251         {
252                if(conn_lock)
253                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
254         }
255        END_TRY
256
257
258        fsd_log_return((""));
259
260        return status;
261}
262
263void
264pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status )
265{
266        fsd_log_enter((""));
267
268        pbs_statfree( job_status );
269}
270
271void
272pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal_name )
273{
274        int rc = PBSE_NONE;
275        volatile bool first_try = true;
276        volatile bool conn_lock = false;
277
278
279        fsd_log_enter((""));
280
281        TRY
282         {
283                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
284
285                check_reconnect(self, false);
286
287retry:
288                rc = pbs_sigjob(self->connection_fd, job_id, signal_name, NULL);
289
290                fsd_log_info(( "pbs_sigjob( fd=%d, job_id=%s, signal_name=%s) = %d", self->connection_fd, job_id, signal_name, rc));
291
292                if(rc != PBSE_NONE)
293                 {
294                        fsd_log_error(( "pbs_sigjob failed, pbs_errno = %d", pbs_errno ));
295                        if (IS_TRANSIENT_ERROR && first_try)
296                         {
297                                check_reconnect(self, true);
298                                first_try = false;
299                                goto retry;
300                         }
301                        else
302                         {
303                                pbsdrmaa_exc_raise_pbs( "pbs_sigjob", self->connection_fd);
304                         }
305                 }
306         }
307        EXCEPT_DEFAULT
308         {
309                fsd_exc_reraise();
310         }
311        FINALLY
312         {
313                if(conn_lock)
314                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
315         }
316        END_TRY
317
318
319        fsd_log_return((""));
320
321}
322
323void
324pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id )
325{
326        int rc = PBSE_NONE;
327        volatile bool first_try = true;
328        volatile bool conn_lock = false;
329
330
331        fsd_log_enter((""));
332
333        TRY
334         {
335                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
336
337                check_reconnect(self, false);
338
339retry:
340                rc = pbs_deljob(self->connection_fd, job_id, NULL);
341
342                fsd_log_info(( "pbs_deljob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
343
344                if(rc != PBSE_NONE)
345                 {
346                        fsd_log_error(( "pbs_deljob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
347                        if (IS_TRANSIENT_ERROR && first_try)
348                         {
349                                check_reconnect(self, true);
350                                first_try = false;
351                                goto retry;
352                         }
353                        else
354                         {
355                                pbsdrmaa_exc_raise_pbs( "pbs_deljob", self->connection_fd);
356                         }
357                 }
358         }
359        EXCEPT_DEFAULT
360         {
361                fsd_exc_reraise();
362         }
363        FINALLY
364         {
365                if(conn_lock)
366                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
367         }
368        END_TRY
369
370
371        fsd_log_return((""));
372}
373
374void
375pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id )
376{
377        int rc = PBSE_NONE;
378        volatile bool first_try = true;
379        volatile bool conn_lock = false;
380
381
382        fsd_log_enter((""));
383
384        TRY
385         {
386                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
387
388                check_reconnect(self, false);
389
390retry:
391                rc = pbs_rlsjob(self->connection_fd, job_id, USER_HOLD, NULL);
392
393                fsd_log_info(( "pbs_rlsjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
394
395                if(rc != PBSE_NONE)
396                 {
397                        fsd_log_error(( "pbs_rlsjob failed, rc = %d, pbs_errno = %d", rc,  pbs_errno ));
398                        if (IS_TRANSIENT_ERROR && first_try)
399                         {
400                                check_reconnect(self, true);
401                                first_try = false;
402                                goto retry;
403                         }
404                        else
405                         {
406                                pbsdrmaa_exc_raise_pbs( "pbs_rlsjob", self->connection_fd);
407                         }
408                 }
409         }
410        EXCEPT_DEFAULT
411         {
412                fsd_exc_reraise();
413         }
414        FINALLY
415         {
416                if(conn_lock)
417                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
418         }
419        END_TRY
420
421
422        fsd_log_return((""));
423}
424
425void
426pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id )
427{
428        int rc = PBSE_NONE;
429        volatile bool first_try = true;
430        volatile bool conn_lock = false;
431
432
433        fsd_log_enter((""));
434
435        TRY
436         {
437                conn_lock = fsd_mutex_lock(&self->session->drm_connection_mutex);
438
439                check_reconnect(self, false);
440
441retry:
442                rc = pbs_holdjob(self->connection_fd, job_id, USER_HOLD, NULL);
443
444                fsd_log_info(( "pbs_holdjob( fd=%d, job_id=%s) = %d", self->connection_fd, job_id, rc));
445
446                if(rc != PBSE_NONE)
447                 {
448                        fsd_log_error(( "pbs_holdjob failed, rc = %d, pbs_errno = %d", rc, pbs_errno ));
449                        if (IS_TRANSIENT_ERROR && first_try)
450                         {
451                                check_reconnect(self, true);
452                                first_try = false;
453                                goto retry;
454                         }
455                        else
456                         {
457                                pbsdrmaa_exc_raise_pbs( "pbs_holdjob", self->connection_fd);
458                         }
459                 }
460         }
461        EXCEPT_DEFAULT
462         {
463                fsd_exc_reraise();
464         }
465        FINALLY
466         {
467                if(conn_lock)
468                        conn_lock = fsd_mutex_unlock(&self->session->drm_connection_mutex);
469         }
470        END_TRY
471
472
473        fsd_log_return((""));
474}
475
476void
477check_reconnect( pbsdrmaa_pbs_conn_t *self, bool force_reconnect)
478{
479        int tries_left = ((pbsdrmaa_session_t *)self->session)->max_retries_count;
480        int sleep_time = 1;
481
482        fsd_log_enter(("(%d)", self->connection_fd));
483
484        if ( self->connection_fd != -1 )
485          {
486                if (!force_reconnect)
487                  {
488                        fsd_log_return(("(%d)", self->connection_fd));
489                        return;
490                  }
491                else
492                 {
493                        stop_autoclose_thread(self);
494                        pbs_disconnect(self->connection_fd);
495                        self->connection_fd = -1;
496                 }
497          }
498
499
500
501retry_connect: /* Life... */
502        self->connection_fd = pbs_connect( self->server );
503        fsd_log_info(( "pbs_connect(%s) =%d", self->server, self->connection_fd ));
504        if( self->connection_fd < 0 && tries_left-- )
505          {
506                sleep(sleep_time);
507                sleep_time *=2;
508                goto retry_connect;
509          }
510       
511        if( self->connection_fd < 0 )
512                pbsdrmaa_exc_raise_pbs( "pbs_connect", self->connection_fd );
513       
514        fsd_log_return(("(%d)", self->connection_fd));
515}
516
517
518/*
519void start_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
520{
521
522
523}
524
525void stop_autoclose_thread( pbsdrmaa_pbs_conn_t *self )
526{
527
528
529}
530
531void autoclose_thread_loop( void *data )
532{
533        pbsdrmaa_pbs_conn_t *self = (pbsdrmaa_pbs_conn_t *)data;
534        struct timespec wait_time;
535
536        fsd_mutex_lock(&self->session->drm_connection_mutex);
537
538        if (fsd_cond_timedwait(&self->autoclose_cond, &self->session->drm_connection_mutex, wait_time);
539         {
540                fsd_log_debug("autoclose thread signaled, waiting again");
541         }
542        else
543         {
544                fsd_log_info("autoclosing PBS connection: fd=%d, time_diff=%d", self->connection_fd, (int)(time(NULL) - self->last_connect_time));
545                pbs_disconnect(self->connection_fd);
546                self->connection_fd = -1;
547         }
548
549        fsd_mutex_unlock(&self->session->drm_connection_mutex);
550}
551*/
Note: See TracBrowser for help on using the repository browser.