#include #include #include #include #include #include #include #include #include int SLEEP_TIME; int run; #define NTHREADS 4 typedef struct{ int size; int finished; char **tab; pthread_mutex_t mutex; }vectorChar; vectorChar all_jobids; static drmaa_job_template_t *create_sleeper_job_template(int seconds) { const char *job_argv[2]; drmaa_job_template_t *jt = NULL; char buffer[100]; int ret = DRMAA_ERRNO_SUCCESS; ret = drmaa_allocate_job_template(&jt, NULL, 0); if (ret != DRMAA_ERRNO_SUCCESS) return NULL; ret = drmaa_set_attribute(jt, DRMAA_WD, DRMAA_PLACEHOLDER_HD, NULL, 0); if (ret != DRMAA_ERRNO_SUCCESS) return NULL; ret = drmaa_set_attribute(jt, DRMAA_REMOTE_COMMAND, "/bin/sleep", NULL, 0); if (ret != DRMAA_ERRNO_SUCCESS) return NULL; ret = drmaa_set_attribute(jt, DRMAA_OUTPUT_PATH, ":/dev/null",NULL, 0); if (ret != DRMAA_ERRNO_SUCCESS) return NULL; sprintf(buffer, "%d", seconds); job_argv[0] = buffer; job_argv[1] = NULL; ret = drmaa_set_vector_attribute(jt, DRMAA_V_ARGV, job_argv, NULL, 0); if (ret != DRMAA_ERRNO_SUCCESS) return NULL; return jt; } static int do_submit(drmaa_job_template_t *jt) { char diagnosis[1024]; char jobid[100]; int drmaa_errno = DRMAA_ERRNO_SUCCESS; int error = DRMAA_ERRNO_SUCCESS; int done; done = 0; while (!done) { drmaa_errno = drmaa_run_job(jobid, sizeof(jobid)-1, jt, diagnosis, sizeof(diagnosis)-1); if (drmaa_errno != DRMAA_ERRNO_SUCCESS) { printf("failed submitting job (%s)\n", drmaa_strerror(drmaa_errno)); } /* Only retry on "try again" error. */ if (drmaa_errno == DRMAA_ERRNO_TRY_LATER) { printf("retry: %s\n", diagnosis); sleep(1); } else { done = 1; break; /* while */ } } if (drmaa_errno == DRMAA_ERRNO_SUCCESS) { printf("Submitted job \"%s\"\n", jobid); } else { printf("Unable to submit job\n"); } pthread_mutex_lock(&all_jobids.mutex); all_jobids.tab = realloc(all_jobids.tab,(all_jobids.size+1)*sizeof(char)*256); all_jobids.tab[all_jobids.size] = strdup(jobid); all_jobids.size++; pthread_mutex_unlock(&all_jobids.mutex); if (drmaa_errno != DRMAA_ERRNO_SUCCESS) { /* If there is ever an error, we will return an error. */ error = drmaa_errno; } return error; } static void *create_job_thread (void *vp) { int n; if (vp != NULL) { n = *(int *)vp; } else { n = 1; } drmaa_job_template_t *jt = NULL; int ret = DRMAA_ERRNO_SUCCESS; if ((jt = create_sleeper_job_template(SLEEP_TIME)) == NULL) { fprintf(stderr, "create_sleeper_job_template() failed\n"); assert(0); } while(run) { ret = do_submit(jt); if(ret != DRMAA_ERRNO_SUCCESS) { fprintf(stderr,"submit failed()\n"); assert(0); } sleep(n); } drmaa_delete_job_template(jt, NULL, 0); return (void *)NULL; } static void *check_job_thread (void *vp) { int n,pos,job_state; char diagnosis[1024]; if (vp != NULL) { n = *(int *)vp; } else { n = 1; } while(run) { pthread_mutex_lock(&all_jobids.mutex); for(pos = 0; pos0) { i = rand()% all_jobids.size; if(all_jobids.tab[i] != NULL) { if (drmaa_job_ps(all_jobids.tab[i], &job_state, diagnosis, sizeof(diagnosis)-1)!=DRMAA_ERRNO_SUCCESS) { fprintf(stderr, "drmaa_job_ps(\"%s\")) failed: %s\n", all_jobids.tab[i], diagnosis); } else { if(job_state != DRMAA_PS_DONE && job_state != DRMAA_PS_FAILED) { drmaa_errno = drmaa_control(all_jobids.tab[i],DRMAA_CONTROL_TERMINATE, diagnosis,sizeof(diagnosis)-1); printf("Terminating %s\n",all_jobids.tab[i]); if (drmaa_errno != DRMAA_ERRNO_SUCCESS && drmaa_errno != DRMAA_ERRNO_EXIT_TIMEOUT && drmaa_errno != DRMAA_ERRNO_INVALID_JOB) { fprintf(stderr, "drmaa_control() failed: %s\n", diagnosis); } } } } } pthread_mutex_unlock(&all_jobids.mutex); sleep(rand()% n); pthread_mutex_lock(&all_jobids.mutex); } pthread_mutex_unlock(&all_jobids.mutex); return (void *)NULL; } void catch_stop() { printf("Caught ctrl+z\n"); run = 0; } int main(int argc, char* argv[]) { int i; char diagnosis[DRMAA_ERROR_STRING_BUFFER]; int SUB_INT, POOL_INTERVAL,WAIT_INTERVAL,KILL_RANGE; pthread_t submitter_threads[NTHREADS]; run = 1; if(argc < 6) { fprintf(stderr,"To less arguments\n"); return 1; } printf("SUB_INT: %s msec\n",argv[1]); printf("SLEEP_TIME: %s sec\n",argv[2]); printf("POOL_INTERVAL: %s sec\n",argv[3]); printf("WAIT_INTERVAL: %s sec\n",argv[4]); printf("KILL_RANGE: %s sec\n",argv[5]); printf("Stop by ctrl+z\n"); if(sscanf(argv[1],"%d",&SUB_INT) != 1) { fprintf(stderr,"Conversion of argument can't be done"); return 1; } if(sscanf(argv[2],"%d",&SLEEP_TIME) != 1) { fprintf(stderr,"Conversion of argument can't be done"); return 1; } if(sscanf(argv[3],"%d",&POOL_INTERVAL) != 1) { fprintf(stderr,"Conversion of argument can't be done"); return 1; } if(sscanf(argv[4],"%d",&WAIT_INTERVAL) != 1) { fprintf(stderr,"Conversion of argument can't be done"); return 1; } if(sscanf(argv[5],"%d",&KILL_RANGE) != 1) { fprintf(stderr,"Conversion of argument can't be done"); return 1; } all_jobids.size = 0; all_jobids.tab = (char **)calloc(1,sizeof(char)*256); pthread_mutex_init(&all_jobids.mutex,NULL); if (drmaa_init(NULL, diagnosis, sizeof(diagnosis)-1) != DRMAA_ERRNO_SUCCESS) { fprintf(stderr, "drmaa_init() failed: %s\n", diagnosis); return 1; } signal(SIGTSTP,catch_stop); printf("Running create_job_thread\n"); pthread_create(&submitter_threads[0], NULL, create_job_thread, &SUB_INT); printf("Running check_job_thread\n"); pthread_create(&submitter_threads[1], NULL, check_job_thread, &POOL_INTERVAL); printf("Running wait_thread\n"); pthread_create(&submitter_threads[2], NULL, wait_thread, &WAIT_INTERVAL); printf("Running kill_thread\n"); pthread_create(&submitter_threads[3], NULL, kill_thread, &KILL_RANGE); printf("Joining threads\n"); for (i=0; i0) { free(all_jobids.tab[all_jobids.size-1]); all_jobids.size--; } free(all_jobids.tab); pthread_mutex_destroy(&all_jobids.mutex); if (drmaa_exit(diagnosis, sizeof(diagnosis)-1) != DRMAA_ERRNO_SUCCESS) { fprintf(stderr, "drmaa_exit() failed: %s\n", diagnosis); return 1; } printf("Successfully ended\n"); return 0; }