Changeset 21


Ignore:
Timestamp:
07/27/11 23:41:14 (13 years ago)
Author:
mmatloka
Message:

exec host from accounting file

Location:
trunk/pbs_drmaa
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/pbs_drmaa/job.c

    r17 r21  
    6464static void 
    6565pbsdrmaa_job_update( fsd_job_t *self, struct batch_status* ); 
     66 
     67bool 
     68pbsdrmaa_job_update_status_accounting( fsd_job_t *self ); 
    6669 
    6770 
     
    189192 
    190193        fsd_log_enter(( "({job_id=%s})", self->job_id )); 
     194         
    191195        TRY 
    192196         { 
     
    203207                if( status == NULL ) 
    204208                 { 
    205 #ifndef PBS_PROFESSIONAL 
    206                         fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 
    207 #else 
    208 #  ifndef PBS_PROFESSIONAL_NO_LOG 
    209                         fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 
    210 #  else 
    211                         fsd_log_error(("pbs_statjob error: %d", pbs_errno)); 
    212 #  endif 
    213 #endif 
    214                         switch( pbs_errno ) 
    215                          { 
    216                                 case PBSE_UNKJOBID: 
    217                                         break; 
    218                                 case PBSE_PROTOCOL: 
    219                                 case PBSE_EXPIRED: 
    220                                         if ( session->pbs_conn >= 0 ) 
    221                                                 pbs_disconnect( session->pbs_conn ); 
    222                                         sleep(1); 
    223                                         session->pbs_conn = pbs_connect( session->super.contact ); 
    224                                         if( session->pbs_conn < 0 ) 
    225                                                 pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
    226                                         else  
    227                                          { 
    228                                                 fsd_log_error(("retry:")); 
    229                                                 goto retry; 
    230                                          } 
    231                                 default: 
    232                                         pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); 
    233                                         break; 
    234                                 case 0:  /* ? */ 
    235                                         fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR ); 
    236                                         break; 
     209                        if(pbsdrmaa_job_update_status_accounting(self) == false) 
     210                        { 
     211        #ifndef PBS_PROFESSIONAL 
     212                                fsd_log_error(("pbs_statjob error: %d, %s, %s", pbs_errno, pbse_to_txt(pbs_errno), pbs_strerror(pbs_errno))); 
     213        #else 
     214        #  ifndef PBS_PROFESSIONAL_NO_LOG 
     215                                fsd_log_error(("pbs_statjob error: %d, %s", pbs_errno, pbse_to_txt(pbs_errno))); 
     216        #  else 
     217                                fsd_log_error(("pbs_statjob error: %d", pbs_errno)); 
     218        #  endif 
     219        #endif 
     220 
     221                                /**/ 
     222 
     223                                switch( pbs_errno ) 
     224                                 { 
     225                                        case PBSE_UNKJOBID: 
     226                                                break; 
     227                                        case PBSE_PROTOCOL: 
     228                                        case PBSE_EXPIRED: 
     229                                                if ( session->pbs_conn >= 0 ) 
     230                                                        pbs_disconnect( session->pbs_conn ); 
     231                                                sleep(1); 
     232                                                session->pbs_conn = pbs_connect( session->super.contact ); 
     233                                                if( session->pbs_conn < 0 ) 
     234                                                        pbsdrmaa_exc_raise_pbs( "pbs_connect" ); 
     235                                                else  
     236                                                 { 
     237                                                        fsd_log_error(("retry:")); 
     238                                                        goto retry; 
     239                                                 } 
     240                                        default: 
     241                                                pbsdrmaa_exc_raise_pbs( "pbs_statjob" ); 
     242                                                break; 
     243                                        case 0:  /* ? */ 
     244                                                fsd_exc_raise_code( FSD_ERRNO_INTERNAL_ERROR ); 
     245                                                break; 
     246                                 } 
    237247                         } 
    238248                 } 
     
    497507                                drmaa_job_ps_to_str(self->state), self->exit_status ));  
    498508} 
     509 
     510bool 
     511pbsdrmaa_job_update_status_accounting( fsd_job_t *self ) 
     512{ 
     513        fsd_drmaa_session_t *session = self->session; 
     514        pbsdrmaa_log_reader_t *log_reader = NULL; 
     515        bool res = false; 
     516         
     517        fsd_log_enter(( "({job_id=%s})", self->job_id )); 
     518        fsd_log_info(( "Reading job %s info from accounting file", self->job_id )); 
     519         
     520        TRY 
     521        {        
     522                log_reader = pbsdrmaa_log_reader_accounting_new( session, self); 
     523                bool res = log_reader->read_log( log_reader );  
     524        } 
     525        FINALLY 
     526        { 
     527                pbsdrmaa_log_reader_destroy( log_reader ); 
     528        } 
     529        END_TRY 
     530 
     531        fsd_log_return(("")); 
     532        return res; 
     533} 
  • trunk/pbs_drmaa/log_reader.c

    r20 r21  
    4949#include <errno.h> 
    5050 
    51 static void 
     51static bool 
    5252pbsdrmaa_read_log(); 
    5353 
     
    6363static ssize_t 
    6464pbsdrmaa_read_line_job_on_missing ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
     65 
     66static void 
     67pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ); 
     68 
     69static ssize_t 
     70pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ); 
     71 
     72static bool  
     73pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self ); 
    6574 
    6675int  
     
    122131} 
    123132 
     133pbsdrmaa_log_reader_t *  
     134pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t *session, fsd_job_t *job ) 
     135{ 
     136        pbsdrmaa_log_reader_t *volatile self = NULL; 
     137 
     138        fsd_log_enter(("")); 
     139        TRY 
     140        { 
     141                fsd_malloc(self, pbsdrmaa_log_reader_t ); 
     142                 
     143                self->session = session; 
     144                 
     145                self->job = job; 
     146                self->name = "Accounting"; 
     147                self->select_file = pbsdrmaa_select_file_accounting; 
     148                self->read_line = pbsdrmaa_read_line_accounting; 
     149                                 
     150                self->read_log = pbsdrmaa_read_log_accounting;   
     151                 
     152                self->log_files = NULL; 
     153                self->log_files_number = 0; 
     154                 
     155                self->run_flag = true; 
     156                self->fd = -1; 
     157                self->date_changed = true; 
     158                self->first_open = true; 
     159                 
     160                self->log_file_initial_size = 0; 
     161                self->log_file_read_size = 0; 
     162        } 
     163        EXCEPT_DEFAULT 
     164        { 
     165                if( self != NULL) 
     166                        fsd_free(self); 
     167                         
     168                fsd_exc_reraise(); 
     169        } 
     170        END_TRY 
     171        fsd_log_return(("")); 
     172        return self; 
     173} 
     174 
    124175void 
    125176pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self ) 
     
    165216}; 
    166217 
     218enum field_msg_accounting 
     219{ 
     220        FLD_MSG_ACC_USER = 0, 
     221        FLD_MSG_ACC_GROUP = 1, 
     222        FLD_MSG_ACC_JOBNAME = 2, 
     223        FLD_MSG_ACC_QUEUE = 3, 
     224        FLD_MSG_ACC_CTIME = 4, 
     225        FLD_MSG_ACC_QTIME = 5, 
     226        FLD_MSG_ACC_ETIME = 6, 
     227        FLD_MSG_ACC_START = 7, 
     228        FLD_MSG_ACC_OWNER = 8, 
     229        FLD_MSG_ACC_EXEC_HOST = 9, 
     230        FLD_MSG_ACC_RES_NEEDNODES = 10, 
     231        FLD_MSG_ACC_RES_NODECT = 11, 
     232        FLD_MSG_ACC_RES_NODES = 12, 
     233        FLD_MSG_ACC_RES_WALLTIME = 13 
     234}; 
     235 
    167236#define FLD_MSG_STATUS "0010" 
    168237#define FLD_MSG_STATE "0008" 
    169238#define FLD_MSG_LOG "0002" 
    170239 
    171 void  
     240bool  
    172241pbsdrmaa_read_log( pbsdrmaa_log_reader_t * self ) 
    173242{ 
     
    541610         
    542611        fsd_log_return(("")); 
     612        return true; 
    543613} 
    544614 
     
    749819} 
    750820 
     821void 
     822pbsdrmaa_select_file_accounting ( pbsdrmaa_log_reader_t * self ) 
     823{ 
     824        pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*) self->session; 
     825                 
     826        char * log_path = NULL; 
     827 
     828        struct tm tm;  
     829                 
     830        fsd_log_enter(("")); 
     831                 
     832        time(&self->t);  
     833                         
     834        localtime_r(&self->t,&tm); 
     835                                 
     836        #define DRMAA_ACCOUNTING_MAX_TRIES (12) 
     837        /* generate new date, close file and open new */ 
     838        if((log_path = fsd_asprintf("%s/server_priv/accounting/%04d%02d%02d", 
     839                                pbssession->pbs_home,     
     840                                tm.tm_year + 1900, 
     841                                tm.tm_mon + 1, 
     842                                tm.tm_mday)) == NULL) { 
     843                fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"Read accounting file - Memory allocation wasn't possible"); 
     844        } 
     845 
     846        if(self->fd != -1) 
     847                close(self->fd); 
     848 
     849        fsd_log_debug(("Accounting Log file: %s",log_path));                             
     850 
     851        if((self->fd = open(log_path,O_RDONLY) ) == -1 ) 
     852        { 
     853                fsd_log_error(("Can't open accounting log file. Change directory chmod and verify pbs_home.")); 
     854        }  
     855 
     856        fsd_free(log_path); 
     857 
     858        fsd_log_debug(("Accounting Log file opened")); 
     859 
     860        fsd_log_return((""));    
     861} 
     862 
     863ssize_t 
     864pbsdrmaa_read_line_accounting ( pbsdrmaa_log_reader_t * self, char * line, char * buffer, ssize_t size, int * idx, int * end_idx, int * line_idx ) 
     865{ 
     866        return fsd_getline_buffered(line,buffer,size,self->fd,idx,end_idx,line_idx); 
     867} 
     868 
     869enum field_acc 
     870{  
     871        FLD_ACC_DATE = 0, 
     872        FLD_ACC_EVENT = 1, 
     873        FLD_ACC_ID = 2, 
     874        FLD_ACC_MSG = 3 
     875}; 
     876 
     877bool  
     878pbsdrmaa_read_log_accounting( pbsdrmaa_log_reader_t * self ) 
     879{ 
     880        pbsdrmaa_job_t *pbsjob = (pbsdrmaa_job_t*) self->job;    
     881        bool res = false; 
     882         
     883        fsd_job_t *volatile temp_job = NULL; 
     884                 
     885        fsd_log_enter(("")); 
     886        fsd_log_debug(("Accounting Log file opened")); 
     887        if(self->job == NULL) 
     888                fsd_mutex_lock( &self->session->mutex ); 
     889 
     890        TRY 
     891        {                
     892                TRY 
     893                { 
     894                        char line[4096] = ""; 
     895                        char buffer[4096] = ""; 
     896                        int idx = 0, end_idx = 0, line_idx = 0; 
     897                         
     898                        self->select_file(self); 
     899                         
     900                        if(self->fd != -1)                                       
     901                        while ((self->read_line(self, line,buffer, sizeof(line), &idx,&end_idx,&line_idx)) > 0)                          
     902                        { 
     903                                const char *volatile ptr = line; 
     904                                char field[256] = ""; 
     905                                int volatile field_n = 0; 
     906                                int n; 
     907                                 
     908                                bool volatile job_id_match = false;      
     909                         
     910                                bool volatile job_found = false; 
     911                                char *  temp_date = NULL; 
     912                                 
     913                                struct batch_status status; 
     914                                 
     915                                while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* split current line into fields */ 
     916                                { 
     917                                        status.next = NULL; 
     918                                        status.attribs = NULL; 
     919                                 
     920                                        if(field_n == FLD_ACC_DATE) 
     921                                        { 
     922                                                temp_date = fsd_strdup(field); 
     923                                        } 
     924                                        else if(field_n == FLD_ACC_EVENT) 
     925                                        { 
     926                                                         
     927                                        } 
     928                                        else if(field_n == FLD_ACC_ID) 
     929                                        {                                                        
     930                                                TRY 
     931                                                {                                                                
     932                                                                int diff = -1; 
     933                                                                diff = fsd_job_id_cmp(self->job->job_id,field); 
     934                                                                if( diff == 0) 
     935                                                                { 
     936                                                                        /* read this file to the place we started and exit*/ 
     937                                                                        fsd_log_debug(("Accounting found job: %s",self->job->job_id)); 
     938                                                                        job_found = true; 
     939                                                                        job_id_match = true;  
     940                                                                        status.name = fsd_strdup(self->job->job_id);                                                                     
     941                                                                }        
     942                                                } 
     943                                                END_TRY  
     944                                        } 
     945                                        else if(job_id_match && field_n == FLD_ACC_MSG) 
     946                                        {                                        
     947                                                struct attrl * struct_attrl = calloc(10,sizeof(struct attrl)); 
     948                                                                                              
     949                                                if(field[0] == 'q') 
     950                                                { 
     951                                                        status.attribs = &struct_attrl[0]; 
     952                                                        struct_attrl[0].name =  ATTR_queue; 
     953                                                        struct_attrl[0].value = fsd_strdup(strchr(field,'=')+1); 
     954                                                        struct_attrl[0].next = NULL; 
     955                                                } 
     956                                                else if(field[0] == 'u') 
     957                                                { 
     958                                                        /* rusage */ 
     959                                                        const char *ptr2 = field; 
     960                                                        char  msg[ 256 ] = ""; 
     961                                                        int n2 = 0; 
     962                                                        int msg_field_n = 0;     
     963                                 
     964                                                        status.attribs = &struct_attrl[0]; 
     965 
     966                                                        while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 
     967                                                         {                                               
     968                                                                switch(msg_field_n)  
     969                                                                { 
     970                                                                        case FLD_MSG_ACC_USER: 
     971                                                                                struct_attrl[msg_field_n].name = ATTR_euser;                                                                     
     972                                                                                break; 
     973 
     974                                                                        case FLD_MSG_ACC_GROUP: 
     975                                                                                struct_attrl[msg_field_n].name = ATTR_egroup; 
     976                                                                                break; 
     977 
     978                                                                        case FLD_MSG_ACC_JOBNAME: 
     979                                                                                struct_attrl[msg_field_n].name = ATTR_name; 
     980                                                                                break; 
     981 
     982                                                                        case FLD_MSG_ACC_QUEUE: 
     983                                                                                struct_attrl[msg_field_n].name = ATTR_queue; 
     984                                                                                break;  
     985 
     986                                                                        case FLD_MSG_ACC_CTIME: 
     987                                                                                struct_attrl[msg_field_n].name = ATTR_ctime; 
     988                                                                                break;  
     989                                                                                 
     990                                                                        case FLD_MSG_ACC_QTIME: 
     991                                                                                struct_attrl[msg_field_n].name = ATTR_qtime; 
     992                                                                                break;  
     993                                                                                 
     994                                                                        case FLD_MSG_ACC_ETIME: 
     995                                                                                struct_attrl[msg_field_n].name = ATTR_etime; 
     996                                                                                break;  
     997                                                                                 
     998                                                                        case FLD_MSG_ACC_START: 
     999                                                                                struct_attrl[msg_field_n].name = ATTR_start_time; 
     1000                                                                                break;  
     1001                                                                                 
     1002                                                                        case FLD_MSG_ACC_OWNER: 
     1003                                                                                struct_attrl[msg_field_n].name = ATTR_owner; 
     1004                                                                                break;  
     1005                                                                                 
     1006                                                                        case FLD_MSG_ACC_EXEC_HOST: 
     1007                                                                                struct_attrl[msg_field_n].name = ATTR_exechost; 
     1008                                                                                break;                                                                           
     1009                                                                } 
     1010                                                                 
     1011                                                                struct_attrl[msg_field_n].value  = fsd_strdup(strchr(msg,'=')+1); 
     1012                                                                if(msg_field_n!=9) 
     1013                                                                { 
     1014                                                                        struct_attrl[msg_field_n].next = &struct_attrl[msg_field_n+1]; 
     1015                                                                } 
     1016                                                                else 
     1017                                                                { 
     1018                                                                        struct_attrl[msg_field_n].next = NULL; 
     1019                                                                        break; 
     1020                                                                } 
     1021                                                               
     1022                                                                ptr2 += n2;  
     1023                                                                msg_field_n++; 
     1024                                                                if ( *ptr2 != ' ' ) 
     1025                                                                 { 
     1026                                                                         break;  
     1027                                                                 } 
     1028                                                                ++ptr2;                                          
     1029                                                         } 
     1030                                                }                                                
     1031                                                  
     1032                                            if( job_found && status.attribs != NULL)  
     1033                                                 { 
     1034                                                        fsd_log_debug(("Accounting file - updating job: %s", self->job->job_id ));                                       
     1035                                                        pbsjob->update( self->job, &status ); 
     1036                                                        res = true; 
     1037                                                 } 
     1038                                                 
     1039                                                if(self->job == NULL) 
     1040                                                { 
     1041                                                        fsd_cond_broadcast( &temp_job->status_cond); 
     1042                                                        fsd_cond_broadcast( &self->session->wait_condition ); 
     1043                                                } 
     1044                                                if ( temp_job ) 
     1045                                                        temp_job->release( temp_job ); 
     1046         
     1047                                                int i = 0; 
     1048                                                for(i = 0; i < 10; i++) 
     1049                                                { 
     1050                                                        fsd_free(struct_attrl[i].value); 
     1051                                                } 
     1052                                                fsd_free(struct_attrl); 
     1053                                                fsd_free(status.name); 
     1054                                        } 
     1055                                         
     1056                                         
     1057                                        ptr += n;  
     1058                                        if ( *ptr != ';' ) 
     1059                                        { 
     1060                                                break; /* end of line */ 
     1061                                        } 
     1062                                        field_n++; 
     1063                                        ++ptr; 
     1064                                }                
     1065 
     1066                                fsd_free(temp_date);                     
     1067                        } /* end of while getline loop */        
     1068                         
     1069                }                
     1070                EXCEPT_DEFAULT 
     1071                 { 
     1072                        const fsd_exc_t *e = fsd_exc_get(); 
     1073                        /* Its better to exit and communicate error rather then let the application to hang */ 
     1074                        fsd_log_fatal(( "Exception in reading accounting file %s: <%d:%s>. Exiting !!!", self->name, e->code(e), e->message(e) )); 
     1075                        exit(1); 
     1076                 } 
     1077                END_TRY 
     1078 
     1079                if(self->fd != -1) 
     1080                        close(self->fd); 
     1081                fsd_log_debug(("%s - Accounting log file closed",self->name));   
     1082        } 
     1083        FINALLY 
     1084        { 
     1085                fsd_log_debug(("%s - Terminated.",self->name));  
     1086                if(self->job == NULL) 
     1087                        fsd_mutex_unlock( &self->session->mutex ); /**/ 
     1088        } 
     1089        END_TRY 
     1090         
     1091        fsd_log_return(("")); 
     1092        return res; 
     1093} 
     1094 
    7511095int  
    7521096fsd_job_id_cmp(const char *s1, const char *s2) /* maybe move to drmaa_utils? */ 
  • trunk/pbs_drmaa/log_reader.h

    r12 r21  
    3333pbsdrmaa_log_reader_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 
    3434 
     35pbsdrmaa_log_reader_t *  
     36pbsdrmaa_log_reader_accounting_new ( fsd_drmaa_session_t * session, fsd_job_t * job ); 
     37 
    3538void 
    3639pbsdrmaa_log_reader_destroy ( pbsdrmaa_log_reader_t * self ); 
     
    4043        fsd_job_t *volatile job; 
    4144         
    42         void (* 
     45        bool (* 
    4346        read_log) ( pbsdrmaa_log_reader_t * self ); 
    4447         
Note: See TracChangeset for help on using the changeset viewer.