source: trunk/src/testing/dim/sageGateBase.py @ 4

Revision 4, 27.4 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1############################################################################
2#
3# DIM - A Direct Interaction Manager for SAGE
4# Copyright (C) 2007 Electronic Visualization Laboratory,
5# University of Illinois at Chicago
6#
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions are met:
11#
12#  * Redistributions of source code must retain the above copyright
13#    notice, this list of conditions and the following disclaimer.
14#  * Redistributions in binary form must reproduce the above
15#    copyright notice, this list of conditions and the following disclaimer
16#    in the documentation and/or other materials provided with the distribution.
17#  * Neither the name of the University of Illinois at Chicago nor
18#    the names of its contributors may be used to endorse or promote
19#    products derived from this software without specific prior written permission.
20#
21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
25# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32#
33# Direct questions, comments etc about SAGE UI to www.evl.uic.edu/cavern/forum
34#
35# Author: Ratko Jagodic
36#       
37############################################################################
38
39
40
41
42"""
43        SAGE UI --> SAGE
44        ---------------------------     
45        Register                        1000
46        Execute                         1001
47        Shutdown                        1002
48        Move                            1003
49        Resize                          1004
50        Start Performance               1005
51        Stop Performance                1006
52        BackColor                       1007
53        Bring to Front                  1010
54        App Properties Change           1011
55        App Frame Rate Change           1012
56        Stream Request                  1014
57        Rotate Window                   1018
58        SAGE Shutdown                   1100
59        Add Object                      1200
60        Move Object                     1201
61        Remove Object                   1202
62        Object Message                  1203
63        Show Object                     1204
64        Hide Object                     1205
65       
66
67        SAGE ----> SAGE UI
68        --------------------------
69        Status                          40000
70        App Info                        40001 APPID L R T B SAIL-ID
71        Performance                     40002
72        App Shutdown                    40003
73        Display Info                    40004
74        Z Order Change                  40005
75        App Exec Config                 40006
76        Display Connections             40007
77        Overlay Object Info             40018
78       
79
80        STORE INFORMATION
81"""
82
83
84from threading import Thread, RLock
85import socket, sys, string, os.path, xmlrpclib, time
86import traceback as tb
87from globals import *
88
89
90### GLOBALS ###
91
92TIMEOUT_INTERVAL = 0.5
93BLANK = ' '
94HEADER_ITEM_LEN = 8
95APP_LAUNCHER_PORT = 19010  #the default port, usually retrieved from the sage server though
96SAGE_SERVER_PORT = 8009  #the xmlrpc port of the sage server
97
98
99
100class AppLauncher:
101
102    def __init__(self, launcherId, name, ip, port, appList):
103        self.port = port
104        self.appList = appList
105        self.ip = ip
106        self.launcherId = launcherId
107        self.name = name
108        self.connected = False
109
110
111    def connect(self):
112        if not self.connected:
113            socket.setdefaulttimeout(3)  #set the timeout to 3 seconds so that we dont wait forever
114            self.server = xmlrpclib.ServerProxy("http://" + self.ip + ":" + str(self.port))
115            try:
116                self.server.test() #just use this as a way of testing whether the server is running or not
117                self.connected = True
118            except socket.error:
119                return False
120            except:
121                tb.print_exc()
122                return False
123        return True
124
125
126    def getId(self):
127        return self.launcherId
128
129    def getIP(self):
130        return self.ip
131
132    def getAppList(self):
133        return self.appList
134
135    def setAppList(self, appList):
136        self.appList = appList
137
138    def getPort(self):
139        return self.port
140
141    def getName(self):
142        return self.name
143   
144    def getServerHandle(self):
145        self.connect()
146        return self.server
147
148
149
150
151
152
153####################################################################
154#
155# DESCRIPTION: This is the base class for communication with SAGE.
156#              You need to inherit from this class and override
157#              the "onMessage" method. You can use the registerCallbackFunction
158#              and the hashCallbackFunction to store your callbacks
159#              and then retrieve them from your overridden onMessage.
160#
161# DATE: Aug, 2007
162#
163####################################################################
164
165class SageGateBase:
166       
167    def __init__(self, sageServerHost="sage.sl.startap.net", useAppLauncher=False, forceAppLauncher=None, onDisconnect=None, verbose=False):
168        self.hashCallbackFunction = {}
169        self.threadkilled = False
170        self.connected = False
171        self.sageHost = None
172        self.sagePort = 20001
173        self.forceAppLauncher = forceAppLauncher         # use this appLauncher if specified
174        self.useAppLauncher = useAppLauncher   # should we use the appLauncher at all?
175        self.onDisconnect = onDisconnect       # call this function if disconnected
176        self.verbose = verbose                 # print the output?
177        self.sageServerHost = sageServerHost           # where the sage server is running
178
179        # used for printing out informative messages (on sending and receiving)
180        self.hashOutgoingMessages = {} 
181        self.hashOutgoingMessages[1000] = "Register UI"
182        self.hashOutgoingMessages[1001] = "App Start"
183        self.hashOutgoingMessages[1002] = "App Shutdown"
184        self.hashOutgoingMessages[1003] = "Move"
185        self.hashOutgoingMessages[1004] = "Resize"
186        self.hashOutgoingMessages[1005] = "Request Performance"
187        self.hashOutgoingMessages[1006] = "Stop Performance"
188        self.hashOutgoingMessages[1007] = "Background Color"
189        self.hashOutgoingMessages[1010] = "Bring To Front"
190        self.hashOutgoingMessages[1011] = "App Properties Change"
191        self.hashOutgoingMessages[1012] = "App Frame Rate Change"
192        self.hashOutgoingMessages[1014] = "Stream Request"
193        self.hashOutgoingMessages[1018] = "Rotate Window"
194        self.hashOutgoingMessages[1200] = "Add Object"
195        self.hashOutgoingMessages[1201] = "Move Object"
196        self.hashOutgoingMessages[1202] = "Remove Object"
197        self.hashOutgoingMessages[1203] = "Object Message"
198        self.hashOutgoingMessages[1100] = "Shutdown"
199
200        self.hashIncomingMessages = {}
201        self.hashIncomingMessages[40000] = "SAGE Status Message"
202        self.hashIncomingMessages[40001] = "App Info Return"
203        self.hashIncomingMessages[40002] = "Performance Info"
204        self.hashIncomingMessages[40003] = "App Shutdown"
205        self.hashIncomingMessages[40004] = "Display Info"
206        self.hashIncomingMessages[40005] = "Z Change"
207        self.hashIncomingMessages[40006] = "App Exec Config"
208        self.hashIncomingMessages[40007] = "Display Connections"
209        self.hashIncomingMessages[40018] = "Overlay Object Info"
210
211
212
213    def makemsg(self, dst, code, appcode, size, data):
214        # assemble the message into a string
215        msg = '%8s\0%8s\0%8s\0%s\0' % (dst, code, appcode, data)
216        size = len(msg) + 9
217        msg = '%8s\0%s' % (size, msg)
218
219        # print the output if requested
220        if self.verbose and int(code) < 1200:  # dont print the draw object messages cause there are many of them
221            print "\n\tSEND:  " + self.hashOutgoingMessages[int(code)]
222            print "\t       [" + data + "]\n\n"
223           
224        return msg     
225
226
227    def connectToAppLauncher(self, host=socket.gethostname()):
228        socket.setdefaulttimeout(3)  #set the timeout to 3 seconds so that we dont wait forever
229        if self.forceAppLauncher:    # overriding with the one from the command line
230            self.appLauncher = xmlrpclib.ServerProxy("http://" + self.forceAppLauncher)
231        else:                         # try to find the appropriate app launcher
232            self.appLauncher = self.__getMyAppLauncher(host)
233            if type(self.appLauncher) is type(None):  # in case we couldn't find one, just assume it's running
234                self.appLauncher = xmlrpclib.ServerProxy("http://" + host + ":" + str(APP_LAUNCHER_PORT))
235
236        # now test the connection
237        try:
238            self.appLauncher.listMethods() #just use this as a way of testing whether the server is running or not
239        except socket.error:
240            return False
241        except:
242            pass
243        return True
244
245
246    def getPort(self):
247        return self.sagePort
248
249    def getHost(self):
250        return self.sageHost
251
252
253    ################################################################## 
254    #  Connect To Sage
255    ################################################################## 
256    def connectToSage(self, host=socket.gethostname(), port=20001):
257        ''' returns 1 if succeeded, 0 if no connection to SAGE and -1 if no connection to appLauncher'''
258        if self.connected == True: return 0
259
260        ## create socket
261        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
262        if self.sock is None: return 0
263
264
265        #### RJ 2005-01-21
266        # without this, this thread will never exit because sock.recv is blocking and it will wait until
267        # something comes in. What happens is, "while" checks if threadkilled is false, it is so it goes
268        # in here and blocks until something is received. In the meantime, we want to quit so we set
269        # threadkilled to True but since we are blocked by sock.recv, we wont be able to get to the "while" again
270        self.sock.settimeout(TIMEOUT_INTERVAL)
271       
272
273        if host: self.sageHost = host
274        if port > 0: self.sagePort = port
275        ## connect
276        try:
277            self.sock.connect((host, port))
278        except socket.error:
279            print 'can\'t connect to SAGE', self.sageHost, self.sagePort       
280            return 0
281
282        ##### FOR Thread
283        self.threadkilled = False
284        self.senderLock = RLock()
285        self.t = Thread(target=self.receiverThread, args=())
286        self.t.start()
287        self.connected = True
288
289        # start the overlay sender thread
290        self.overlayMsgLock = RLock()
291        self.overlayMsgQueue = []   # combined overlay messages (list of string messages)
292        self.overlayMsgFreq = 50    # send messages roughly 50 times a sec
293        self.overlaySender = Thread(target=self.overlaySenderThread)
294        self.overlaySenderKilled = False
295        self.overlaySender.start()
296
297        # now connecting to the appLauncher running on the
298        # same machine as SAGE we are connecting to
299        if self.useAppLauncher and not self.connectToAppLauncher(host):
300            print "\n===> Can't connect to the appLauncher"
301            return -1
302        else:
303            print 'connected to SAGE', self.sageHost, self.sagePort
304            return 1
305
306
307
308        ### attempts to find the appLauncher that's running on the same machine as myHost
309        ### (by comparing IP addresses of the sage machine and reported appLaunchers)
310    def __getMyAppLauncher(self, myHost):
311        self.updateLauncherList()  #get a new fresh list of appLaunchers
312       
313        def sameByIP(first, second):
314            try:
315                first = socket.gethostbyname(first)[0]
316                second = socket.gethostbyname(second)[0]
317                if first == second:
318                    return True
319                else:
320                    return False
321            except:
322                return False
323
324        # loop through all the known launchers and try to find the one that matches myHost
325        for launcher in self.launchers.itervalues():
326            try:
327                if myHost == launcher.getIP():
328                    return launcher.getServerHandle()  #returns the server object that we can call functions on
329                elif sameByIP(myHost, launcher.getIP):
330                    return launcher.getServerHandle()
331                else:
332                    return None
333            except:
334                return None
335               
336   
337   
338    ### connects to the sage server and retrieves the list of all app launchers running
339    def updateLauncherList(self):
340        self.launchers = {}
341        sageServer = xmlrpclib.ServerProxy("http://" + self.sageServerHost + ":" + str(SAGE_SERVER_PORT))
342        try:
343            # a hash comes back (key=launcherId, value=appList - that's another hash of appNames and configs)
344            launcherHash = sageServer.GetRegisteredLaunchers()
345        except socket.error:
346            print "no connection to the sage server... can't get the list of appLaunchers"
347        except:
348            tb.print_exc()
349        else:
350            for launcherString, appList in launcherHash.iteritems():
351                (name, launcherId) = launcherString.split(":", 1)
352                (ip, port) = launcherId.split(":", 1)
353                self.launchers[launcherId] = AppLauncher(launcherId, name, ip, port, appList)
354               
355        return self.launchers
356
357
358    ### returns a hash of all the appLaunchers currently running
359    def getLaunchers(self):
360        return self.updateLauncherList()
361   
362
363    # get the applist from the applauncher first and call the appropriate function
364    def getAppList(self):
365        appList = {}
366        try:
367            appList = self.appLauncher.getAppList()
368            self.hashCallbackFunction[ 40000 ](appList)
369        except socket.error:
370            self.hashCallbackFunction[ 40000 ](appList) #return appList  #the server is not running
371        except:
372            tb.print_exc()
373            return False
374
375
376    def isConnected(self):
377        return self.connected
378       
379
380    def disconnectFromSage(self, isSocketError=False):
381        """ isSocketError should be True when we didn't close
382            the connection intentionally but rather the connection
383            broke for some reason. In that case the onDisconnect
384            callback is called.
385        """
386        if self.connected == False: return 0
387
388        self.threadkilled = True
389        self.overlaySenderKilled = True
390        self.connected = False         
391        #self.t.join()
392        self.sock.close()
393        del self.sock
394        print 'disconnected from SAGE', self.sageHost, self.sagePort
395
396        if isSocketError and self.onDisconnect:
397            self.onDisconnect()
398       
399        return 1
400
401
402    def sendmsg(self, data, code, sailId=''):
403        if not self.connected:
404            return 0
405
406        self.senderLock.acquire()
407
408        msg = self.makemsg(sailId, code, '', len(data), data)
409        totalcount = 0
410        try:
411                totalcount = self.sock.send(msg)
412        except socket.error:
413            print 'SageGateBase: socket error on send'
414            totalcount = 0
415            self.disconnectFromSage(isSocketError=True)
416        except Exception:
417            tb.print_exc()
418            totalcount = 0
419
420        self.senderLock.release()
421        return totalcount
422
423
424    ################################################################## 
425    # Register
426    ################################################################## 
427    # 1000 none
428    # 40004 display info format
429    ################################################################## 
430    def registerSage(self):
431        if not self.connected:
432            return 0
433        return self.sendmsg('', 1000)
434
435
436    ################################################################## 
437    # Execute
438    ################################################################## 
439    # 1001 app-name 100 100 (app-name(?))
440    # 40001 app-inst-ID left right top bottom sail-ID
441    ################################################################## 
442    def executeApp(self, appName, configName="default", pos=False, size=False, optionalArgs="", useBridge=False, sageIP=None, sagePort=None):
443        if self.connected == False: return 0
444        if not appName: return 0
445
446        if not sageIP:  sageIP = self.sageHost
447        if not sagePort: sagePort = self.sagePort + 1
448       
449        # try running the app (return -1 if failed for whatever reason)
450        try:
451            res = self.appLauncher.startDefaultApp(appName, sageIP, sagePort, useBridge, configName, pos, size, optionalArgs)
452        except socket.error:
453            return -1
454        else:
455            return res
456
457
458    def executeRemoteApp(self, launcherId, appName, configName="default", pos=False, size=False, optionalArgs="", useBridge=False, sageIP=None, sagePort=None):
459        if self.connected == False: return 0
460        if not appName: return 0
461
462        if not sageIP:  sageIP = self.sageHost
463        if not sagePort: sagePort = self.sagePort + 1
464
465        # try running the app (return -1 if failed for whatever reason)
466        if launcherId in self.launchers:
467            server = self.launchers[launcherId].getServerHandle()
468            try:
469                res = server.startDefaultApp(appName, sageIP, sagePort, useBridge, configName, pos, size, optionalArgs)
470            except socket.error:
471                return -1
472            else:
473                return res
474        else:
475            print "Launcher not found: ", launcherId
476            return -1
477
478
479       
480    ################################################################## 
481    # Shutdown
482    ################################################################## 
483    # 1002 app-instance
484    # 40003 app-Inst-ID (?)
485    ################################################################## 
486    def shutdownApp(self, appId):
487        if self.connected == False: return 0
488
489        data = str(appId)
490        return self.sendmsg(data, 1002)
491
492
493    ################################################################## 
494    # Forceful Shutdown
495    ################################################################## 
496    def forceShutdownApp(self, portNum):
497        if self.connected == False: return 0
498
499        # the portNum is basically the appId in the appLauncher context
500        return self.appLauncher.stopApp(portNum)   
501
502
503    ################################################################## 
504    # Move
505    ################################################################## 
506    # 1003 app-instance dist-X,dist-Y
507    ################################################################## 
508    def moveWindow(self, appId, distX, distY):
509        if self.connected == False: return 0
510
511        #make sure all the coordinates are ints
512        distX = int(distX)
513        distY = int(distY)
514
515        data = str(appId) + BLANK + str(distX) + BLANK + str(distY)
516        return self.sendmsg(data, 1003)
517       
518
519    ##############################################################
520    # Resize
521    # 1004 app-instance lef,right,top,bottom
522    ##################################################################
523    def resizeWindow(self, appId, left=0, right=0, bottom=0, top=0):
524        if self.connected == False: return 0
525        #if not appId: return 0
526
527        #make sure all the coordinates are ints
528        left = int(left)
529        right = int(right)
530        bottom = int(bottom)
531        top = int(top)
532
533        data = str(appId) + BLANK + str(left) + BLANK + str(right) + BLANK + str(bottom) + BLANK + str(top)
534        return self.sendmsg(data, 1004)
535       
536
537    ###########################################################
538    # Performance Information
539    ###########################################################
540    # 1005 app-instance sending-rate
541    # 1006 app-instance         
542    ###########################################################
543    def startPerformance(self, appId, sendingrate=2):
544        if self.connected == False: return 0
545
546        data = "%d %d" % (appId, sendingrate)
547        return self.sendmsg(data, 1005)
548       
549
550    def stopPerformance(self, appId):
551        if self.connected == False: return 0
552
553        data = str(appId) # convert the data to string format
554        return self.sendmsg(data, 1006)
555       
556
557
558    ####################################       
559    # Background Color
560    # 1007 red,green blue       
561    ##################################################################
562    def setBgColor(self, (red, green, blue)=(1, 1, 1)):
563        if self.connected == False: return 0
564
565        data = str(red) + BLANK + str(green) + BLANK + str(blue)
566        return self.sendmsg(data, 1007)
567       
568
569    ####################################       
570    # bring the application window to the top (front)
571    # 1010 app-inst-ID
572    ##################################################################
573    def bringToFront(self, appId):
574        if self.connected == False: return 0
575
576        data = str(appId)
577        return self.sendmsg(data, 1010)
578       
579
580
581    ####################################       
582    # Change App Properties
583    # 1011 appId, fsmIP, fsmPort, appConfigNum 
584    ##################################################################
585    def changeAppProperties(self, appId, newTitle, newTitleColor=(-1, -1, -1), newBorderColor=(-1, -1, -1)):
586        if self.connected == False: return 0
587
588        data = str(appId) + BLANK + str(newTitle)
589        data = data + BLANK + str(newTitleColor[0]) + BLANK + str(newTitleColor[1]) + BLANK + str(newTitleColor[2])
590        data = data + BLANK + str(newBorderColor[0]) + BLANK + str(newBorderColor[1]) + BLANK + str(newBorderColor[2])
591        return self.sendmsg(data, 1011)
592       
593
594    ####################################       
595    # Change App Frame Rate
596    # 1010 appId, fsmIP, fsmPort, appConfigNum 
597    ##################################################################
598    def changeAppFrameRate(self, appId, newFrameRate):
599        if self.connected == False: return 0
600
601        data = str(appId) + BLANK + str(newFrameRate)
602        return self.sendmsg(data, 1012)
603       
604
605
606    ####################################       
607    # Stream Request
608    # 1014 appId, fsmIP, fsmPort
609    ##################################################################
610    def streamApp(self, appId, fsmIP, fsmPort):
611        if self.connected == False: return 0
612
613        data = str(appId) + BLANK + str(fsmIP) + BLANK + str(fsmPort)
614        return self.sendmsg(data, 1014)
615       
616
617
618    ####################################       
619    # Rotate Window
620    # 1018 appId, degree
621    ##################################################################
622    def rotateWindow(self, appId, degree):
623        if self.connected == False: return 0
624
625        data = str(appId) + BLANK + str(degree)
626        return self.sendmsg(data, 1018)
627
628
629
630    ####################################       
631    # Overlay Messages
632    # 1200 - 1205
633    ##################################################################
634    def addOverlay(self, overlayType, x, y, w, h, isGlobal, drawOrder, displayId=0):
635        data = '%s %s %s %s %s %s %s %s' % (overlayType, x, y, w, h, int(isGlobal), drawOrder, displayId)
636        return self.sendmsg(data, 1200)
637
638
639    def moveOverlay(self, id, dx, dy):
640        """ relative movement """
641        data = '%s %s %s' % (id, dx, dy)
642        return self.sendmsg(data, 1201)
643   
644
645    def showOverlay(self, id, doShow):
646        data = '%s %s' % (id, str(int(doShow)))
647        return self.sendmsg(data, 1204)
648
649
650    def sendOverlayMessage(self, id, *data):
651        """ this actually puts the messages in a queue
652            which are then sent at fixed intervals """
653
654        # first assemble the data into a string
655        msg = '%s' % (id)
656        for d in data:   
657            msg += " " + str(d)
658
659        self.overlayMsgLock.acquire()
660        self.overlayMsgQueue.append(msg)
661        self.overlayMsgLock.release()
662
663
664    def __sendMultipleOverlayMessages(self, msg):  # a bunch of messages combined into one
665        return self.sendmsg(msg, 1203)
666
667
668    def removeOverlay(self, id):
669        data = '%s' % (id)
670        return self.sendmsg(data, 1202)
671       
672
673    ####################################       
674    # SAGE App events
675    # 31000 - 31007
676    ##################################################################
677
678    def sendAppEvent(self, eventId, sailId, *data):
679        # first assemble the data into a string
680        msg = ''
681        for d in data:   
682            msg += " " + str(d)
683        self.sendmsg(msg, 31000 + eventId, sailId)
684
685
686    ####################################       
687    # SAGE shutdown
688    # 1100      <none>
689    ##################################################################
690    def shutdownSAGE(self):
691        if self.connected == False: return 0
692        return self.sendmsg('', 1100)
693
694
695
696    ##############
697    #  Overlay Sender Thread
698    #    - Sends combined overlay messages at fixed intervals
699    ##################################################################
700    def overlaySenderThread(self):
701        while not self.overlaySenderKilled and doRun():
702            self.overlayMsgLock.acquire()
703
704            # iterate through the msg queue and assemble the messages into a string
705            msg = ""
706            for m in self.overlayMsgQueue:
707                msg += m + "\n"            # separate messages with \n
708            self.overlayMsgQueue = []      # clear the queue
709            self.overlayMsgLock.release()
710
711            # send the message if there is something to send
712            msg = msg.strip()
713            if msg != "":
714                self.__sendMultipleOverlayMessages(msg)
715               
716            # sleep for a certain time
717            time.sleep(1.0 / self.overlayMsgFreq)
718
719        print "Overlay message sender thread closed"
720       
721
722    ##############
723    #  Receiver Thread
724    #    - receives messages from SAGE in a thread
725    ##################################################################
726    def receiverThread(self):
727
728        while self.threadkilled == False and doRun():   #doesn't work as expected without the sock.settimeout (look below)
729
730            #############################
731            try:
732
733                code = ""
734                incomingMsg = ""
735                msgSize = ""
736
737                # first make sure you read the whole 8 bytes for the size
738                while len(msgSize) < HEADER_ITEM_LEN:
739                    msgSize = self.sock.recv(HEADER_ITEM_LEN)
740                    if len(msgSize) == 0:
741                        self.threadkilled = True
742                        self.overlaySenderKilled = True
743                        self.disconnectFromSage(isSocketError=True)
744                        break
745
746                if self.threadkilled: break
747               
748                # this is the number of bytes that the total message contains
749                msgSize = msgSize.replace('\x00', '')
750                sizeLeft = int(msgSize) - HEADER_ITEM_LEN    # we already read the size so read the rest of the bytes
751
752                # read the rest of the message
753                while len(incomingMsg) < sizeLeft:
754                    incomingMsg = incomingMsg + self.sock.recv(sizeLeft - len(incomingMsg))
755
756                # extract the tokens from the message
757                if len(incomingMsg) > 0:
758                    incomingMsg = incomingMsg.replace('\x00', ' ')
759                    dst = incomingMsg[ 1:9 ].strip()
760                    code = int(incomingMsg[ 10:18 ].strip())
761                    appCode = incomingMsg[ 19:27 ].strip()
762                    data = incomingMsg[ 28: ].strip()
763
764                    # print the message out (except performance info since there are many of them)
765                    if self.verbose and code in self.hashIncomingMessages and code != 40002:
766                        print "\n\tRECEIVED:  " + self.hashIncomingMessages[code]
767                        lines = data.split('\n')
768                        if len(lines) < 2:
769                            print "\t\t   [" + lines[0] + "]\n\n"
770                        else:
771                            for i in range(0, len(lines)):
772                                if i == 0:
773                                    print "\t\t   [" + lines[i]
774                                elif i == len(lines) - 1:
775                                    print "\t\t    " + lines[i] + "]\n\n"
776                                else:
777                                    print "\t\t    " + lines[i]
778
779
780            except socket.timeout:
781                continue
782            except socket.error:
783                print 'SageGateBase: socket error on receive'
784                self.disconnectFromSage(isSocketError=True)
785                continue
786            #except:
787            #   print 'exception: ', sys.exc_info()[0], sys.exc_info()[1]
788            #   break
789            ############################
790
791            if self.threadkilled:
792                break
793
794            # finally, do something with this message (ie call the subclass' message handler)
795            self.onMessage(code, data)
796
797        print "SageGate receiver thread closed"
798       
799
800    def cleanBuffer(self, stBuffer):
801        """ converts all non-printable characters from the buffer to white spaces
802            (so that they can be removed using string.strip() function)
803        """
804        stNewBuffer = ""
805
806        for ch in stBuffer:
807            if (ch in string.printable):
808                stNewBuffer = stNewBuffer + ch
809            else:
810                stNewBuffer = stNewBuffer + " "
811
812        return stNewBuffer
813
814
815
816    def onMessage(self, code, data):
817        """ this is the function that gets called after a message arrives successfully
818            it must be overridden by the subclass
819        """
820        raise NotImplementedError
821
822
823    def registerCallbackFunction(self, msgID, function):
824        self.hashCallbackFunction[ msgID ] = function
825
826
827
828
829
830       
831
832
Note: See TracBrowser for help on using the repository browser.