1 | /****************************************************************************** |
---|
2 | * SAGE - Scalable Adaptive Graphics Environment |
---|
3 | * |
---|
4 | * Copyright (C) 2004 Electronic Visualization Laboratory, |
---|
5 | * University of Illinois at Chicago |
---|
6 | * |
---|
7 | * All rights reserved. |
---|
8 | * |
---|
9 | * Redistribution and use in source and binary forms, with or without |
---|
10 | * modification, are permitted provided that the following conditions are met: |
---|
11 | * |
---|
12 | * * Redistributions of source code must retain the above copyright |
---|
13 | * notice, this list of conditions and the following disclaimer. |
---|
14 | * * Redistributions in binary form must reproduce the above |
---|
15 | * copyright notice, this list of conditions and the following disclaimer |
---|
16 | * in the documentation and/or other materials provided with the distribution. |
---|
17 | * * Neither the name of the University of Illinois at Chicago nor |
---|
18 | * the names of its contributors may be used to endorse or promote |
---|
19 | * products derived from this software without specific prior written permission. |
---|
20 | * |
---|
21 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
---|
22 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
---|
23 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
---|
24 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
---|
25 | * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
---|
26 | * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
---|
27 | * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
---|
28 | * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
---|
29 | * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
---|
30 | * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
31 | * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
32 | * |
---|
33 | * Direct questions, comments etc about SAGE to http://www.evl.uic.edu/cavern/forum/ |
---|
34 | * |
---|
35 | *****************************************************************************/ |
---|
36 | #define _GNU_SOURCE 1 /* for O_DIRECT from <fcntl.h> */ |
---|
37 | #define __EXTENSIONS__ 1 /* for directio() in Solaris */ |
---|
38 | |
---|
39 | #include "bpio.h" |
---|
40 | |
---|
41 | #include <stdio.h> |
---|
42 | #include <stdlib.h> |
---|
43 | #include <fcntl.h> |
---|
44 | #include <errno.h> |
---|
45 | #include <memory.h> |
---|
46 | #include <sys/time.h> |
---|
47 | |
---|
48 | extern int verbose; |
---|
49 | |
---|
50 | static bpbuf_t *bpbufinit( bpbuf_t *bpb, bpio_t *bpio, int nbufs ) |
---|
51 | { |
---|
52 | int pagesize = getpagesize(); |
---|
53 | int i, margin, stride; |
---|
54 | unsigned char *p; |
---|
55 | |
---|
56 | if(bpb == NULL) |
---|
57 | bpb = (bpbuf_t *)malloc( sizeof(bpbuf_t) ); |
---|
58 | memset(bpb, 0, sizeof(bpbuf_t)); |
---|
59 | |
---|
60 | bpb->bpio = bpio; |
---|
61 | pthread_mutex_init( &bpb->bmut, NULL ); |
---|
62 | pthread_cond_init( &bpb->bfillwait, NULL ); |
---|
63 | pthread_cond_init( &bpb->bdrainwait, NULL ); |
---|
64 | bpb->fillwaiting = bpb->drainwaiting = 0; |
---|
65 | bpb->filepos = 0LL; /* probably changed later by bpopen() */ |
---|
66 | bpb->wrappos = 0LL; /* filled in later by bpstart */ |
---|
67 | bpb->eofpos = 0LL; |
---|
68 | bpb->wp = bpb->rp = 0; /* initially empty */ |
---|
69 | |
---|
70 | bpb->bufsize = bpio->bufsize; |
---|
71 | bpb->readsize = bpio->readsize; |
---|
72 | stride = (bpb->bufsize + pagesize - 1) & ~(pagesize - 1); |
---|
73 | bpb->bufspace = (unsigned char *)malloc( pagesize-1 + nbufs*stride ); |
---|
74 | |
---|
75 | |
---|
76 | p = bpb->bufspace; |
---|
77 | margin = (unsigned long)p & (pagesize - 1); |
---|
78 | if(margin != 0) |
---|
79 | p += (pagesize - margin); |
---|
80 | /* Now p is page-aligned */ |
---|
81 | bpb->bufs = (unsigned char **)malloc( nbufs * sizeof(unsigned char *) ); |
---|
82 | bpb->curpos = (off_t *)malloc( nbufs * sizeof(off_t) ); |
---|
83 | bpb->nbufs = nbufs; |
---|
84 | for(i = 0; i < nbufs; i++) { |
---|
85 | bpb->bufs[i] = p + i*stride; |
---|
86 | bpb->curpos[i] = -1LL; |
---|
87 | } |
---|
88 | |
---|
89 | bpb->doloop = 0; |
---|
90 | bpb->fd = -1; |
---|
91 | bpb->filling = 0; |
---|
92 | |
---|
93 | /* Start the thread. It will realize that filling==0 and go to sleep. |
---|
94 | */ |
---|
95 | pthread_create( &bpb->bthread, NULL, bpfiller, (void *)bpb ); |
---|
96 | |
---|
97 | return bpb; |
---|
98 | } |
---|
99 | |
---|
100 | void bpforward( bpio_t *bpio, int fwd ) |
---|
101 | { |
---|
102 | int i; |
---|
103 | off_t incr = (off_t)bpio->bufsize * (fwd * bpio->nfillers); |
---|
104 | |
---|
105 | if(incr != bpio->bpb[0].incrpos || bpio->fwd != fwd) { |
---|
106 | off_t here = bptell( bpio ); |
---|
107 | |
---|
108 | for(i = 0; i < bpio->nfillers; i++) { |
---|
109 | bpio->bpb[i].incrpos = incr; |
---|
110 | } |
---|
111 | bpio->fwd = fwd; |
---|
112 | bpseek( bpio, here ); |
---|
113 | } |
---|
114 | |
---|
115 | } |
---|
116 | |
---|
117 | bpio_t *bpinit( bpio_t *bpio, int nfillers, int bufsize, int readsize, int nbufseach ) |
---|
118 | { |
---|
119 | int i; |
---|
120 | |
---|
121 | if(bpio == NULL) |
---|
122 | bpio = (bpio_t *)malloc( sizeof(bpio_t) ); |
---|
123 | memset(bpio, 0, sizeof(bpio_t)); |
---|
124 | bpio->nfillers = nfillers; |
---|
125 | bpio->bufsize = bufsize; |
---|
126 | bpio->readsize = readsize; |
---|
127 | bpio->bpb = (bpbuf_t *)malloc( nfillers * sizeof(bpbuf_t) ); |
---|
128 | for(i = 0; i < nfillers; i++) |
---|
129 | bpbufinit( &bpio->bpb[i], bpio, nbufseach ); |
---|
130 | bpforward( bpio, 1 ); |
---|
131 | return bpio; |
---|
132 | } |
---|
133 | |
---|
134 | int bpbempty( bpbuf_t *bpb ) |
---|
135 | { |
---|
136 | return bpb->wp == bpb->rp; |
---|
137 | } |
---|
138 | |
---|
139 | int bpbfull( bpbuf_t *bpb ) |
---|
140 | { |
---|
141 | return (bpb->wp+1) % bpb->nbufs == bpb->rp; |
---|
142 | } |
---|
143 | |
---|
144 | void bpbstop( bpbuf_t *bpb ) |
---|
145 | { |
---|
146 | bpb->filling = 0; |
---|
147 | if(bpb->fillwaiting) |
---|
148 | pthread_cond_signal( &bpb->bfillwait ); |
---|
149 | if(bpb->drainwaiting) |
---|
150 | pthread_cond_signal( &bpb->bdrainwait ); |
---|
151 | } |
---|
152 | |
---|
153 | /* file-reader (bpb-queue-filler) thread */ |
---|
154 | void *bpfiller( void *vbpb ) |
---|
155 | { |
---|
156 | bpbuf_t *bpb = (bpbuf_t *)vbpb; |
---|
157 | int want; |
---|
158 | unsigned char *p; |
---|
159 | off_t filepos; |
---|
160 | int normaleof; |
---|
161 | |
---|
162 | pthread_mutex_lock( &bpb->bmut ); |
---|
163 | |
---|
164 | for(;;) { |
---|
165 | |
---|
166 | /* Can we run now? */ |
---|
167 | while(!bpb->filling || bpbfull(bpb)) { |
---|
168 | /* No, sleep until told to start and have room to put data */ |
---|
169 | bpb->fillwaiting = 1; |
---|
170 | bpb->busy = 0; |
---|
171 | pthread_cond_wait( &bpb->bfillwait, &bpb->bmut ); |
---|
172 | bpb->fillwaiting = 0; |
---|
173 | } |
---|
174 | |
---|
175 | |
---|
176 | /* Now we fill bpb->bufs[ bpb->rp ] */ |
---|
177 | /* It's all ours now, so run unlocked */ |
---|
178 | |
---|
179 | want = bpb->bufsize; |
---|
180 | p = bpb->bufs[ bpb->rp ]; |
---|
181 | |
---|
182 | if(bpb->filepos >= bpb->eofpos) { |
---|
183 | if(!bpb->doloop || bpb->wrappos >= bpb->eofpos) { |
---|
184 | bpb->filling = 0; |
---|
185 | } else { |
---|
186 | /* Going forwards beyond EOF, need to wrap backwards */ |
---|
187 | bpb->filepos = bpb->wrappos + (bpb->filepos - bpb->eofpos); |
---|
188 | } |
---|
189 | |
---|
190 | } else if(bpb->filepos < bpb->wrappos) { |
---|
191 | if(!bpb->doloop || bpb->wrappos >= bpb->eofpos) { |
---|
192 | bpb->filling = 0; |
---|
193 | } else { |
---|
194 | /* Going backwards, need to wrap forwards */ |
---|
195 | bpb->filepos = bpb->eofpos + (bpb->filepos - bpb->wrappos); |
---|
196 | } |
---|
197 | } |
---|
198 | |
---|
199 | if(bpb->filepos <0||verbose>=3) |
---|
200 | printf("T%02d r%02d w%02d: rpos %02lld -> %lld\n", |
---|
201 | (int) (bpb - bpb->bpio->bpb), bpb->rp, bpb->wp, |
---|
202 | (long long)(bpb->filepos / bpb->bufsize), (long long)bpb->filepos); |
---|
203 | |
---|
204 | |
---|
205 | bpb->curpos[ bpb->rp ] = bpb->filepos; |
---|
206 | filepos = bpb->filepos; |
---|
207 | bpb->filepos += bpb->incrpos; |
---|
208 | bpb->busy = 1; |
---|
209 | |
---|
210 | pthread_mutex_unlock( &bpb->bmut ); |
---|
211 | |
---|
212 | lseek( bpb->fd, filepos, 0 ); |
---|
213 | |
---|
214 | normaleof = 0; |
---|
215 | while(want > 0 && bpb->filling) { |
---|
216 | int now = want < bpb->readsize ? want : bpb->readsize; |
---|
217 | int got = read( bpb->fd, p, now ); |
---|
218 | if(got < 0 && errno == EINTR) |
---|
219 | continue; |
---|
220 | if(got <= 0) { |
---|
221 | /* 0-fill remainder of this buffer and stop reading. */ |
---|
222 | memset(p, 0, want); |
---|
223 | bpbstop( bpb ); |
---|
224 | if(got == 0) normaleof = 1; |
---|
225 | break; |
---|
226 | |
---|
227 | } else { |
---|
228 | want -= got; |
---|
229 | p += got; |
---|
230 | } |
---|
231 | } |
---|
232 | |
---|
233 | pthread_mutex_lock( &bpb->bmut ); |
---|
234 | |
---|
235 | if(bpb->filling || normaleof) { |
---|
236 | bpb->rp = (bpb->rp + 1) % bpb->nbufs; |
---|
237 | } else { |
---|
238 | bpb->curpos[ bpb->rp] = -1LL; /* mark buffer as invalid */ |
---|
239 | bpb->busy = 0; |
---|
240 | } |
---|
241 | |
---|
242 | if(bpb->drainwaiting) /* is drainer waiting for data? */ |
---|
243 | pthread_cond_signal( &bpb->bdrainwait ); |
---|
244 | } |
---|
245 | |
---|
246 | /* In case anyone ever 'break's from above loop */ |
---|
247 | pthread_mutex_unlock( &bpb->bmut ); |
---|
248 | return NULL; |
---|
249 | } |
---|
250 | |
---|
251 | void bpclose( bpio_t *bpio ) |
---|
252 | { |
---|
253 | int i; |
---|
254 | |
---|
255 | for(i = 0; i < bpio->nfillers; i++) { |
---|
256 | bpbuf_t *bpb = &bpio->bpb[i]; |
---|
257 | bpbstop( bpb ); |
---|
258 | if(bpb->fd >= 0) { |
---|
259 | close(bpb->fd); |
---|
260 | bpb->fd = -1; |
---|
261 | } |
---|
262 | } |
---|
263 | } |
---|
264 | |
---|
265 | /* |
---|
266 | * bpopen() opens a file across a collection of buffers. |
---|
267 | */ |
---|
268 | int bpopen( bpio_t *bpio, char *fname ) |
---|
269 | { |
---|
270 | int i; |
---|
271 | |
---|
272 | bpclose( bpio ); |
---|
273 | for(i = 0; i < bpio->nfillers; i++) { |
---|
274 | bpbuf_t *bpb = &bpio->bpb[i]; |
---|
275 | #if defined(O_DIRECT) && !USE_NODIRECT |
---|
276 | bpb->fd = open(fname, O_RDONLY | O_DIRECT); /* try direct I/O if available */ |
---|
277 | #else |
---|
278 | bpb->fd = open(fname, O_RDONLY); |
---|
279 | #endif |
---|
280 | if(bpb->fd < 0) { |
---|
281 | perror(fname); |
---|
282 | return -1; |
---|
283 | } |
---|
284 | /* Request direct I/O in Solaris' way too, but don't worry if we can't */ |
---|
285 | #ifdef DIRECTIO_ON |
---|
286 | directio( bpb->fd, DIRECTIO_ON ); |
---|
287 | #endif |
---|
288 | |
---|
289 | } |
---|
290 | bprange( bpio, (off_t) 0, (off_t) lseek( bpio->bpb[0].fd, 0, SEEK_END ) ); |
---|
291 | bpseek( bpio, (off_t) 0 ); |
---|
292 | return 0; |
---|
293 | } |
---|
294 | |
---|
295 | /* return pointer to current buffer */ |
---|
296 | unsigned char *bpcurbuf( bpio_t *bpio ) |
---|
297 | { |
---|
298 | bpbuf_t *bpb; |
---|
299 | if(bpio == NULL || (unsigned int)bpio->drain >= bpio->nfillers) |
---|
300 | return NULL; |
---|
301 | bpb = &bpio->bpb[ bpio->drain ]; |
---|
302 | if(bpbempty(bpb) || bpb->bufs == NULL) |
---|
303 | return NULL; |
---|
304 | return bpb->bufs[ bpb->wp ]; |
---|
305 | } |
---|
306 | |
---|
307 | /* Return file offset of current (drain'th) buffer. */ |
---|
308 | /* Can be -1 if not read yet. */ |
---|
309 | off_t bptell( bpio_t *bpio ) |
---|
310 | { |
---|
311 | bpbuf_t *bpb = &bpio->bpb[ bpio->drain ]; |
---|
312 | return bpb->curpos[ bpb->wp ]; |
---|
313 | } |
---|
314 | |
---|
315 | void bpseek( bpio_t *bpio, off_t pos ) |
---|
316 | { |
---|
317 | int i, b; |
---|
318 | |
---|
319 | if(verbose >= 3) printf("S %02lld -> %lld\n", (long long)(pos / bpio->bufsize), (long long)pos); |
---|
320 | |
---|
321 | bpstop( bpio ); |
---|
322 | bpsync( bpio ); /* wait until all is stable */ |
---|
323 | |
---|
324 | bpio->drain = 0; /* next-to-drain is first reader */ |
---|
325 | for(i = 0; i < bpio->nfillers; i++) { |
---|
326 | bpbuf_t *bpb = &bpio->bpb[i]; |
---|
327 | |
---|
328 | pthread_mutex_lock( &bpb->bmut ); |
---|
329 | |
---|
330 | bpbstop( bpb ); |
---|
331 | |
---|
332 | bpb->rp = bpb->wp = 0; |
---|
333 | for(b = 0; b < bpb->nbufs; b++) |
---|
334 | bpb->curpos[b] = -1LL; |
---|
335 | |
---|
336 | bpb->filepos = pos + bpio->bufsize * (bpio->fwd * i); |
---|
337 | |
---|
338 | if(bpb->eofpos > bpb->wrappos) { |
---|
339 | while(bpb->filepos > bpb->eofpos) |
---|
340 | bpb->filepos -= bpb->eofpos - bpb->wrappos; |
---|
341 | while(bpb->filepos < bpb->wrappos) |
---|
342 | bpb->filepos += bpb->eofpos - bpb->wrappos; |
---|
343 | } |
---|
344 | |
---|
345 | /* we changed buffer pointers -- awaken any sleepers */ |
---|
346 | if(bpb->drainwaiting) |
---|
347 | pthread_cond_signal( &bpb->bdrainwait ); |
---|
348 | if(bpb->fillwaiting) |
---|
349 | pthread_cond_signal( &bpb->bfillwait ); |
---|
350 | |
---|
351 | pthread_mutex_unlock( &bpb->bmut ); |
---|
352 | } |
---|
353 | } |
---|
354 | |
---|
355 | void bprange( bpio_t *bpio, off_t from, off_t to ) |
---|
356 | { |
---|
357 | int i; |
---|
358 | for(i = 0; i < bpio->nfillers; i++) { |
---|
359 | bpbuf_t *bpb = &bpio->bpb[i]; |
---|
360 | if(from != -1LL) bpb->wrappos = from; |
---|
361 | if(to != -1LL) bpb->eofpos = to; |
---|
362 | bpbstop( bpb ); |
---|
363 | } |
---|
364 | } |
---|
365 | |
---|
366 | void bpstart( bpio_t *bpio, int wrap ) |
---|
367 | { |
---|
368 | int i; |
---|
369 | |
---|
370 | for(i = 0; i < bpio->nfillers; i++) { |
---|
371 | bpbuf_t *bpb = &bpio->bpb[i]; |
---|
372 | bpb->doloop = wrap; |
---|
373 | bpb->filling = 1; |
---|
374 | if(bpb->fillwaiting) |
---|
375 | pthread_cond_signal( &bpb->bfillwait ); |
---|
376 | } |
---|
377 | } |
---|
378 | |
---|
379 | /* |
---|
380 | * If we're supposed to be stopped, |
---|
381 | * wait until all filler threads have noticed that. |
---|
382 | */ |
---|
383 | void bpsync( bpio_t *bpio ) |
---|
384 | { |
---|
385 | int i, some; |
---|
386 | int around = 0; |
---|
387 | |
---|
388 | do { |
---|
389 | some = 0; |
---|
390 | for(i = 0; i < bpio->nfillers; i++) { |
---|
391 | bpbuf_t *bpb = &bpio->bpb[i]; |
---|
392 | pthread_mutex_lock( &bpb->bmut ); |
---|
393 | if(!bpb->filling && bpb->busy) |
---|
394 | some = 1; |
---|
395 | pthread_mutex_unlock( &bpb->bmut ); |
---|
396 | } |
---|
397 | if(some) |
---|
398 | usleep(50000); |
---|
399 | around += some; |
---|
400 | } while(some); |
---|
401 | if(verbose>=2) printf("sync %d\n", around); |
---|
402 | } |
---|
403 | |
---|
404 | void bpstop( bpio_t *bpio ) |
---|
405 | { |
---|
406 | int i; |
---|
407 | for(i = 0; i < bpio->nfillers; i++) |
---|
408 | bpbstop( &bpio->bpb[i] ); |
---|
409 | } |
---|
410 | |
---|
411 | |
---|
412 | int bpdrain( bpio_t *bpio, int (*sink)( unsigned char *, int, void * ), void *arg ) |
---|
413 | { |
---|
414 | int status; |
---|
415 | |
---|
416 | for(;;) { |
---|
417 | bpbuf_t *bpb = &bpio->bpb[ bpio->drain ]; |
---|
418 | |
---|
419 | if(!bpb->filling) /* if EOF or bpstop() or etc. */ |
---|
420 | break; |
---|
421 | |
---|
422 | pthread_mutex_lock( &bpb->bmut ); |
---|
423 | while(bpbempty( bpb )) { |
---|
424 | bpb->drainwaiting = 1; |
---|
425 | pthread_cond_wait( &bpb->bdrainwait, &bpb->bmut ); |
---|
426 | bpb->drainwaiting = 0; |
---|
427 | } |
---|
428 | pthread_mutex_unlock( &bpb->bmut ); |
---|
429 | |
---|
430 | /* Make use of bpb->bufs[ bpb->wp ] someday */ |
---|
431 | |
---|
432 | status = (*sink)( bpb->bufs[ bpb->wp ], bpb->bufsize, arg ); |
---|
433 | if(status) |
---|
434 | return status; |
---|
435 | |
---|
436 | pthread_mutex_lock( &bpb->bmut ); |
---|
437 | bpb->wp = (bpb->wp + 1) % bpb->nbufs; |
---|
438 | |
---|
439 | if(bpb->fillwaiting) |
---|
440 | pthread_cond_signal( &bpb->bfillwait ); |
---|
441 | pthread_mutex_unlock( &bpb->bmut ); |
---|
442 | |
---|
443 | bpio->drain = (bpio->drain+1) % bpio->nfillers; |
---|
444 | } |
---|
445 | |
---|
446 | return 0; |
---|
447 | } |
---|