source: experiments/DRMAA_tests/DRMAA_Scalability/scalability.c @ 16

Revision 16, 8.3 KB checked in by mmatloka, 14 years ago (diff)

add experiments

RevLine 
[16]1#include <stdlib.h>
2#include <stdio.h>
3#include <assert.h>
4#include <limits.h>
5#include <signal.h>
6#include <string.h>
7#include <unistd.h>
8#include <pthread.h>
9
10#include <drmaa.h>
11
12int SLEEP_TIME;
13int run;
14
15#define NTHREADS 4
16
17typedef struct{
18        int size;
19        int finished;
20        char **tab;
21        pthread_mutex_t mutex;
22}vectorChar;
23
24vectorChar all_jobids;
25
26static drmaa_job_template_t *create_sleeper_job_template(int seconds)
27{
28        const char *job_argv[2];
29        drmaa_job_template_t *jt = NULL;
30        char buffer[100];
31        int ret = DRMAA_ERRNO_SUCCESS;
32
33        ret = drmaa_allocate_job_template(&jt, NULL, 0);
34        if (ret != DRMAA_ERRNO_SUCCESS) return NULL;
35       
36        ret = drmaa_set_attribute(jt, DRMAA_WD, DRMAA_PLACEHOLDER_HD, NULL, 0);
37        if (ret != DRMAA_ERRNO_SUCCESS) return NULL;
38
39        ret = drmaa_set_attribute(jt, DRMAA_REMOTE_COMMAND, "/bin/sleep", NULL, 0);
40        if (ret != DRMAA_ERRNO_SUCCESS) return NULL;
41
42        ret = drmaa_set_attribute(jt, DRMAA_OUTPUT_PATH, ":/dev/null",NULL, 0);
43        if (ret != DRMAA_ERRNO_SUCCESS) return NULL;
44       
45        sprintf(buffer, "%d", seconds);
46        job_argv[0] = buffer;
47        job_argv[1] = NULL;
48        ret = drmaa_set_vector_attribute(jt, DRMAA_V_ARGV, job_argv, NULL, 0);
49        if (ret != DRMAA_ERRNO_SUCCESS) return NULL;
50
51        return jt;
52}
53
54
55static int do_submit(drmaa_job_template_t *jt)
56{
57        char diagnosis[1024];
58        char jobid[100];
59        int drmaa_errno = DRMAA_ERRNO_SUCCESS;
60        int error = DRMAA_ERRNO_SUCCESS;
61        int done;
62
63        done = 0;
64        while (!done)
65        {
66                drmaa_errno = drmaa_run_job(jobid, sizeof(jobid)-1, jt, diagnosis, sizeof(diagnosis)-1);
67
68                if (drmaa_errno != DRMAA_ERRNO_SUCCESS)
69                {
70                        printf("failed submitting job (%s)\n", drmaa_strerror(drmaa_errno));
71                }
72
73                /* Only retry on "try again" error. */
74                if (drmaa_errno == DRMAA_ERRNO_TRY_LATER)
75                {
76                        printf("retry: %s\n", diagnosis);
77                        sleep(1);
78                } else {
79                        done = 1;
80                        break; /* while */
81                }
82        }
83
84        if (drmaa_errno == DRMAA_ERRNO_SUCCESS) {
85                printf("Submitted job \"%s\"\n", jobid);
86        } else {
87                printf("Unable to submit job\n");
88        }
89       
90        pthread_mutex_lock(&all_jobids.mutex);
91        all_jobids.tab = realloc(all_jobids.tab,(all_jobids.size+1)*sizeof(char)*256);
92        all_jobids.tab[all_jobids.size] = strdup(jobid);
93        all_jobids.size++;
94        pthread_mutex_unlock(&all_jobids.mutex);
95       
96        if (drmaa_errno != DRMAA_ERRNO_SUCCESS) {
97                /* If there is ever an error, we will return an error. */
98                error = drmaa_errno;
99        }
100
101        return error;
102}
103
104
105static void *create_job_thread (void *vp) {
106        int n;
107
108        if (vp != NULL) {
109                n = *(int *)vp;
110        }
111        else {
112                n = 1;
113        }
114
115        drmaa_job_template_t *jt = NULL;
116        int ret = DRMAA_ERRNO_SUCCESS;
117
118        if ((jt = create_sleeper_job_template(SLEEP_TIME)) == NULL)
119        {
120                fprintf(stderr, "create_sleeper_job_template() failed\n");
121                assert(0);
122        }
123
124        while(run)
125        {               
126                ret = do_submit(jt);
127                if(ret != DRMAA_ERRNO_SUCCESS)
128                {
129                        fprintf(stderr,"submit failed()\n");
130                        assert(0);
131                }
132
133                usleep(n*1000);
134        }
135       
136        drmaa_delete_job_template(jt, NULL, 0);
137       
138        return (void *)NULL;
139}
140
141static void *check_job_thread (void *vp)
142{
143        int n,pos,job_state;
144        char diagnosis[1024];
145       
146        if (vp != NULL) {
147                n = *(int *)vp;
148        }
149        else {
150                n = 1;
151        }
152
153        while(run)
154        {
155                pthread_mutex_lock(&all_jobids.mutex);
156               
157                for(pos = 0; pos<all_jobids.size; pos++)
158                {
159                        if(all_jobids.tab[pos]!=NULL)   /* if not ended */
160                        {
161                                if (drmaa_job_ps(all_jobids.tab[pos], &job_state, diagnosis, sizeof(diagnosis)-1)!=DRMAA_ERRNO_SUCCESS)
162                                {
163                                                fprintf(stderr, "drmaa_job_ps(\"%s\")) failed: %s\n", all_jobids.tab[pos], diagnosis);
164                                }
165                                printf("Job: %s - state: %d\n",all_jobids.tab[pos],job_state);
166                        }       
167                }
168               
169                pthread_mutex_unlock(&all_jobids.mutex);
170               
171                sleep(n);
172        }
173       
174        return (void *)NULL;
175}
176
177static void *wait_thread (void *vp)
178{
179        int n,i;
180        char jobid[256] = "";
181        int stat;
182        char diagnosis[1024];
183        int drmaa_errno = DRMAA_ERRNO_SUCCESS;
184        if (vp != NULL) {
185                n = *(int *)vp;
186        }
187        else {
188                n = 1;
189        }
190        pthread_mutex_lock(&all_jobids.mutex);
191        while(run || all_jobids.finished!=all_jobids.size)
192        {
193                pthread_mutex_unlock(&all_jobids.mutex);
194
195                drmaa_errno = drmaa_wait(DRMAA_JOB_IDS_SESSION_ANY, jobid, sizeof(jobid)-1, &stat,n, NULL, diagnosis, sizeof(diagnosis)-1);
196                if (drmaa_errno != DRMAA_ERRNO_SUCCESS && drmaa_errno != DRMAA_ERRNO_EXIT_TIMEOUT && drmaa_errno != DRMAA_ERRNO_INVALID_JOB)
197                {
198                        fprintf(stderr, "drmaa_wait() failed: %s\n", diagnosis);
199                        assert(0);
200                }
201
202                pthread_mutex_lock(&all_jobids.mutex);
203
204                for(i=0;i<all_jobids.size;i++)
205                {
206                        if(all_jobids.tab[i] != NULL  && strcmp(all_jobids.tab[i],jobid) == 0)
207                        {
208                                free(all_jobids.tab[i]);
209                                all_jobids.tab[i] = NULL;
210                                all_jobids.finished++;
211                                printf("Job %s ended. %d/%d\n",jobid,all_jobids.finished,all_jobids.size);
212                                strcpy(jobid , "");
213                                break;
214                        }
215                }               
216        }
217        pthread_mutex_unlock(&all_jobids.mutex);
218        return (void *)NULL;
219}
220
221static void *kill_thread (void *vp)
222{
223        int n,i,job_state;
224        char diagnosis[1024];
225        int drmaa_errno = DRMAA_ERRNO_SUCCESS;
226
227        if (vp != NULL) {
228                n = *(int *)vp;
229        }
230        else {
231                n = 1;
232        }
233
234        srand((unsigned)time(NULL));
235       
236        pthread_mutex_lock(&all_jobids.mutex); 
237        while(run || all_jobids.finished!=all_jobids.size)
238        {               
239                if(all_jobids.size>0)
240                {
241                        i = rand()% all_jobids.size;
242
243                        if(all_jobids.tab[i] != NULL)
244                        {
245                                if (drmaa_job_ps(all_jobids.tab[i], &job_state, diagnosis, sizeof(diagnosis)-1)!=DRMAA_ERRNO_SUCCESS)
246                                {
247                                                fprintf(stderr, "drmaa_job_ps(\"%s\")) failed: %s\n", all_jobids.tab[i], diagnosis);
248                                }
249                                else
250                                {
251                                        if(job_state != DRMAA_PS_DONE && job_state != DRMAA_PS_FAILED)
252                                        {
253                                                drmaa_errno = drmaa_control(all_jobids.tab[i],DRMAA_CONTROL_TERMINATE, diagnosis,sizeof(diagnosis)-1);
254                               
255                                                printf("Terminating %s\n",all_jobids.tab[i]);
256                                                if (drmaa_errno != DRMAA_ERRNO_SUCCESS && drmaa_errno != DRMAA_ERRNO_EXIT_TIMEOUT && drmaa_errno != DRMAA_ERRNO_INVALID_JOB)
257                                                {
258                                                        fprintf(stderr, "drmaa_control() failed: %s\n", diagnosis);
259                                                }
260                                        }
261                                }
262                        }
263                }
264       
265                pthread_mutex_unlock(&all_jobids.mutex);
266                sleep(rand()% n);               
267                pthread_mutex_lock(&all_jobids.mutex);
268                               
269        }
270        pthread_mutex_unlock(&all_jobids.mutex);
271        return (void *)NULL;
272}
273
274void catch_stop()
275{
276        printf("Caught ctrl+c\n");
277        run = 0;
278}
279
280int main(int argc, char* argv[])
281{
282        int i;
283        char diagnosis[DRMAA_ERROR_STRING_BUFFER];
284        int SUB_INT, POOL_INTERVAL,WAIT_INTERVAL,KILL_RANGE;
285        pthread_t submitter_threads[NTHREADS];
286        run = 1;
287       
288        if(argc < 6)
289        {
290                fprintf(stderr,"To less arguments\n");
291                return 1;
292        }
293       
294        printf("SUB_INT: %s msec\n",argv[1]);
295        printf("SLEEP_TIME: %s sec\n",argv[2]);
296        printf("POOL_INTERVAL: %s sec\n",argv[3]);
297        printf("WAIT_INTERVAL: %s sec\n",argv[4]);
298        printf("KILL_RANGE: %s sec\n",argv[5]);
299
300        printf("Stop by ctrl+z\n");
301
302        if(sscanf(argv[1],"%d",&SUB_INT) != 1)
303        {
304                fprintf(stderr,"Conversion of argument can't be done");
305                return 1;
306        }
307       
308        if(sscanf(argv[2],"%d",&SLEEP_TIME) != 1)
309        {
310                fprintf(stderr,"Conversion of argument can't be done");
311                return 1;
312        }
313       
314        if(sscanf(argv[3],"%d",&POOL_INTERVAL) != 1)
315        {
316                fprintf(stderr,"Conversion of argument can't be done");
317                return 1;
318        }
319       
320        if(sscanf(argv[4],"%d",&WAIT_INTERVAL) != 1)
321        {
322                fprintf(stderr,"Conversion of argument can't be done");
323                return 1;
324        }
325
326        if(sscanf(argv[5],"%d",&KILL_RANGE) != 1)
327        {
328                fprintf(stderr,"Conversion of argument can't be done");
329                return 1;
330        }
331       
332        all_jobids.size = 0;
333        all_jobids.tab = (char **)calloc(1,sizeof(char)*256);
334        pthread_mutex_init(&all_jobids.mutex,NULL);
335       
336        if (drmaa_init(NULL, diagnosis, sizeof(diagnosis)-1) != DRMAA_ERRNO_SUCCESS)
337        {
338                fprintf(stderr, "drmaa_init() failed: %s\n", diagnosis);
339                return 1;
340        }
341               
342        signal(SIGINT,catch_stop);
343       
344        printf("Running create_job_thread\n");
345        pthread_create(&submitter_threads[0], NULL, create_job_thread, &SUB_INT);
346       
347        printf("Running check_job_thread\n");
348        pthread_create(&submitter_threads[1], NULL, check_job_thread, &POOL_INTERVAL);
349       
350        printf("Running wait_thread\n");
351        pthread_create(&submitter_threads[2], NULL, wait_thread, &WAIT_INTERVAL);
352       
353        printf("Running kill_thread\n");
354        pthread_create(&submitter_threads[3], NULL, kill_thread, &KILL_RANGE);
355
356        printf("Joining threads\n");   
357        for (i=0; i<NTHREADS; i++)
358                if (pthread_join(submitter_threads[i], NULL))
359                        printf("pthread_join() returned != 0\n");
360               
361        while(all_jobids.size>0)
362        {
363                free(all_jobids.tab[all_jobids.size-1]);
364                all_jobids.size--;
365        }
366        free(all_jobids.tab);
367       
368        pthread_mutex_destroy(&all_jobids.mutex);
369       
370        if (drmaa_exit(diagnosis, sizeof(diagnosis)-1) != DRMAA_ERRNO_SUCCESS)
371        {
372                fprintf(stderr, "drmaa_exit() failed: %s\n", diagnosis);
373                return 1;
374        }
375        printf("Successfully ended\n");
376        return 0;
377}
Note: See TracBrowser for help on using the repository browser.