source: trunk/src/testing/include/sageSync.h @ 4

Revision 4, 16.3 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1/******************************************************************************
2 * SAGE - Scalable Adaptive Graphics Environment
3 *
4 * Module: sageSync.h
5 * Author : Byungil Jeong, Rajvikram Singh,
6 * Description : This file includes classes to synchronize multiple groups
7 *               of synchronization slaves in the SAGE processes and a class
8 *               to synchronize multiple threads in a SAGE process.
9 *
10 * Copyright (C) 2004 Electronic Visualization Laboratory,
11 * University of Illinois at Chicago
12 *
13 * All rights reserved.
14 *
15 * Redistribution and use in source and binary forms, with or without
16 * modification, are permitted provided that the following conditions are met:
17 *
18 *  * Redistributions of source code must retain the above copyright
19 *    notice, this list of conditions and the following disclaimer.
20 *  * Redistributions in binary form must reproduce the above
21 *    copyright notice, this list of conditions and the following disclaimer
22 *    in the documentation and/or other materials provided with the distribution.
23 *  * Neither the name of the University of Illinois at Chicago nor
24 *    the names of its contributors may be used to endorse or promote
25 *    products derived from this software without specific prior written permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
31 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 *
39 * Direct questions, comments etc about SAGE to sage_users@listserv.uic.edu or
40 * http://www.evl.uic.edu/cavern/forum/
41 *
42 *****************************************************************************/
43
44#ifndef _SAGESYNC_H
45#define _SAGESYNC_H
46
47#include "sageBase.h"
48#include <list>
49#include <map>
50#include <bitset>
51
52#define SAGE_SYNC_MSG_LEN  1280 // must be equal to SAGE_EVENT_SIZE
53#define MAX_SYNC_GROUP     100
54#define SYNC_MSG_BUF_LEN   64
55
56#define SAGE_UPDATE_SETUP    1
57#define SAGE_UPDATE_FOLLOW   2
58#define SAGE_UPDATE_FRAME    3
59#define SAGE_UPDATE_AUDIO    4
60
61#define SAGE_CONSTANT_SYNC   1
62#define SAGE_ASAP_SYNC_HARD  2
63#define SAGE_ASAP_SYNC_SOFT  3
64
65#define NORMAL_SYNC          1
66#define SKIP_FRAME           2
67
68#ifndef MSG_DONTWAIT
69#define MSG_DONTWAIT    0
70#endif
71
72
73/**
74 * class syncSlaveData.
75 */
76class syncSlaveData {
77public:
78
79        /**
80         * a syncClient's socket file descriptor
81         */
82   int            clientSockFd;
83
84   /**
85    * another socket for refresh barrier - 2nd phase
86        */
87   int barrierClientSockFd;
88
89   /**
90    * a syncClient's IP address
91        */
92   sockaddr_in      clientAddr;
93
94
95   /**
96    * Sungwon
97    * this will be filled in sageSyncServer::syncServerThread() when the object is created
98    */
99   int SDM; // sungwon
100
101   /**
102    * frame number.
103        */
104   int            frame;
105
106   /**
107    * Constructor.
108    * When a syncClient calls sageSyncClinet::connectToServer(), sageSyncServer::syncServerThread() accepts it<BR>
109    * then the thread creates syncSlaveData object for the client.
110        */
111   syncSlaveData() : frame(0), SDM(-1) {}
112};
113
114//forward declarations
115class sageSyncClient;
116class sageSyncServer;
117
118#define MAX_INTERVAL_ERROR 0.1
119#define SYNC_TIMEOUT       1667   /**< 1sec/60 * 10% = 1667 us */
120
121/**
122 * class syncGroup.
123 */
124class syncGroup {
125protected:
126   int noOfUpdates; /**< how many displays are updated ? */
127   int slaveNum; /**< how many syncSlaves are involved in this sync group */
128   pthread_mutex_t   *syncMsgLock;
129   std::deque<char *> syncMsgQueue;
130
131   int id, policy, curFrame, audioFrame, videoFrame, keyFrame, skipFrame, audioSyncCnt;
132
133   /**
134    * minimum sync signal interval<BR>
135    * Even though a sync group is ready to proceed to the next frame,<BR>
136    * the sync master waits until the minimum interval is reached.
137    */
138   double interval;
139
140   double timeError;
141   bool syncEnd;
142
143   /**
144    * when sageSyncServer::addSyncGroup() is called, the sync server object calling addSyncGroup() is assigned.
145    */
146   sageSyncServer *syncServer;
147   pthread_t threadID;
148   sageTimer timer;
149   pthread_mutex_t  syncLock;
150   pthread_cond_t   resumeSync; /**< condition variable */
151
152   bool holdSync; /**< if true, then it will make checkHold() to wait on condition resumeSync */
153   bool waitingInterval; /**< whether syncGroup is waiting to reach the interval */
154   bool waitForKeyFrame;
155
156public:
157   /**
158    * Constructor.
159        */
160   syncGroup() : id(0), noOfUpdates(0), curFrame(0), slaveNum(0), syncEnd(false), interval(0.0),
161      policy(SAGE_ASAP_SYNC_SOFT), videoFrame(0), audioFrame(-1), keyFrame(100), holdSync(false),
162      timeError(0.0), waitingInterval(false), waitForKeyFrame(false), skipFrame(0), audioSyncCnt(0)
163   { syncMsgQueue.clear(); }
164
165   int init(int startFrame, int _policy_, int groupID, int frameRate = 1, int sNum = 1); /**< syncGroup::init() */
166   int setFrameRate(float frameRate);
167
168   /**
169    * TRUE, if this syncGroup reached the interval
170    */
171   bool checkInterval();
172
173   /**
174    * IMPORTANT<BR>
175    * This function is called by sageSyncServer::manageUpdate()<BR>
176    *
177    * HARD_SYNC implemented<BR>
178    * A syncGroup maintains the number of syncSlaves. Whenever slave sends an update, noOfUpdates increases<BR>
179    * HARD_SYNC ensures every display for an app can maintain same frame number<BR>
180    * by updated frame can be propagated only when noOfUpdates == number of slaves<BR>
181    * i.e. all syncSlaves for an application should sends udpate
182    *
183    * @return 0 or NORMAL_SYNC
184    */
185   int processUpdate(char *data);
186
187   int enqueSyncMsg(char *msg); /**< enqueue sync message<BR>thread safe*/
188   char* dequeSyncMsg(); /**< dequeue sync message<BR>thread sage*/
189   int getSyncID() { return id; }
190   bool checkTimeOut();
191   void blockSync(); /**< sets holdSync = true */
192   void unblockSync(); /**< sets holdSync = false */
193
194   /**
195    * waits on condition resumeSync if holdSync is true
196    */
197   void checkHold();
198   inline bool isWaiting() { return waitingInterval; }
199
200   friend class sageSyncServer;
201   friend class sageSyncClient;
202};
203
204/**
205 * class sageSyncBBServer
206 */
207class sageSyncBBServer {
208protected:
209   int serverSockFd; /**< syncServer's socket */
210   int maxSyncGroupID;
211   int maxSlaveSockFd;
212   bool syncEnd;
213
214   /**
215    * -1 old sync
216    *  0 no sync
217    *  1 data sync only
218    *  2 two phase
219    *  3 one phase
220    */
221   int syncLevel;
222
223   sockaddr_in serverAddr;
224   fd_set slaveFds;
225
226   pthread_t syncThreadID, syncServerThreadID;
227
228   std::vector<syncSlaveData> syncSlaves; /**< a list of sync slaves */
229
230   /**
231    * a list of sync slaves
232    * key is SDM id
233    * value is its data(mainly socket)
234    */
235   std::map<int,syncSlaveData> syncSlavesMap;
236
237   sockaddr_in barrierServerAddr;
238   int barrierServerSockFd;
239   int barrierPort;
240
241   fd_set barrierSlaveFds;
242   int maxBarrierSlaveSockFd;
243
244   pthread_t syncBarrierServerThreadID;
245   static void* syncBarrierServerThread(void *);
246
247   /**
248    * waits syncClient's connection by calling accept() syscall<br>
249    * if one connects, then it creates syncSlaveData object for it, and adds it to the vector syncSlaves
250    */
251   static void* syncServerThread(void*);
252
253   /**
254    * This thread is created whenever addSyncGroup() is called and under some condition.<BR>
255    * 1. when the policy is SAGE_CONSTANT_SYNC<BR>
256    * 2. 1 is not true and asapSyncGroupNum == 0<BR>
257    * <BR>
258    * if syncGroup's policy is SAGE_CONSTANT_SYNC, then this thread periodicaly sends sync(sendSync()) signal to its syncGroup.<BR>
259    * Otherwise it sends sync signal(by calling manageUpdate()) only when syncClient asks ??<BR>
260    * The period is defined in the syncGroup.interval
261    */
262   static void* mainLoopThread(void*);
263
264   sageTimer timer;
265
266public:
267        /**
268         *  Constructor. Called in sageDisplayManager::sageDisplayManager() and sail::init()
269         */
270   sageSyncBBServer(int syncLevel = 2);
271   ~sageSyncBBServer();
272
273   /**
274    * creates syncServer socket, and starts syncServerThread
275    */
276   int init(int port);
277
278   /**
279    * create barrierServerSockFd
280    */
281   int initBarrier(int barrierPort);
282
283   int totalRcvNum;
284   int refreshInterval;
285   int syncMasterPollingInterval;
286   int startManagerThread(int totalRcvNum, int refreshInterval, int syncMasterPollingInterval);
287
288   void killAllClients();  /**< closes all the open client sockets */
289   int checkTimeOut();
290};
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310/**
311 * class sageSyncServer. It is created when syncMaster == true
312 */
313class sageSyncServer {
314protected:
315   int serverSockFd; /**< syncServer's socket */
316   int asapSyncGroupNum, maxSyncGroupID;
317   int maxSlaveSockFd;
318   bool syncEnd;
319
320   sockaddr_in serverAddr;
321   fd_set slaveFds;
322
323   pthread_t   syncThreadID, syncServerThreadID;
324   pthread_t   groupThreadID;
325
326   std::vector<syncSlaveData> syncSlaves; /**< a list of sync slaves */
327
328   /**
329    * key - sdm id
330        */
331   std::map<int,syncSlaveData> syncSlavesMap;
332
333
334   //syncGroup *syncGroupArray[MAX_SYNC_GROUP]; /**< an array of sync groups */
335   std::vector<syncGroup *> syncGroupArray; /**< an array of sync groups */
336
337   /**
338    * waits syncClient's connection by calling accept() syscall<br>
339    * if one connects, then it creates syncSlaveData object for it, and adds it to the vector syncSlaves
340    */
341   static void* syncServerThread(void*);
342
343   /**
344    * This thread is created whenever addSyncGroup() is called and under some condition.<BR>
345    * 1. when the policy is SAGE_CONSTANT_SYNC<BR>
346    * 2. 1 is not true and asapSyncGroupNum == 0<BR>
347    * <BR>
348    * if syncGroup's policy is SAGE_CONSTANT_SYNC, then this thread periodicaly sends sync(sendSync()) signal to its syncGroup.<BR>
349    * Otherwise it sends sync signal(by calling manageUpdate()) only when syncClient asks ??<BR>
350    * The period is defined in the syncGroup.interval
351    */
352   static void* managerThread(void*);
353   static void* managerConstantThread(void*);
354
355   /**
356    * It seems that each syncGroup has corresponding managerThread.<BR>
357    *
358    * This function waits this syncServer object's syncSlaves' update from socket connection<BR>
359    * then, extracts and identifies groupID from a slave's message<BR>
360    * then, calls corresponding syncGroup's syncGroup::processUpdate()<BR>
361    * If syncGroup's processUpdate() returns NORMAL_SYNC<BR>
362    * then, syncGroup::checkInterval(), syncGroup::checkHold(), syndSync() could be called.
363    *
364    * respond with sync signal ( sendSync() ) when a syncClient sends sageSyncClient::sendSlaveUpdate().
365    */
366   int manageUpdate();
367
368   /**
369    * sends sync message (group's current frame: grp->curFrame) to the syncSlaves.<BR>
370    * sync message includes grp->id, grp->curFrame, dataLen, cmd<BR>
371    * if grp->syncGroup::dequeSyncMsg() returns data then sends the data as well
372    */
373   int sendSync(syncGroup *grp, int cmd = NORMAL_SYNC);
374   sageTimer timer;
375
376        syncGroup* findSyncGroup(int id, int& index);
377
378public:
379        /**
380         *  Constructor. Called in sageDisplayManager::sageDisplayManager() and sail::init()
381         */
382   sageSyncServer();
383   ~sageSyncServer();
384
385   /**
386    * creates syncServer socket, and starts syncServerThread
387    */
388   int init(int port);
389
390   /**
391    * Adds syncGroup into syncGroupArray and creates corresponding managerThread() if group's policy is SAGE_CONSTANT_SYNC<BR>
392    * or policy is not SAGE_CONSTANT_SYNC and asapSyncGroupNum == 0<BR>
393    * triggered by sageDisplayManager::initStreams() and sail
394    */
395   int addSyncGroup(syncGroup *grp);
396
397   int removeSyncGroup(int id);
398
399   void killAllClients();  /**< closes all the open client sockets */
400   int checkTimeOut();
401}; // End of sageSyncServer
402
403class sageCircBufSingle;
404
405/**
406 * class syncMsgStruct
407 */
408class syncMsgStruct {
409public:
410   int frameID;
411   char *data;
412
413   syncMsgStruct() : frameID(0), data(NULL) {}
414   syncMsgStruct(int size) : frameID(0) { data = new char[size]; }
415   ~syncMsgStruct() { if (data) delete [] data; }
416};
417
418/**
419 * class sageSyncClient
420 */
421class sageSyncClient {
422private:
423        int syncLevel;
424
425   int clientSockFd;
426
427   int barrierClientSockFd;
428   int refreshBarrierDeltaT;
429
430   int maxGroupID;
431   //sageCircBufSingle *syncMsgBuf[MAX_SYNC_GROUP];
432   std::vector<sageCircBufSingle *> syncMsgBuf;
433   bool syncEnd;
434   pthread_t syncThreadID;
435
436   /**
437    * keeps trying to receive message from syncServer by calling readSyncMsg()<BR>
438    * started by addSyncGroup()
439        */
440   static void* syncClientThread(void *args);
441
442   /**
443    * receive messages from sageSyncServer::sendSync()<BR>
444    * receives message(syncgroupID, frameNum, dataLen) from syncServer by calling sage::recv() and reads sync data.<BR>
445    * This function is continuously called by syncClientThread().
446        */
447   int readSyncMsg();
448
449        sageCircBufSingle* findSyncGroup(int id, int& index);
450
451public:
452   /**
453        * creates socket(clientSockFd) and sets TCP_NODELAY, SO_OOBINLINE socket option
454        */
455   sageSyncClient(int syncLevel = -1);
456
457   /**
458    * shuts down the socket, and calls pthread_join
459        */
460   ~sageSyncClient();
461
462   /**
463    * connects to a syncServer
464        *
465        * @param syncServerIP a char *
466        * @param port an integer
467        * @return -1 on error, 0 otherwise
468        */
469   int connectToServer(const char *serverIP, int port, int SDMnum=-1);
470
471   int connectToBarrierServer(const char *serverIP, int port, int SDMnum=-1);
472
473
474   /**
475    * creates sageCircBufSingle object, assigns it into syncMsgBuf array<BR>
476    * starts syncClientThread() if no other syncgroup was added.<BR>
477    * So, syncClientThread() is started when the first syncgroup is added.
478        *
479        * @param id an integer, sync group id
480        * @return -1 on error, 0 otherwise
481        */
482   int addSyncGroup(int id);
483
484
485   /**
486    * removes element(id) from syncMsgBuf array
487        *
488        * @param id an integer
489        * @return -1 on error, 0 otherwise
490        */
491   int removeSyncGroup(int id);
492
493
494   /**
495    * sends update msg to the syncServer.<BR>
496    * sageSyncServer::manageUpdate() will respond to this.<BR>
497    *
498    * Triggered by below events and message<BR>
499    * EVENT_READ_BLOCK (pixelDownloader::fetchSageBlocks()) <BR>
500    * EVENT_SYNC_MESSAGE (sageDisplayManager::processSync() -> pixelDownloader::fetchSageBlocks()) <BR>
501    * RCV_UPDATE_DISPLAY (sageDisplayManager::updateDisplay() -> pixelDownloader::fetchSageBlocks()) <BR>
502    * <BR>
503    * in the sageAudioManager::initStream()<BR>
504    * in the sageAudioCircBuf::updateReadIndex() <- (sageAudioReceiver::readData(), sageAudioStreamer::streamLoop())<BR>
505    *
506        *
507        * @param frame an integer frame number
508        * @param id an integer, default 0
509        * @param rcvNum an integer, default 0
510        * @param type an integer, default SAGE_UPDATE_FOLLOW
511        * @return -1 on error, 0 otherwise
512        */
513   int sendSlaveUpdate(int frame, int id = 0, int rcvNum = 0, int type = SAGE_UPDATE_FOLLOW, int nodeID=-1);
514
515   /**
516    * when a PDL received new frame (END_FRAME flag) it reports to the syncMaster before doing swapMontage()
517        */
518   int sendSlaveUpdateToBBS(int frame, int id = 0, int rcvNum = 0, int SDMnum = -1, int delayCompenLatency=0);
519
520   /** THE FINAL CASE , sageSync_theFinal.cpp */
521   int sendRefreshBarrier(int nodeID);
522   int recvRefreshBarrier(bool nonblock=false); // block or nonblock
523   /** end THE FINAL CASE */
524
525   /**
526    * receives sync message without additional data
527        *
528        * @param msg a char *
529        * @return -1 on error, 0 otherwise
530        */
531   int waitForSync(char* msg, int len = -1);
532
533   /**
534    * It uses MSG_PEEK to find out the length of sync message
535    *
536    * @param void
537    * @return -1 on error, message size otherwise
538    */
539   int waitForSyncPeek();
540
541   /**
542    * receives sync message with group ID and additional data
543        *
544        * @param id an integer
545        * @return syncMsg, NULL on error
546        */
547   syncMsgStruct* waitForSync(int id);
548
549   /**
550    * receives sync message with additional data
551        *
552        * @param data a char *
553        * @return -1 on error, 0 otherwise
554        */
555   int waitForSyncData(char* &data);
556
557
558   /**
559    * ch
560        */
561   int checkSync(char* &msg);
562}; // End of tvSyncClient
563
564
565/**
566 * For synchronization among master and slave threads
567 */
568class sageThreadSync {
569private:
570   pthread_mutex_t   *slaveLock;
571   pthread_mutex_t   *masterLock;
572   int slaveNum;
573
574public:
575   sageThreadSync(int sNum = 1);
576   ~sageThreadSync();
577
578   int synchronize(int rank = 0);
579};
580
581#endif
Note: See TracBrowser for help on using the repository browser.