source: trunk/src/testing/app/bitplay/bpio.c @ 4

Revision 4, 11.4 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1/******************************************************************************
2 * SAGE - Scalable Adaptive Graphics Environment
3 *
4 * Copyright (C) 2004 Electronic Visualization Laboratory,
5 * University of Illinois at Chicago
6 *
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *
12 *  * Redistributions of source code must retain the above copyright
13 *    notice, this list of conditions and the following disclaimer.
14 *  * Redistributions in binary form must reproduce the above
15 *    copyright notice, this list of conditions and the following disclaimer
16 *    in the documentation and/or other materials provided with the distribution.
17 *  * Neither the name of the University of Illinois at Chicago nor
18 *    the names of its contributors may be used to endorse or promote
19 *    products derived from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 *
33 * Direct questions, comments etc about SAGE to http://www.evl.uic.edu/cavern/forum/
34 *
35 *****************************************************************************/
36#define _GNU_SOURCE        1            /* for O_DIRECT from <fcntl.h> */
37#define __EXTENSIONS__     1            /* for directio() in Solaris */
38
39#include "bpio.h"
40
41#include <stdio.h>
42#include <stdlib.h>
43#include <fcntl.h>
44#include <errno.h>
45#include <memory.h>
46#include <sys/time.h>
47
48extern int verbose;
49
50static bpbuf_t *bpbufinit( bpbuf_t *bpb, bpio_t *bpio, int nbufs )
51{
52    int pagesize = getpagesize();
53    int i, margin, stride;
54    unsigned char *p;
55
56    if(bpb == NULL)
57        bpb = (bpbuf_t *)malloc( sizeof(bpbuf_t) );
58    memset(bpb, 0, sizeof(bpbuf_t));
59
60    bpb->bpio = bpio;
61    pthread_mutex_init( &bpb->bmut, NULL );
62    pthread_cond_init( &bpb->bfillwait, NULL );
63    pthread_cond_init( &bpb->bdrainwait, NULL );
64    bpb->fillwaiting = bpb->drainwaiting = 0;
65    bpb->filepos = 0LL;         /* probably changed later by bpopen() */
66    bpb->wrappos = 0LL;         /* filled in later by bpstart */
67    bpb->eofpos  = 0LL;
68    bpb->wp = bpb->rp = 0;      /* initially empty */
69
70    bpb->bufsize = bpio->bufsize;
71    bpb->readsize = bpio->readsize;
72    stride = (bpb->bufsize + pagesize - 1) & ~(pagesize - 1);
73    bpb->bufspace = (unsigned char *)malloc( pagesize-1 + nbufs*stride );
74
75
76    p = bpb->bufspace;
77    margin = (unsigned long)p & (pagesize - 1);
78    if(margin != 0)
79        p += (pagesize - margin);
80    /* Now p is page-aligned */
81    bpb->bufs = (unsigned char **)malloc( nbufs * sizeof(unsigned char *) );
82    bpb->curpos = (off_t *)malloc( nbufs * sizeof(off_t) );
83    bpb->nbufs = nbufs;
84    for(i = 0; i < nbufs; i++) {
85        bpb->bufs[i] = p + i*stride;
86        bpb->curpos[i] = -1LL;
87    }
88
89    bpb->doloop = 0;
90    bpb->fd = -1;
91    bpb->filling = 0;
92
93    /* Start the thread.  It will realize that filling==0 and go to sleep.
94     */
95    pthread_create( &bpb->bthread, NULL, bpfiller, (void *)bpb );
96
97    return bpb;
98}
99
100void bpforward( bpio_t *bpio, int fwd )
101{
102    int i;
103    off_t incr = (off_t)bpio->bufsize * (fwd * bpio->nfillers);
104
105    if(incr != bpio->bpb[0].incrpos || bpio->fwd != fwd) {
106        off_t here = bptell( bpio );
107
108        for(i = 0; i < bpio->nfillers; i++) {
109            bpio->bpb[i].incrpos = incr;
110        }
111        bpio->fwd = fwd;
112        bpseek( bpio, here );
113    }
114
115}
116
117bpio_t *bpinit( bpio_t *bpio, int nfillers, int bufsize, int readsize, int nbufseach )
118{
119    int i;
120
121    if(bpio == NULL)
122        bpio = (bpio_t *)malloc( sizeof(bpio_t) );
123    memset(bpio, 0, sizeof(bpio_t));
124    bpio->nfillers = nfillers;
125    bpio->bufsize = bufsize;
126    bpio->readsize = readsize;
127    bpio->bpb = (bpbuf_t *)malloc( nfillers * sizeof(bpbuf_t) );
128    for(i = 0; i < nfillers; i++)
129        bpbufinit( &bpio->bpb[i], bpio, nbufseach );
130    bpforward( bpio, 1 );
131    return bpio;
132}
133
134int bpbempty( bpbuf_t *bpb )
135{
136    return bpb->wp == bpb->rp;
137}
138
139int bpbfull( bpbuf_t *bpb )
140{
141    return (bpb->wp+1) % bpb->nbufs == bpb->rp;
142}
143
144void bpbstop( bpbuf_t *bpb )
145{
146    bpb->filling = 0;
147    if(bpb->fillwaiting)
148        pthread_cond_signal( &bpb->bfillwait );
149    if(bpb->drainwaiting)
150        pthread_cond_signal( &bpb->bdrainwait );
151}
152
153/* file-reader (bpb-queue-filler) thread */
154void *bpfiller( void *vbpb )
155{
156    bpbuf_t *bpb = (bpbuf_t *)vbpb;
157    int want;
158    unsigned char *p;
159    off_t filepos;
160    int normaleof;
161
162    pthread_mutex_lock( &bpb->bmut );
163
164    for(;;) {
165
166        /* Can we run now? */
167        while(!bpb->filling || bpbfull(bpb)) {
168            /* No, sleep until told to start and have room to put data */
169            bpb->fillwaiting = 1;
170            bpb->busy = 0;
171            pthread_cond_wait( &bpb->bfillwait, &bpb->bmut );
172            bpb->fillwaiting = 0;
173        }
174
175
176        /* Now we fill bpb->bufs[ bpb->rp ] */
177        /* It's all ours now, so run unlocked */
178
179        want = bpb->bufsize;
180        p = bpb->bufs[ bpb->rp ];
181
182        if(bpb->filepos >= bpb->eofpos) {
183            if(!bpb->doloop || bpb->wrappos >= bpb->eofpos) {
184                bpb->filling = 0;
185            } else {
186                /* Going forwards beyond EOF, need to wrap backwards */
187                bpb->filepos = bpb->wrappos + (bpb->filepos - bpb->eofpos);
188            }
189
190        } else if(bpb->filepos < bpb->wrappos) {
191            if(!bpb->doloop || bpb->wrappos >= bpb->eofpos) {
192                bpb->filling = 0;
193            } else {
194                /* Going backwards, need to wrap forwards */
195                bpb->filepos = bpb->eofpos + (bpb->filepos - bpb->wrappos);
196            }
197        }
198
199        if(bpb->filepos <0||verbose>=3)
200            printf("T%02d r%02d w%02d: rpos %02lld -> %lld\n",
201                (int) (bpb - bpb->bpio->bpb), bpb->rp, bpb->wp,
202                (long long)(bpb->filepos / bpb->bufsize), (long long)bpb->filepos);
203
204
205        bpb->curpos[ bpb->rp ] = bpb->filepos;
206        filepos = bpb->filepos;
207        bpb->filepos += bpb->incrpos;
208        bpb->busy = 1;
209
210        pthread_mutex_unlock( &bpb->bmut );
211
212        lseek( bpb->fd, filepos, 0 );
213
214        normaleof = 0;
215        while(want > 0 && bpb->filling) {
216            int now = want < bpb->readsize ? want : bpb->readsize;
217            int got = read( bpb->fd, p, now );
218            if(got < 0 && errno == EINTR)
219                continue;
220            if(got <= 0) {
221                /* 0-fill remainder of this buffer and stop reading. */
222                memset(p, 0, want);
223                bpbstop( bpb );
224                if(got == 0) normaleof = 1;
225                break;
226
227            } else {
228                want -= got;
229                p += got;
230            }
231        }
232
233        pthread_mutex_lock( &bpb->bmut );
234
235        if(bpb->filling || normaleof) {
236            bpb->rp = (bpb->rp + 1) % bpb->nbufs;
237        } else {
238            bpb->curpos[ bpb->rp] = -1LL;       /* mark buffer as invalid */
239            bpb->busy = 0;
240        }
241
242        if(bpb->drainwaiting)   /* is drainer waiting for data? */
243            pthread_cond_signal( &bpb->bdrainwait );
244    }
245
246    /* In case anyone ever 'break's from above loop */
247    pthread_mutex_unlock( &bpb->bmut );
248    return NULL;
249}
250
251void bpclose( bpio_t *bpio )
252{
253    int i;
254
255    for(i = 0; i < bpio->nfillers; i++) {
256        bpbuf_t *bpb = &bpio->bpb[i];
257        bpbstop( bpb );
258        if(bpb->fd >= 0) {
259            close(bpb->fd);
260            bpb->fd = -1;
261        }
262    }
263}
264
265/*
266 * bpopen() opens a file across a collection of buffers.
267 */
268int bpopen( bpio_t *bpio, char *fname )
269{
270    int i;
271
272    bpclose( bpio );
273    for(i = 0; i < bpio->nfillers; i++) {
274        bpbuf_t *bpb = &bpio->bpb[i];
275#if defined(O_DIRECT) && !USE_NODIRECT
276        bpb->fd = open(fname, O_RDONLY | O_DIRECT);     /* try direct I/O if available */
277#else
278        bpb->fd = open(fname, O_RDONLY);
279#endif
280        if(bpb->fd < 0) {
281            perror(fname);
282            return -1;
283        }
284        /* Request direct I/O in Solaris' way too, but don't worry if we can't */
285#ifdef DIRECTIO_ON
286        directio( bpb->fd, DIRECTIO_ON );
287#endif
288
289    }
290    bprange( bpio, (off_t) 0, (off_t) lseek( bpio->bpb[0].fd, 0, SEEK_END ) );
291    bpseek( bpio, (off_t) 0 );
292    return 0;
293}
294
295/* return pointer to current buffer */
296unsigned char *bpcurbuf( bpio_t *bpio )
297{
298    bpbuf_t *bpb;
299    if(bpio == NULL || (unsigned int)bpio->drain >= bpio->nfillers)
300        return NULL;
301    bpb = &bpio->bpb[ bpio->drain ];
302    if(bpbempty(bpb) || bpb->bufs == NULL)
303        return NULL;
304    return bpb->bufs[ bpb->wp ];
305}
306
307/* Return file offset of current (drain'th) buffer. */
308/* Can be -1 if not read yet. */
309off_t bptell( bpio_t *bpio )
310{
311    bpbuf_t *bpb = &bpio->bpb[ bpio->drain ];
312    return bpb->curpos[ bpb->wp ];
313}
314
315void bpseek( bpio_t *bpio, off_t pos )
316{
317    int i, b;
318
319    if(verbose >= 3) printf("S %02lld -> %lld\n", (long long)(pos / bpio->bufsize), (long long)pos);
320
321    bpstop( bpio );
322    bpsync( bpio );             /* wait until all is stable */
323
324    bpio->drain = 0;            /* next-to-drain is first reader */
325    for(i = 0; i < bpio->nfillers; i++) {
326        bpbuf_t *bpb = &bpio->bpb[i];
327
328        pthread_mutex_lock( &bpb->bmut );
329
330        bpbstop( bpb );
331
332        bpb->rp = bpb->wp = 0;
333        for(b = 0; b < bpb->nbufs; b++)
334            bpb->curpos[b] = -1LL;
335
336        bpb->filepos = pos + bpio->bufsize * (bpio->fwd * i);
337
338        if(bpb->eofpos > bpb->wrappos) {
339            while(bpb->filepos > bpb->eofpos)
340                bpb->filepos -= bpb->eofpos - bpb->wrappos;
341            while(bpb->filepos < bpb->wrappos)
342                bpb->filepos += bpb->eofpos - bpb->wrappos;
343        }
344
345        /* we changed buffer pointers -- awaken any sleepers */
346        if(bpb->drainwaiting)
347            pthread_cond_signal( &bpb->bdrainwait );
348        if(bpb->fillwaiting)
349            pthread_cond_signal( &bpb->bfillwait );
350
351        pthread_mutex_unlock( &bpb->bmut );
352    }
353}
354
355void bprange( bpio_t *bpio, off_t from, off_t to )
356{
357    int i;
358    for(i = 0; i < bpio->nfillers; i++) {
359        bpbuf_t *bpb = &bpio->bpb[i];
360        if(from != -1LL) bpb->wrappos = from;
361        if(to != -1LL) bpb->eofpos = to;
362        bpbstop( bpb );
363    }
364}
365
366void bpstart( bpio_t *bpio, int wrap )
367{
368    int i;
369
370    for(i = 0; i < bpio->nfillers; i++) {
371        bpbuf_t *bpb = &bpio->bpb[i];
372        bpb->doloop = wrap;
373        bpb->filling = 1;
374        if(bpb->fillwaiting)
375            pthread_cond_signal( &bpb->bfillwait );
376    }
377}
378
379/*
380 * If we're supposed to be stopped,
381 * wait until all filler threads have noticed that.
382 */
383void bpsync( bpio_t *bpio )
384{
385    int i, some;
386    int around = 0;
387
388    do {
389        some = 0;
390        for(i = 0; i < bpio->nfillers; i++) {
391            bpbuf_t *bpb = &bpio->bpb[i];
392            pthread_mutex_lock( &bpb->bmut );
393            if(!bpb->filling && bpb->busy)
394                some = 1;
395            pthread_mutex_unlock( &bpb->bmut );
396        }
397        if(some)
398            usleep(50000);
399        around += some;
400    } while(some);
401    if(verbose>=2) printf("sync %d\n", around);
402}
403
404void bpstop( bpio_t *bpio )
405{
406    int i;
407    for(i = 0; i < bpio->nfillers; i++)
408        bpbstop( &bpio->bpb[i] );
409}
410
411
412int bpdrain( bpio_t *bpio, int (*sink)( unsigned char *, int, void * ), void *arg )
413{
414    int status;
415
416    for(;;) {
417        bpbuf_t *bpb = &bpio->bpb[ bpio->drain ];
418
419        if(!bpb->filling)       /* if EOF or bpstop() or etc. */
420            break;
421
422        pthread_mutex_lock( &bpb->bmut );
423        while(bpbempty( bpb )) {
424            bpb->drainwaiting = 1;
425            pthread_cond_wait( &bpb->bdrainwait, &bpb->bmut );
426            bpb->drainwaiting = 0;
427        }
428        pthread_mutex_unlock( &bpb->bmut );
429
430        /* Make use of bpb->bufs[ bpb->wp ] someday */
431
432        status = (*sink)( bpb->bufs[ bpb->wp ], bpb->bufsize, arg );
433        if(status)
434            return status;
435
436        pthread_mutex_lock( &bpb->bmut );
437        bpb->wp = (bpb->wp + 1) % bpb->nbufs;
438
439        if(bpb->fillwaiting)
440            pthread_cond_signal( &bpb->bfillwait );
441        pthread_mutex_unlock( &bpb->bmut );
442
443        bpio->drain = (bpio->drain+1) % bpio->nfillers;
444    }
445
446    return 0;
447}
Note: See TracBrowser for help on using the repository browser.