Changeset 76 for trunk


Ignore:
Timestamp:
09/17/12 23:25:29 (12 years ago)
Author:
mmamonski
Message:

Wrap PBS API in single class

Location:
trunk/pbs_drmaa
Files:
1 edited
2 copied

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/Makefile.am

    r12 r76  
    3131 submit.c submit.h \ 
    3232 util.c util.h \ 
    33  log_reader.c log_reader.h  
     33 log_reader.c log_reader.h \ 
     34 pbs_conn.c pbs_conn.h 
    3435BUILT_SOURCES       = pbs_attrib.c 
    3536EXTRA_DIST          = pbs_attrib.c 
  • trunk/pbs_drmaa/pbs_conn.c

    r60 r76  
    2222#endif 
    2323 
    24 #include <stdlib.h> 
    25 #include <ctype.h> 
    26 #include <string.h> 
    27 #include <unistd.h> 
    28 #include <sys/stat.h> 
    29 #include <sys/types.h> 
    30 #include <dirent.h> 
    31 #include <fcntl.h> 
    32  
    33 #include <pbs_ifl.h> 
    3424#include <pbs_error.h> 
    3525 
     
    4131#include <drmaa_utils/datetime.h> 
    4232 
    43 #include <pbs_drmaa/job.h> 
    44 #include <pbs_drmaa/log_reader.h> 
    45 #include <pbs_drmaa/session.h> 
    46 #include <pbs_drmaa/submit.h> 
     33#include <pbs_drmaa/pbs_conn.h> 
    4734#include <pbs_drmaa/util.h> 
    48 #include <pbs_drmaa/pbs_attrib.h> 
    4935 
    5036#include <errno.h> 
    51  
    52 enum pbsdrmaa_field_id 
    53 { 
    54         PBSDRMAA_FLD_ID_DATE = 0, 
    55         PBSDRMAA_FLD_ID_EVENT = 1, 
    56         PBSDRMAA_FLD_ID_SRC = 2, 
    57         PBSDRMAA_FLD_ID_OBJ_TYPE = 3, 
    58         PBSDRMAA_FLD_ID_OBJ_ID = 4, 
    59         PBSDRMAA_FLD_ID_MSG = 5 
    60 }; 
    61  
    62  
    63 #define PBSDRMAA_FLD_MSG_0008 "0008" 
    64 #define PBSDRMAA_FLD_MSG_0010 "0010" 
    65  
    66 enum pbsdrmaa_event_type 
    67 { 
    68         pbsdrmaa_event_0008 = 8, 
    69         pbsdrmaa_event_0010 = 10 
    70 }; 
    71  
    72 static void pbsdrmaa_read_log(); 
    73  
    74 static void pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self); 
    75  
    76 static void pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self); 
    77  
    78 static void pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self); 
    79  
    80 static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size); 
    81  
    82 static char *pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id); 
    83  
    84 /* 
    85  * Snippets from log files 
    86  * 
    87  * PBS Pro 
    88  * 
    89 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal 
    90 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl 
    91 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb) 
    92 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl 
    93 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01 
    94  
    95  * 
    96  * Torque 
    97  * 
    98 10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short 
    99 10/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl 
    100 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 
    101  
    102 deleting job: 
    103 I . PBS Pro 
    104 a) in Q state 
    105 10/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Queued at request of mmamonski@grass1.man.poznan.pl, owner = mmamonski@grass1.man.poznan.pl, job name = STDIN, queue = workq 
    106 10/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Modified at request of Scheduler@grass1.man.poznan.pl 
    107 10/16/2011 09:49:37;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 
    108 10/16/2011 09:49:37;0100;Server@grass1;Job;2178.grass1.man.poznan.pl;dequeuing from workq, state 5 
    109  
    110  
    111 b) in R state 
    112 10/16/2011 09:45:12;0080;Server@grass1;Job;2177.grass1.man.poznan.pl;delete job request received 
    113 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job sent signal TermJob on delete 
    114 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 
    115 10/16/2011 09:45:12;0010;Server@grass1;Job;2177.grass1.man.poznan.pl;Exit_status=271 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=2772kb resources_used.ncpus=1 resources_used.vmem=199288kb resources_used.walltime=00:00:26 
    116 10/16/2011 09:45:12;0100;Server@grass1;Job;2177.grass1.man.poznan.pl;dequeuing from workq, state 5 
    117  
    118 II. Torque 
    119 a) in Q state 
    120 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 
    121 10/15/2011 21:19:25;0100;PBS_Server;Job;113045.grass1.man.poznan.pl;dequeuing from batch, state EXITING 
    122  
    123 b) in R state 
    124 10/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 
    125 10/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job sent signal SIGTERM on delete 
    126 10/15/2011 21:19:47;0010;PBS_Server;Job;113046.grass1.man.poznan.pl;Exit_status=271 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:10 
    127  
    128 Log closed: 
    129 10/16/2011 00:00:17;0002;PBS_Server;Svr;Log;Log closed 
    130  
    131  */ 
    132 pbsdrmaa_log_reader_t *  
    133 pbsdrmaa_log_reader_new( fsd_drmaa_session_t *session ) 
    134 { 
    135         pbsdrmaa_log_reader_t *volatile self = NULL; 
     37#include <signal.h> 
     38#include <unistd.h> 
     39 
     40 
     41static char* pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ); 
     42 
     43static struct batch_status* pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib ); 
     44 
     45static void pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status ); 
     46 
     47static void pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal ); 
     48 
     49static void pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self,  char *job_id ); 
     50 
     51static void pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id ); 
     52 
     53static void pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id ); 
     54 
     55static void pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool reconnect); 
     56         
     57pbsdrmaa_pbs_conn_t *  
     58pbsdrmaa_pbs_conn_new( pbsdrmaa_session_t *session, char *server ) 
     59{ 
     60        pbsdrmaa_pbs_conn_t *volatile self = NULL; 
    13661 
    13762        fsd_log_enter(("")); 
    13863 
    13964        TRY 
    140         { 
    141                 fsd_malloc(self, pbsdrmaa_log_reader_t ); 
     65          { 
     66                fsd_malloc(self, pbsdrmaa_pbs_conn_t ); 
    14267                 
    14368                self->session = session; 
    144  
    145                 self->select_file = pbsdrmaa_select_file; 
    146                 self->read_log = pbsdrmaa_read_log;      
    147                 self->close = pbsdrmaa_close_log; 
    148                 self->reopen = pbsdrmaa_reopen_log; 
    14969                 
    150                 self->run_flag = true; 
    151                 self->fhandle = NULL; 
    152                 self->date_changed = true; 
    153                 self->first_open = true; 
    154                 self->log_path = NULL; 
    155                 self->current_offset = 0; 
    156                  
    157         } 
     70                self->submit = pbsdrmaa_pbs_submit; 
     71                self->statjob = pbsdrmaa_pbs_statjob; 
     72                self->statjob_free = pbsdrmaa_pbs_statjob_free; 
     73                self->sigjob = pbsdrmaa_pbs_sigjob; 
     74                self->deljob = pbsdrmaa_pbs_deljob; 
     75                self->rlsjob = pbsdrmaa_pbs_rlsjob; 
     76                self->holdjob = pbsdrmaa_pbs_holdjob; 
     77 
     78                self->server = fsd_strdup(server); 
     79 
     80                self->connection_fd = -1; 
     81                self->last_usage = time(NULL); 
     82                 
     83                /*ignore SIGPIPE - otheriwse pbs_disconnect cause the program to exit */ 
     84                signal(SIGPIPE, SIG_IGN);        
     85 
     86                pbsdrmaa_pbs_reconnect_internal(self, false); 
     87          } 
    15888        EXCEPT_DEFAULT 
    159         { 
     89          { 
    16090                if( self != NULL) 
     91                  { 
     92                        fsd_free(self->server); 
    16193                        fsd_free(self); 
     94 
     95                        if (self->connection_fd != -1) 
     96                                pbs_disconnect(self->connection_fd); 
     97                  } 
    16298                         
    16399                fsd_exc_reraise(); 
    164         } 
     100          } 
    165101        END_TRY 
    166102 
     
    172108 
    173109void 
    174 pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self ) 
     110pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ) 
    175111{ 
    176112        fsd_log_enter(("")); 
     
    179115                if(self != NULL) 
    180116                { 
     117                        fsd_free(self->server); 
    181118                        fsd_free(self);  
     119 
     120                        if (self->connection_fd != -1) 
     121                                pbs_disconnect(self->connection_fd); 
    182122                } 
    183123        } 
     
    191131} 
    192132 
    193  
    194 void 
    195 pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self ) 
    196 { 
    197         fsd_log_enter(("")); 
    198          
    199         fsd_mutex_lock( &self->session->mutex ); 
    200  
    201         TRY 
    202          { 
    203                 while( self->run_flag ) 
    204                  { 
    205                         TRY 
    206                         { 
    207                                 char *line = NULL; 
    208                                  
    209                                 self->select_file(self); 
    210  
    211                                 while ((line = fsd_readline(self->fhandle)) != NULL) 
    212                                  { 
    213                                         int field_id = PBSDRMAA_FLD_ID_DATE; 
    214                                         char *tok_ctx = NULL; 
    215                                         char *field_token = NULL; 
    216                                         char *event_timestamp = NULL; 
    217                                         int event_type = -1; 
    218                                         fsd_job_t *job = NULL; 
    219  
    220                                         /* at first detect if this not the end of log file */ 
    221                                         if (strstr(line, "Log;Log closed")) /*TODO try to be more effective and safe */ 
    222                                          { 
    223                                                 fsd_log_debug(("WT - Date changed. Closing log file")); 
    224                                                 self->date_changed = true; 
    225                                                 goto cleanup; 
    226                                          } 
    227  
    228                                         for (field_token = strtok_r(line, ";", &tok_ctx); field_token; field_token = strtok_r(NULL, ";", &tok_ctx), field_id++) 
    229                                          { 
    230                                                 if ( field_id == PBSDRMAA_FLD_ID_DATE) 
    231                                                  { 
    232                                                         event_timestamp = field_token; 
    233 #ifdef PBS_PBS_PROFESSIONAL 
    234                                                         /*additional check */ 
    235                                                         TRY 
    236                                                         { 
    237                                                          (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 
    238                                                         } 
    239                                                         EXCEPT_DEFAULT 
    240                                                         { 
    241                                                                 fsd_log_error(("Failed to parse timestamp: %s. Log corrupted?", event_timestamp)); 
    242                                                         } 
    243                                                         END_TRY 
    244 #endif 
    245                                                  } 
    246                                                 else if ( field_id == PBSDRMAA_FLD_ID_EVENT) 
    247                                                  { 
    248                                                         if (strncmp(field_token, PBSDRMAA_FLD_MSG_0008, 4) == 0) 
    249                                                                 event_type = pbsdrmaa_event_0008; 
    250                                                         else if (strncmp(field_token, PBSDRMAA_FLD_MSG_0010, 4) == 0) 
    251                                                                 event_type = pbsdrmaa_event_0010; 
    252                                                         else 
    253                                                          { 
    254                                                                 goto cleanup; /*we are interested only in the above log messages */ 
    255                                                          } 
    256                                                  } 
    257                                                 else if ( field_id == PBSDRMAA_FLD_ID_SRC) 
    258                                                  { 
    259                                                         /* not used ignore */ 
    260                                                  } 
    261                                                 else if (field_id  == PBSDRMAA_FLD_ID_OBJ_TYPE) 
    262                                                  { 
    263                                                         if (strncmp(field_token, "Job", 3) != 0) 
    264                                                          { 
    265                                                                 goto cleanup; /* we are interested only in job events */ 
    266                                                          } 
    267                                                  } 
    268                                                 else if (field_id == PBSDRMAA_FLD_ID_OBJ_ID) 
    269                                                  { 
    270                                                         const char *event_jobid = field_token; 
    271                                                          
    272                                                         if (!isdigit(event_jobid[0])) 
    273                                                          { 
    274                                                                 fsd_log_debug(("WT - Invalid job: %s", event_jobid));  
    275                                                                 goto cleanup; 
    276                                                          } 
    277  
    278                                                         job = self->session->get_job( self->session, event_jobid ); 
    279  
    280                                                         if( job ) 
    281                                                          { 
    282                                                                 fsd_log_debug(("WT - Found job event: %s", event_jobid)); 
    283                                                          } 
    284                                                         else 
    285                                                          { 
    286                                                                 fsd_log_debug(("WT - Unknown job: %s", event_jobid)); /* Not a DRMAA job */ 
    287                                                                 goto cleanup; 
    288                                                          } 
    289                                                  } 
    290                                                 else if (field_id == PBSDRMAA_FLD_ID_MSG) 
    291                                                  { 
    292                                                         char *msg = field_token; 
    293                                                         struct batch_status status; 
    294                                                         struct attrl *attribs = NULL; 
    295                                                         bool in_running_state = false; 
    296  
    297                                                         if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Queued", 10) == 0) 
    298                                                          { 
    299                                                                 /* Queued 
    300                                                                  * PBS Pro: 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal 
    301                                                                  * Torque:  10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short 
    302                                                                  */ 
    303                                                                 char *p_queue = NULL; 
    304  
    305                                                                 fsd_log_info(("WT - Detected queuing of job %s", job->job_id)); 
    306  
    307                                                                 if ((p_queue = strstr(msg,"queue =")) == NULL) 
    308                                                                         fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"No queue attribute found in log line = %s", line); 
    309  
    310                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "Q"); 
    311                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_QUEUE, p_queue + 7); 
    312                                                          } 
    313                                                         else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Run", 7) == 0) 
    314                                                         { 
    315                                                                 /* 
    316                                                                  * Running 
    317                                                                  * Torque: 10/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl 
    318                                                                  * PBS Pro: 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb) 
    319                                                                  */ 
    320                                                                 char timestamp_unix[64]; 
    321  
    322                                                                 fsd_log_info(("WT - Detected start of job %s", job->job_id)); 
    323  
    324                                                                 (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 
    325  
    326                                                                 in_running_state = true; 
    327  
    328                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "R"); 
    329                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_START_TIME, timestamp_unix); 
    330 #ifdef PBS_PROFESSIONAL 
    331                                                                         { 
    332                                                                                 char *p_vnode = NULL; 
    333                                                                                 if ((p_vnode = strstr(msg, "exec_vnode"))) 
    334                                                                                  { 
    335                                                                                         attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_VNODE, p_vnode + 11); 
    336                                                                                  } 
    337                                                                         } 
    338 #endif 
    339                                                          } 
    340 #ifndef PBS_PROFESSIONAL 
    341                                                         else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job deleted", 11) == 0) 
    342 #else 
    343                                                         else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job to be deleted", 17) == 0) 
    344 #endif 
    345                                                          { 
    346                                                         /* Deleted 
    347                                                          * PBS Pro: 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl 
    348                                                          * Torque: 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl 
    349                                                          */ 
    350                                                                 char timestamp_unix[64]; 
    351  
    352                                                                 fsd_log_info(("WT - Detected deletion of job %s", job->job_id)); 
    353  
    354                                                                 (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 
    355  
    356                                                                 if (job->state < DRMAA_PS_RUNNING) 
    357                                                                  { 
    358                                                                         fsd_log_info(("WT - Job %s killed before entering running state (%d).", job->job_id, job->state)); 
    359                                                                         attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 
    360                                                                         attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 
    361                                                                         attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, "-2"); 
    362                                                                  } 
    363                                                                 else 
    364                                                                  { 
    365                                                                         fsd_log_info(("WT - Job %s killed after entering running state (%d). Waiting for Completed event...", job->job_id, job->state)); 
    366                                                                         goto cleanup; /* job was started, ignore, wait for Exit_status message */ 
    367                                                                  } 
    368                                                          } 
    369                                                         else if (event_type == pbsdrmaa_event_0010 && (strncmp(msg, "Exit_status=", 12) == 0)) 
    370                                                          { 
    371                                                         /* Completed: 
    372                                                          * PBS Pro: 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01 
    373                                                          * Torque: 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00 
    374                                                          */ 
    375                                                                 char timestamp_unix[64]; 
    376                                                                 time_t timestamp_time_t = pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix)); 
    377                                                                 char *tok_ctx2 = NULL; 
    378                                                                 char *token = NULL; 
    379  
    380                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C"); 
    381                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix); 
    382  
    383                                                                 /* tokenize !!! */ 
    384                                                                 for (token = strtok_r(msg, " ", &tok_ctx2); token; token = strtok_r(NULL, " ", &tok_ctx2)) 
    385                                                                  { 
    386                                                                         if (strncmp(token, "Exit_status=", 12) == 0) 
    387                                                                          { 
    388                                                                                 token[11] = '\0'; 
    389                                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, token + 12); 
    390                                                                                 fsd_log_info(("WT - Completion of job %s (Exit_status=%s) detected after %d seconds", job->job_id, token+12, (int)(time(NULL) - timestamp_time_t) )); 
    391                                                                          } 
    392                                                                         else if (strncmp(token, "resources_used.cput=", 20) == 0) 
    393                                                                          { 
    394                                                                                 token[19] = '\0'; 
    395                                                                                 attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 
    396                                                                          } 
    397                                                                         else if (strncmp(token, "resources_used.mem=", 19) == 0) 
    398                                                                          { 
    399                                                                                 token[18] = '\0'; 
    400                                                                                 attribs = pbsdrmaa_add_attr(attribs, token, token + 19); 
    401                                                                          } 
    402                                                                         else if (strncmp(token, "resources_used.vmem=", 20) == 0) 
    403                                                                          { 
    404                                                                                 token[19] = '\0'; 
    405                                                                                 attribs = pbsdrmaa_add_attr(attribs, token, token + 20); 
    406                                                                          } 
    407                                                                         else if (strncmp(token, "resources_used.walltime=", 24) == 0) 
    408                                                                          { 
    409                                                                                 token[23] = '\0'; 
    410                                                                                 attribs = pbsdrmaa_add_attr(attribs, token, token + 24); 
    411                                                                          } 
    412                                                                  } 
    413  
    414                                                                 if (!job->execution_hosts) 
    415                                                                  { 
    416                                                                         char *exec_host = NULL; 
    417                                                                         fsd_log_info(("WT - No execution host information for job %s. Reading accounting logs...", job->job_id)); 
    418                                                                         exec_host = pbsdrmaa_get_exec_host_from_accountig(self, job->job_id); 
    419                                                                         if (exec_host) 
    420                                                                          { 
    421                                                                                 attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_HOST, exec_host); 
    422                                                                                 fsd_free(exec_host); 
    423                                                                          } 
    424                                                                  } 
    425                                                          } 
    426                                                         else 
    427                                                         { 
    428                                                                 fsd_log_debug(("Ignoring msg(type=%d) = %s", event_type,  msg)); 
    429                                                                 goto cleanup; /* ignore other job events*/ 
    430                                                         } 
    431                                          
    432                                                         fsd_log_debug(("WT - updating job: %s", job->job_id )); 
    433                                                         status.name = job->job_id; 
    434                                                         status.attribs = attribs; 
    435  
    436                                                         ((pbsdrmaa_job_t *)job)->update( job, &status ); 
    437  
    438                                                         if ( in_running_state ) 
    439                                                          { 
    440                                                                 fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); 
    441                                                                 TRY 
    442                                                                 { 
    443                                                                         job->update_status( job ); 
    444                                                                 } 
    445                                                                 EXCEPT_DEFAULT 
    446                                                                 { 
    447                                                                         /*TODO: distinguish between invalid job and internal errors */ 
    448                                                                         fsd_log_debug(("Job finished just after entering running state: %s", job->job_id)); 
    449                                                                 } 
    450                                                                 END_TRY 
    451                                                          } 
    452  
    453  
    454                                                         pbsdrmaa_free_attrl(attribs); /* TODO free on exception */ 
    455  
    456                                                         fsd_cond_broadcast( &job->status_cond); 
    457                                                         fsd_cond_broadcast( &self->session->wait_condition ); 
    458  
    459                                                  } 
    460                                                 else 
    461                                                  { 
    462                                                         fsd_assert(0); /*not reached */ 
    463                                                  } 
    464                                          } 
    465                                 cleanup: 
    466                                         fsd_free(line); /* TODO what about exceptions */                 
    467                                         if ( job ) 
    468                                                 job->release( job ); 
    469  
    470  
    471  
    472                                  } /* end of while getline loop */ 
    473  
    474  
    475  
    476                                 fsd_mutex_unlock( &self->session->mutex ); 
    477  
    478                                 /* close */ 
    479                                 self->close(self); 
    480  
    481                                 sleep(((pbsdrmaa_session_t *)self->session)->wait_thread_sleep_time); 
    482  
    483                                 /* and reopen log file */ 
    484                                 self->reopen(self); 
    485  
    486                                 fsd_mutex_lock( &self->session->mutex ); 
    487  
    488                                 self->run_flag = self->session->wait_thread_run_flag; 
    489                         } 
    490                         EXCEPT_DEFAULT 
    491                         { 
    492                                 const fsd_exc_t *e = fsd_exc_get(); 
    493                                 /* Its better to exit and communicate error rather then let the application to hang */ 
    494                                 fsd_log_fatal(( "Exception in wait thread: <%d:%s>. Exiting !!!", e->code(e), e->message(e) )); 
    495                                 exit(1); 
    496                         } 
    497                         END_TRY 
    498                  } 
    499  
    500                 if(self->fhandle) 
    501                         fclose(self->fhandle); 
    502  
    503                 fsd_log_debug(("WT - Log file closed")); 
    504         } 
    505         FINALLY 
    506         { 
    507                 fsd_log_debug(("WT - Terminated.")); 
    508                 fsd_mutex_unlock( &self->session->mutex ); /**/ 
    509         } 
    510         END_TRY 
    511          
    512         fsd_log_return(("")); 
    513 } 
    514  
    515 void 
    516 pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self ) 
    517 { 
    518         pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 
    519          
    520         if (self->date_changed) 
    521          { 
    522                 int num_tries = 0; 
    523                 struct tm tm;  
    524                 char *old_log_path = NULL; 
    525                  
    526                 fsd_log_enter(("")); 
    527                  
    528                 if(!self->first_open) 
    529                         time(&self->t);  
    530                 else 
    531                         self->t = pbssession->log_file_initial_time; 
    532                          
    533                 localtime_r(&self->t,&tm); 
    534                                  
    535                 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 
    536                 /* generate new date, close file and open new */ 
    537                 old_log_path = self->log_path; 
    538  
    539                 self->log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 
    540  
    541                 if(self->fhandle) 
    542                         fclose(self->fhandle); 
    543  
    544                 fsd_log_info(("Opening log file: %s",self->log_path)); 
    545                                  
    546         retry: 
    547                 if ((self->fhandle = fopen(self->log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open)) 
    548                  { 
    549                         fsd_log_error(("Can't open log file: %s. Verify pbs_home. Running standard wait_thread.", self->log_path)); 
    550                         fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status")); 
    551                         /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */ 
    552                         pbssession->wait_thread_log = false; 
    553                         pbssession->super.wait_thread = pbssession->super_wait_thread; 
    554                         pbssession->super.wait_thread(self->session); 
    555                  } 
    556                 else if ( self->fhandle == NULL ) 
    557                  { /* Torque seems not to create a new file immediately after the old one is closed */ 
    558                         fsd_log_warning(("Can't open log file: %s. Retries count: %d", self->log_path, num_tries)); 
    559                         num_tries++; 
    560                         sleep(2 * num_tries); 
    561                         goto retry; 
    562                  } 
    563  
    564                 fsd_log_debug(("Log file opened")); 
    565  
    566                 if(self->first_open) 
    567                  { 
    568                         fsd_log_debug(("Log file lseek")); 
    569  
    570                         if(fseek(self->fhandle, pbssession->log_file_initial_size, SEEK_SET) == (off_t) -1) 
    571                          { 
    572                                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 
    573                          } 
    574                         self->first_open = false; 
    575                  } 
    576                 else if (old_log_path && strcmp(old_log_path, self->log_path) == 0) 
    577                  { 
    578                         fsd_log_info(("PBS restarted. Seeking log file %u", (unsigned int)self->current_offset)); 
    579                         if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1) 
    580                          { 
    581                                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 
    582                          } 
    583                  } 
    584  
    585                 self->date_changed = false; 
    586                  
    587                 fsd_free(old_log_path); 
    588  
    589                 fsd_log_return(("")); 
    590         }        
    591 } 
    592  
    593 time_t 
    594 pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size) 
    595 { 
    596         struct tm temp_time_tm; 
    597         memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 
    598         temp_time_tm.tm_isdst = -1; 
    599  
    600         if (strptime(timestamp, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) 
    601          { 
    602                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - failed to parse log timestamp: %s", timestamp); 
    603          } 
    604         else 
    605          { 
    606                 time_t temp_time = mktime(&temp_time_tm); 
    607                 snprintf(unixtime_str, size, "%lu", temp_time); 
    608                 return temp_time; 
    609          } 
    610 } 
    611  
    612 char * 
    613 pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id) 
    614 { 
    615                 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) log_reader->session; 
    616                 struct tm tm; 
    617                 time_t tm_t; 
    618                 char *line = NULL; 
    619                 char *exec_host = NULL; 
    620                 char *log_path = NULL; 
    621                 FILE *fhandle = NULL; 
    622  
    623                 fsd_log_enter(("(job_id=%s)", job_id)); 
    624  
    625                 tm_t = time(NULL); 
    626                 localtime_r(&tm_t, &tm); 
    627  
    628                 log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday); 
    629  
    630                 fsd_log_info(("Opening accounting log file: %s", log_path)); 
    631  
    632                 if ((fhandle = fopen(log_path, "r")) == NULL) 
    633                  { 
    634                         fsd_log_error(("Failed to open accounting log file: %s", log_path)); 
    635                         fsd_free(log_path); 
    636                         return NULL; 
    637                  } 
    638  
    639                 fsd_free(log_path); 
    640 /* 
    641 10/27/2011 14:09:32;E;114249.grass1.man.poznan.pl;user=drmaa group=drmaa jobname=none queue=shortq ctime=1319717371 qtime=1319717371 etime=1319717371 start=1319717372 owner=drmaa@grass1.man.poznan.pl exec_host=grass4.man.poznan.pl/0 Resource_List.neednodes=1 Resource_List.nodect=1 Resource_List.nodes=1 Resource_List.walltime=02:00:00 session=28561 end=1319717372 Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:00 
    642  */ 
    643                 while ((line = fsd_readline(fhandle)) != NULL) 
    644                  { 
    645  
    646                         if (line[20] == 'E'  && strncmp(line + 22, job_id, strlen(job_id)) == 0 ) 
    647                          { 
    648                                 char *p = NULL; 
    649  
    650                                 fsd_log_debug(("Matched accounting log record = %s", line)); 
    651  
    652                                 if (!(exec_host = strstr(line, "exec_host"))) 
    653                                  { 
    654                                         fsd_log_error(("Invalid accounting record: %s", exec_host)); 
    655                                         break; 
    656                                  } 
    657  
    658                                 exec_host += 10; 
    659  
    660                                 p = exec_host; 
    661                                 while (*p != ' ' && *p != '\0') 
    662                                         p++; 
    663                                 *p = '\0'; 
    664  
    665                                 break; 
    666                          } 
    667  
    668                         fsd_free(line); 
    669                  } 
    670  
    671                 if (exec_host) 
    672                  { 
    673                         fsd_log_info(("Job %s was executing on hosts %s.", job_id, exec_host)); 
    674                         exec_host = fsd_strdup(exec_host); 
    675                  } 
     133char*  
     134pbsdrmaa_pbs_submit( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ) 
     135{ 
     136 
     137 
     138} 
     139 
     140struct batch_status*  
     141pbsdrmaa_pbs_statjob( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib ) 
     142{ 
     143 
     144} 
     145 
     146void  
     147pbsdrmaa_pbs_statjob_free( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status ) 
     148{ 
     149 
     150 
     151} 
     152 
     153void  
     154pbsdrmaa_pbs_sigjob( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal ) 
     155{ 
     156 
     157 
     158} 
     159 
     160void  
     161pbsdrmaa_pbs_deljob( pbsdrmaa_pbs_conn_t *self, char *job_id ) 
     162{ 
     163 
     164} 
     165 
     166void  
     167pbsdrmaa_pbs_rlsjob( pbsdrmaa_pbs_conn_t *self, char *job_id ) 
     168{ 
     169 
     170 
     171} 
     172 
     173void  
     174pbsdrmaa_pbs_holdjob( pbsdrmaa_pbs_conn_t *self,  char *job_id ) 
     175{ 
     176 
     177} 
     178 
     179void  
     180pbsdrmaa_pbs_reconnect_internal( pbsdrmaa_pbs_conn_t *self, bool force_reconnect) 
     181{ 
     182        int tries_left = self->session->max_retries_count; 
     183        int sleep_time = 1; 
     184 
     185 
     186        fsd_log_enter(("(%d)", self->connection_fd)); 
     187 
     188        if ( self->connection_fd != -1 )  
     189          { 
     190                if (!force_reconnect) 
     191                  { 
     192                        fsd_log_return(("(%d)", self->connection_fd)); 
     193                        return; 
     194                  } 
    676195                else 
    677196                 { 
    678                         fsd_log_error(("Could not find executions hosts for %s.", job_id)); 
     197                        pbs_disconnect(self->connection_fd); 
     198                        self->connection_fd = -1; 
    679199                 } 
    680  
    681                 if (line) 
    682                         fsd_free(line); 
    683  
    684                 fclose(fhandle); 
    685  
    686                 return exec_host; 
    687 } 
    688  
    689 void 
    690 pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self ) 
    691 { 
    692  
    693         self->current_offset = ftello(self->fhandle); 
    694          
    695         fsd_log_debug(("Closing log  file (offset=%d)", self->current_offset));   
    696  
    697         fclose(self->fhandle); 
    698  
    699         self->fhandle = NULL; 
    700 } 
    701  
    702 void 
    703 pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self ) 
    704 { 
    705         fsd_log_debug(("Reopening log file: %s (offset=%d)", self->log_path, self->current_offset));   
    706  
    707         if ((self->fhandle = fopen(self->log_path,"r")) == NULL) 
    708          { 
    709                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to reopen log file"); 
    710          } 
    711  
    712         if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1) 
    713          { 
    714                 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error"); 
    715          } 
    716 } 
    717  
     200          } 
     201 
     202retry_connect: /* Life... */ 
     203        self->connection_fd = pbs_connect( self->server ); 
     204        fsd_log_info(( "pbs_connect(%s) =%d", self->server, self->connection_fd )); 
     205        if( self->connection_fd < 0 && tries_left-- ) 
     206          { 
     207                sleep(sleep_time); 
     208                sleep_time *=2; 
     209                goto retry_connect; 
     210          } 
     211         
     212        if( self->connection_fd < 0 ) 
     213                pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     214         
     215        fsd_log_return(("(%d)", self->connection_fd)); 
     216} 
     217 
  • trunk/pbs_drmaa/pbs_conn.h

    r48 r76  
    11/* $Id$ */ 
    22/* 
    3  *  FedStage DRMAA for PBS Pro 
    4  *  Copyright (C) 2006-2009  FedStage Systems 
     3 *  PSNC DRMAA for Torque/PBS Pro 
     4 *  Copyright (C) 2012 Poznan Supercomputing and Networking Center 
    55 * 
    66 *  This program is free software: you can redistribute it and/or modify 
     
    1818 */ 
    1919  
    20 #ifndef __PBS_DRMAA__LOG_READER_H 
    21 #define __PBS_DRMAA__LOG_READER_H 
     20#ifndef __PBS_DRMAA__PBS_CONN_H 
     21#define __PBS_DRMAA__PBS_CONN_H 
    2222 
    2323#ifdef HAVE_CONFIG_H 
     
    3030#include <drmaa_utils/session.h> 
    3131 
    32 typedef struct pbsdrmaa_log_reader_s pbsdrmaa_log_reader_t; 
     32#include <session.h> 
    3333 
    34 pbsdrmaa_log_reader_t *  
    35 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session); 
     34#include <pbs_ifl.h> 
     35 
     36typedef struct pbsdrmaa_pbs_conn_s pbsdrmaa_pbs_conn_t; 
     37 
     38pbsdrmaa_pbs_conn_t * pbsdrmaa_pbs_conn_new ( pbsdrmaa_session_t * session, char *server); 
    3639 
    3740void 
    38 pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self ); 
     41pbsdrmaa_pbs_conn_destroy ( pbsdrmaa_pbs_conn_t * self ); 
    3942 
    40 struct pbsdrmaa_log_reader_s { 
    41         fsd_drmaa_session_t *volatile session ; 
     43struct pbsdrmaa_pbs_conn_s { 
     44        pbsdrmaa_session_t *volatile session; 
     45 
     46        char* (*submit) ( pbsdrmaa_pbs_conn_t *self, struct attropl *attrib, char *script, char *destination ); 
     47 
     48        struct batch_status* (*statjob) ( pbsdrmaa_pbs_conn_t *self,  char *job_id, struct attrl *attrib ); 
     49 
     50        void (*statjob_free) ( pbsdrmaa_pbs_conn_t *self, struct batch_status* job_status ); 
     51 
     52        void (*sigjob) ( pbsdrmaa_pbs_conn_t *self, char *job_id, char *signal ); 
     53 
     54        void (*deljob) ( pbsdrmaa_pbs_conn_t *self, char *job_id ); 
     55 
     56        void (*rlsjob) ( pbsdrmaa_pbs_conn_t *self, char *job_id ); 
     57 
     58        void (*holdjob) ( pbsdrmaa_pbs_conn_t *self, char *job_id ); 
    4259         
    43         void (*read_log) ( pbsdrmaa_log_reader_t * self ); 
     60        /* contact string */ 
     61        char *server; 
     62        /* connection descriptor */ 
     63        int connection_fd; 
    4464         
    45         void (*select_file) ( pbsdrmaa_log_reader_t * self ); 
    46          
    47         void (*close) ( pbsdrmaa_log_reader_t * self ); 
    48  
    49         void (*reopen) ( pbsdrmaa_log_reader_t * self ); 
    50  
    51  
    52         /* determines if function should run */ 
    53         bool run_flag; 
    54          
    55         /* date of current file */ 
    56         time_t t;        
    57          
    58         /* log file handle */ 
    59         FILE *fhandle; 
    60          
    61         /* for wait_thread - day changed */ 
    62         bool volatile date_changed; 
    63          
    64         /* for wait_thread - log file first open */ 
    65         bool volatile first_open;        
    66  
    67         char *volatile log_path; 
    68  
    69         off_t volatile current_offset; 
     65        /* timestamp of last usage */ 
     66        time_t last_usage;       
    7067}; 
    7168 
    72 #endif /* __PBS_DRMAA__LOG_READER_H */ 
     69#endif /* __PBS_DRMAA__PBS_CONN_H */ 
Note: See TracChangeset for help on using the changeset viewer.