source: trunk/pbs_drmaa/log_reader.c @ 31

Revision 31, 19.5 KB checked in by mmamonski, 13 years ago (diff)

on site fixies

  • Property svn:keywords set to Id
RevLine 
[12]1/* $Id$ */
[7]2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/select.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
[29]48#include <pbs_drmaa/pbs_attrib.h>
[7]49
50#include <errno.h>
51
[29]52enum pbsdrmaa_field_id
53{
54        PBSDRMAA_FLD_ID_DATE = 0,
55        PBSDRMAA_FLD_ID_EVENT = 1,
56        PBSDRMAA_FLD_ID_SRC = 2,
57        PBSDRMAA_FLD_ID_OBJ_TYPE = 3,
58        PBSDRMAA_FLD_ID_OBJ_ID = 4,
59        PBSDRMAA_FLD_ID_MSG = 5
60};
[7]61
62
[29]63#define PBSDRMAA_FLD_MSG_0008 "0008"
64#define PBSDRMAA_FLD_MSG_0010 "0010"
[7]65
[29]66enum pbsdrmaa_event_type
67{
68        pbsdrmaa_event_0008 = 8,
69        pbsdrmaa_event_0010 = 10
70};
[7]71
[29]72static void pbsdrmaa_read_log();
[7]73
[29]74static void pbsdrmaa_select_file_wait_thread( pbsdrmaa_log_reader_t * self);
[21]75
[29]76char *pbsdrmaa_read_line_wait_thread( pbsdrmaa_log_reader_t * self);
[21]77
[29]78static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size);
[21]79
[29]80static char *pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id);
[7]81
[25]82/*
83 * Snippets from log files
84 *
85 * PBS Pro
86 *
8710/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
8810/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
8910/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
9010/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
9110/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
92
93 *
94 * Torque
95 *
9610/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
9710/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
9810/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
99
[29]100deleting job:
101I . PBS Pro
102a) in Q state
10310/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Queued at request of mmamonski@grass1.man.poznan.pl, owner = mmamonski@grass1.man.poznan.pl, job name = STDIN, queue = workq
10410/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Modified at request of Scheduler@grass1.man.poznan.pl
10510/16/2011 09:49:37;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
10610/16/2011 09:49:37;0100;Server@grass1;Job;2178.grass1.man.poznan.pl;dequeuing from workq, state 5
107
108
109b) in R state
11010/16/2011 09:45:12;0080;Server@grass1;Job;2177.grass1.man.poznan.pl;delete job request received
11110/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job sent signal TermJob on delete
11210/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
11310/16/2011 09:45:12;0010;Server@grass1;Job;2177.grass1.man.poznan.pl;Exit_status=271 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=2772kb resources_used.ncpus=1 resources_used.vmem=199288kb resources_used.walltime=00:00:26
11410/16/2011 09:45:12;0100;Server@grass1;Job;2177.grass1.man.poznan.pl;dequeuing from workq, state 5
115
116II. Torque
117a) in Q state
11810/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
11910/15/2011 21:19:25;0100;PBS_Server;Job;113045.grass1.man.poznan.pl;dequeuing from batch, state EXITING
120
121b) in R state
12210/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
12310/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job sent signal SIGTERM on delete
12410/15/2011 21:19:47;0010;PBS_Server;Job;113046.grass1.man.poznan.pl;Exit_status=271 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:10
125
126Log closed:
12710/16/2011 00:00:17;0002;PBS_Server;Svr;Log;Log closed
128
[25]129 */
[7]130pbsdrmaa_log_reader_t *
[29]131pbsdrmaa_log_reader_new( fsd_drmaa_session_t *session )
[7]132{
133        pbsdrmaa_log_reader_t *volatile self = NULL;
134
135        fsd_log_enter((""));
[29]136
[7]137        TRY
138        {
139                fsd_malloc(self, pbsdrmaa_log_reader_t );
140               
141                self->session = session;
[29]142
143                self->select_file = pbsdrmaa_select_file_wait_thread;
[7]144                self->read_log = pbsdrmaa_read_log;     
145               
146                self->run_flag = true;
[29]147                self->fhandle = NULL;
[7]148                self->date_changed = true;
149                self->first_open = true;
150               
151        }
152        EXCEPT_DEFAULT
153        {
154                if( self != NULL)
155                        fsd_free(self);
156                       
157                fsd_exc_reraise();
158        }
159        END_TRY
[29]160
[7]161        fsd_log_return((""));
[29]162
[7]163        return self;
164}
165
[21]166
[7]167void
168pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
169{
170        fsd_log_enter((""));
171        TRY
172        {
173                if(self != NULL)
174                {
175                        fsd_free(self);
[29]176                }
[7]177        }
178        EXCEPT_DEFAULT
179        {
180                fsd_exc_reraise();
181        }
182        END_TRY
183       
184        fsd_log_return((""));
185}
186
187
[29]188void
[7]189pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
190{
191        fsd_log_enter((""));
192       
[29]193        fsd_mutex_lock( &self->session->mutex );
[8]194
[7]195        TRY
[29]196         {
[7]197                while( self->run_flag )
[29]198                 {
199                        TRY
[7]200                        {
[29]201                                char *line = NULL;
[8]202                               
[29]203                                self->select_file(self);
[7]204
[29]205                                while ((line = fsd_readline(self->fhandle)) != NULL)
206                                 {
207                                        int field_id = PBSDRMAA_FLD_ID_DATE;
208                                        char *tok_ctx = NULL;
209                                        char *field_token = NULL;
210                                        char *event_timestamp = NULL;
211                                        int event_type = -1;
212                                        fsd_job_t *job = NULL;
[7]213
[29]214                                        /* at first detect if this not the end of log file */
215                                        if (strstr(line, "Log;Log closed")) /*TODO try to be more effective and safe */
216                                         {
217                                                fsd_log_debug(("WT - Date changed. Closing log file"));
218                                                self->date_changed = true;
[31]219                                                goto cleanup;
[29]220                                         }
[7]221
[29]222                                        for (field_token = strtok_r(line, ";", &tok_ctx); field_token; field_token = strtok_r(NULL, ";", &tok_ctx), field_id++)
223                                         {
224                                                if ( field_id == PBSDRMAA_FLD_ID_DATE)
225                                                 {
226                                                        event_timestamp = field_token;
227                                                 }
228                                                else if ( field_id == PBSDRMAA_FLD_ID_EVENT)
229                                                 {
230                                                        if (strncmp(field_token, PBSDRMAA_FLD_MSG_0008, 4) == 0)
231                                                                event_type = pbsdrmaa_event_0008;
232                                                        else if (strncmp(field_token, PBSDRMAA_FLD_MSG_0010, 4) == 0)
233                                                                event_type = pbsdrmaa_event_0010;
234                                                        else
[30]235                                                         {
[31]236                                                                goto cleanup; /*we are interested only in the above log messages */
[30]237                                                         }
[29]238                                                 }
239                                                else if ( field_id == PBSDRMAA_FLD_ID_SRC)
240                                                 {
241                                                        /* not used ignore */
242                                                 }
243                                                else if (field_id  == PBSDRMAA_FLD_ID_OBJ_TYPE)
244                                                 {
245                                                        if (strncmp(field_token, "Job", 3) != 0)
[30]246                                                         {
[31]247                                                                goto cleanup; /* we are interested only in job events */
[30]248                                                         }
[29]249                                                 }
250                                                else if (field_id == PBSDRMAA_FLD_ID_OBJ_ID)
251                                                 {
252                                                        const char *event_jobid = field_token;
[31]253                                                       
254                                                        if (!isdigit(event_jobid[0]))
255                                                         {
256                                                                fsd_log_debug(("WT - Invalid job: %s", event_jobid));
257                                                                goto cleanup;
258                                                         }
[26]259
[31]260                                                        job = self->session->get_job( self->session, event_jobid );
261
262                                                        if( job )
[25]263                                                         {
[31]264                                                                fsd_log_debug(("WT - Found job event: %s", event_jobid));
[29]265                                                         }
[31]266                                                        else
267                                                         {
268                                                                fsd_log_debug(("WT - Unknown job: %s", event_jobid)); /* Not a DRMAA job */
269                                                                goto cleanup;
270                                                         }
271                                                 }
[29]272                                                else if (field_id == PBSDRMAA_FLD_ID_MSG)
273                                                 {
274                                                        char *msg = field_token;
275                                                        struct batch_status status;
276                                                        struct attrl *attribs = NULL;
277                                                        bool in_running_state = false;
[7]278
[29]279                                                        if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Queued", 10) == 0)
280                                                         {
281                                                                /* Queued
282                                                                 * PBS Pro: 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
283                                                                 * Torque:  10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
284                                                                 */
285                                                                char *p_queue = NULL;
[25]286
[31]287                                                                fsd_log_info(("WT - Detected queuing of job %s", job->job_id));
[30]288
[29]289                                                                if ((p_queue = strstr(msg,"queue =")) == NULL)
290                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"No queue attribute found in log line = %s", line);
[25]291
[30]292                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "Q");
293                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_QUEUE, p_queue + 7);
294                                                         }
295                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Run", 7) == 0)
296                                                        {
297                                                                /*
298                                                                 * Running
299                                                                 * Torque: 10/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
300                                                                 * PBS Pro: 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
301                                                                 */
302                                                                char timestamp_unix[64];
303
304                                                                fsd_log_info(("WT - Detected start of job %s", job->job_id));
305
[29]306                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
307
308                                                                in_running_state = true;
309
310                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "R");
311                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_START_TIME, timestamp_unix);
[26]312#ifdef PBS_PROFESSIONAL
313                                                                        {
314                                                                                char *p_vnode = NULL;
[27]315                                                                                if ((p_vnode = strstr(field, "exec_vnode")))
[26]316                                                                                 {
317                                                                                        last_attr->next = &struct_exec_vnode;
318                                                                                        last_attr =  &struct_exec_vnode;
319                                                                                        struct_exec_vnode.name = "exec_vnode";
320                                                                                        struct_exec_vnode.next = NULL;
[27]321                                                                                        struct_exec_vnode.value = fsd_strdup(p_vnode + 11);
[26]322                                                                                 }
323                                                                        }
324#endif
325                                                         }
[29]326#ifndef PBS_PBS_PROFESSIONAL
327                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job deleted", 11))
328#else
329                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job to be deleted", 17))
330#endif
331                                                         {
332                                                        /* Deleted
333                                                         * PBS Pro: 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
334                                                         * Torque: 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
335                                                         */
336                                                                char timestamp_unix[64];
[7]337
[30]338                                                                fsd_log_info(("WT - Detected deletion of job %s", job->job_id));
339
[29]340                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
[7]341
[29]342                                                                if (job->state < DRMAA_PS_RUNNING)
343                                                                 {
344                                                                        fsd_log_info(("Job %s killed before entering running state (%d).", job->job_id, job->state));
345                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C");
346                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix);
347                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, "-2");
348                                                                 }
349                                                                else
350                                                                 {
[31]351                                                                        goto cleanup; /* job was started, ignore, wait for Exit_status message */
[29]352                                                                 }
353                                                         }
[31]354                                                        else if (event_type == pbsdrmaa_event_0010 && (strncmp(msg, "Exit_status=", 12) == 0))
[29]355                                                         {
356                                                        /* Completed:
357                                                         * PBS Pro: 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
358                                                         * Torque: 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
359                                                         */
360                                                                char timestamp_unix[64];
361                                                                time_t timestamp_time_t = pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
362                                                                char *tok_ctx2 = NULL;
363                                                                char *token = NULL;
[7]364
[29]365                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C");
366                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix);
[7]367
[29]368                                                                /* tokenize !!! */
369                                                                for (token = strtok_r(msg, " ", &tok_ctx2); token; token = strtok_r(NULL, " ", &tok_ctx2))
370                                                                 {
371                                                                        if (strncmp(token, "Exit_status=", 12) == 0)
372                                                                         {
[31]373                                                                                token[11] = '\0';
[29]374                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 12);
375                                                                                fsd_log_info(("WT - Completion of job %s (Exit_status=%s) detected after %d seconds", job->job_id, token+12, (int)(time(NULL) - timestamp_time_t) ));
376                                                                         }
377                                                                        else if (strncmp(token, "resources_used.cput=", 20) == 0)
378                                                                         {
[31]379                                                                                token[19] = '\0';
[29]380                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20);
381                                                                         }
382                                                                        else if (strncmp(token, "resources_used.mem=", 19) == 0)
383                                                                         {
[31]384                                                                                token[18] = '\0';
[29]385                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 19);
386                                                                         }
387                                                                        else if (strncmp(token, "resources_used.vmem=", 20) == 0)
388                                                                         {
[31]389                                                                                token[19] = '\0';
[29]390                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20);
391                                                                         }
392                                                                        else if (strncmp(token, "resources_used.walltime=", 24) == 0)
393                                                                         {
[31]394                                                                                token[23] = '\0';
[29]395                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 24);
396                                                                         }
397                                                                 }
[7]398
[29]399                                                                if (!job->execution_hosts)
400                                                                 {
401                                                                        char *exec_host = NULL;
402                                                                        fsd_log_info(("WT - No execution host information for job %s. Reading accounting logs...", job->job_id));
403                                                                        exec_host = pbsdrmaa_get_exec_host_from_accountig(self, job->job_id);
404                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_HOST, exec_host);
405                                                                        fsd_free(exec_host);
406                                                                 }
407                                                         }
408                                                        else
409                                                        {
[31]410                                                                goto cleanup; /* ignore other job events*/
[29]411                                                        }
[25]412
[29]413                                                        if ( in_running_state )
414                                                         {
415                                                                fsd_log_debug(("WT - forcing update of job: %s", job->job_id ));
[18]416                                                                TRY
417                                                                {
[29]418                                                                        job->update_status( job );
[18]419                                                                }
420                                                                EXCEPT_DEFAULT
421                                                                {
422                                                                        /*TODO: distinguish between invalid job and internal errors */
[29]423                                                                        fsd_log_debug(("Job finished just after entering running state: %s", job->job_id));
[18]424                                                                }
425                                                                END_TRY
[29]426                                                         }
[7]427                                                        else
[29]428                                                         {
429                                                                fsd_log_debug(("WT - updating job: %s", job->job_id ));
430                                                                status.name = job->job_id;
431                                                                status.attribs = attribs;
432
433                                                                ((pbsdrmaa_job_t *)job)->update( job, &status );
434
435                                                                pbsdrmaa_free_attrl(attribs); /* TODO free on exception */
436                                                         }
437
438                                                        fsd_cond_broadcast( &job->status_cond);
[7]439                                                        fsd_cond_broadcast( &self->session->wait_condition );
440
[29]441                                                 }
442                                                else
443                                                 {
444                                                        fsd_assert(0); /*not reached */
445                                                 }
446                                         }
[31]447                                cleanup:
448                                        fsd_free(line); /* TODO what about exceptions */               
449                                        if ( job )
450                                                job->release( job );
[29]451
[31]452
453
[29]454                                 } /* end of while getline loop */
455
[30]456
[29]457                                 { /* poll on log file */
458                                        struct timeval timeout_tv;
459                                        fd_set log_fds;
460
461                                        fsd_mutex_unlock( &self->session->mutex );
[7]462                                       
[29]463                                        FD_ZERO(&log_fds);
464                                        FD_SET(fileno(self->fhandle), &log_fds);
[7]465
[29]466                                        timeout_tv.tv_sec = 1;
467                                        timeout_tv.tv_usec = 0;
[31]468                                        fsd_log_debug(("Polling log file for %d seconds", timeout_tv.tv_sec));
[29]469                                        /* ignore return value - the next get line call will handle IO errors */
470                                        (void)select(1, &log_fds, NULL, NULL, &timeout_tv);
[19]471
[29]472                                        fsd_mutex_lock( &self->session->mutex );
[19]473
[29]474                                        self->run_flag = self->session->wait_thread_run_flag;
475                                 }
[7]476                        }
[29]477                        EXCEPT_DEFAULT
478                        {
479                                const fsd_exc_t *e = fsd_exc_get();
480                                /* Its better to exit and communicate error rather then let the application to hang */
481                                fsd_log_fatal(( "Exception in wait thread: <%d:%s>. Exiting !!!", e->code(e), e->message(e) ));
482                                exit(1);
483                        }
484                        END_TRY
485                 }
[7]486
[29]487                if(self->fhandle)
488                        fclose(self->fhandle);
489
490                fsd_log_debug(("WT - Log file closed"));
[7]491        }
492        FINALLY
493        {
[29]494                fsd_log_debug(("WT - Terminated."));
495                fsd_mutex_unlock( &self->session->mutex ); /**/
[7]496        }
497        END_TRY
498       
499        fsd_log_return((""));
500}
501
502void
[8]503pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
[7]504{
505        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
506       
[29]507        if (self->date_changed)
508         {
[7]509                char * log_path = NULL;
510                int num_tries = 0;
511                struct tm tm;
512               
513                fsd_log_enter((""));
514               
515                if(!self->first_open)
516                        time(&self->t);
517                else
518                        self->t = pbssession->log_file_initial_time;
519                       
520                localtime_r(&self->t,&tm);
521                               
522                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
523                /* generate new date, close file and open new */
[29]524                log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday);
[7]525
[29]526                if(self->fhandle)
527                        fclose(self->fhandle);
[7]528
[29]529                fsd_log_info(("Opening log file: %s",log_path));
[7]530                               
531        retry:
[31]532                if ((self->fhandle = fopen(log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open))
[29]533                 {
[7]534                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
[29]535                        fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status"));
[7]536                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
537                        pbssession->wait_thread_log = false;
538                        pbssession->super.wait_thread = pbssession->super_wait_thread;
539                        pbssession->super.wait_thread(self->session);
[29]540                 }
541                else if ( self->fhandle == NULL )
542                 { /* Torque seems not to create a new file immediately after the old one is closed */
[7]543                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
544                        num_tries++;
[29]545                        sleep(2 * num_tries);
[7]546                        goto retry;
[29]547                 }
[7]548
549                fsd_free(log_path);
550
551                fsd_log_debug(("Log file opened"));
552
[29]553                if(self->first_open)
554                 {
[7]555                        fsd_log_debug(("Log file lseek"));
[29]556
557                        if(fseek(self->fhandle, pbssession->log_file_initial_size, SEEK_SET) == (off_t) -1)
558                         {
559                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error");
560                         }
[7]561                        self->first_open = false;
[29]562                 }
[7]563
564                self->date_changed = false;
565               
566                fsd_log_return((""));
567        }       
568}
569
[29]570time_t
571pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size)
[7]572{
[29]573        struct tm temp_time_tm;
574        memset(&temp_time_tm, 0, sizeof(temp_time_tm));
575        temp_time_tm.tm_isdst = -1;
[7]576
[29]577        if (strptime(timestamp, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
578         {
579                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - failed to parse log timestamp: %s", timestamp);
580         }
[7]581        else
[29]582         {
583                time_t temp_time = mktime(&temp_time_tm);
584                snprintf(unixtime_str, size, "%lu", temp_time);
585                return temp_time;
586         }
[7]587}
588
[29]589char *
590pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id)
[7]591{
[29]592        /* TODO: implement */
593        return NULL;
[7]594}
595
[21]596
Note: See TracBrowser for help on using the repository browser.