Changeset 7
- Timestamp:
- 03/02/11 17:42:40 (14 years ago)
- Location:
- trunk/pbs_drmaa
- Files:
-
- 2 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/pbs_drmaa/Makefile.am
r1 r7 30 30 session.c session.h \ 31 31 submit.c submit.h \ 32 util.c util.h 32 util.c util.h \ 33 log_reader.c log_reader.h 33 34 BUILT_SOURCES = pbs_attrib.c 34 35 EXTRA_DIST = pbs_attrib.c -
trunk/pbs_drmaa/job.c
r3 r7 33 33 34 34 #include <pbs_drmaa/job.h> 35 #include <pbs_drmaa/log_reader.h> 35 36 #include <pbs_drmaa/pbs_attrib.h> 36 37 #include <pbs_drmaa/session.h> … … 54 55 static void 55 56 pbsdrmaa_job_on_missing( fsd_job_t *self ); 57 58 void 59 pbsdrmaa_job_on_missing_standard( fsd_job_t *self ); 60 61 void 62 pbsdrmaa_job_on_missing_log_based( fsd_job_t *self ); 56 63 57 64 static void … … 227 234 ((pbsdrmaa_job_t*)self)->update( self, status ); 228 235 } 229 else 236 else if( self->state < DRMAA_PS_DONE ) 230 237 self->on_missing( self ); 231 238 } … … 398 405 } 399 406 400 401 407 void 402 408 pbsdrmaa_job_on_missing( fsd_job_t *self ) 403 409 { 410 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*)self->session; 411 412 if( pbssession->pbs_home == NULL ) 413 pbsdrmaa_job_on_missing_standard( self ); 414 else 415 pbsdrmaa_job_on_missing_log_based( self ); 416 } 417 418 void 419 pbsdrmaa_job_on_missing_standard( fsd_job_t *self ) 420 { 404 421 fsd_drmaa_session_t *session = self->session; 405 pbsdrmaa_session_t *pbssession = (pbsdrmaa_session_t*)session; 406 if( !pbssession->wait_thread_log){ 422 407 423 unsigned missing_mask = 0; 408 424 … … 411 427 412 428 switch( session->missing_jobs ) 413 429 { 414 430 case FSD_REVEAL_MISSING_JOBS: missing_mask = 0; break; 415 431 case FSD_IGNORE_MISSING_JOBS: missing_mask = 0x73; break; 416 432 case FSD_IGNORE_QUEUED_MISSING_JOBS: missing_mask = 0x13; break; 417 433 } 418 434 fsd_log_debug(( "last job_ps: %s (0x%02x); mask: 0x%02x", 419 435 drmaa_job_ps_to_str(self->state), self->state, missing_mask )); … … 427 443 428 444 if( (self->flags & FSD_JOB_TERMINATED_MASK) == 0 ) 429 445 { 430 446 self->flags &= FSD_JOB_TERMINATED_MASK; 431 447 self->flags |= FSD_JOB_TERMINATED; 432 448 } 433 449 434 450 if( (self->flags & FSD_JOB_ABORTED) == 0 435 451 && session->missing_jobs == FSD_IGNORE_MISSING_JOBS ) 436 452 { /* assume everthing was ok */ 437 453 self->state = DRMAA_PS_DONE; 438 454 self->exit_status = 0; 439 455 } 440 456 else 441 457 { /* job aborted */ 442 458 self->state = DRMAA_PS_FAILED; 443 459 self->exit_status = -1; 444 460 } 445 461 446 462 fsd_log_return(( "; job_ps=%s, exit_status=%d", 447 463 drmaa_job_ps_to_str(self->state), self->exit_status )); 448 } 449 } 450 464 } 465 466 void 467 pbsdrmaa_job_on_missing_log_based( fsd_job_t *self ) 468 { 469 fsd_drmaa_session_t *session = self->session; 470 pbsdrmaa_log_reader_t *log_reader = NULL; 471 472 fsd_log_enter(( "({job_id=%s})", self->job_id )); 473 fsd_log_warning(( "self %s missing from DRM queue", self->job_id )); 474 475 TRY 476 { 477 log_reader = pbsdrmaa_log_reader_new( session, self); 478 log_reader->read_log( log_reader ); 479 } 480 FINALLY 481 { 482 pbsdrmaa_log_reader_destroy( log_reader ); 483 } 484 END_TRY 485 486 fsd_log_return(( "; job_ps=%s, exit_status=%d", 487 drmaa_job_ps_to_str(self->state), self->exit_status )); 488 } -
trunk/pbs_drmaa/session.c
r3 r7 41 41 42 42 #include <pbs_drmaa/job.h> 43 #include <pbs_drmaa/log_reader.h> 43 44 #include <pbs_drmaa/session.h> 44 45 #include <pbs_drmaa/submit.h> … … 218 219 struct stat statbuf; 219 220 char * volatile log_path; 220 time_t t;221 221 struct tm tm; 222 222 223 pbsself->pbs_home = pbs_home->val.string; 223 224 fsd_log_debug(("pbs_home: %s",pbsself->pbs_home)); … … 226 227 pbsself->wait_thread_log = true; 227 228 228 time(& t);229 localtime_r(& t,&pbsself->log_file_initial_time);229 time(&pbsself->log_file_initial_time); 230 localtime_r(&pbsself->log_file_initial_time,&tm); 230 231 231 232 if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 232 233 pbsself->pbs_home, 233 pbsself->log_file_initial_time.tm_year + 1900,234 pbsself->log_file_initial_time.tm_mon + 1,235 pbsself->log_file_initial_time.tm_mday)) == NULL) {234 tm.tm_year + 1900, 235 tm.tm_mon + 1, 236 tm.tm_mday)) == NULL) { 236 237 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 237 238 } … … 516 517 } 517 518 518 519 enum field520 {521 FLD_DATE = 0,522 FLD_EVENT = 1,523 FLD_OBJ = 2,524 FLD_TYPE = 3,525 FLD_ID = 4,526 FLD_MSG = 5527 };528 529 enum field_msg530 {531 FLD_MSG_EXIT_STATUS = 0,532 FLD_MSG_CPUT = 1,533 FLD_MSG_MEM = 2,534 FLD_MSG_VMEM = 3,535 FLD_MSG_WALLTIME = 4536 };537 538 #define FLD_MSG_STATUS "0010"539 #define FLD_MSG_STATE "0008"540 #define FLD_MSG_LOG "0002"541 542 ssize_t fsd_getline(char * line,ssize_t size, int fd)543 {544 char buf;545 char * ptr = NULL;546 ssize_t n = 0, rc;547 ptr = line;548 for(n = 1; n< size; n++)549 {550 if( (rc = read(fd,&buf,1 )) == 1) {551 *ptr++ = buf;552 if(buf == '\n')553 {554 break;555 }556 }557 else if (rc == 0) {558 if (n == 1)559 return 0;560 else561 break;562 }563 else564 return -1;565 }566 567 return n;568 }569 570 519 void * 571 520 pbsdrmaa_session_wait_thread( fsd_drmaa_session_t *self ) 572 521 { 573 pbsdrmaa_session_t *pbsself = (pbsdrmaa_session_t*) self; 574 fsd_job_t *volatile job = NULL; 575 pbsdrmaa_job_t *volatile pbsjob = NULL; 576 char job_id[256] = ""; 577 char event[256] = ""; 578 time_t t; 579 struct tm tm; 580 581 tm = pbsself->log_file_initial_time; 582 522 pbsdrmaa_log_reader_t *log_reader = NULL; 523 583 524 fsd_log_enter(( "" )); 584 fsd_mutex_lock( &self->mutex );525 585 526 TRY 586 { 587 char * volatile log_path = NULL; 588 char buffer[4096] = ""; 589 bool volatile date_changed = true; 590 int volatile fd = -1; 591 bool first_open = true; 592 593 fsd_log_debug(("WT - reading log files")); 594 595 while( self->wait_thread_run_flag ) 596 TRY 597 { 598 if(date_changed) 599 { 600 int num_tries = 0; 601 602 time(&t); 603 localtime_r(&t,&tm); 604 605 #define DRMAA_WAIT_THREAD_MAX_TRIES (12) 606 /* generate new date, close file and open new */ 607 if((log_path = fsd_asprintf("%s/server_logs/%04d%02d%02d", 608 pbsself->pbs_home, 609 tm.tm_year + 1900, 610 tm.tm_mon + 1, 611 tm.tm_mday)) == NULL) { 612 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 613 } 614 615 if(fd != -1) 616 close(fd); 617 618 fsd_log_debug(("Log file: %s",log_path)); 619 620 retry: 621 if((fd = open(log_path,O_RDONLY) ) == -1 && num_tries > DRMAA_WAIT_THREAD_MAX_TRIES ) 622 { 623 fsd_log_error(("Can't open log file. Verify pbs_home. Running standard wait_thread.")); 624 fsd_log_error(("Remember that without keep_completed set standard wait_thread won't run correctly")); 625 /*pbsself->super.enable_wait_thread = false;*/ /* run not wait_thread */ 626 pbsself->wait_thread_log = false; 627 pbsself->super.wait_thread = pbsself->super_wait_thread; 628 pbsself->super.wait_thread(self); 629 } else if ( fd == -1 ) { 630 fsd_log_warning(("Can't open log file: %s. Retries count: %d", log_path, num_tries)); 631 num_tries++; 632 sleep(5); 633 goto retry; 634 } 635 636 fsd_free(log_path); 637 638 fsd_log_debug(("Log file opened")); 639 640 if(first_open) { 641 fsd_log_debug(("Log file lseek")); 642 if(lseek(fd,pbsself->log_file_initial_size,SEEK_SET) == (off_t) -1) { 643 char errbuf[256] = "InternalError"; 644 (void)strerror_r(errno, errbuf, 256); 645 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"lseek error: %s",errbuf); 646 } 647 first_open = false; 648 } 649 650 date_changed = false; 651 } 652 653 while ((fsd_getline(buffer,sizeof(buffer),fd)) > 0) 654 { 655 const char *volatile ptr = buffer; 656 char field[256] = ""; 657 int volatile field_n = 0; 658 int n; 659 660 bool volatile job_id_match = false; 661 bool volatile event_match = false; 662 bool volatile log_event = false; 663 bool volatile log_match = false; 664 char * temp_date = NULL; 665 666 667 struct batch_status status; 668 status.next = NULL; 669 670 if( strlcpy(job_id,"",sizeof(job_id)) > sizeof(job_id) ) { 671 fsd_log_error(("WT - strlcpy error")); 672 } 673 if( strlcpy(event,"",sizeof(event)) > sizeof(event) ) { 674 fsd_log_error(("WT - strlcpy error")); 675 } 676 while ( sscanf(ptr, "%255[^;]%n", field, &n) == 1 ) /* divide current line into fields */ 677 { 678 if(field_n == FLD_DATE) 679 { 680 temp_date = fsd_strdup(field); 681 } 682 else if(field_n == FLD_EVENT && (strcmp(field,FLD_MSG_STATUS) == 0 || 683 strcmp(field,FLD_MSG_STATE) == 0 )) 684 { 685 /* event described by log line*/ 686 if(strlcpy(event, field,sizeof(event)) > sizeof(event)) { 687 fsd_log_error(("WT - strlcpy error")); 688 } 689 event_match = true; 690 } 691 else if(event_match && field_n == FLD_ID) 692 { 693 TRY 694 { 695 job = self->get_job( self, field ); 696 pbsjob = (pbsdrmaa_job_t*) job; 697 698 if( job ) 699 { 700 if(strlcpy(job_id, field,sizeof(job_id)) > sizeof(job_id)) { 701 fsd_log_error(("WT - strlcpy error")); 702 } 703 fsd_log_debug(("WT - job_id: %s",job_id)); 704 status.name = fsd_strdup(job_id); 705 job_id_match = true; /* job_id is in drmaa */ 706 } 707 else 708 { 709 fsd_log_debug(("WT - Unknown job: %s", field)); 710 } 711 } 712 END_TRY 713 } 714 else if(job_id_match && field_n == FLD_MSG) 715 { 716 /* parse msg - depends on FLD_EVENT*/ 717 struct attrl struct_resource_cput,struct_resource_mem,struct_resource_vmem, 718 struct_resource_walltime, struct_status, struct_state, struct_start_time,struct_mtime, struct_queue, struct_account_name; 719 720 bool state_running = false; 721 722 struct_status.name = NULL; 723 struct_status.value = NULL; 724 struct_status.next = NULL; 725 struct_status.resource = NULL; 726 727 struct_state.name = NULL; 728 struct_state.value = NULL; 729 struct_state.next = NULL; 730 struct_state.resource = NULL; 731 732 struct_resource_cput.name = NULL; 733 struct_resource_cput.value = NULL; 734 struct_resource_cput.next = NULL; 735 struct_resource_cput.resource = NULL; 736 737 struct_resource_mem.name = NULL; 738 struct_resource_mem.value = NULL; 739 struct_resource_mem.next = NULL; 740 struct_resource_mem.resource = NULL; 741 742 struct_resource_vmem.name = NULL; 743 struct_resource_vmem.value = NULL; 744 struct_resource_vmem.next = NULL; 745 struct_resource_vmem.resource = NULL; 746 747 struct_resource_walltime.name = NULL; 748 struct_resource_walltime.value = NULL; 749 struct_resource_walltime.next = NULL; 750 struct_resource_walltime.resource = NULL; 751 752 struct_start_time.name = NULL; 753 struct_start_time.value = NULL; 754 struct_start_time.next = NULL; 755 struct_start_time.resource = NULL; 756 757 struct_mtime.name = NULL; 758 struct_mtime.value = NULL; 759 struct_mtime.next = NULL; 760 struct_mtime.resource = NULL; 761 762 struct_queue.name = NULL; 763 struct_queue.value = NULL; 764 struct_queue.next = NULL; 765 struct_queue.resource = NULL; 766 767 struct_account_name.name = NULL; 768 struct_account_name.value = NULL; 769 struct_account_name.next = NULL; 770 struct_account_name.resource = NULL; 771 772 773 if (strcmp(event,FLD_MSG_STATE) == 0) 774 { 775 /* job run, modified, queued etc */ 776 int n = 0; 777 status.attribs = &struct_state; 778 struct_state.next = NULL; 779 struct_state.name = "job_state"; 780 if(field[0] == 'J') /* Job Queued, Job Modified, Job Run*/ 781 { 782 n = 4; 783 } 784 if(field[4] == 'M') { 785 struct tm temp_time_tm; 786 memset(&temp_time_tm, 0, sizeof(temp_time_tm)); 787 temp_time_tm.tm_isdst = -1; 788 789 if (strptime(temp_date, "%m/%d/%Y %H:%M:%S", &temp_time_tm) == NULL) 790 { 791 fsd_log_error(("failed to parse mtime: %s", temp_date)); 792 } 793 else 794 { 795 time_t temp_time = mktime(&temp_time_tm); 796 status.attribs = &struct_mtime; 797 struct_mtime.name = "mtime"; 798 struct_mtime.next = NULL; 799 struct_mtime.value = fsd_asprintf("%lu",temp_time); 800 } 801 } 802 /* != Job deleted and Job to be deleted*/ 803 #ifdef PBS_PROFESSIONAL 804 else if (field[4] != 't' && field[10] != 'd') { 805 #else 806 else if(field[4] != 'd') { 807 #endif 808 809 if ((struct_state.value = fsd_asprintf("%c",field[n]) ) == NULL ) { /* 4 first letter of state */ 810 fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"WT - Memory allocation wasn't possible"); 811 } 812 if(struct_state.value[0] == 'R'){ 813 state_running = true; 814 } 815 } 816 else { /* job terminated - pbs drmaa detects failed as completed with exit_status !=0, aborted with status -1*/ 817 struct_status.name = "exit_status"; 818 struct_status.value = fsd_strdup("-1"); 819 struct_status.next = NULL; 820 struct_state.next = &struct_status; 821 struct_state.value = fsd_strdup("C"); 822 } 823 } 824 else /*if (strcmp(event,FLD_MSG_STATUS) == 0 )*/ 825 { 826 /* exit status and rusage */ 827 const char *ptr2 = field; 828 char msg[ 256 ] = ""; 829 int n2; 830 int msg_field_n = 0; 831 832 struct_resource_cput.name = "resources_used"; 833 struct_resource_mem.name = "resources_used"; 834 struct_resource_vmem.name = "resources_used"; 835 struct_resource_walltime.name = "resources_used"; 836 struct_status.name = "exit_status"; 837 struct_state.name = "job_state"; 838 839 status.attribs = &struct_resource_cput; 840 struct_resource_cput.next = &struct_resource_mem; 841 struct_resource_mem.next = &struct_resource_vmem; 842 struct_resource_vmem.next = &struct_resource_walltime; 843 struct_resource_walltime.next = &struct_status; 844 struct_status.next = &struct_state; 845 struct_state.next = NULL; 846 847 while ( sscanf(ptr2, "%255[^ ]%n", msg, &n2) == 1 ) 848 { 849 switch(msg_field_n) 850 { 851 case FLD_MSG_EXIT_STATUS: 852 struct_status.value = fsd_strdup(strchr(msg,'=')+1); 853 break; 854 855 case FLD_MSG_CPUT: 856 struct_resource_cput.resource = "cput"; 857 struct_resource_cput.value = fsd_strdup(strchr(msg,'=')+1); 858 break; 859 860 case FLD_MSG_MEM: 861 struct_resource_mem.resource = "mem"; 862 struct_resource_mem.value = fsd_strdup(strchr(msg,'=')+1); 863 break; 864 865 case FLD_MSG_VMEM: 866 struct_resource_vmem.resource = "vmem"; 867 struct_resource_vmem.value = fsd_strdup(strchr(msg,'=')+1); 868 break; 869 870 case FLD_MSG_WALLTIME: 871 struct_resource_walltime.resource = "walltime"; 872 struct_resource_walltime.value = fsd_strdup(strchr(msg,'=')+1); 873 break; 874 } 875 876 ptr2 += n2; 877 msg_field_n++; 878 if ( *ptr2 != ' ' ) 879 { 880 break; 881 } 882 ++ptr2; 883 } 884 struct_state.value = fsd_strdup("C"); /* we got exit_status so we say that it has completed */ 885 } 886 887 if ( state_running ) 888 { 889 fsd_log_debug(("WT - forcing update of job: %s", job->job_id )); 890 job->update_status( job ); 891 } 892 else 893 { 894 fsd_log_debug(("WT - updating job: %s", job->job_id )); 895 pbsjob->update( job, &status ); 896 } 897 898 899 fsd_cond_broadcast( &job->status_cond); 900 fsd_cond_broadcast( &self->wait_condition ); 901 902 if ( job ) 903 job->release( job ); 527 { 528 log_reader = pbsdrmaa_log_reader_new( self, NULL); 529 log_reader->read_log( log_reader ); 530 } 531 FINALLY 532 { 533 pbsdrmaa_log_reader_destroy( log_reader ); 534 } 535 END_TRY 904 536 905 fsd_free(struct_resource_cput.value);906 fsd_free(struct_resource_mem.value);907 fsd_free(struct_resource_vmem.value);908 fsd_free(struct_resource_walltime.value);909 fsd_free(struct_status.value);910 fsd_free(struct_state.value);911 fsd_free(struct_start_time.value);912 fsd_free(struct_mtime.value);913 fsd_free(struct_queue.value);914 fsd_free(struct_account_name.value);915 916 if ( status.name!=NULL )917 fsd_free(status.name);918 }919 else if(field_n == FLD_EVENT && strcmp(field,FLD_MSG_LOG) == 0)920 {921 log_event = true;922 }923 else if (log_event && field_n == FLD_ID && strcmp(field,"Log") == 0 )924 {925 log_match = true;926 log_event = false;927 }928 else if( log_match && field_n == FLD_MSG &&929 field[0] == 'L' &&930 field[1] == 'o' &&931 field[2] == 'g' &&932 field[3] == ' ' &&933 field[4] == 'c' &&934 field[5] == 'l' &&935 field[6] == 'o' &&936 field[7] == 's' &&937 field[8] == 'e' &&938 field[9] == 'd' ) /* last field in the file - strange bahaviour*/939 {940 fsd_log_debug(("WT - Date changed. Closing log file"));941 date_changed = true;942 log_match = false;943 }944 945 ptr += n;946 if ( *ptr != ';' )947 {948 break; /* end of line */949 }950 field_n++;951 ++ptr;952 }953 954 if( strlcpy(buffer,"",sizeof(buffer)) > sizeof(buffer) ) {955 fsd_log_error(("WT - strlcpy error"));956 }957 958 fsd_free(temp_date);959 }960 961 fsd_mutex_unlock( &self->mutex );962 usleep(1000000);963 fsd_mutex_lock( &self->mutex );964 }965 EXCEPT_DEFAULT966 {967 const fsd_exc_t *e = fsd_exc_get();968 969 fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) ));970 fsd_exc_reraise();971 }972 END_TRY973 974 if(fd != -1)975 close(fd);976 fsd_log_debug(("Log file closed"));977 }978 FINALLY979 {980 fsd_log_debug(("WT - Terminated."));981 fsd_mutex_unlock( &self->mutex );982 }983 END_TRY984 985 537 fsd_log_return(( " =NULL" )); 986 538 return NULL; -
trunk/pbs_drmaa/session.h
r1 r7 79 79 * Time we checked log file initial size - used by wait_thread which reads log files 80 80 */ 81 struct tmlog_file_initial_time;81 time_t log_file_initial_time; 82 82 }; 83 83 -
trunk/pbs_drmaa/util.c
r1 r7 258 258 } 259 259 260 260 ssize_t fsd_getline(char * line,ssize_t size, int fd) 261 { 262 char buf; 263 char * ptr = NULL; 264 ssize_t n = 0, rc; 265 ptr = line; 266 for(n = 1; n< size; n++) 267 { 268 if( (rc = read(fd,&buf,1 )) == 1) { 269 *ptr++ = buf; 270 if(buf == '\n') 271 { 272 break; 273 } 274 } 275 else if (rc == 0) { 276 if (n == 1) 277 return 0; 278 else 279 break; 280 } 281 else 282 return -1; 283 } 284 285 return n; 286 } 287 -
trunk/pbs_drmaa/util.h
r1 r7 43 43 pbsdrmaa_write_tmpfile( const char *content, size_t len ); 44 44 45 ssize_t 46 fsd_getline(char * line,ssize_t size, int fd); 47 45 48 #endif /* __PBS_DRMAA__UTIL_H */ 46 49
Note: See TracChangeset
for help on using the changeset viewer.