source: trunk/pbs_drmaa/log_reader.c @ 104

Revision 104, 23.0 KB checked in by pkopta, 4 years ago (diff)
  • few changes for PBS Pro
  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <ctype.h>
26#include <string.h>
27#include <unistd.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30#include <dirent.h>
31#include <fcntl.h>
32
33#include <pbs_ifl.h>
34#include <pbs_error.h>
35
36#include <drmaa_utils/datetime.h>
37#include <drmaa_utils/drmaa.h>
38#include <drmaa_utils/iter.h>
39#include <drmaa_utils/conf.h>
40#include <drmaa_utils/session.h>
41#include <drmaa_utils/datetime.h>
42
43#include <pbs_drmaa/job.h>
44#include <pbs_drmaa/log_reader.h>
45#include <pbs_drmaa/session.h>
46#include <pbs_drmaa/submit.h>
47#include <pbs_drmaa/util.h>
48#include <pbs_drmaa/pbs_attrib.h>
49
50#include <errno.h>
51
52enum pbsdrmaa_field_id
53{
54        PBSDRMAA_FLD_ID_DATE = 0,
55        PBSDRMAA_FLD_ID_EVENT = 1,
56        PBSDRMAA_FLD_ID_SRC = 2,
57        PBSDRMAA_FLD_ID_OBJ_TYPE = 3,
58        PBSDRMAA_FLD_ID_OBJ_ID = 4,
59        PBSDRMAA_FLD_ID_MSG = 5
60};
61
62
63#define PBSDRMAA_FLD_MSG_0008 "0008"
64#define PBSDRMAA_FLD_MSG_0010 "0010"
65
66enum pbsdrmaa_event_type
67{
68        pbsdrmaa_event_0008 = 8,
69        pbsdrmaa_event_0010 = 10
70};
71
72static void pbsdrmaa_read_log();
73
74static void pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self);
75
76static void pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self);
77
78static void pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self);
79
80static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size);
81
82static char *pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id);
83
84/*
85 * Snippets from log files
86 *
87 * PBS Pro
88 *
8910/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
9010/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
9110/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
9210/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
9310/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
94
95 *
96 * Torque
97 *
9810/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
9910/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
10010/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
101
102deleting job:
103I . PBS Pro
104a) in Q state
10510/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Queued at request of mmamonski@grass1.man.poznan.pl, owner = mmamonski@grass1.man.poznan.pl, job name = STDIN, queue = workq
10610/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Modified at request of Scheduler@grass1.man.poznan.pl
10710/16/2011 09:49:37;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
10810/16/2011 09:49:37;0100;Server@grass1;Job;2178.grass1.man.poznan.pl;dequeuing from workq, state 5
109
110
111b) in R state
11210/16/2011 09:45:12;0080;Server@grass1;Job;2177.grass1.man.poznan.pl;delete job request received
11310/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job sent signal TermJob on delete
11410/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
11510/16/2011 09:45:12;0010;Server@grass1;Job;2177.grass1.man.poznan.pl;Exit_status=271 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=2772kb resources_used.ncpus=1 resources_used.vmem=199288kb resources_used.walltime=00:00:26
11610/16/2011 09:45:12;0100;Server@grass1;Job;2177.grass1.man.poznan.pl;dequeuing from workq, state 5
117
118II. Torque
119a) in Q state
12010/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
12110/15/2011 21:19:25;0100;PBS_Server;Job;113045.grass1.man.poznan.pl;dequeuing from batch, state EXITING
122
123b) in R state
12410/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
12510/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job sent signal SIGTERM on delete
12610/15/2011 21:19:47;0010;PBS_Server;Job;113046.grass1.man.poznan.pl;Exit_status=271 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:10
127
128Log closed:
12910/16/2011 00:00:17;0002;PBS_Server;Svr;Log;Log closed
130
131 */
132pbsdrmaa_log_reader_t *
133pbsdrmaa_log_reader_new( fsd_drmaa_session_t *session )
134{
135        pbsdrmaa_log_reader_t *volatile self = NULL;
136
137        fsd_log_enter((""));
138
139        TRY
140        {
141                fsd_malloc(self, pbsdrmaa_log_reader_t );
142               
143                self->session = session;
144
145                self->select_file = pbsdrmaa_select_file;
146                self->read_log = pbsdrmaa_read_log;     
147                self->close = pbsdrmaa_close_log;
148                self->reopen = pbsdrmaa_reopen_log;
149               
150                self->run_flag = true;
151                self->fhandle = NULL;
152                self->date_changed = true;
153                self->first_open = true;
154                self->log_path = NULL;
155                self->current_offset = 0;
156               
157        }
158        EXCEPT_DEFAULT
159        {
160                if( self != NULL)
161                        fsd_free(self);
162                       
163                fsd_exc_reraise();
164        }
165        END_TRY
166
167        fsd_log_return((""));
168
169        return self;
170}
171
172
173void
174pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
175{
176        fsd_log_enter((""));
177        TRY
178        {
179                if(self != NULL)
180                {
181                        fsd_free(self);
182                }
183        }
184        EXCEPT_DEFAULT
185        {
186                fsd_exc_reraise();
187        }
188        END_TRY
189       
190        fsd_log_return((""));
191}
192
193
194void
195pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
196{
197        fsd_log_enter((""));
198       
199        fsd_mutex_lock( &self->session->mutex );
200
201        TRY
202         {
203                while( self->run_flag )
204                 {
205                        TRY
206                        {
207                                char *line = NULL;
208                               
209                                self->select_file(self);
210
211                                while ((line = fsd_readline(self->fhandle)) != NULL)
212                                 {
213                                        int field_id = PBSDRMAA_FLD_ID_DATE;
214                                        char *tok_ctx = NULL;
215                                        char *field_token = NULL;
216                                        char *event_timestamp = NULL;
217                                        int event_type = -1;
218                                        fsd_job_t *job = NULL;
219
220                                        /* at first detect if this not the end of log file */
221                                        if (strstr(line, "Log;Log closed")) /*TODO try to be more effective and safe */
222                                         {
223                                                fsd_log_debug(("WT - Date changed. Closing log file"));
224                                                self->date_changed = true;
225                                                goto cleanup;
226                                         }
227
228                                        for (field_token = strtok_r(line, ";", &tok_ctx); field_token; field_token = strtok_r(NULL, ";", &tok_ctx), field_id++)
229                                         {
230                                                if ( field_id == PBSDRMAA_FLD_ID_DATE)
231                                                 {
232                                                        event_timestamp = field_token;
233#ifdef PBS_PBS_PROFESSIONAL
234                                                        /*additional check */
235                                                        TRY
236                                                        {
237                                                         (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
238                                                        }
239                                                        EXCEPT_DEFAULT
240                                                        {
241                                                                fsd_log_error(("Failed to parse timestamp: %s. Log corrupted?", event_timestamp));
242                                                        }
243                                                        END_TRY
244#endif
245                                                 }
246                                                else if ( field_id == PBSDRMAA_FLD_ID_EVENT)
247                                                 {
248                                                        if (strncmp(field_token, PBSDRMAA_FLD_MSG_0008, 4) == 0)
249                                                                event_type = pbsdrmaa_event_0008;
250                                                        else if (strncmp(field_token, PBSDRMAA_FLD_MSG_0010, 4) == 0)
251                                                                event_type = pbsdrmaa_event_0010;
252                                                        else
253                                                         {
254                                                                goto cleanup; /*we are interested only in the above log messages */
255                                                         }
256                                                 }
257                                                else if ( field_id == PBSDRMAA_FLD_ID_SRC)
258                                                 {
259                                                        /* not used ignore */
260                                                 }
261                                                else if (field_id  == PBSDRMAA_FLD_ID_OBJ_TYPE)
262                                                 {
263                                                        if (strncmp(field_token, "Job", 3) != 0)
264                                                         {
265                                                                goto cleanup; /* we are interested only in job events */
266                                                         }
267                                                 }
268                                                else if (field_id == PBSDRMAA_FLD_ID_OBJ_ID)
269                                                 {
270                                                        const char *event_jobid = field_token;
271                                                       
272                                                        if (!isdigit(event_jobid[0]))
273                                                         {
274                                                                fsd_log_debug(("WT - Invalid job: %s", event_jobid));
275                                                                goto cleanup;
276                                                         }
277
278                                                        job = self->session->get_job( self->session, event_jobid );
279
280                                                        if( job )
281                                                         {
282                                                                fsd_log_debug(("WT - Found job event: %s", event_jobid));
283                                                         }
284                                                        else
285                                                         {
286                                                                fsd_log_debug(("WT - Unknown job: %s", event_jobid)); /* Not a DRMAA job */
287                                                                goto cleanup;
288                                                         }
289                                                 }
290                                                else if (field_id == PBSDRMAA_FLD_ID_MSG)
291                                                 {
292                                                        char *msg = field_token;
293                                                        struct batch_status status;
294                                                        struct attrl *attribs = NULL;
295                                                        bool in_running_state = false;
296
297                                                        if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Queued", 10) == 0)
298                                                         {
299                                                                /* Queued
300                                                                 * PBS Pro: 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
301                                                                 * Torque:  10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
302                                                                 */
303                                                                char *p_queue = NULL;
304
305                                                                fsd_log_info(("WT - Detected queuing of job %s", job->job_id));
306
307                                                                if ((p_queue = strstr(msg,"queue =")) == NULL)
308                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"No queue attribute found in log line = %s", line);
309
310                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "Q");
311                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_QUEUE, p_queue + 7);
312                                                         }
313                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Run", 7) == 0)
314                                                        {
315                                                                /*
316                                                                 * Running
317                                                                 * Torque: 10/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
318                                                                 * PBS Pro: 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
319                                                                 */
320                                                                char timestamp_unix[64];
321
322                                                                fsd_log_info(("WT - Detected start of job %s", job->job_id));
323
324                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
325
326                                                                in_running_state = true;
327
328                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "R");
329                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_START_TIME, timestamp_unix);
330#ifdef PBS_PROFESSIONAL
331                                                                        {
332                                                                                char *p_vnode = NULL;
333                                                                                if ((p_vnode = strstr(msg, "exec_vnode")))
334                                                                                 {
335                                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_VNODE, p_vnode + 11);
336                                                                                 }
337                                                                        }
338#endif
339                                                         }
340#ifndef PBS_PROFESSIONAL
341                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job deleted", 11) == 0)
342#else
343                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job to be deleted", 17) == 0)
344#endif
345                                                         {
346                                                        /* Deleted
347                                                         * PBS Pro: 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
348                                                         * Torque: 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
349                                                         */
350                                                                char timestamp_unix[64];
351
352                                                                fsd_log_info(("WT - Detected deletion of job %s", job->job_id));
353
354                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
355
356                                                                if (job->state < DRMAA_PS_RUNNING)
357                                                                 {
358                                                                        fsd_log_info(("WT - Job %s killed before entering running state (%d).", job->job_id, job->state));
359
360#ifdef PBS_PROFESSIONAL
361                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "F");
362#else
363                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C");
364#endif
365
366                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix);
367                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, "-101");
368                                                                 }
369                                                                else
370                                                                 {
371                                                                        fsd_log_info(("WT - Job %s killed after entering running state (%d). Waiting for Completed event...", job->job_id, job->state));
372                                                                        goto cleanup; /* job was started, ignore, wait for Exit_status message */
373                                                                 }
374                                                         }
375                                                        else if (event_type == pbsdrmaa_event_0010 && (strncmp(msg, "Exit_status=", 12) == 0))
376                                                         {
377                                                        /* Completed:
378                                                         * PBS Pro: 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
379                                                         * Torque: 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
380                                                         */
381                                                                char timestamp_unix[64];
382                                                                time_t timestamp_time_t = pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
383                                                                char *tok_ctx2 = NULL;
384                                                                char *token = NULL;
385
386#ifdef PBS_PBS_PROFESSIONAL
387                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "F");
388#else
389                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C");
390#endif
391                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix);
392
393                                                                /* tokenize !!! */
394                                                                for (token = strtok_r(msg, " ", &tok_ctx2); token; token = strtok_r(NULL, " ", &tok_ctx2))
395                                                                 {
396                                                                        if (strncmp(token, "Exit_status=", 12) == 0)
397                                                                         {
398                                                                                token[11] = '\0';
399                                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, token + 12);
400                                                                                fsd_log_info(("WT - Completion of job %s (Exit_status=%s) detected after %d seconds", job->job_id, token+12, (int)(time(NULL) - timestamp_time_t) ));
401                                                                         }
402                                                                        else if (strncmp(token, "resources_used.cput=", 20) == 0)
403                                                                         {
404                                                                                token[19] = '\0';
405                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20);
406                                                                         }
407                                                                        else if (strncmp(token, "resources_used.mem=", 19) == 0)
408                                                                         {
409                                                                                token[18] = '\0';
410                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 19);
411                                                                         }
412                                                                        else if (strncmp(token, "resources_used.vmem=", 20) == 0)
413                                                                         {
414                                                                                token[19] = '\0';
415                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20);
416                                                                         }
417                                                                        else if (strncmp(token, "resources_used.walltime=", 24) == 0)
418                                                                         {
419                                                                                token[23] = '\0';
420                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 24);
421                                                                         }
422                                                                 }
423
424                                                                if (!job->execution_hosts)
425                                                                 {
426                                                                        char *exec_host = NULL;
427                                                                        fsd_log_info(("WT - No execution host information for job %s. Reading accounting logs...", job->job_id));
428                                                                        exec_host = pbsdrmaa_get_exec_host_from_accountig(self, job->job_id);
429                                                                        if (exec_host)
430                                                                         {
431                                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_HOST, exec_host);
432                                                                                fsd_free(exec_host);
433                                                                         }
434                                                                 }
435                                                         }
436                                                        else
437                                                        {
438                                                                fsd_log_debug(("Ignoring msg(type=%d) = %s", event_type,  msg));
439                                                                goto cleanup; /* ignore other job events*/
440                                                        }
441                                       
442                                                        fsd_log_debug(("WT - updating job: %s", job->job_id ));
443                                                        status.name = job->job_id;
444                                                        status.attribs = attribs;
445
446                                                        ((pbsdrmaa_job_t *)job)->update( job, &status );
447
448                                                        if ( in_running_state )
449                                                         {
450                                                                fsd_log_debug(("WT - forcing update of job: %s", job->job_id ));
451                                                                TRY
452                                                                {
453                                                                        job->update_status( job );
454                                                                }
455                                                                EXCEPT_DEFAULT
456                                                                {
457                                                                        /*TODO: distinguish between invalid job and internal errors */
458                                                                        fsd_log_debug(("Job finished just after entering running state: %s", job->job_id));
459                                                                }
460                                                                END_TRY
461                                                         }
462
463
464                                                        pbsdrmaa_free_attrl(attribs); /* TODO free on exception */
465
466                                                        fsd_cond_broadcast( &job->status_cond);
467                                                        fsd_cond_broadcast( &self->session->wait_condition );
468
469                                                 }
470                                                else
471                                                 {
472                                                        fsd_assert(0); /*not reached */
473                                                 }
474                                         }
475                                cleanup:
476                                        fsd_free(line); /* TODO what about exceptions */               
477                                        if ( job )
478                                                job->release( job );
479
480
481
482                                 } /* end of while getline loop */
483
484
485
486                                fsd_mutex_unlock( &self->session->mutex );
487
488                                /* close */
489                                self->close(self);
490
491                                sleep(((pbsdrmaa_session_t *)self->session)->wait_thread_sleep_time);
492
493                                /* and reopen log file */
494                                self->reopen(self);
495
496                                fsd_mutex_lock( &self->session->mutex );
497
498                                self->run_flag = self->session->wait_thread_run_flag;
499                        }
500                        EXCEPT_DEFAULT
501                        {
502                                const fsd_exc_t *e = fsd_exc_get();
503                                /* Its better to exit and communicate error rather then let the application to hang */
504                                fsd_log_fatal(( "Exception in wait thread: <%d:%s>. Exiting !!!", e->code(e), e->message(e) ));
505                                exit(1);
506                        }
507                        END_TRY
508                 }
509
510                if(self->fhandle)
511                        fclose(self->fhandle);
512
513                fsd_log_debug(("WT - Log file closed"));
514        }
515        FINALLY
516        {
517                fsd_log_debug(("WT - Terminated."));
518                fsd_mutex_unlock( &self->session->mutex ); /**/
519        }
520        END_TRY
521       
522        fsd_log_return((""));
523}
524
525void
526pbsdrmaa_select_file( pbsdrmaa_log_reader_t * self )
527{
528        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
529       
530        if (self->date_changed)
531         {
532                int num_tries = 0;
533                struct tm tm;
534                char *old_log_path = NULL;
535               
536                fsd_log_enter((""));
537               
538                if(!self->first_open)
539                        time(&self->t);
540                else
541                        self->t = pbssession->log_file_initial_time;
542                       
543                localtime_r(&self->t,&tm);
544                               
545                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
546                /* generate new date, close file and open new */
547                old_log_path = self->log_path;
548
549                self->log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday);
550
551                if(self->fhandle)
552                        fclose(self->fhandle);
553
554                fsd_log_info(("Opening log file: %s",self->log_path));
555                               
556        retry:
557                if ((self->fhandle = fopen(self->log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open))
558                 {
559                        fsd_log_error(("Can't open log file: %s. Verify pbs_home. Running standard wait_thread.", self->log_path));
560                        fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status"));
561                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
562                        pbssession->wait_thread_log = false;
563                        pbssession->super.wait_thread = pbssession->super_wait_thread;
564                        pbssession->super.wait_thread(self->session);
565                 }
566                else if ( self->fhandle == NULL )
567                 { /* Torque seems not to create a new file immediately after the old one is closed */
568                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", self->log_path, num_tries));
569                        num_tries++;
570                        sleep(2 * num_tries);
571                        goto retry;
572                 }
573
574                fsd_log_debug(("Log file opened"));
575
576                if(self->first_open)
577                 {
578                        fsd_log_debug(("Log file lseek"));
579
580                        if(fseek(self->fhandle, pbssession->log_file_initial_size, SEEK_SET) == (off_t) -1)
581                         {
582                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error");
583                         }
584                        self->first_open = false;
585                 }
586                else if (old_log_path && strcmp(old_log_path, self->log_path) == 0)
587                 {
588                        fsd_log_info(("PBS restarted. Seeking log file %u", (unsigned int)self->current_offset));
589                        if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1)
590                         {
591                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error");
592                         }
593                 }
594
595                self->date_changed = false;
596               
597                fsd_free(old_log_path);
598
599                fsd_log_return((""));
600        }       
601}
602
603time_t
604pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size)
605{
606        struct tm temp_time_tm;
607        memset(&temp_time_tm, 0, sizeof(temp_time_tm));
608        temp_time_tm.tm_isdst = -1;
609
610        if (strptime(timestamp, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
611         {
612                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - failed to parse log timestamp: %s", timestamp);
613         }
614        else
615         {
616                time_t temp_time = mktime(&temp_time_tm);
617                snprintf(unixtime_str, size, "%lu", temp_time);
618                return temp_time;
619         }
620}
621
622char *
623pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id)
624{
625                pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) log_reader->session;
626                struct tm tm;
627                time_t tm_t;
628                char *line = NULL;
629                char *exec_host = NULL;
630                char *log_path = NULL;
631                FILE *fhandle = NULL;
632
633                fsd_log_enter(("(job_id=%s)", job_id));
634
635                tm_t = time(NULL);
636                localtime_r(&tm_t, &tm);
637
638                log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday);
639
640                fsd_log_info(("Opening accounting log file: %s", log_path));
641
642                if ((fhandle = fopen(log_path, "r")) == NULL)
643                 {
644                        fsd_log_error(("Failed to open accounting log file: %s", log_path));
645                        fsd_free(log_path);
646                        return NULL;
647                 }
648
649                fsd_free(log_path);
650/*
65110/27/2011 14:09:32;E;114249.grass1.man.poznan.pl;user=drmaa group=drmaa jobname=none queue=shortq ctime=1319717371 qtime=1319717371 etime=1319717371 start=1319717372 owner=drmaa@grass1.man.poznan.pl exec_host=grass4.man.poznan.pl/0 Resource_List.neednodes=1 Resource_List.nodect=1 Resource_List.nodes=1 Resource_List.walltime=02:00:00 session=28561 end=1319717372 Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:00
652 */
653                while ((line = fsd_readline(fhandle)) != NULL)
654                 {
655
656                        if (line[20] == 'E'  && strncmp(line + 22, job_id, strlen(job_id)) == 0 )
657                         {
658                                char *p = NULL;
659
660                                fsd_log_debug(("Matched accounting log record = %s", line));
661
662                                if (!(exec_host = strstr(line, "exec_host")))
663                                 {
664                                        fsd_log_error(("Invalid accounting record: %s", exec_host));
665                                        break;
666                                 }
667
668                                exec_host += 10;
669
670                                p = exec_host;
671                                while (*p != ' ' && *p != '\0')
672                                        p++;
673                                *p = '\0';
674
675                                break;
676                         }
677
678                        fsd_free(line);
679                 }
680
681                if (exec_host)
682                 {
683                        fsd_log_info(("Job %s was executing on hosts %s.", job_id, exec_host));
684                        exec_host = fsd_strdup(exec_host);
685                 }
686                else
687                 {
688                        fsd_log_error(("Could not find executions hosts for %s.", job_id));
689                 }
690
691                if (line)
692                        fsd_free(line);
693
694                fclose(fhandle);
695
696                return exec_host;
697}
698
699void
700pbsdrmaa_close_log( pbsdrmaa_log_reader_t * self )
701{
702
703        self->current_offset = ftello(self->fhandle);
704       
705        fsd_log_debug(("Closing log  file (offset=%d)", (int)self->current_offset));
706
707        fclose(self->fhandle);
708
709        self->fhandle = NULL;
710}
711
712void
713pbsdrmaa_reopen_log( pbsdrmaa_log_reader_t * self )
714{
715        fsd_log_debug(("Reopening log file: %s (offset=%d)", self->log_path, (int)self->current_offset));
716
717        if ((self->fhandle = fopen(self->log_path,"r")) == NULL)
718         {
719                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Failed to reopen log file");
720         }
721
722        if(fseek(self->fhandle, self->current_offset, SEEK_SET) == (off_t) -1)
723         {
724                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error");
725         }
726}
727
Note: See TracBrowser for help on using the repository browser.