Changeset 7 for trunk/pbs_drmaa/session.c
- Timestamp:
- 03/02/11 17:42:40 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/session.c
r3 r7 41 41 42 42 #include <pbs_drmaa/job.h> 43 #include <pbs_drmaa/log_reader.h> 43 44 #include <pbs_drmaa/session.h> 44 45 #include <pbs_drmaa/submit.h> … … 218 219 struct stat statbuf; 219 220 char * volatile log_path; 220 time_t t;221 221 struct tm tm; 222 222 223 pbsself->pbs_home = pbs_home->val.string; 223 224 fsd_log_debug(("pbs_home: %s",pbsself->pbs_home)); … … 226 227 pbsself->wait_thread_log = true; 227 228 228 time(& t);229 localtime_r(& t,&pbsself->log_file_initial_time);229 time(&pbsself->log_file_initial_time); 230 localtime_r(&pbsself->log_file_initial_time,&tm); 230 231 231 232 if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 232 233 pbsself->pbs_home, 233 pbsself->log_file_initial_time.tm_year + 1900,234 pbsself->log_file_initial_time.tm_mon + 1,235 pbsself->log_file_initial_time.tm_mday)) == NULL) {234 tm.tm_year + 1900, 235 tm.tm_mon + 1, 236 tm.tm_mday)) == NULL) { 236 237 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 237 238 } … … 516 517 } 517 518 518 519 enum field520 {521 FLD_DATE = 0,522 FLD_EVENT = 1,523 FLD_OBJ = 2,524 FLD_TYPE = 3,525 FLD_ID = 4,526 FLD_MSG = 5527 };528 529 enum field_msg530 {531 FLD_MSG_EXIT_STATUS = 0,532 FLD_MSG_CPUT = 1,533 FLD_MSG_MEM = 2,534 FLD_MSG_VMEM = 3,535 FLD_MSG_WALLTIME = 4536 };537 538 #define FLD_MSG_STATUS "0010"539 #define FLD_MSG_STATE "0008"540 #define FLD_MSG_LOG "0002"541 542 ssize_t fsd_getline(char * line,ssize_t size, int fd)543 {544 char buf;545 char * ptr = NULL;546 ssize_t n = 0, rc;547 ptr = line;548 for(n = 1; n< size; n++)549 {550 if( (rc = read(fd,&buf,1 )) == 1) {551 *ptr++ = buf;552 if(buf == '\n')553 {554 break;555 }556 }557 else if (rc == 0) {558 if (n == 1)559 return 0;560 else561 break;562 }563 else564 return -1;565 }566 567 return n;568 }569 570 519 void * 571 520 pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ) 572 521 { 573 pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*) self; 574 fsd_job_t *volatile job = NULL; 575 pbsdrmaa_job_t *volatile pbsjob = NULL; 576 char job_id[256] = ""; 577 char event[256] = ""; 578 time_t t; 579 struct tm tm; 580 581 tm = pbsself->log_file_initial_time; 582 522 pbsdrmaa_log_reader_t *log_reader = NULL; 523 583 524 fsd_log_enter(( "" )); 584 fsd_mutex_lock( &self->mutex );525 585 526 TRY 586 { 587 char * volatile log_path = NULL; 588 char buffer[4096] = ""; 589 bool volatile date_changed = true; 590 int volatile fd = -1; 591 bool first_open = true; 592 593 fsd_log_debug(("WT - reading log files")); 594 595 while( self->wait_thread_run_flag ) 596 TRY 597 { 598 if(date_changed) 599 { 600 int num_tries = 0; 601 602 time(&t); 603 localtime_r(&t,&tm); 604 605 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 606 /* generate new date, close file and open new */ 607 if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 608 pbsself->pbs_home, 609 tm.tm_year + 1900, 610 tm.tm_mon + 1, 611 tm.tm_mday)) == NULL) { 612 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 613 } 614 615 if(fd != -1) 616 close(fd); 617 618 fsd_log_debug(("Log file: %s",log_path)); 619 620 retry: 621 if((fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES ) 622 { 623 fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread.")); 624 fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly")); 625 /*pbsself->super.enable_wait_thread = false;*/ /* run not wait_thread */ 626 pbsself->wait_thread_log = false; 627 pbsself->super.wait_thread = pbsself->super_wait_thread; 628 pbsself->super.wait_thread(self); 629 } else if ( fd == -1 ) { 630 fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 631 num_tries++; 632 sleep(5); 633 goto retry; 634 } 635 636 fsd_free(log_path); 637 638 fsd_log_debug(("Log file opened")); 639 640 if(first_open) { 641 fsd_log_debug(("Log file lseek")); 642 if(lseek(fd,pbsself->log_file_initial_size,SEEK_SET) == (off_t) -1) { 643 char errbuf[256] = "InternalError"; 644 (void)strerror_r(errno, errbuf, 256); 645 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf); 646 } 647 first_open = false; 648 } 649 650 date_changed = false; 651 } 652 653 while ((fsd_getline(buffer,sizeof(buffer),fd)) > 0) 654 { 655 const char *volatile ptr = buffer; 656 char field[256] = ""; 657 int volatile field_n = 0; 658 int n; 659 660 bool volatile job_id_match = false; 661 bool volatile event_match = false; 662 bool volatile log_event = false; 663 bool volatile log_match = false; 664 char * temp_date = NULL; 665 666 667 struct batch_status status; 668 status.next = NULL; 669 670 if( strlcpy(job_id,"",sizeof(job_id)) > sizeof(job_id) ) { 671 fsd_log_error(("WT - strlcpy error")); 672 } 673 if( strlcpy(event,"",sizeof(event)) > sizeof(event) ) { 674 fsd_log_error(("WT - strlcpy error")); 675 } 676 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */ 677 { 678 if(field_n == FLD_DATE) 679 { 680 temp_date = fsd_strdup(field); 681 } 682 else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || 683 strcmp(field,FLD_MSG_STATE) == 0 )) 684 { 685 /* event described by log line*/ 686 if(strlcpy(event, field,sizeof(event)) > sizeof(event)) { 687 fsd_log_error(("WT - strlcpy error")); 688 } 689 event_match = true; 690 } 691 else if(event_match && field_n == FLD_ID) 692 { 693 TRY 694 { 695 job = self->get_job( self, field ); 696 pbsjob = (pbsdrmaa_job_t*) job; 697 698 if( job ) 699 { 700 if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) { 701 fsd_log_error(("WT - strlcpy error")); 702 } 703 fsd_log_debug(("WT - job_id: %s",job_id)); 704 status.name = fsd_strdup(job_id); 705 job_id_match = true; /* job_id is in drmaa */ 706 } 707 else 708 { 709 fsd_log_debug(("WT - Unknown job: %s", field)); 710 } 711 } 712 END_TRY 713 } 714 else if(job_id_match && field_n == FLD_MSG) 715 { 716 /* parse msg - depends on FLD_EVENT*/ 717 struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem, 718 struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name; 719 720 bool state_running = false; 721 722 struct_status.name = NULL; 723 struct_status.value = NULL; 724 struct_status.next = NULL; 725 struct_status.resource = NULL; 726 727 struct_state.name = NULL; 728 struct_state.value = NULL; 729 struct_state.next = NULL; 730 struct_state.resource = NULL; 731 732 struct_resource_cput.name = NULL; 733 struct_resource_cput.value = NULL; 734 struct_resource_cput.next = NULL; 735 struct_resource_cput.resource = NULL; 736 737 struct_resource_mem.name = NULL; 738 struct_resource_mem.value = NULL; 739 struct_resource_mem.next = NULL; 740 struct_resource_mem.resource = NULL; 741 742 struct_resource_vmem.name = NULL; 743 struct_resource_vmem.value = NULL; 744 struct_resource_vmem.next = NULL; 745 struct_resource_vmem.resource = NULL; 746 747 struct_resource_walltime.name = NULL; 748 struct_resource_walltime.value = NULL; 749 struct_resource_walltime.next = NULL; 750 struct_resource_walltime.resource = NULL; 751 752 struct_start_time.name = NULL; 753 struct_start_time.value = NULL; 754 struct_start_time.next = NULL; 755 struct_start_time.resource = NULL; 756 757 struct_mtime.name = NULL; 758 struct_mtime.value = NULL; 759 struct_mtime.next = NULL; 760 struct_mtime.resource = NULL; 761 762 struct_queue.name = NULL; 763 struct_queue.value = NULL; 764 struct_queue.next = NULL; 765 struct_queue.resource = NULL; 766 767 struct_account_name.name = NULL; 768 struct_account_name.value = NULL; 769 struct_account_name.next = NULL; 770 struct_account_name.resource = NULL; 771 772 773 if (strcmp(event,FLD_MSG_STATE) == 0) 774 { 775 /* job run, modified, queued etc */ 776 int n = 0; 777 status.attribs = &struct_state; 778 struct_state.next = NULL; 779 struct_state.name = "job_state"; 780 if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/ 781 { 782 n = 4; 783 } 784 if(field[4] == 'M') { 785 struct tm temp_time_tm; 786 memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 787 temp_time_tm.tm_isdst = -1; 788 789 if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) 790 { 791 fsd_log_error(("failed to parse mtime: %s", temp_date)); 792 } 793 else 794 { 795 time_t temp_time = mktime(&temp_time_tm); 796 status.attribs = &struct_mtime; 797 struct_mtime.name = "mtime"; 798 struct_mtime.next = NULL; 799 struct_mtime.value = fsd_asprintf("%lu",temp_time); 800 } 801 } 802 /* != Job deleted and Job to be deleted*/ 803 #ifdef PBS_PROFESSIONAL 804 else if (field[4] != 't' && field[10] != 'd') { 805 #else 806 else if(field[4] != 'd') { 807 #endif 808 809 if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */ 810 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 811 } 812 if(struct_state.value[0] == 'R'){ 813 state_running = true; 814 } 815 } 816 else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/ 817 struct_status.name = "exit_status"; 818 struct_status.value = fsd_strdup("-1"); 819 struct_status.next = NULL; 820 struct_state.next = &struct_status; 821 struct_state.value = fsd_strdup("C"); 822 } 823 } 824 else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/ 825 { 826 /* exit status and rusage */ 827 const char *ptr2 = field; 828 char msg[ 256 ] = ""; 829 int n2; 830 int msg_field_n = 0; 831 832 struct_resource_cput.name = "resources_used"; 833 struct_resource_mem.name = "resources_used"; 834 struct_resource_vmem.name = "resources_used"; 835 struct_resource_walltime.name = "resources_used"; 836 struct_status.name = "exit_status"; 837 struct_state.name = "job_state"; 838 839 status.attribs = &struct_resource_cput; 840 struct_resource_cput.next = &struct_resource_mem; 841 struct_resource_mem.next = &struct_resource_vmem; 842 struct_resource_vmem.next = &struct_resource_walltime; 843 struct_resource_walltime.next = &struct_status; 844 struct_status.next = &struct_state; 845 struct_state.next = NULL; 846 847 while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 848 { 849 switch(msg_field_n) 850 { 851 case FLD_MSG_EXIT_STATUS: 852 struct_status.value = fsd_strdup(strchr(msg,'=')+1); 853 break; 854 855 case FLD_MSG_CPUT: 856 struct_resource_cput.resource = "cput"; 857 struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1); 858 break; 859 860 case FLD_MSG_MEM: 861 struct_resource_mem.resource = "mem"; 862 struct_resource_mem.value = fsd_strdup(strchr(msg,'=')+1); 863 break; 864 865 case FLD_MSG_VMEM: 866 struct_resource_vmem.resource = "vmem"; 867 struct_resource_vmem.value = fsd_strdup(strchr(msg,'=')+1); 868 break; 869 870 case FLD_MSG_WALLTIME: 871 struct_resource_walltime.resource = "walltime"; 872 struct_resource_walltime.value = fsd_strdup(strchr(msg,'=')+1); 873 break; 874 } 875 876 ptr2 += n2; 877 msg_field_n++; 878 if ( *ptr2 != ' ' ) 879 { 880 break; 881 } 882 ++ptr2; 883 } 884 struct_state.value = fsd_strdup("C"); /* we got exit_status so we say that it has completed */ 885 } 886 887 if ( state_running ) 888 { 889 fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); 890 job->update_status( job ); 891 } 892 else 893 { 894 fsd_log_debug(("WT - updating job: %s", job->job_id )); 895 pbsjob->update( job, &status ); 896 } 897 898 899 fsd_cond_broadcast( &job->status_cond); 900 fsd_cond_broadcast( &self->wait_condition ); 901 902 if ( job ) 903 job->release( job ); 527 { 528 log_reader = pbsdrmaa_log_reader_new( self, NULL); 529 log_reader->read_log( log_reader ); 530 } 531 FINALLY 532 { 533 pbsdrmaa_log_reader_destroy( log_reader ); 534 } 535 END_TRY 904 536 905 fsd_free(struct_resource_cput.value);906 fsd_free(struct_resource_mem.value);907 fsd_free(struct_resource_vmem.value);908 fsd_free(struct_resource_walltime.value);909 fsd_free(struct_status.value);910 fsd_free(struct_state.value);911 fsd_free(struct_start_time.value);912 fsd_free(struct_mtime.value);913 fsd_free(struct_queue.value);914 fsd_free(struct_account_name.value);915 916 if ( status.name!=NULL )917 fsd_free(status.name);918 }919 else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)920 {921 log_event = true;922 }923 else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )924 {925 log_match = true;926 log_event = false;927 }928 else if( log_match && field_n == FLD_MSG &&929 field[0] == 'L' &&930 field[1] == 'o' &&931 field[2] == 'g' &&932 field[3] == ' ' &&933 field[4] == 'c' &&934 field[5] == 'l' &&935 field[6] == 'o' &&936 field[7] == 's' &&937 field[8] == 'e' &&938 field[9] == 'd' ) /* last field in the file - strange bahaviour*/939 {940 fsd_log_debug(("WT - Date changed. Closing log file"));941 date_changed = true;942 log_match = false;943 }944 945 ptr += n;946 if ( *ptr != ';' )947 {948 break; /* end of line */949 }950 field_n++;951 ++ptr;952 }953 954 if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) {955 fsd_log_error(("WT - strlcpy error"));956 }957 958 fsd_free(temp_date);959 }960 961 fsd_mutex_unlock( &self->mutex );962 usleep(1000000);963 fsd_mutex_lock( &self->mutex );964 }965 EXCEPT_DEFAULT966 {967 const fsd_exc_t *e = fsd_exc_get();968 969 fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));970 fsd_exc_reraise();971 }972 END_TRY973 974 if(fd != -1)975 close(fd);976 fsd_log_debug(("Log file closed"));977 }978 FINALLY979 {980 fsd_log_debug(("WT - Terminated."));981 fsd_mutex_unlock( &self->mutex );982 }983 END_TRY984 985 537 fsd_log_return(( " =NULL" )); 986 538 return NULL;
Note: See TracChangeset
for help on using the changeset viewer.