/* $Id: session.c 385 2011-01-04 18:24:05Z mamonski $ */ /* * FedStage DRMAA for PBS Pro * Copyright (C) 2006-2009 FedStage Systems * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #ifdef HAVE_CONFIG_H # include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef lint static char rcsid[] # ifdef __GNUC__ __attribute__ ((unused)) # endif = "$Id: session.c 385 2011-01-04 18:24:05Z mamonski $"; #endif static void pbsdrmaa_session_destroy( fsd_drmaa_session_t *self ); static void pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self ); static fsd_job_t * pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id ); static bool pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self ); static void pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self ); static void *pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ); static char * pbsdrmaa_session_run_impl( fsd_drmaa_session_t *self, const fsd_template_t *jt, int bulk_idx ); static struct attrl * pbsdrmaa_create_status_attrl(void); fsd_drmaa_session_t * pbsdrmaa_session_new( const char *contact ) { pbsdrmaa_session_t *volatile self = NULL; if( contact == NULL ) contact = ""; TRY { self = (pbsdrmaa_session_t*)fsd_drmaa_session_new(contact); fsd_realloc( self, 1, pbsdrmaa_session_t ); self->super_wait_thread = NULL; self->log_file_initial_size = 0; self->pbs_conn = -1; self->pbs_home = NULL; self->wait_thread_log = false; self->status_attrl = NULL; self->super_destroy = self->super.destroy; self->super.destroy = pbsdrmaa_session_destroy; self->super.new_job = pbsdrmaa_session_new_job; self->super.update_all_jobs_status = pbsdrmaa_session_update_all_jobs_status; self->super.run_impl = pbsdrmaa_session_run_impl; self->super_apply_configuration = self->super.apply_configuration; self->super.apply_configuration = pbsdrmaa_session_apply_configuration; self->do_drm_keeps_completed_jobs = pbsdrmaa_session_do_drm_keeps_completed_jobs; self->status_attrl = pbsdrmaa_create_status_attrl(); self->pbs_conn = pbs_connect( self->super.contact ); fsd_log_debug(( "pbs_connect(%s) =%d", self->super.contact, self->pbs_conn )); if( self->pbs_conn < 0 ) pbsdrmaa_exc_raise_pbs( "pbs_connect" ); self->super.load_configuration( &self->super, "pbs_drmaa" ); self->super.missing_jobs = FSD_IGNORE_MISSING_JOBS; if( self->do_drm_keeps_completed_jobs( self ) ) self->super.missing_jobs = FSD_IGNORE_QUEUED_MISSING_JOBS; } EXCEPT_DEFAULT { if( self ) { self->super.destroy( &self->super ); self = NULL; } } END_TRY return (fsd_drmaa_session_t*)self; } void pbsdrmaa_session_destroy( fsd_drmaa_session_t *self ) { pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; self->stop_wait_thread( self ); if( pbsself->pbs_conn >= 0 ) pbs_disconnect( pbsself->pbs_conn ); fsd_free( pbsself->status_attrl ); pbsself->super_destroy( self ); } static char * pbsdrmaa_session_run_impl( fsd_drmaa_session_t *self, const fsd_template_t *jt, int bulk_idx ) { char *volatile job_id = NULL; fsd_job_t *volatile job = NULL; pbsdrmaa_submit_t *volatile submit = NULL; fsd_log_enter(( "(jt=%p, bulk_idx=%d)", (void*)jt, bulk_idx )); TRY { submit = pbsdrmaa_submit_new( self, jt, bulk_idx ); submit->eval( submit ); job_id = submit->submit( submit ); job = self->new_job( self, job_id ); job->submit_time = time(NULL); job->flags |= FSD_JOB_CURRENT_SESSION; self->jobs->add( self->jobs, job ); job->release( job ); job = NULL; } EXCEPT_DEFAULT { fsd_free( job_id ); fsd_exc_reraise(); } FINALLY { if( submit ) submit->destroy( submit ); if( job ) job->release( job ); } END_TRY fsd_log_return(( " =%s", job_id )); return job_id; } static fsd_job_t * pbsdrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id ) { fsd_job_t *job; job = pbsdrmaa_job_new( fsd_strdup(job_id) ); job->session = self; return job; } void pbsdrmaa_session_apply_configuration( fsd_drmaa_session_t *self ) { pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; fsd_conf_option_t *pbs_home; pbs_home = fsd_conf_dict_get(self->configuration, "pbs_home" ); if( pbs_home ) { if( pbs_home->type == FSD_CONF_STRING ) { struct stat statbuf; char * volatile log_path; time_t t; pbsself->pbs_home = pbs_home->val.string; fsd_log_debug(("pbs_home: %s",pbsself->pbs_home)); pbsself->super_wait_thread = pbsself->super.wait_thread; pbsself->super.wait_thread = pbsdrmaa_session_wait_thread; pbsself->wait_thread_log = true; time(&t); localtime_r(&t,&pbsself->log_file_initial_time); if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbsself->pbs_home, pbsself->log_file_initial_time.tm_year + 1900, pbsself->log_file_initial_time.tm_mon + 1, pbsself->log_file_initial_time.tm_mday)) == NULL) { fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); } if(stat(log_path,&statbuf) == -1) { char errbuf[256] = "InternalError"; (void)strerror_r(errno, errbuf, 256); fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"stat error: %s",errbuf); } fsd_log_debug(("Log file %s size %d",log_path,(int) statbuf.st_size)); pbsself->log_file_initial_size = statbuf.st_size; fsd_free(log_path); } else { pbsself->super.enable_wait_thread = false; pbsself->wait_thread_log = false; fsd_log_debug(("pbs_home not configured. Running standard wait_thread (pooling).")); } } pbsself->super_apply_configuration(self); /* call method from the superclass */ } void pbsdrmaa_session_update_all_jobs_status( fsd_drmaa_session_t *self ) { volatile bool conn_lock = false; volatile bool jobs_lock = false; pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*)self; fsd_job_set_t *jobs = self->jobs; struct batch_status *volatile status = NULL; fsd_log_enter(("")); TRY { conn_lock = fsd_mutex_lock( &self->drm_connection_mutex ); retry: #ifdef PBS_PROFESSIONAL status = pbs_statjob( pbsself->pbs_conn, NULL, NULL, NULL ); #else status = pbs_statjob( pbsself->pbs_conn, NULL, pbsself->status_attrl, NULL ); #endif fsd_log_debug(( "pbs_statjob( fd=%d, job_id=NULL, attribs={...} ) =%p", pbsself->pbs_conn, (void*)status )); if( status == NULL && pbs_errno != 0 ) { if (pbs_errno == PBSE_PROTOCOL || pbs_errno == PBSE_EXPIRED) { if ( pbsself->pbs_conn >= 0) pbs_disconnect( pbsself->pbs_conn ); sleep(1); pbsself->pbs_conn = pbs_connect( pbsself->super.contact ); if( pbsself->pbs_conn < 0 ) pbsdrmaa_exc_raise_pbs( "pbs_connect" ); else goto retry; } else { pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); } } conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); { size_t i; fsd_job_t *job; jobs_lock = fsd_mutex_lock( &jobs->mutex ); for( i = 0; i < jobs->tab_size; i++ ) for( job = jobs->tab[i]; job != NULL; job = job->next ) { fsd_mutex_lock( &job->mutex ); job->flags |= FSD_JOB_MISSING; fsd_mutex_unlock( &job->mutex ); } jobs_lock = fsd_mutex_unlock( &jobs->mutex ); } { struct batch_status *volatile i; for( i = status; i != NULL; i = i->next ) { fsd_job_t *job = NULL; fsd_log_debug(( "job_id=%s", i->name )); job = self->get_job( self, i->name ); if( job != NULL ) { job->flags &= ~FSD_JOB_MISSING; TRY { ((pbsdrmaa_job_t*)job)->update( job, i ); } FINALLY { job->release( job ); } END_TRY } } } { size_t volatile i; fsd_job_t *volatile job; jobs_lock = fsd_mutex_lock( &jobs->mutex ); for( i = 0; i < jobs->tab_size; i++ ) for( job = jobs->tab[i]; job != NULL; job = job->next ) { fsd_mutex_lock( &job->mutex ); TRY { if( job->flags & FSD_JOB_MISSING ) job->on_missing( job ); } FINALLY{ fsd_mutex_unlock( &job->mutex ); } END_TRY } jobs_lock = fsd_mutex_unlock( &jobs->mutex ); } } FINALLY { if( status != NULL ) pbs_statfree( status ); if( conn_lock ) conn_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); if( jobs_lock ) jobs_lock = fsd_mutex_unlock( &jobs->mutex ); } END_TRY fsd_log_return(("")); } struct attrl * pbsdrmaa_create_status_attrl(void) { struct attrl *result = NULL; struct attrl *i; const int max_attribs = 16; int n_attribs; int j = 0; fsd_log_enter(("")); fsd_calloc( result, max_attribs, struct attrl ); result[j++].name="job_state"; result[j++].name="exit_status"; result[j++].name="resources_used"; result[j++].name="ctime"; result[j++].name="mtime"; result[j++].name="qtime"; result[j++].name="etime"; result[j++].name="queue"; result[j++].name="Account_Name"; result[j++].name="exec_host"; result[j++].name="start_time"; result[j++].name="mtime"; #if 0 result[j].name="resources_used"; result[j].resource="walltime"; j++; result[j].name="resources_used"; result[j].resource="cput"; j++; result[j].name="resources_used"; result[j].resource="mem"; j++; result[j].name="resources_used"; result[j].resource="vmem"; j++; result[j].name="Resource_List"; result[j].resource="walltime"; j++; result[j].name="Resource_List"; result[j].resource="cput"; j++; result[j].name="Resource_List"; result[j].resource="mem"; j++; result[j].name="Resource_List"; result[j].resource="vmem"; j++; #endif n_attribs = j; for( i = result; true; i++ ) if( i+1 < result + n_attribs ) i->next = i+1; else { i->next = NULL; break; } #ifdef DEBUGGING fsd_log_return((":")); pbsdrmaa_dump_attrl( result, NULL ); #endif return result; } bool pbsdrmaa_session_do_drm_keeps_completed_jobs( pbsdrmaa_session_t *self ) { #ifndef PBS_PROFESSIONAL struct attrl default_queue_query; struct attrl keep_completed_query; struct batch_status *default_queue_result = NULL; struct batch_status *keep_completed_result = NULL; const char *default_queue = NULL; const char *keep_completed = NULL; volatile bool result = false; volatile bool conn_lock = false; TRY { default_queue_query.next = NULL; default_queue_query.name = "default_queue"; default_queue_query.resource = NULL; default_queue_query.value = NULL; keep_completed_query.next = NULL; keep_completed_query.name = "keep_completed"; keep_completed_query.resource = NULL; keep_completed_query.value = NULL; conn_lock = fsd_mutex_lock( &self->super.drm_connection_mutex ); default_queue_result = pbs_statserver( self->pbs_conn, &default_queue_query, NULL ); if( default_queue_result == NULL ) pbsdrmaa_exc_raise_pbs( "pbs_statserver" ); if( default_queue_result->attribs && !strcmp( default_queue_result->attribs->name, "default_queue" ) ) default_queue = default_queue_result->attribs->value; fsd_log_debug(( "default_queue: %s", default_queue )); if( default_queue ) { keep_completed_result = pbs_statque( self->pbs_conn, (char*)default_queue, &keep_completed_query, NULL ); if( keep_completed_result == NULL ) pbsdrmaa_exc_raise_pbs( "pbs_statque" ); if( keep_completed_result->attribs && !strcmp( keep_completed_result->attribs->name, "keep_completed" ) ) keep_completed = keep_completed_result->attribs->value; } fsd_log_debug(( "keep_completed: %s", keep_completed )); } EXCEPT_DEFAULT { const fsd_exc_t *e = fsd_exc_get(); fsd_log_warning(( "PBS server seems not to keep completed jobs\n" "detail: %s", e->message(e) )); result = false; } ELSE { result = false; if( default_queue == NULL ) fsd_log_warning(( "no default queue set on PBS server" )); else if( keep_completed == NULL && self->pbs_home == NULL ) fsd_log_warning(( "PBS server is not configured to keep completed jobs\n" "in Torque: set keep_completed parameter of default queue\n" " $ qmgr -c 'set queue batch keep_completed = 60'\n" " or configure DRMAA to utilize log files" )); else result = true; } FINALLY { if( default_queue_result ) pbs_statfree( default_queue_result ); if( keep_completed_result ) pbs_statfree( keep_completed_result ); if( conn_lock ) conn_lock = fsd_mutex_unlock( &self->super.drm_connection_mutex ); } END_TRY #endif return false; } enum field { FLD_DATE = 0, FLD_EVENT = 1, FLD_OBJ = 2, FLD_TYPE = 3, FLD_ID = 4, FLD_MSG = 5 }; enum field_msg { FLD_MSG_EXIT_STATUS = 0, FLD_MSG_CPUT = 1, FLD_MSG_MEM = 2, FLD_MSG_VMEM = 3, FLD_MSG_WALLTIME = 4 }; #define FLD_MSG_STATUS "0010" #define FLD_MSG_STATE "0008" #define FLD_MSG_LOG "0002" ssize_t fsd_getline(char * line,ssize_t size, int fd) { char buf; char * ptr = NULL; ssize_t n = 0, rc; ptr = line; for(n = 1; n< size; n++) { if( (rc = read(fd,&buf,1 )) == 1) { *ptr++ = buf; if(buf == '\n') { break; } } else if (rc == 0) { if (n == 1) return 0; else break; } else return -1; } return n; } void * pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ) { pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*) self; fsd_job_t *volatile job = NULL; pbsdrmaa_job_t *volatile pbsjob = NULL; char job_id[256] = ""; char event[256] = ""; time_t t; struct tm tm; tm = pbsself->log_file_initial_time; fsd_log_enter(( "" )); fsd_mutex_lock( &self->mutex ); TRY { char * volatile log_path = NULL; char buffer[4096] = ""; bool volatile date_changed = true; int volatile fd = -1; bool first_open = true; fsd_log_debug(("WT - reading log files")); while( self->wait_thread_run_flag ) TRY { if(date_changed) { int num_tries = 0; time(&t); localtime_r(&t,&tm); #define DRMAA_WAIT_THREAD_MAX_TRIES (12) /* generate new date, close file and open new */ if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", pbsself->pbs_home, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday)) == NULL) { fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); } if(fd != -1) close(fd); fsd_log_debug(("Log file: %s",log_path)); retry: if((fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES ) { fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread.")); fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly")); /*pbsself->super.enable_wait_thread = false;*/ /* run not wait_thread */ pbsself->wait_thread_log = false; pbsself->super.wait_thread = pbsself->super_wait_thread; pbsself->super.wait_thread(self); } else if ( fd == -1 ) { fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); num_tries++; sleep(5); goto retry; } fsd_free(log_path); fsd_log_debug(("Log file opened")); if(first_open) { fsd_log_debug(("Log file lseek")); if(lseek(fd,pbsself->log_file_initial_size,SEEK_SET) == (off_t) -1) { char errbuf[256] = "InternalError"; (void)strerror_r(errno, errbuf, 256); fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf); } first_open = false; } date_changed = false; } while ((fsd_getline(buffer,sizeof(buffer),fd)) > 0) { const char *volatile ptr = buffer; char field[256] = ""; int volatile field_n = 0; int n; bool volatile job_id_match = false; bool volatile event_match = false; bool volatile log_event = false; bool volatile log_match = false; char * temp_date = NULL; struct batch_status status; status.next = NULL; if( strlcpy(job_id,"",sizeof(job_id)) > sizeof(job_id) ) { fsd_log_error(("WT - strlcpy error")); } if( strlcpy(event,"",sizeof(event)) > sizeof(event) ) { fsd_log_error(("WT - strlcpy error")); } while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */ { if(field_n == FLD_DATE) { temp_date = fsd_strdup(field); } else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || strcmp(field,FLD_MSG_STATE) == 0 )) { /* event described by log line*/ if(strlcpy(event, field,sizeof(event)) > sizeof(event)) { fsd_log_error(("WT - strlcpy error")); } event_match = true; } else if(event_match && field_n == FLD_ID) { TRY { job = self->get_job( self, field ); pbsjob = (pbsdrmaa_job_t*) job; if( job ) { if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) { fsd_log_error(("WT - strlcpy error")); } fsd_log_debug(("WT - job_id: %s",job_id)); status.name = fsd_strdup(job_id); job_id_match = true; /* job_id is in drmaa */ } else { fsd_log_debug(("WT - Unknown job: %s", field)); } } END_TRY } else if(job_id_match && field_n == FLD_MSG) { /* parse msg - depends on FLD_EVENT*/ struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem, struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name; bool state_running = false; struct_status.name = NULL; struct_status.value = NULL; struct_status.next = NULL; struct_status.resource = NULL; struct_state.name = NULL; struct_state.value = NULL; struct_state.next = NULL; struct_state.resource = NULL; struct_resource_cput.name = NULL; struct_resource_cput.value = NULL; struct_resource_cput.next = NULL; struct_resource_cput.resource = NULL; struct_resource_mem.name = NULL; struct_resource_mem.value = NULL; struct_resource_mem.next = NULL; struct_resource_mem.resource = NULL; struct_resource_vmem.name = NULL; struct_resource_vmem.value = NULL; struct_resource_vmem.next = NULL; struct_resource_vmem.resource = NULL; struct_resource_walltime.name = NULL; struct_resource_walltime.value = NULL; struct_resource_walltime.next = NULL; struct_resource_walltime.resource = NULL; struct_start_time.name = NULL; struct_start_time.value = NULL; struct_start_time.next = NULL; struct_start_time.resource = NULL; struct_mtime.name = NULL; struct_mtime.value = NULL; struct_mtime.next = NULL; struct_mtime.resource = NULL; struct_queue.name = NULL; struct_queue.value = NULL; struct_queue.next = NULL; struct_queue.resource = NULL; struct_account_name.name = NULL; struct_account_name.value = NULL; struct_account_name.next = NULL; struct_account_name.resource = NULL; if (strcmp(event,FLD_MSG_STATE) == 0) { /* job run, modified, queued etc */ int n = 0; status.attribs = &struct_state; struct_state.next = NULL; struct_state.name = "job_state"; if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/ { n = 4; } if(field[4] == 'M') { struct tm temp_time_tm; memset(&temp_time_tm, 0, sizeof(temp_time_tm)); temp_time_tm.tm_isdst = -1; if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) { fsd_log_error(("failed to parse mtime: %s", temp_date)); } else { time_t temp_time = mktime(&temp_time_tm); status.attribs = &struct_mtime; struct_mtime.name = "mtime"; struct_mtime.next = NULL; struct_mtime.value = fsd_asprintf("%lu",temp_time); } } /* != Job deleted and Job to be deleted*/ #ifdef PBS_PROFESSIONAL else if (field[4] != 't' && field[10] != 'd') { #else else if(field[4] != 'd') { #endif if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */ fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); } if(struct_state.value[0] == 'R'){ state_running = true; } } else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/ struct_status.name = "exit_status"; struct_status.value = fsd_strdup("-1"); struct_status.next = NULL; struct_state.next = &struct_status; struct_state.value = fsd_strdup("C"); } } else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/ { /* exit status and rusage */ const char *ptr2 = field; char msg[ 256 ] = ""; int n2; int msg_field_n = 0; struct_resource_cput.name = "resources_used"; struct_resource_mem.name = "resources_used"; struct_resource_vmem.name = "resources_used"; struct_resource_walltime.name = "resources_used"; struct_status.name = "exit_status"; struct_state.name = "job_state"; status.attribs = &struct_resource_cput; struct_resource_cput.next = &struct_resource_mem; struct_resource_mem.next = &struct_resource_vmem; struct_resource_vmem.next = &struct_resource_walltime; struct_resource_walltime.next = &struct_status; struct_status.next = &struct_state; struct_state.next = NULL; while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) { switch(msg_field_n) { case FLD_MSG_EXIT_STATUS: struct_status.value = fsd_strdup(strchr(msg,'=')+1); break; case FLD_MSG_CPUT: struct_resource_cput.resource = "cput"; struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1); break; case FLD_MSG_MEM: struct_resource_mem.resource = "mem"; struct_resource_mem.value = fsd_strdup(strchr(msg,'=')+1); break; case FLD_MSG_VMEM: struct_resource_vmem.resource = "vmem"; struct_resource_vmem.value = fsd_strdup(strchr(msg,'=')+1); break; case FLD_MSG_WALLTIME: struct_resource_walltime.resource = "walltime"; struct_resource_walltime.value = fsd_strdup(strchr(msg,'=')+1); break; } ptr2 += n2; msg_field_n++; if ( *ptr2 != ' ' ) { break; } ++ptr2; } struct_state.value = fsd_strdup("C"); /* we got exit_status so we say that it has completed */ } if ( state_running ) { fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); job->update_status( job ); } else { fsd_log_debug(("WT - updating job: %s", job->job_id )); pbsjob->update( job, &status ); } fsd_cond_broadcast( &job->status_cond); fsd_cond_broadcast( &self->wait_condition ); if ( job ) job->release( job ); fsd_free(struct_resource_cput.value); fsd_free(struct_resource_mem.value); fsd_free(struct_resource_vmem.value); fsd_free(struct_resource_walltime.value); fsd_free(struct_status.value); fsd_free(struct_state.value); fsd_free(struct_start_time.value); fsd_free(struct_mtime.value); fsd_free(struct_queue.value); fsd_free(struct_account_name.value); if ( status.name!=NULL ) fsd_free(status.name); } else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0) { log_event = true; } else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 ) { log_match = true; log_event = false; } else if( log_match && field_n == FLD_MSG && field[0] == 'L' && field[1] == 'o' && field[2] == 'g' && field[3] == ' ' && field[4] == 'c' && field[5] == 'l' && field[6] == 'o' && field[7] == 's' && field[8] == 'e' && field[9] == 'd' ) /* last field in the file - strange bahaviour*/ { fsd_log_debug(("WT - Date changed. Closing log file")); date_changed = true; log_match = false; } ptr += n; if ( *ptr != ';' ) { break; /* end of line */ } field_n++; ++ptr; } if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) { fsd_log_error(("WT - strlcpy error")); } fsd_free(temp_date); } fsd_mutex_unlock( &self->mutex ); usleep(1000000); fsd_mutex_lock( &self->mutex ); } EXCEPT_DEFAULT { const fsd_exc_t *e = fsd_exc_get(); fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) )); fsd_exc_reraise(); } END_TRY if(fd != -1) close(fd); fsd_log_debug(("Log file closed")); } FINALLY { fsd_log_debug(("WT - Terminated.")); fsd_mutex_unlock( &self->mutex ); } END_TRY fsd_log_return(( " =NULL" )); return NULL; }