source: trunk/pbs_drmaa/log_reader.c @ 34

Revision 34, 20.9 KB checked in by mmamonski, 12 years ago (diff)

parsing accounting logs

  • Property svn:keywords set to Id
Line 
1/* $Id$ */
2/*
3 *  FedStage DRMAA for PBS Pro
4 *  Copyright (C) 2006-2007  FedStage Systems
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU General Public License for more details.
15 *
16 *  You should have received a copy of the GNU General Public License
17 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifdef HAVE_CONFIG_H
21#       include <config.h>
22#endif
23
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include <sys/stat.h>
28#include <sys/types.h>
29#include <dirent.h>
30#include <fcntl.h>
31
32#include <pbs_ifl.h>
33#include <pbs_error.h>
34
35#include <drmaa_utils/datetime.h>
36#include <drmaa_utils/drmaa.h>
37#include <drmaa_utils/iter.h>
38#include <drmaa_utils/conf.h>
39#include <drmaa_utils/session.h>
40#include <drmaa_utils/datetime.h>
41
42#include <pbs_drmaa/job.h>
43#include <pbs_drmaa/log_reader.h>
44#include <pbs_drmaa/session.h>
45#include <pbs_drmaa/submit.h>
46#include <pbs_drmaa/util.h>
47#include <pbs_drmaa/pbs_attrib.h>
48
49#include <errno.h>
50
51enum pbsdrmaa_field_id
52{
53        PBSDRMAA_FLD_ID_DATE = 0,
54        PBSDRMAA_FLD_ID_EVENT = 1,
55        PBSDRMAA_FLD_ID_SRC = 2,
56        PBSDRMAA_FLD_ID_OBJ_TYPE = 3,
57        PBSDRMAA_FLD_ID_OBJ_ID = 4,
58        PBSDRMAA_FLD_ID_MSG = 5
59};
60
61
62#define PBSDRMAA_FLD_MSG_0008 "0008"
63#define PBSDRMAA_FLD_MSG_0010 "0010"
64
65enum pbsdrmaa_event_type
66{
67        pbsdrmaa_event_0008 = 8,
68        pbsdrmaa_event_0010 = 10
69};
70
71static void pbsdrmaa_read_log();
72
73static void pbsdrmaa_select_file_wait_thread( pbsdrmaa_log_reader_t * self);
74
75char *pbsdrmaa_read_line_wait_thread( pbsdrmaa_log_reader_t * self);
76
77static time_t pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size);
78
79static char *pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id);
80
81/*
82 * Snippets from log files
83 *
84 * PBS Pro
85 *
8610/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
8710/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
8810/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
8910/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Modified at request of Scheduler@nova.wcss.wroc.pl
9010/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
91
92 *
93 * Torque
94 *
9510/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
9610/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
9710/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
98
99deleting job:
100I . PBS Pro
101a) in Q state
10210/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Queued at request of mmamonski@grass1.man.poznan.pl, owner = mmamonski@grass1.man.poznan.pl, job name = STDIN, queue = workq
10310/16/2011 09:49:25;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job Modified at request of Scheduler@grass1.man.poznan.pl
10410/16/2011 09:49:37;0008;Server@grass1;Job;2178.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
10510/16/2011 09:49:37;0100;Server@grass1;Job;2178.grass1.man.poznan.pl;dequeuing from workq, state 5
106
107
108b) in R state
10910/16/2011 09:45:12;0080;Server@grass1;Job;2177.grass1.man.poznan.pl;delete job request received
11010/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job sent signal TermJob on delete
11110/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
11210/16/2011 09:45:12;0010;Server@grass1;Job;2177.grass1.man.poznan.pl;Exit_status=271 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=2772kb resources_used.ncpus=1 resources_used.vmem=199288kb resources_used.walltime=00:00:26
11310/16/2011 09:45:12;0100;Server@grass1;Job;2177.grass1.man.poznan.pl;dequeuing from workq, state 5
114
115II. Torque
116a) in Q state
11710/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
11810/15/2011 21:19:25;0100;PBS_Server;Job;113045.grass1.man.poznan.pl;dequeuing from batch, state EXITING
119
120b) in R state
12110/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
12210/15/2011 21:19:47;0008;PBS_Server;Job;113046.grass1.man.poznan.pl;Job sent signal SIGTERM on delete
12310/15/2011 21:19:47;0010;PBS_Server;Job;113046.grass1.man.poznan.pl;Exit_status=271 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:10
124
125Log closed:
12610/16/2011 00:00:17;0002;PBS_Server;Svr;Log;Log closed
127
128 */
129pbsdrmaa_log_reader_t *
130pbsdrmaa_log_reader_new( fsd_drmaa_session_t *session )
131{
132        pbsdrmaa_log_reader_t *volatile self = NULL;
133
134        fsd_log_enter((""));
135
136        TRY
137        {
138                fsd_malloc(self, pbsdrmaa_log_reader_t );
139               
140                self->session = session;
141
142                self->select_file = pbsdrmaa_select_file_wait_thread;
143                self->read_log = pbsdrmaa_read_log;     
144               
145                self->run_flag = true;
146                self->fhandle = NULL;
147                self->date_changed = true;
148                self->first_open = true;
149               
150        }
151        EXCEPT_DEFAULT
152        {
153                if( self != NULL)
154                        fsd_free(self);
155                       
156                fsd_exc_reraise();
157        }
158        END_TRY
159
160        fsd_log_return((""));
161
162        return self;
163}
164
165
166void
167pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self )
168{
169        fsd_log_enter((""));
170        TRY
171        {
172                if(self != NULL)
173                {
174                        fsd_free(self);
175                }
176        }
177        EXCEPT_DEFAULT
178        {
179                fsd_exc_reraise();
180        }
181        END_TRY
182       
183        fsd_log_return((""));
184}
185
186
187void
188pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self )
189{
190        fsd_log_enter((""));
191       
192        fsd_mutex_lock( &self->session->mutex );
193
194        TRY
195         {
196                while( self->run_flag )
197                 {
198                        TRY
199                        {
200                                char *line = NULL;
201                               
202                                self->select_file(self);
203
204                                while ((line = fsd_readline(self->fhandle)) != NULL)
205                                 {
206                                        int field_id = PBSDRMAA_FLD_ID_DATE;
207                                        char *tok_ctx = NULL;
208                                        char *field_token = NULL;
209                                        char *event_timestamp = NULL;
210                                        int event_type = -1;
211                                        fsd_job_t *job = NULL;
212
213                                        /* at first detect if this not the end of log file */
214                                        if (strstr(line, "Log;Log closed")) /*TODO try to be more effective and safe */
215                                         {
216                                                fsd_log_debug(("WT - Date changed. Closing log file"));
217                                                self->date_changed = true;
218                                                goto cleanup;
219                                         }
220
221                                        for (field_token = strtok_r(line, ";", &tok_ctx); field_token; field_token = strtok_r(NULL, ";", &tok_ctx), field_id++)
222                                         {
223                                                if ( field_id == PBSDRMAA_FLD_ID_DATE)
224                                                 {
225                                                        event_timestamp = field_token;
226                                                 }
227                                                else if ( field_id == PBSDRMAA_FLD_ID_EVENT)
228                                                 {
229                                                        if (strncmp(field_token, PBSDRMAA_FLD_MSG_0008, 4) == 0)
230                                                                event_type = pbsdrmaa_event_0008;
231                                                        else if (strncmp(field_token, PBSDRMAA_FLD_MSG_0010, 4) == 0)
232                                                                event_type = pbsdrmaa_event_0010;
233                                                        else
234                                                         {
235                                                                goto cleanup; /*we are interested only in the above log messages */
236                                                         }
237                                                 }
238                                                else if ( field_id == PBSDRMAA_FLD_ID_SRC)
239                                                 {
240                                                        /* not used ignore */
241                                                 }
242                                                else if (field_id  == PBSDRMAA_FLD_ID_OBJ_TYPE)
243                                                 {
244                                                        if (strncmp(field_token, "Job", 3) != 0)
245                                                         {
246                                                                goto cleanup; /* we are interested only in job events */
247                                                         }
248                                                 }
249                                                else if (field_id == PBSDRMAA_FLD_ID_OBJ_ID)
250                                                 {
251                                                        const char *event_jobid = field_token;
252                                                       
253                                                        if (!isdigit(event_jobid[0]))
254                                                         {
255                                                                fsd_log_debug(("WT - Invalid job: %s", event_jobid));
256                                                                goto cleanup;
257                                                         }
258
259                                                        job = self->session->get_job( self->session, event_jobid );
260
261                                                        if( job )
262                                                         {
263                                                                fsd_log_debug(("WT - Found job event: %s", event_jobid));
264                                                         }
265                                                        else
266                                                         {
267                                                                fsd_log_debug(("WT - Unknown job: %s", event_jobid)); /* Not a DRMAA job */
268                                                                goto cleanup;
269                                                         }
270                                                 }
271                                                else if (field_id == PBSDRMAA_FLD_ID_MSG)
272                                                 {
273                                                        char *msg = field_token;
274                                                        struct batch_status status;
275                                                        struct attrl *attribs = NULL;
276                                                        bool in_running_state = false;
277
278                                                        if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Queued", 10) == 0)
279                                                         {
280                                                                /* Queued
281                                                                 * PBS Pro: 10/11/2011 14:43:29;0008;Server@nova;Job;2127218.nova;Job Queued at request of mamonski@endor.wcss.wroc.pl, owner = mamonski@endor.wcss.wroc.pl, job name = STDIN, queue = normal
282                                                                 * Torque:  10/11/2011 14:47:59;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Queued at request of plgmamonski@ui.cyf-kr.edu.pl, owner = plgmamonski@ui.cyf-kr.edu.pl, job name = STDIN, queue = l_short
283                                                                 */
284                                                                char *p_queue = NULL;
285
286                                                                fsd_log_info(("WT - Detected queuing of job %s", job->job_id));
287
288                                                                if ((p_queue = strstr(msg,"queue =")) == NULL)
289                                                                        fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"No queue attribute found in log line = %s", line);
290
291                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "Q");
292                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_QUEUE, p_queue + 7);
293                                                         }
294                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job Run", 7) == 0)
295                                                        {
296                                                                /*
297                                                                 * Running
298                                                                 * Torque: 10/11/2011 14:48:23;0008;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Job Run at request of root@batch.grid.cyf-kr.edu.pl
299                                                                 * PBS Pro: 10/11/2011 14:43:31;0008;Server@nova;Job;2127218.nova;Job Run at request of Scheduler@nova.wcss.wroc.pl on exec_vnode (wn698:ncpus=3:mem=2048000kb)+(wn700:ncpus=3:mem=2048000kb)
300                                                                 */
301                                                                char timestamp_unix[64];
302
303                                                                fsd_log_info(("WT - Detected start of job %s", job->job_id));
304
305                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
306
307                                                                in_running_state = true;
308
309                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "R");
310                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_START_TIME, timestamp_unix);
311#ifdef PBS_PROFESSIONAL
312                                                                        {
313                                                                                char *p_vnode = NULL;
314                                                                                if ((p_vnode = strstr(msg, "exec_vnode")))
315                                                                                 {
316                                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_VNODE, p_vnode + 11);
317                                                                                 }
318                                                                        }
319#endif
320                                                         }
321#ifndef PBS_PBS_PROFESSIONAL
322                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job deleted", 11) == 0)
323#else
324                                                        else if (event_type == pbsdrmaa_event_0008 && strncmp(msg, "Job to be deleted", 17) == 0)
325#endif
326                                                         {
327                                                        /* Deleted
328                                                         * PBS Pro: 10/16/2011 09:45:12;0008;Server@grass1;Job;2177.grass1.man.poznan.pl;Job to be deleted at request of mmamonski@grass1.man.poznan.pl
329                                                         * Torque: 10/15/2011 21:19:25;0008;PBS_Server;Job;113045.grass1.man.poznan.pl;Job deleted at request of mmamonski@grass1.man.poznan.pl
330                                                         */
331                                                                char timestamp_unix[64];
332
333                                                                fsd_log_info(("WT - Detected deletion of job %s", job->job_id));
334
335                                                                (void)pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
336
337                                                                if (job->state < DRMAA_PS_RUNNING)
338                                                                 {
339                                                                        fsd_log_info(("WT - Job %s killed before entering running state (%d).", job->job_id, job->state));
340                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C");
341                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix);
342                                                                        attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, "-2");
343                                                                 }
344                                                                else
345                                                                 {
346                                                                        goto cleanup; /* job was started, ignore, wait for Exit_status message */
347                                                                 }
348                                                         }
349                                                        else if (event_type == pbsdrmaa_event_0010 && (strncmp(msg, "Exit_status=", 12) == 0))
350                                                         {
351                                                        /* Completed:
352                                                         * PBS Pro: 10/11/2011 14:43:32;0010;Server@nova;Job;2127218.nova;Exit_status=0 resources_used.cpupercent=0 resources_used.cput=00:00:00 resources_used.mem=1768kb resources_used.ncpus=6 resources_used.vmem=19228kb resources_used.walltime=00:00:01
353                                                         * Torque: 10/11/2011 14:48:24;0010;PBS_Server;Job;15545337.batch.grid.cyf-kr.edu.pl;Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=720kb resources_used.vmem=13308kb resources_used.walltime=00:00:00
354                                                         */
355                                                                char timestamp_unix[64];
356                                                                time_t timestamp_time_t = pbsdrmaa_parse_log_timestamp(event_timestamp, timestamp_unix, sizeof(timestamp_unix));
357                                                                char *tok_ctx2 = NULL;
358                                                                char *token = NULL;
359
360                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_JOB_STATE, "C");
361                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_MTIME, timestamp_unix);
362
363                                                                /* tokenize !!! */
364                                                                for (token = strtok_r(msg, " ", &tok_ctx2); token; token = strtok_r(NULL, " ", &tok_ctx2))
365                                                                 {
366                                                                        if (strncmp(token, "Exit_status=", 12) == 0)
367                                                                         {
368                                                                                token[11] = '\0';
369                                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXIT_STATUS, token + 12);
370                                                                                fsd_log_info(("WT - Completion of job %s (Exit_status=%s) detected after %d seconds", job->job_id, token+12, (int)(time(NULL) - timestamp_time_t) ));
371                                                                         }
372                                                                        else if (strncmp(token, "resources_used.cput=", 20) == 0)
373                                                                         {
374                                                                                token[19] = '\0';
375                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20);
376                                                                         }
377                                                                        else if (strncmp(token, "resources_used.mem=", 19) == 0)
378                                                                         {
379                                                                                token[18] = '\0';
380                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 19);
381                                                                         }
382                                                                        else if (strncmp(token, "resources_used.vmem=", 20) == 0)
383                                                                         {
384                                                                                token[19] = '\0';
385                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 20);
386                                                                         }
387                                                                        else if (strncmp(token, "resources_used.walltime=", 24) == 0)
388                                                                         {
389                                                                                token[23] = '\0';
390                                                                                attribs = pbsdrmaa_add_attr(attribs, token, token + 24);
391                                                                         }
392                                                                 }
393
394                                                                if (!job->execution_hosts)
395                                                                 {
396                                                                        char *exec_host = NULL;
397                                                                        fsd_log_info(("WT - No execution host information for job %s. Reading accounting logs...", job->job_id));
398                                                                        exec_host = pbsdrmaa_get_exec_host_from_accountig(self, job->job_id);
399                                                                        if (exec_host)
400                                                                         {
401                                                                                attribs = pbsdrmaa_add_attr(attribs, PBSDRMAA_EXECUTION_HOST, exec_host);
402                                                                                fsd_free(exec_host);
403                                                                         }
404                                                                 }
405                                                         }
406                                                        else
407                                                        {
408                                                                goto cleanup; /* ignore other job events*/
409                                                        }
410
411                                                        if ( in_running_state )
412                                                         {
413                                                                fsd_log_debug(("WT - forcing update of job: %s", job->job_id ));
414                                                                TRY
415                                                                {
416                                                                        job->update_status( job );
417                                                                }
418                                                                EXCEPT_DEFAULT
419                                                                {
420                                                                        /*TODO: distinguish between invalid job and internal errors */
421                                                                        fsd_log_debug(("Job finished just after entering running state: %s", job->job_id));
422                                                                }
423                                                                END_TRY
424                                                         }
425                                                        else
426                                                         {
427                                                                fsd_log_debug(("WT - updating job: %s", job->job_id ));
428                                                                status.name = job->job_id;
429                                                                status.attribs = attribs;
430
431                                                                ((pbsdrmaa_job_t *)job)->update( job, &status );
432
433                                                                pbsdrmaa_free_attrl(attribs); /* TODO free on exception */
434                                                         }
435
436                                                        fsd_cond_broadcast( &job->status_cond);
437                                                        fsd_cond_broadcast( &self->session->wait_condition );
438
439                                                 }
440                                                else
441                                                 {
442                                                        fsd_assert(0); /*not reached */
443                                                 }
444                                         }
445                                cleanup:
446                                        fsd_free(line); /* TODO what about exceptions */               
447                                        if ( job )
448                                                job->release( job );
449
450
451
452                                 } /* end of while getline loop */
453
454
455
456                                fsd_mutex_unlock( &self->session->mutex );
457                               
458                                usleep(1500000); /* 500 ms - consider using inotify - but this would not work with NFS */                               
459
460                                fsd_mutex_lock( &self->session->mutex );
461
462                                self->run_flag = self->session->wait_thread_run_flag;
463                        }
464                        EXCEPT_DEFAULT
465                        {
466                                const fsd_exc_t *e = fsd_exc_get();
467                                /* Its better to exit and communicate error rather then let the application to hang */
468                                fsd_log_fatal(( "Exception in wait thread: <%d:%s>. Exiting !!!", e->code(e), e->message(e) ));
469                                exit(1);
470                        }
471                        END_TRY
472                 }
473
474                if(self->fhandle)
475                        fclose(self->fhandle);
476
477                fsd_log_debug(("WT - Log file closed"));
478        }
479        FINALLY
480        {
481                fsd_log_debug(("WT - Terminated."));
482                fsd_mutex_unlock( &self->session->mutex ); /**/
483        }
484        END_TRY
485       
486        fsd_log_return((""));
487}
488
489void
490pbsdrmaa_select_file_wait_thread ( pbsdrmaa_log_reader_t * self )
491{
492        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session;
493       
494        if (self->date_changed)
495         {
496                char * log_path = NULL;
497                int num_tries = 0;
498                struct tm tm;
499               
500                fsd_log_enter((""));
501               
502                if(!self->first_open)
503                        time(&self->t);
504                else
505                        self->t = pbssession->log_file_initial_time;
506                       
507                localtime_r(&self->t,&tm);
508                               
509                #define DRMAA_WAIT_THREAD_MAX_TRIES (12)
510                /* generate new date, close file and open new */
511                log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday);
512
513                if(self->fhandle)
514                        fclose(self->fhandle);
515
516                fsd_log_info(("Opening log file: %s",log_path));
517                               
518        retry:
519                if ((self->fhandle = fopen(log_path,"r")) == NULL && (num_tries > DRMAA_WAIT_THREAD_MAX_TRIES || self->first_open))
520                 {
521                        fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread."));
522                        fsd_log_error(("Remember that without keep_completed set the standard wait_thread won't provide information about job exit status"));
523                        /*pbssession->super.enable_wait_thread = false;*/ /* run not wait_thread */
524                        pbssession->wait_thread_log = false;
525                        pbssession->super.wait_thread = pbssession->super_wait_thread;
526                        pbssession->super.wait_thread(self->session);
527                 }
528                else if ( self->fhandle == NULL )
529                 { /* Torque seems not to create a new file immediately after the old one is closed */
530                        fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries));
531                        num_tries++;
532                        sleep(2 * num_tries);
533                        goto retry;
534                 }
535
536                fsd_free(log_path);
537
538                fsd_log_debug(("Log file opened"));
539
540                if(self->first_open)
541                 {
542                        fsd_log_debug(("Log file lseek"));
543
544                        if(fseek(self->fhandle, pbssession->log_file_initial_size, SEEK_SET) == (off_t) -1)
545                         {
546                                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"fseek error");
547                         }
548                        self->first_open = false;
549                 }
550
551                self->date_changed = false;
552               
553                fsd_log_return((""));
554        }       
555}
556
557time_t
558pbsdrmaa_parse_log_timestamp(const char *timestamp, char *unixtime_str, size_t size)
559{
560        struct tm temp_time_tm;
561        memset(&temp_time_tm, 0, sizeof(temp_time_tm));
562        temp_time_tm.tm_isdst = -1;
563
564        if (strptime(timestamp, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL)
565         {
566                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - failed to parse log timestamp: %s", timestamp);
567         }
568        else
569         {
570                time_t temp_time = mktime(&temp_time_tm);
571                snprintf(unixtime_str, size, "%lu", temp_time);
572                return temp_time;
573         }
574}
575
576char *
577pbsdrmaa_get_exec_host_from_accountig(pbsdrmaa_log_reader_t * log_reader, const char *job_id)
578{
579                pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) log_reader->session;
580
581                struct tm tm;
582                time_t tm_t;
583                char *line = NULL;
584                FILE *fhandle = NULL;
585                char *exec_host = NULL;
586
587                fsd_log_enter((""));
588
589                tm_t = time(NULL);
590                localtime_r(&time_t, &tm);
591
592                log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", pbssession->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday);
593
594                fsd_log_info(("Opening accounting log file: %s", log_path));
595
596                if ((fhandle = fopen(log_path, "r")) == NULL)
597                 {
598                        fsd_log_error("Failed to open accounting log file: %s", log_path);
599                        fsd_free(log_path);
600                        return NULL;
601                 }
602
603                fsd_free(log_path);
604/*
60510/27/2011 14:09:32;E;114249.grass1.man.poznan.pl;user=drmaa group=drmaa jobname=none queue=shortq ctime=1319717371 qtime=1319717371 etime=1319717371 start=1319717372 owner=drmaa@grass1.man.poznan.pl exec_host=grass4.man.poznan.pl/0 Resource_List.neednodes=1 Resource_List.nodect=1 Resource_List.nodes=1 Resource_List.walltime=02:00:00 session=28561 end=1319717372 Exit_status=0 resources_used.cput=00:00:00 resources_used.mem=0kb resources_used.vmem=0kb resources_used.walltime=00:00:00
606 */
607                while ((line = fsd_readline(fhandle)) != NULL)
608                 {
609                        if (line[20] == 'E' && strncmp(line + 23, job_id, strlen(job_id)) == 0 )
610                         {
611                                char *p = NULL;
612
613                                fsd_log_debug(("Matched accounting log record = %s", line));
614
615                                if (!(exec_host = strstr(line, "exec_host")))
616                                 {
617                                        fsd_log_error(("Invalid accounting record: %s", exec_host));
618                                        break;
619                                 }
620
621                                exec_host += 10;
622
623                                p = exec_host;
624                                while (p != ' ' && p != '\0')
625                                        p++;
626                                p = '\0';
627
628                                break;
629                         }
630
631                        fsd_free(line);
632                 }
633
634                if (exec_host)
635                 {
636                        fsd_log_info(("Job %s was executing on hosts %s.", job_id, exec_host));
637                        exec_host = fsd_strdup(exec_host);
638                 }
639                else
640                 {
641                        fsd_log_error(("Could not find executions hosts for %s.", job_id))
642                 }
643
644                if (line)
645                        fsd_free(line);
646
647                fclose(fhandle);
648
649                return exec_host;
650}
651
652
Note: See TracBrowser for help on using the repository browser.