- Timestamp:
- 07/27/11 23:41:14 (14 years ago)
- Location:
- trunk/pbs_drmaa
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/job.c
r17 r21 64 64 static void 65 65 pbsdrmaa_job_update( fsd_job_t *self, struct batch_status* ); 66 67 bool 68 pbsdrmaa_job_update_status_accounting( fsd_job_t *self ); 66 69 67 70 … … 189 192 190 193 fsd_log_enter(( "({job_id=%s})", self->job_id )); 194 191 195 TRY 192 196 { … … 203 207 if( status == NULL ) 204 208 { 205 #ifndef PBS_PROFESSIONAL 206 fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 207 #else 208 # ifndef PBS_PROFESSIONAL_NO_LOG 209 fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 210 # else 211 fsd_log_error(("pbs_statjob error: %d", pbs_errno)); 212 # endif 213 #endif 214 switch( pbs_errno ) 215 { 216 case PBSE_UNKJOBID: 217 break; 218 case PBSE_PROTOCOL: 219 case PBSE_EXPIRED: 220 if ( session->pbs_conn >= 0 ) 221 pbs_disconnect( session->pbs_conn ); 222 sleep(1); 223 session->pbs_conn = pbs_connect( session->super.contact ); 224 if( session->pbs_conn < 0 ) 225 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 226 else 227 { 228 fsd_log_error(("retry:")); 229 goto retry; 230 } 231 default: 232 pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); 233 break; 234 case 0: /* ? */ 235 fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR ); 236 break; 209 if(pbsdrmaa_job_update_status_accounting(self) == false) 210 { 211 #ifndef PBS_PROFESSIONAL 212 fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 213 #else 214 # ifndef PBS_PROFESSIONAL_NO_LOG 215 fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 216 # else 217 fsd_log_error(("pbs_statjob error: %d", pbs_errno)); 218 # endif 219 #endif 220 221 /**/ 222 223 switch( pbs_errno ) 224 { 225 case PBSE_UNKJOBID: 226 break; 227 case PBSE_PROTOCOL: 228 case PBSE_EXPIRED: 229 if ( session->pbs_conn >= 0 ) 230 pbs_disconnect( session->pbs_conn ); 231 sleep(1); 232 session->pbs_conn = pbs_connect( session->super.contact ); 233 if( session->pbs_conn < 0 ) 234 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 235 else 236 { 237 fsd_log_error(("retry:")); 238 goto retry; 239 } 240 default: 241 pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); 242 break; 243 case 0: /* ? */ 244 fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR ); 245 break; 246 } 237 247 } 238 248 } … … 497 507 drmaa_job_ps_to_str(self->state), self->exit_status )); 498 508 } 509 510 bool 511 pbsdrmaa_job_update_status_accounting( fsd_job_t *self ) 512 { 513 fsd_drmaa_session_t *session = self->session; 514 pbsdrmaa_log_reader_t *log_reader = NULL; 515 bool res = false; 516 517 fsd_log_enter(( "({job_id=%s})", self->job_id )); 518 fsd_log_info(( "Reading job %s info from accounting file", self->job_id )); 519 520 TRY 521 { 522 log_reader = pbsdrmaa_log_reader_accounting_new( session, self); 523 bool res = log_reader->read_log( log_reader ); 524 } 525 FINALLY 526 { 527 pbsdrmaa_log_reader_destroy( log_reader ); 528 } 529 END_TRY 530 531 fsd_log_return(("")); 532 return res; 533 } -
trunk/pbs_drmaa/log_reader.c
r20 r21 49 49 #include <errno.h> 50 50 51 static void51 static bool 52 52 pbsdrmaa_read_log(); 53 53 … … 63 63 static ssize_t 64 64 pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 65 66 static void 67 pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ); 68 69 static ssize_t 70 pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 71 72 static bool 73 pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self ); 65 74 66 75 int … … 122 131 } 123 132 133 pbsdrmaa_log_reader_t * 134 pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job ) 135 { 136 pbsdrmaa_log_reader_t *volatile self = NULL; 137 138 fsd_log_enter(("")); 139 TRY 140 { 141 fsd_malloc(self, pbsdrmaa_log_reader_t ); 142 143 self->session = session; 144 145 self->job = job; 146 self->name = "Accounting"; 147 self->select_file = pbsdrmaa_select_file_accounting; 148 self->read_line = pbsdrmaa_read_line_accounting; 149 150 self->read_log = pbsdrmaa_read_log_accounting; 151 152 self->log_files = NULL; 153 self->log_files_number = 0; 154 155 self->run_flag = true; 156 self->fd = -1; 157 self->date_changed = true; 158 self->first_open = true; 159 160 self->log_file_initial_size = 0; 161 self->log_file_read_size = 0; 162 } 163 EXCEPT_DEFAULT 164 { 165 if( self != NULL) 166 fsd_free(self); 167 168 fsd_exc_reraise(); 169 } 170 END_TRY 171 fsd_log_return(("")); 172 return self; 173 } 174 124 175 void 125 176 pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self ) … … 165 216 }; 166 217 218 enum field_msg_accounting 219 { 220 FLD_MSG_ACC_USER = 0, 221 FLD_MSG_ACC_GROUP = 1, 222 FLD_MSG_ACC_JOBNAME = 2, 223 FLD_MSG_ACC_QUEUE = 3, 224 FLD_MSG_ACC_CTIME = 4, 225 FLD_MSG_ACC_QTIME = 5, 226 FLD_MSG_ACC_ETIME = 6, 227 FLD_MSG_ACC_START = 7, 228 FLD_MSG_ACC_OWNER = 8, 229 FLD_MSG_ACC_EXEC_HOST = 9, 230 FLD_MSG_ACC_RES_NEEDNODES = 10, 231 FLD_MSG_ACC_RES_NODECT = 11, 232 FLD_MSG_ACC_RES_NODES = 12, 233 FLD_MSG_ACC_RES_WALLTIME = 13 234 }; 235 167 236 #define FLD_MSG_STATUS "0010" 168 237 #define FLD_MSG_STATE "0008" 169 238 #define FLD_MSG_LOG "0002" 170 239 171 void240 bool 172 241 pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self ) 173 242 { … … 541 610 542 611 fsd_log_return(("")); 612 return true; 543 613 } 544 614 … … 749 819 } 750 820 821 void 822 pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ) 823 { 824 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 825 826 char * log_path = NULL; 827 828 struct tm tm; 829 830 fsd_log_enter(("")); 831 832 time(&self->t); 833 834 localtime_r(&self->t,&tm); 835 836 #define DRMAA_ACCOUNTING_MAX_TRIES (12) 837 /* generate new date, close file and open new */ 838 if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", 839 pbssession->pbs_home, 840 tm.tm_year + 1900, 841 tm.tm_mon + 1, 842 tm.tm_mday)) == NULL) { 843 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible"); 844 } 845 846 if(self->fd != -1) 847 close(self->fd); 848 849 fsd_log_debug(("Accounting Log file: %s",log_path)); 850 851 if((self->fd = open(log_path,O_RDONLY) ) == -1 ) 852 { 853 fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home.")); 854 } 855 856 fsd_free(log_path); 857 858 fsd_log_debug(("Accounting Log file opened")); 859 860 fsd_log_return(("")); 861 } 862 863 ssize_t 864 pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 865 { 866 return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx); 867 } 868 869 enum field_acc 870 { 871 FLD_ACC_DATE = 0, 872 FLD_ACC_EVENT = 1, 873 FLD_ACC_ID = 2, 874 FLD_ACC_MSG = 3 875 }; 876 877 bool 878 pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self ) 879 { 880 pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job; 881 bool res = false; 882 883 fsd_job_t *volatile temp_job = NULL; 884 885 fsd_log_enter(("")); 886 fsd_log_debug(("Accounting Log file opened")); 887 if(self->job == NULL) 888 fsd_mutex_lock( &self->session->mutex ); 889 890 TRY 891 { 892 TRY 893 { 894 char line[4096] = ""; 895 char buffer[4096] = ""; 896 int idx = 0, end_idx = 0, line_idx = 0; 897 898 self->select_file(self); 899 900 if(self->fd != -1) 901 while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0) 902 { 903 const char *volatile ptr = line; 904 char field[256] = ""; 905 int volatile field_n = 0; 906 int n; 907 908 bool volatile job_id_match = false; 909 910 bool volatile job_found = false; 911 char * temp_date = NULL; 912 913 struct batch_status status; 914 915 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */ 916 { 917 status.next = NULL; 918 status.attribs = NULL; 919 920 if(field_n == FLD_ACC_DATE) 921 { 922 temp_date = fsd_strdup(field); 923 } 924 else if(field_n == FLD_ACC_EVENT) 925 { 926 927 } 928 else if(field_n == FLD_ACC_ID) 929 { 930 TRY 931 { 932 int diff = -1; 933 diff = fsd_job_id_cmp(self->job->job_id,field); 934 if( diff == 0) 935 { 936 /* read this file to the place we started and exit*/ 937 fsd_log_debug(("Accounting found job: %s",self->job->job_id)); 938 job_found = true; 939 job_id_match = true; 940 status.name = fsd_strdup(self->job->job_id); 941 } 942 } 943 END_TRY 944 } 945 else if(job_id_match && field_n == FLD_ACC_MSG) 946 { 947 struct attrl * struct_attrl = calloc(10,sizeof(struct attrl)); 948 949 if(field[0] == 'q') 950 { 951 status.attribs = &struct_attrl[0]; 952 struct_attrl[0].name = ATTR_queue; 953 struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1); 954 struct_attrl[0].next = NULL; 955 } 956 else if(field[0] == 'u') 957 { 958 /* rusage */ 959 const char *ptr2 = field; 960 char msg[ 256 ] = ""; 961 int n2 = 0; 962 int msg_field_n = 0; 963 964 status.attribs = &struct_attrl[0]; 965 966 while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 967 { 968 switch(msg_field_n) 969 { 970 case FLD_MSG_ACC_USER: 971 struct_attrl[msg_field_n].name = ATTR_euser; 972 break; 973 974 case FLD_MSG_ACC_GROUP: 975 struct_attrl[msg_field_n].name = ATTR_egroup; 976 break; 977 978 case FLD_MSG_ACC_JOBNAME: 979 struct_attrl[msg_field_n].name = ATTR_name; 980 break; 981 982 case FLD_MSG_ACC_QUEUE: 983 struct_attrl[msg_field_n].name = ATTR_queue; 984 break; 985 986 case FLD_MSG_ACC_CTIME: 987 struct_attrl[msg_field_n].name = ATTR_ctime; 988 break; 989 990 case FLD_MSG_ACC_QTIME: 991 struct_attrl[msg_field_n].name = ATTR_qtime; 992 break; 993 994 case FLD_MSG_ACC_ETIME: 995 struct_attrl[msg_field_n].name = ATTR_etime; 996 break; 997 998 case FLD_MSG_ACC_START: 999 struct_attrl[msg_field_n].name = ATTR_start_time; 1000 break; 1001 1002 case FLD_MSG_ACC_OWNER: 1003 struct_attrl[msg_field_n].name = ATTR_owner; 1004 break; 1005 1006 case FLD_MSG_ACC_EXEC_HOST: 1007 struct_attrl[msg_field_n].name = ATTR_exechost; 1008 break; 1009 } 1010 1011 struct_attrl[msg_field_n].value = fsd_strdup(strchr(msg,'=')+1); 1012 if(msg_field_n!=9) 1013 { 1014 struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1]; 1015 } 1016 else 1017 { 1018 struct_attrl[msg_field_n].next = NULL; 1019 break; 1020 } 1021 1022 ptr2 += n2; 1023 msg_field_n++; 1024 if ( *ptr2 != ' ' ) 1025 { 1026 break; 1027 } 1028 ++ptr2; 1029 } 1030 } 1031 1032 if( job_found && status.attribs != NULL) 1033 { 1034 fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id )); 1035 pbsjob->update( self->job, &status ); 1036 res = true; 1037 } 1038 1039 if(self->job == NULL) 1040 { 1041 fsd_cond_broadcast( &temp_job->status_cond); 1042 fsd_cond_broadcast( &self->session->wait_condition ); 1043 } 1044 if ( temp_job ) 1045 temp_job->release( temp_job ); 1046 1047 int i = 0; 1048 for(i = 0; i < 10; i++) 1049 { 1050 fsd_free(struct_attrl[i].value); 1051 } 1052 fsd_free(struct_attrl); 1053 fsd_free(status.name); 1054 } 1055 1056 1057 ptr += n; 1058 if ( *ptr != ';' ) 1059 { 1060 break; /* end of line */ 1061 } 1062 field_n++; 1063 ++ptr; 1064 } 1065 1066 fsd_free(temp_date); 1067 } /* end of while getline loop */ 1068 1069 } 1070 EXCEPT_DEFAULT 1071 { 1072 const fsd_exc_t *e = fsd_exc_get(); 1073 /* Its better to exit and communicate error rather then let the application to hang */ 1074 fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) )); 1075 exit(1); 1076 } 1077 END_TRY 1078 1079 if(self->fd != -1) 1080 close(self->fd); 1081 fsd_log_debug(("%s - Accounting log file closed",self->name)); 1082 } 1083 FINALLY 1084 { 1085 fsd_log_debug(("%s - Terminated.",self->name)); 1086 if(self->job == NULL) 1087 fsd_mutex_unlock( &self->session->mutex ); /**/ 1088 } 1089 END_TRY 1090 1091 fsd_log_return(("")); 1092 return res; 1093 } 1094 751 1095 int 752 1096 fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */ -
trunk/pbs_drmaa/log_reader.h
r12 r21 33 33 pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 34 34 35 pbsdrmaa_log_reader_t * 36 pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 37 35 38 void 36 39 pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self ); … … 40 43 fsd_job_t *volatile job; 41 44 42 void(*45 bool (* 43 46 read_log) ( pbsdrmaa_log_reader_t * self ); 44 47
Note: See TracChangeset
for help on using the changeset viewer.