source: trunk/src/testing/include/sageStreamer.h @ 4

Revision 4, 8.3 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1/******************************************************************************
2 * SAGE - Scalable Adaptive Graphics Environment
3 *
4 * Module: sageStreamer.h
5 * Author : Byungil Jeong
6 *
7 * Copyright (C) 2004 Electronic Visualization Laboratory,
8 * University of Illinois at Chicago
9 *
10 * All rights reserved.
11 *
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions are met:
14 *
15 *  * Redistributions of source code must retain the above copyright
16 *    notice, this list of conditions and the following disclaimer.
17 *  * Redistributions in binary form must reproduce the above
18 *    copyright notice, this list of conditions and the following disclaimer
19 *    in the documentation and/or other materials provided with the distribution.
20 *  * Neither the name of the University of Illinois at Chicago nor
21 *    the names of its contributors may be used to endorse or promote
22 *    products derived from this software without specific prior written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
28 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 * Direct questions, comments etc about SAGE to sage_users@listserv.uic.edu or
37 * http://www.evl.uic.edu/cavern/forum/
38 *
39 *****************************************************************************/
40
41#ifndef SAGESTREAMER_H_
42#define SAGESTREAMER_H_
43
44#include "sageConfig.h"
45#include "sageBlock.h"
46#include "streamProtocol.h"
47#include "sageDoubleBuf.h"
48#include "sageAudioCircBuf.h"
49#include "sageSync.h"
50#include "streamInfo.h"
51#include "sageTcpModule.h"
52#include "sageUdpModule.h"
53
54// sungwon exp
55#include <list>
56
57#define ALL_CONNECTION       0
58#define ACTIVE_CONNECTION    1
59#define INACTIVE_CONNECTION  2
60
61typedef struct {
62   int  rcvID;
63   int  nodeID;
64   bool active;
65} streamParam;
66
67class sageBlockFrame;
68class sagePixelBlock;
69class sageBlockPartition;
70class sageBlockGroup;
71
72/**
73 * sageStreamer
74 */
75class sageStreamer {
76protected:
77   streamerConfig config;
78   streamProtocol *nwObj;
79   sageNwConfig nwCfg;
80   int winID;    // display window ID of the application
81   
82   int streamNum;
83   int rcvNodeNum;
84   bool streamerOn;
85   
86   streamParam *params;
87   streamGroup sGrp;
88   bridgeStreamGroup bStreamGrp;
89   sageBlockPartition *partition;
90   
91   int blockSize;
92   //bool reconfigured;   
93   //bool timeBlockFlag;
94   unsigned long totalBandWidth;
95   int frameID;
96   int configID;
97   
98   bool firstConfiguration;
99   pthread_mutex_t *reconfigMutex;
100   std::deque<char *> msgQueue;
101   sageTimer streamTimer;
102   sageCounter frameCounter;
103   double interval;
104   double timeError;
105
106   pthread_t thId;
107
108   /**
109    * invokes streamLoop
110    */
111   static void* nwThread(void *args);
112   
113   //int sendTimeBlock(int frame);
114   
115   virtual int streamLoop() = 0;
116   virtual int reconfigureStreams(char *msgStr);
117
118   /**
119    * called by sageStreamer::initNetworks()
120    * it calls nwObj->connect()
121    *
122    * this function makes connection between sender(SAIL) and receivers(SDM)
123    * sends 1KByte of regMsg to receivers
124    */
125   virtual int connectToRcv(sageToken &tokenBuf, bool localPort = false);
126   virtual void setupBlockPool() {}
127   
128public:
129   sageStreamer();
130   virtual ~sageStreamer();
131
132   /**
133    * creates sageTcpModule object. member var *nwObj will point it
134    * calls connectToRcv()
135    * starts nwThread
136    */
137   virtual int initNetworks(char *data, bool localPort = false);
138   
139   void setWinID(int id) { winID = id; }
140   //void setSyncClient(sageSyncClient *sClient) { syncClientObj = sClient; }
141   int getFrameID() { return frameID; }
142   bool isFirstConfig() { return firstConfiguration; }
143   int enqueMsg(char *data);
144   void checkInterval();
145   void setFrameRate(float frate) { //nwObj->setFrameRate((double)frate);
146         interval = 1000000.0/frate; }
147   syncGroup* getSyncGroup() { return config.sGroup; }
148   
149   virtual sageDoubleBuf* getDoubleBuffer() { return NULL; }
150   unsigned long getBandWidth() { return totalBandWidth; }
151   void resetBandWidth() { totalBandWidth = 0; }
152   bool isActive() { return streamerOn; }
153   unsigned int getFrameCount() { return frameCounter.getValue(); }
154   void resetFrameCounter() { frameCounter.reset(); }
155   virtual int regeneratePixelBlocks() { return 0; }
156
157   /**
158    * blockSize is calculated here. ceil((blockX * blockY * bytesPerPixel)/compFactor) + BLOCK_HEADER_SIZE
159    * sageBlockGroup object is created here
160    */
161   virtual void setNwConfig(sageNwConfig &nc) { nwCfg = nc; }
162   virtual void shutdown() { streamerOn = false; }
163   inline bool isStreamerOn() { return streamerOn; }
164};
165
166/**
167 * sageBlockStreamer
168 * sail uses this to stream.
169 *
170 * This class is responsible for creating doubleBuffer for sending, getting frame from an application, and streaming it to sage
171 * the size of double buffer is two times of ceil((resX*resY*bytesPerPixel)/(compX*compY)) + BLOCK_HEADER_SIZE
172 */
173class sageBlockStreamer : public sageStreamer {
174protected:
175   sageDoubleBuf  *doubleBuf;
176   int bytesPerPixel;
177   float compX, compY, compFactor;
178        sageBlockGroup *nbg;
179
180        /**
181         * keeps calling streamPixelData() with one part of the doubleBuf
182         */
183   virtual int streamLoop();
184   void setupBlockPool();
185   int createDoubleBuffer();
186   int sendPixelBlock(sagePixelBlock *block);
187   int sendControlBlock(int flag, int cond);
188
189   /**
190    * keeps calling sendPixelBlock() with sagePixelBlock extracted from nbg->front()
191    */
192   int streamPixelData(sageBlockFrame *buf);
193   
194   // sungwon exp
195   bool affinityFlag;
196   std::list<int> cpulist;
197   pthread_mutex_t affinityMutex;
198
199
200public:
201   sageBlockStreamer(streamerConfig &conf, int pixSize);
202   
203   sageDoubleBuf* getDoubleBuffer() { return doubleBuf; }
204
205   /**
206    * creates sageBlockPartition object.
207    * calls initFrame with the front and the back buffer of the double buffer
208    *
209    * creates sageBlockGroup object and assigns it to the member variable sageBlockGroup *nbg
210    */
211   virtual void setNwConfig(sageNwConfig &nc);
212   void shutdown();
213   virtual ~sageBlockStreamer();
214
215   // sungwon exp
216   void setAffinity(const char *msgdata);
217};
218
219class sageBlockBuf;
220
221class bridgeStreamer : public sageStreamer {
222protected:
223   sageBlockBuf *blockBuffer;
224   int updateType;
225   sageTimer frameTimer;
226   double accInterval;
227   bool firstFrame;
228   
229   virtual int streamLoop();
230   int streamPixelData();
231   void setupBlockPool();
232   int sendPixelBlock(sagePixelBlock *block);
233   int sendControlBlock(int flag, int cond);
234   
235public:
236   bridgeStreamer(streamerConfig &conf, sageBlockBuf *buf, streamProtocol *obj);
237   
238   int initNetworks(char *data, bool localPort = false);
239   int storeStreamConfig(char *msgStr);
240   void shutdown();
241   //void setBlockMutex(pthread_mutex_t *mutex) { blockMutex = mutex; }
242   
243   virtual ~bridgeStreamer() {}
244};
245
246/**
247 * sageAudioStreamer
248 */
249class sageAudioStreamer : public sageStreamer {
250protected:
251   sageAudioCircBuf* buffer;
252   int bytesPerSample;
253
254   virtual int streamLoop();
255   virtual int reconfigureStreams(char *msgStr);   
256   virtual int connectToRcv(sageToken &tokenBuf, bool localPort = false);
257
258   int sendAudioBlock();
259   int sendControlBlock(int flag, int cond);
260   int streamAudioData();
261   
262public:
263   sageAudioStreamer(streamerConfig &conf, int sampleSize, sageAudioCircBuf* buff);
264
265   sageAudioCircBuf* getBuffer() { return buffer; }
266   virtual void setNwConfig(sageNwConfig &nc);
267   virtual ~sageAudioStreamer();
268protected:
269   sageAudioBlock aBlock;
270   int initializeBlock();
271   
272};
273
274#endif
Note: See TracBrowser for help on using the repository browser.