source: trunk/src/testing/ui/connectionManager/ConnectionManager.py @ 4

Revision 4, 49.3 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1#!/usr/bin/env python
2
3############################################################################
4#
5# SAGE UI Users Server - A server that handles users, fsManagers and chat for SAGE
6#
7# Copyright (C) 2005 Electronic Visualization Laboratory,
8# University of Illinois at Chicago
9#
10# All rights reserved.
11#
12# Redistribution and use in source and binary forms, with or without
13# modification, are permitted provided that the following conditions are met:
14#
15#  * Redistributions of source code must retain the above copyright
16#    notice, this list of conditions and the following disclaimer.
17#  * Redistributions in binary form must reproduce the above
18#    copyright notice, this list of conditions and the following disclaimer
19#    in the documentation and/or other materials provided with the distribution.
20#  * Neither the name of the University of Illinois at Chicago nor
21#    the names of its contributors may be used to endorse or promote
22#    products derived from this software without specific prior written permission.
23#
24# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
28# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35#
36# Direct questions, comments etc about SAGE UI to www.evl.uic.edu/cavern/forum
37#
38# Author: Ratko Jagodic
39#       
40############################################################################
41
42
43from threading import *
44import string, socket, os, os.path, sys, time, SimpleXMLRPCServer, xmlrpclib
45
46
47# some global constants
48
49CHUNK_SIZE = 2048          # we use fixed size messages... in bytes
50SOCKET_TIMEOUT = 1         # in seconds
51MSGLEN = CHUNK_SIZE
52SEPARATOR = '\0'         # null character for separation
53NEW_STYLE_VER = "2.82"   # below this UIs don't support system IP/port for machines
54
55USER_SERVER_PORT = 15558  # port for the server to listen for User connections
56SAGE_SERVER_PORT = 15557   # port for the server to listen for SAGE connections
57MAX_USER_CONNECTIONS = 100  # maximum number of simultaneous User connections
58MAX_SAGE_CONNECTIONS = 50  # maximum number of simultaneous SAGE connections
59
60PRINT_TO_SCREEN = False  # for debugging
61                        #(prints our messages onto the screen as well as the log file)
62
63socket.setdefaulttimeout(SOCKET_TIMEOUT)
64
65
66messageNames = {}
67messageNames[ 2000 ] = "Register"
68messageNames[ 2001 ] = "Incoming Chat Message"
69messageNames[ 2002 ] = "Check Username"
70messageNames[ 2003 ] = "Unregister from room"
71messageNames[ 30000 ] = "Machine Status"
72messageNames[ 30001 ] = "User Status"
73messageNames[ 30002 ] = "Outgoing Chat Message"
74messageNames[ 30003 ] = "Username OK"
75messageNames[ 31000 ] = "Info Message"
76
77
78
79
80#  MESSAGES:
81#
82#
83#  Notes:
84#  --------------------
85#  All messages are sent in this format (as strings):
86#  code
87#  data
88#
89#  For example:
90#  "2002"
91#  "Ratko"
92#
93#
94#  All machines are always keyed by machineId that the users should connect to to control SAGE
95#
96#
97#  <<< UI  --->  SERVER >>>
98#  CODE    FORMAT                 MESSAGE
99#  ----------------------------------------------------------------
100#  2000    username                register this user with the Users server
101#          info
102#          machine_id              the machines the user is connected to
103#
104#  2001    from={username}         send a chat message to one person or to all
105#          to={"all" | id}         id = specific to users connected to a sepcific SAGE machine
106#          message
107#
108#  2002    username                check username for duplicates
109#
110#  2003    username                unregister this username from the machine specified
111#          machine_id
112#
113#         
114#  <<< SERVER  --->  UI >>>
115#  CODE    FORMAT                 MESSAGE
116#  ----------------------------------------------------------------
117#
118#  30000   machine_name            a status list of all the MACHINES registered with the server
119#          ip
120#          port
121#          machineId
122#          alive={"1" | "0"}       if the machine is alive, send 1, otherwise send 0
123#          displayInfo             in this format: "Xtiles Ytiles tileWidth tileHeight"
124#          systemIP systemPort     older UIs don't support this so not sent to them
125#          "\x00"               < --- a break between different blocks of data
126#          machine_name
127#          ip
128#          port
129#          machineId
130#          alive={"1" | "0"}       if the machine is alive, send 1, otherwise send 0
131#          displayInfo             in this format: "Xtiles Ytiles tileWidth tileHeight"
132#          systemIP systemPort     older UIs don't support this so not sent to them
133#          "\x00"
134#          ....
135#
136#
137#  30001   username                receive a list of USERS that are connected and their info
138#          info
139#          machine_id              the machines the user is connected to
140#          machine_id
141#          "\x00"               < --- a break between different users' info
142#          username
143#          info
144#          machine_id
145#          "\x00"
146#          ....
147#
148#  30002   from={username}         receive a chat message from one person,
149#          to={"all" | id}        id = specific to users connected to a specific SAGE machine
150#          message
151#
152#  30003   "1" | "0"              1=username OK, 0=username already in use
153#
154#  31000   message                an informative message... just any string
155#
156#
157#
158#  <<< SAGE  --->  SERVER >>>
159#  CODE    FORMAT                 MESSAGE
160#  ----------------------------------------------------------------
161
162#  100     machine_name version   "i am alive" message from SAGE
163#          pubIP pubPort          < --- machine ip and port that SAGE UIs should connect to
164#          sysIP sysPort          < --- machine ip and port that the apps should use for streaming
165#          displayInfo
166
167
168
169# log the actions in a file
170os.chdir(sys.path[0])  #change to the folder where the script is running
171logFile = open("/tmp/log.txt", "a")
172logFileLock = RLock()  #since multiple threads are writing to the same file
173
174def WriteToFile(text):
175    logFileLock.acquire()
176    logFile.write( text )
177    if PRINT_TO_SCREEN:
178        print text
179    logFile.flush()  #flush the buffer to a file (especially needed if running as a daemon)
180    logFileLock.release()
181
182
183# record some stats to a file
184statsFile = open("/tmp/stats.txt", "a")
185statsFileLock = RLock()
186
187def WriteStats(text):
188    statsFileLock.acquire()
189    statsFile.write( text )
190    if PRINT_TO_SCREEN:
191        print text
192    statsFile.flush()  #flush the buffer to a file (especially needed if running as a daemon)
193    statsFileLock.release()
194
195
196
197
198############################################################################
199#
200#  CLASS: SingleMachine
201
202#  DESCRIPTION: This is basically a thread that deals with the connection
203#               to ONLY ONE SAGE. It's created by the Server class upon
204#               connection from sage and it receives a clientsocket that
205#               was internally created by the Server. It then uses this
206#               socket for all the communication. One of these exists for
207#               every sage that is connected to this Server and the Server class
208#               keeps these SingleMachine objects in a list.
209#
210#  DATE:        May, 2005
211#
212############################################################################
213
214class SingleMachine(Thread):
215   
216    def __init__(self, socket, address, server):
217        Thread.__init__(self)
218        self.socket = socket
219        self.socket.settimeout(SOCKET_TIMEOUT)
220        self.server = server
221        self.threadKilled = False
222        self.name = ""
223        self.ip = address[0]  # these 2 are use for control communication
224        self.port = ""
225        self.systemIP = ""   # these 2 are used for streaming the data
226        self.systemPort = ""
227        self.sageVersion = None   #version of SAGE
228        self.oldStyleSAGE = False
229        self.machineId = self.ip+":"+str(self.port)
230        self.displayInfo = ""
231        self.receivedRegisterMessage = False   # in case SAGE connects but never registers the thread will be running forever
232        self.lastReportTime = None  #this will be the time of the last report from fsManager
233        self.maxReportInterval = 6.0   #allow up to 6 seconds between the sage reports and then consider the fsManager disconnected
234
235        t = Timer(10, self.WaitForRegisterMessage)
236        t.start()
237
238
239    def WaitForRegisterMessage(self):
240        if not self.receivedRegisterMessage:
241            self.Stop(False)
242
243
244        # stops the thread, breaks the connection and unregisters the machine from this server
245    def Stop(self, unregister=True):
246        # record the stats
247        stats = "\nSAGE disconnected: "+self.name+" "+str(self.machineId)+" "+str(self.displayInfo)+" "+str(time.asctime())
248        WriteStats(stats)
249       
250        WriteToFile( "\n*** Connection closed with SAGE: \"" + self.name + "\"  <" + time.asctime() + ">")
251        self.threadKilled = True
252        if unregister:
253            self.server.UnregisterMachine(self.GetId())
254        self.socket.close()
255   
256
257    #-----------------------------------------------
258    # data access methods
259    #-----------------------------------------------
260
261    def GetIP(self):
262        return self.ip
263
264    def GetName(self):
265        return self.name
266
267    def GetSystemIP(self):
268        return self.systemIP
269
270    def GetSystemPort(self):
271        return self.systemPort
272
273    def GetPort(self):
274        return self.port
275   
276    def IsAlive(self):
277        return not self.threadKilled
278
279    def GetDisplayInfo(self):
280        return self.displayInfo
281
282    def GetId(self):
283        return self.machineId
284
285
286#-------------------------------------------------------
287#  RECEIVING
288#-------------------------------------------------------
289
290        # runs in a thread... started in the Server class
291    def run(self):       
292        while not self.threadKilled:
293            try:             
294                # this first receive will receive the length of the data
295                cleanMsg = self.GetMessage()
296                if not cleanMsg:
297                    break  #connection was closed
298                msgHeader = string.split( cleanMsg, "\n", 1 )
299                if len( msgHeader ) > 0:
300                    code = int(msgHeader[0])
301                    data = msgHeader[1]
302                else:
303                    WriteToFile( "\n ERROR: message could not be split correctly into (code, data)")
304                    break
305
306               
307                # call the appropriate function to handle the incoming data
308                if code == 100:
309                    self.OnRegisterMachineMessage(data)
310                           
311           
312            except socket.timeout:
313                # if the fsManager hasn't reported in a while, assume it's dead and quit
314                if self.lastReportTime:
315                    if time.time() - self.lastReportTime > self.maxReportInterval:
316                        WriteToFile( "\nERROR: Time expired with SAGE connection " + self.name)
317                        self.Stop()
318                continue
319            except socket.error:
320                WriteToFile( "\nERROR: UsersServer: socket error in connection with SAGE: " + self.name)
321                self.Stop()
322                break
323       
324        # outside of while loop
325        self.threadKilled = True
326        WriteToFile("\nThread from "+self.name +" died")
327               
328                           
329    def GetMessage(self, MSG_SIZE=CHUNK_SIZE):
330        # this first receive will receive the length of the data
331        msg = self.socket.recv(MSG_SIZE)
332        if len( msg ) < 2:
333            WriteToFile("\nERROR: message length < 2. Closing the connection with SAGE "+self.name)
334            self.Stop()   #connection was closed
335            return False
336       
337
338        # since recv is not guaranteed to receive exactly CHUNK_SIZE bytes
339        # so keep receiving until it gets the whole chunk
340        while len( msg ) < MSG_SIZE:
341            msg = msg + self.socket.recv(MSG_SIZE - len( msg))
342
343
344        # strip all the empty spaces from the message
345        cleanMsg = self.CleanBuffer( msg )
346        cleanMsg = string.strip( cleanMsg )
347        return cleanMsg
348       
349   
350   
351    # converts all non-printable characters from the buffer to white spaces
352    # (so that they can be removed using string.strip() function)
353    def CleanBuffer( self, stBuffer ):
354        stNewBuffer = ""
355           
356        for ch in stBuffer:
357            if ( ch in string.printable ):
358                stNewBuffer = stNewBuffer + ch
359            else:
360                stNewBuffer = stNewBuffer + " "
361                           
362        return stNewBuffer
363
364
365#-------------------------------------------------------
366#  MESSAGE CALLBACKS
367#-------------------------------------------------------
368
369    def OnRegisterMachineMessage(self, data):
370        """ there are two different fsManagers so handle them differently """
371        if self.receivedRegisterMessage:
372            # record the time when it was reported
373            self.lastReportTime = time.time()
374            return
375       
376        self.receivedRegisterMessage = True
377        tokens = string.split(data, "\n", 3)
378        if len(tokens[0].split()) == 1: # based on this we will know which fsManger is this
379            self.name = tokens[0]
380            self.ip = tokens[1]
381            self.port = int(tokens[2])
382            self.systemIP = self.ip   #make them the same as public if not given
383            self.systemPort = self.port
384            self.oldStyleSAGE = True
385        else:  # new style SAGE includes the system port/ip as well as the sage version
386            (self.name, self.sageVersion) = tokens[0].split()
387            sys = tokens[1].split()
388            self.systemIP = sys[0].strip()
389            self.systemPort = int(sys[1].strip())
390
391            pub = tokens[2].split()
392            self.ip = pub[0].strip()
393            self.port = int(pub[1].strip())
394           
395        self.machineId = self.ip+":"+str(self.port)
396        self.displayInfo = tokens[3]
397        self.server.RegisterMachine(self)
398
399        # record the stats
400        stats = "\nSAGE connected: "+self.name+" "+str(self.machineId)+" "+str(self.displayInfo)+" "+str(time.asctime())
401        WriteStats(stats)
402       
403
404
405
406
407
408
409############################################################################
410#
411#  CLASS: SingleUser
412
413#  DESCRIPTION: This is basically a thread that deals with the connection
414#               to ONLY ONE SAGE UI. It's created by the Server class upon
415#               connection from a user and it receives a clientsocket that
416#               was internally created by the Server. It then uses this
417#               socket for all the communication. One of these exists for
418#               every user that is connected to this Server and the Server class
419#               keeps these SingleUser objects in a list.
420#
421#  DATE:        May, 2005
422#
423############################################################################
424
425class SingleUser(Thread):
426   
427    def __init__(self, socket, address, server):
428        Thread.__init__(self)
429        self.socket = socket
430        self.socket.settimeout(SOCKET_TIMEOUT)
431        self.server = server
432        self.threadKilled = False
433        self.username = ""
434        self.info = ""
435        self.machineList = []   # the SAGE machines that this user is connected to ( a list of machineIds )
436        self.ip = address[0]
437        self.registered = False
438        self.ui_version = "2.0"  # default
439        self.newStyle = False
440       
441        self.messageCallback = {}
442        self.messageCallback[ 2000 ] = self.OnRegister
443        self.messageCallback[ 2001 ] = self.OnChatMessage
444        self.messageCallback[ 2002 ] = self.OnCheckUsername
445        self.messageCallback[ 2003 ] = self.OnUnregisterUser
446
447        # send the first reply message with machine status
448        self.server.OnConnectUser(self)
449        self.stopped = False
450
451
452    def Stop(self, unregister=True):
453        self.stopped = True
454        # record the stats
455        stats = "\nUser disconnected: "+self.GetName()+" "+str(self.ip)+" "+str(time.asctime())
456        WriteStats(stats)
457        self.threadKilled = True
458        WriteToFile( "\n*** Connection closed with user: \"" + self.username + "\"  <" + time.asctime() + ">")
459        if unregister and self.registered:
460            self.server.UnregisterUser(self, self.username)
461        self.server.OnDisconnectUser(self)
462        #self.threadKilled = True
463        self.socket.close()
464   
465
466    def GetInfo(self):
467        return self.info
468
469    def GetName(self):
470        return self.username
471
472    def GetMachines(self):
473        return self.machineList
474
475    def IsNewStyle(self):
476        return self.newStyle
477
478   
479#-------------------------------------------------------
480#  RECEIVING
481#-------------------------------------------------------
482
483        # runs in a thread... started in the Server class
484    def run(self):       
485        while not self.threadKilled:
486            try:
487                # this first receive will receive the length of the data
488                cleanMsg = self.GetMessage()
489                if not cleanMsg:
490                    break  #connection was closed
491                msgHeader = string.split( cleanMsg, "\n", 1 )
492                if len( msgHeader ) > 0:
493                    code = int(msgHeader[0])
494                    data = msgHeader[1]
495                else:
496                    WriteToFile( "\n ERROR: message could not be split correctly into (code, data)")
497                    break
498
499                # print what we received
500                if messageNames.has_key(code):
501                    WriteToFile( "RECEIVED: \"" + messageNames[code] + "\" from " + self.username + "   (" + str(self.ip) + ")")
502               
503                # call the appropriate function to handle the incoming data
504                if self.messageCallback.has_key( code ):
505                    self.messageCallback[ code ](data)
506                else:
507                    WriteToFile("\nERROR: Message code " + str(code) + " unrecognized")
508                           
509           
510            except socket.timeout:
511                continue
512            except socket.error:
513                WriteToFile( "\nERROR: UsersServer: socket error in connection with: " + self.username)
514                self.Stop()
515                break
516 
517        if self.stopped == False:
518            WriteToFile("Stopping the thread")
519            self.Stop()
520        WriteToFile("Thread from " + self.username + " has died")
521                           
522    def GetMessage(self, clean=True, MSG_SIZE=CHUNK_SIZE):
523        # this first receive will receive the length of the data
524        msg = self.socket.recv(MSG_SIZE)
525        if len( msg ) < 2:
526            self.Stop()   #connection was closed
527            return False
528                   
529
530        # since recv is not guaranteed to receive exactly CHUNK_SIZE bytes
531        # so keep receiving until it gets the whole chunk
532        while len( msg ) < MSG_SIZE:
533            msg = msg + self.socket.recv(MSG_SIZE - len( msg))
534
535        if clean:
536            # strip all the empty spaces from the message
537            cleanMsg = self.CleanBuffer( msg )
538            cleanMsg = string.strip( cleanMsg )
539            return cleanMsg
540        else:
541            return msg
542
543   
544   
545    # converts all non-printable characters from the buffer to white spaces
546    # (so that they can be removed using string.strip() function)
547    def CleanBuffer( self, stBuffer ):
548            stNewBuffer = ""
549
550            for ch in stBuffer:
551                if ch == '\0':   #this is our message separator so handle it separately
552                    stNewBuffer += ch
553                elif ch in string.printable:
554                    stNewBuffer += ch
555                else:
556                    stNewBuffer += " "
557            return stNewBuffer
558   
559
560
561#-------------------------------------------------------
562#  SENDING
563#-------------------------------------------------------
564
565        # make the message with the right code and send it
566    def MakeMsg(self, code, data):
567        msg = '%8s\n%s' %(code,data)
568        WriteToFile( "SEND: \"" + messageNames[code] + "\" to " + self.username)
569        msg = msg + ' ' * (MSGLEN-len(msg))
570        self.Send(msg)
571
572        # just performs the send operation
573    def Send(self, msg):
574        try:
575            self.socket.send(msg)
576        except socket.error:
577            WriteToFile( "\nERROR: UsersServer: Could not send message: socket error with: "+self.username )
578            self.socket.close()
579            #self.Stop()
580
581            # record the stats
582            stats = "\nUser disconnected: "+self.GetName()+" "+str(self.ip)+" "+str(time.asctime())
583            WriteStats(stats)
584            #self.Stop()
585            # the run() method will catch this as well and it will handle the cleanup
586
587        # these functions are called by the main Server class in order to send
588        # messages to all the users via their own sockets used in this SingleUser class
589    def SendChatMessage(self, message):
590        self.MakeMsg(30002, message)
591
592
593        # form a message and send the users status
594    def SendUsersStatusMessage(self, statusList):
595        message = ""
596
597        # make the message out of the machine status list
598        for userInfo in statusList:
599            if message != "":  message += SEPARATOR   #add the separator before adding each new machine info   
600            for infoItem in userInfo:
601                message += str(infoItem) + "\n"
602
603        self.MakeMsg(30001, message)
604
605
606        # form a message and send the machine status
607    def SendMachinesStatusMessage(self, statusList):
608        message = ""
609
610        # make the message out of the machine status list
611        for machineInfo in statusList:
612            if message != "":  message += SEPARATOR   #add the separator before adding each new machine info   
613            for infoItem in machineInfo:
614                message += str(infoItem) + "\n"
615           
616        self.MakeMsg(30000, message)
617       
618
619    def SendUsernameOKMessage(self, usernameOK):
620        self.MakeMsg( 30003, str(int(usernameOK)) )
621
622
623       
624#-------------------------------------------------------
625#  MESSAGE CALLBACKS
626#-------------------------------------------------------
627
628    def OnRegister(self, data):
629        tokens = string.split(data, "\n")
630        self.username = tokens[0]
631        self.info = tokens[1]
632        machineId = tokens[2]
633        if not machineId in self.machineList:
634            self.machineList.append(machineId)
635
636        # record the stats
637        stats = "\nUser registered: "+self.GetName()+" "+str(self.ip)+" "+str(machineId)+" "+str(time.asctime())
638        WriteStats(stats)
639
640        self.server.RegisterUser(self, self.username)
641
642        # if the version of the connected UI handles system ip/port
643        # for each machine, send it after the registration message
644        if self.IsNewStyle():
645            status = self.server.MakeNewMachinesStatusList()
646            self.SendMachinesStatusMessage(status)
647       
648
649    def OnChatMessage(self, data):
650        tokens = string.split(data, "\n", 2)
651        if tokens[1] == "all":
652            toRoom = "all"
653        else:
654            toRoom = tokens[1]
655        self.server.ForwardChatMessage(self, toRoom, data)
656       
657
658    def OnCheckUsername(self, data):
659        tokens = string.split(data, "\n")
660        if len(tokens) > 1:  # sageui v2.82+ sends a version number
661            self.ui_version == tokens[1].strip()
662            self.newStyle = True
663        self.SendUsernameOKMessage(self.server.IsUsernameOK(self, tokens[0]) )
664
665
666    def OnUnregisterUser(self, data):
667        tokens = string.split(data, "\n")
668        username = tokens[0]
669        machineId = tokens[1]
670        if machineId in self.GetMachines():
671            self.machineList.remove(machineId)
672            # record the stats
673            stats = "User unregistered: "+self.GetName()+" "+str(self.ip)+" "+str(machineId)+" "+str(time.asctime())
674            WriteStats(stats)
675            self.server.UpdateUsers()
676       
677   
678
679
680############################################################################
681#
682#  CLASS: SingleLauncher
683
684#  DESCRIPTION: This class describes one appLauncher connection for use by
685#               SAGE UIs. It mainly contains a list of applications and their configs
686#
687#
688############################################################################
689
690class SingleLauncher:
691    def __init__(self, launcherId, name, ip, port, appList):
692        self.port = port
693        self.appList = appList
694        self.ip = ip
695        self.launcherId = launcherId
696        self.name = name
697        self.oldT = time.time()
698        self.maxReportTime = 10  #allow maximum 8 seconds between reports
699
700    def getId(self):
701        return self.launcherId
702
703    def getIP(self):
704        return self.ip
705
706    def getAppList(self):
707        return self.appList
708
709    def setAppList(self, appList):
710        self.appList = appList
711
712    def getPort(self):
713        return self.port
714
715    def getName(self):
716        return self.name
717   
718    def isAlive(self):
719        if (time.time() - self.oldT) < self.maxReportTime:
720            return True
721        else:
722            return False
723
724    def report(self):
725        self.oldT = time.time()
726
727
728
729############################################################################
730#
731#  CLASS: Server
732
733#  DESCRIPTION: This server should run as a deamon and constantly listen for
734#               connections on a certain port. Once it accepts a connection
735#               it spawns a thread (SingleUser) that takes care of listening for the
736#               incoming messages and sending messages on that socket.
737#               It also relays all the incoming messages to their corresponding
738#               recepients. To do that it keeps track of all the users connected
739#               in a list registeredUsers[]. It also listens for connections
740#               from fsManagers and keeps track of them in connectedMachines[] (
741#               a hash of singleMachines).
742#
743#  DATE:        May, 2005
744#
745############################################################################
746
747class Server:
748    def __init__(self):
749        self.serverRunning = True
750
751        # start the XMLRPC server in a thread
752        xmlrpcServer = Thread(target=self.StartXMLRPCServer)
753        xmlrpcServer.start()
754
755        # start the two servers listening on separate ports
756        machinesServer = Thread(target=self.StartMachinesServer)
757        machinesServer.start()
758
759        try:
760            self.StartUsersServer()  #start this one in the main thread so that we can capture
761                                 #keystrokes such as Ctrl-C
762        except KeyboardInterrupt:
763            WriteToFile ("\n******  Shutting down the server  *******")
764            self.serverRunning = False
765            self.CloseAllUserConnections()
766            self.CloseAllSAGEConnections()
767            self.xmlrpc.server_close()
768            #logFile.close()
769
770
771
772#------------------------------------------------------------------------------
773#  XML-RPC STUFF  -  FOR REMOTE ADMINISTRATION, APPLAUNCHER AND SAGE UI PROXY
774#------------------------------------------------------------------------------
775
776    def StartXMLRPCServer(self):
777        self.registeredLaunchers = {}  #key=launcherID, value=SingleLauncher object
778       
779        # start the XML-RPC server
780        self.xmlrpc = XMLRPCServer(("", 8009))
781
782        # users
783        self.xmlrpc.register_function(self.GetRegisteredUsers)
784        self.xmlrpc.register_function(self.GetUserInfo)
785        self.xmlrpc.register_function(self.DisconnectUser)
786
787        # machines
788        self.xmlrpc.register_function(self.GetMachineInfo)
789        self.xmlrpc.register_function(self.GetRegisteredMachines)
790        self.xmlrpc.register_function(self.DisconnectMachine)
791
792        # appLauncher
793        self.xmlrpc.register_function(self.ReportLauncher)
794        self.xmlrpc.register_function(self.GetRegisteredLaunchers)
795        self.xmlrpc.register_function(self.UnregisterLauncher)
796
797        WriteToFile ("Starting the XML-RPC Server...\n")
798        while self.serverRunning:
799            try:
800                self.xmlrpc.handle_request()  #accept and process xmlrpc requests
801                self.__checkLaunchers()   #check to see whether every launcher is still alive
802            except socket.timeout:
803                continue
804            except:
805                WriteToFile( "\n=====> XMLRPC Server ERROR:" )
806                WriteToFile( "".join(tb.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])) )
807                continue
808
809
810
811        ### loops through all the app launchers and checks whether they are still alive
812        ### the minimum frequency of checks is defined by the timeout set int XMLRPCServer constructor
813    def __checkLaunchers(self):
814        for l in self.registeredLaunchers.values():
815            if not l.isAlive():
816                WriteToFile("Launcher "+l.getName()+"("+l.getId()+") unregistered")
817                self.UnregisterLauncher(l.getId())
818               
819           
820        ### called by each appLauncher in order to register with the server
821    def ReportLauncher(self, launcherName, launcherIP, launcherPort, appList):
822        launcherId = launcherIP+":"+str(launcherPort)
823        if launcherId in self.registeredLaunchers:
824            self.registeredLaunchers[launcherId].report()  # launcher already exists so just update its last report time
825            self.registeredLaunchers[launcherId].setAppList(appList)
826        else:
827            l = SingleLauncher(launcherId, launcherName, launcherIP, launcherPort, appList)
828            WriteToFile("Launcher "+l.getName()+"("+l.getId()+") registered")
829            self.registeredLaunchers[launcherId] = l
830        return launcherId
831
832
833        ### removes the appLauncher from a list of registered ones
834    def UnregisterLauncher(self, launcherId):
835        if launcherId in self.registeredLaunchers:
836            del self.registeredLaunchers[ launcherId ]
837        return 1
838   
839
840        ### return a hash of all the app launchers running
841        ### key= "name:launcherId" , value=appList  (that's another hash of appNames and their configs)
842    def GetRegisteredLaunchers(self):
843        tempHash = {}
844        for l in self.registeredLaunchers.itervalues():
845            tempHash[ l.getName()+":"+l.getId() ] = l.getAppList()
846        return tempHash
847   
848
849
850        ### return a list of currently registered users and machines
851    def GetRegisteredUsers(self):
852        self.registeredUsersLock.acquire()
853        users = self.registeredUsers.keys()
854        self.registeredUsersLock.release()
855        return users
856
857    def GetRegisteredMachines(self):
858        self.connectedMachinesLock.acquire()
859        machineList = []
860        for machineId, singleMachine in self.connectedMachines.iteritems():
861            machineList.append(singleMachine.GetName() + " - " + str(machineId))
862        self.connectedMachinesLock.release()
863        return machineList
864
865##     def GetConnectedUsers(self):
866##         self.connectedUsersLock.acquire()
867##         users = []
868##         for user in self.connectedUsers:
869##             users.append(user.GetName())
870##         self.connectedUsersLock.release()
871##         return users
872
873        ### return user and machine info
874    def GetUserInfo(self, username):
875        self.registeredUsersLock.acquire()
876        if self.registeredUsers.has_key(username):
877            singleUser = self.registeredUsers[username]
878            machineList = []
879            self.connectedMachinesLock.acquire()
880            for machineId in singleUser.GetMachines():
881                if self.connectedMachines.has_key(machineId):
882                    machineList.append(self.connectedMachines[machineId].GetName())
883            self.connectedMachinesLock.release()
884            self.registeredUsersLock.release()
885            return machineList  #singleUser.GetMachines()
886        else:
887            self.registeredUsersLock.release()
888            return -1
889
890    def GetMachineInfo(self, machineId):
891        self.connectedMachinesLock.acquire()
892        if self.connectedMachines.has_key(machineId):
893            m = self.connectedMachines[machineId]
894            self.connectedMachinesLock.release()
895
896            #now make a list of all the users that are connected to this machine
897            self.registeredUsersLock.acquire()
898            userList = []
899            for name, singleUser in self.registeredUsers.iteritems():
900                if machineId in singleUser.GetMachines():
901                    userList.append(name)
902            self.registeredUsersLock.release()
903            return (m.GetName(), m.GetIP(), m.GetPort(), m.GetId(), m.IsAlive(), m.GetDisplayInfo(), userList)
904        else:
905            self.connectedMachinesLock.release()
906            return (-1,-1,-1,-1,-1,-1,-1)
907
908
909        ### allow the user to close individual connections with SAGE and users
910    def DisconnectUser(self, username):
911        self.registeredUsersLock.acquire()
912        if self.registeredUsers.has_key(username):
913            singleUser = self.registeredUsers[username]
914            singleUser.Stop()
915        self.registeredUsersLock.release()
916        return True
917
918
919    def DisconnectMachine(self, machineId):
920        self.connectedMachinesLock.acquire()
921        if self.connectedMachines.has_key(machineId):
922            singleMachine = self.connectedMachines[machineId]
923            singleMachine.Stop()
924        self.connectedMachinesLock.release()
925        return True
926
927
928
929#----------------------------------------------------------------------------------------
930#   THE SERVERS RUNNING IN THREADS RECEIVING CONNECTIONS FROM USERS AND SAGES
931#----------------------------------------------------------------------------------------
932
933
934        # runs in the main thread
935    def StartUsersServer(self):
936        self.registeredUsers = {}   # a hash of SingleUsers for every registered user  (keyed by username)
937        self.registeredUsersLock = RLock()  #used to lock the access to self.registeredUsers hash
938
939        self.connectedUsers = []    # this includes all the users that are connected to the server
940        self.connectedUsersLock = RLock()  # but not necessarily registered with it. (so registeredUsers is a subset of this)
941
942        self.pendingUsernames = []   # usernames that have been checked with the server but not yet registered
943        self.pendingUsernamesLock = RLock()
944       
945        # create the server socket and accept a connection
946        serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
947        if serversocket is None:
948                WriteToFile( "\n\nERROR: Server socket could not be created... exiting" )
949                return False
950        serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
951        serversocket.bind(("", USER_SERVER_PORT))
952        serversocket.listen(MAX_USER_CONNECTIONS)
953        serversocket.settimeout(SOCKET_TIMEOUT)
954
955        WriteToFile( "Users Server waiting for connections on port " +  str(USER_SERVER_PORT) +  "...\n" )
956        while self.serverRunning:
957            try:
958                (clientsocket, address) = serversocket.accept()
959            except socket.timeout:
960                continue
961            except:
962                WriteToFile( "\n\nUsers Server Not accepting any more connections... exiting  <" + time.asctime() + ">\n" )
963                self.CloseAllUserConnections()
964                self.serverRunning = False
965                break
966
967            WriteToFile( "\n*** Connection accepted from " + str(address[0]) + " <" + time.asctime() + ">" )
968            # create a SingleUser instance and start the receiver in a thread
969            t = SingleUser(clientsocket, address, self)
970            self.connectedUsersLock.acquire()
971            self.connectedUsers.append(t)   #add the user to the list of all connected users
972            self.connectedUsersLock.release()
973            t.start()
974        WriteToFile("\nUsers Server exited")
975       
976
977        # runs in a thread
978    def StartMachinesServer(self):
979        self.connectedMachines = {}   # a hash of SingleMachines for every connected SAGE (keyed by id)           
980        self.connectedMachinesLock = RLock()
981
982                # create the server socket and accept a connection
983        serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
984        if serversocket is None:
985                WriteToFile( "\n\nERROR: SAGE Server socket could not be created... exiting" )
986                return False
987        serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
988        serversocket.bind(("", SAGE_SERVER_PORT))
989        serversocket.listen(MAX_SAGE_CONNECTIONS) 
990        serversocket.settimeout(SOCKET_TIMEOUT)
991       
992        WriteToFile( "SAGE Server waiting for connections on port " +  str(SAGE_SERVER_PORT) +  "...\n" )
993        while self.serverRunning:
994            try:
995                (clientsocket, address) = serversocket.accept()
996            except socket.timeout:
997                continue
998            except:
999                WriteToFile( "\n\nSAGE Server Not accepting any more connections... exiting  <" + time.asctime() + ">\n" )
1000                self.CloseAllSAGEConnections()
1001                self.serverRunning = False
1002                break
1003
1004            WriteToFile( "\n*** SAGE Connection accepted from " + str(address[0]) + " <" + time.asctime() + ">" )
1005            # create a SingleMachine instance and start the receiver in a thread
1006            t = SingleMachine(clientsocket, address, self)
1007            t.start()
1008        WriteToFile("\nSAGE Server exited")
1009        self.xmlrpc.server_close()
1010
1011
1012    def CloseAllUserConnections(self):
1013        for singleUser in self.registeredUsers.itervalues():
1014            singleUser.Stop(unregister=False) #we dont want to unregister because the server is closing anyway
1015
1016    def CloseAllSAGEConnections(self):
1017        for singleMachine in self.connectedMachines.itervalues():
1018            singleMachine.Stop(unregister=False) #we dont want to unregister because the server is closing anyway
1019           
1020
1021
1022#-------------------------------------------------------
1023#  MESSAGE CALLBACKS - USERS
1024#-------------------------------------------------------
1025
1026        # when the user connects, we need to send him the list of all the SAGE
1027        # machines registered with this server
1028    def OnConnectUser(self, singleUser):
1029        self.connectedMachinesLock.acquire()
1030        singleUser.SendMachinesStatusMessage(self.MakeMachinesStatusList())
1031        self.connectedMachinesLock.release()
1032
1033
1034        # if the user connected but never registered, we still have to remove him from this list
1035    def OnDisconnectUser(self, singleUser):
1036
1037        # remove from the pending usernames if connected
1038        self.pendingUsernamesLock.acquire()
1039        if singleUser.GetName() in self.pendingUsernames:
1040            self.pendingUsernames.remove(singleUser.GetName())
1041        self.pendingUsernamesLock.release()
1042
1043        # remove from connected users list
1044        self.connectedUsersLock.acquire()
1045        if singleUser in self.connectedUsers:
1046            self.connectedUsers.remove(singleUser)
1047        self.connectedUsersLock.release()
1048       
1049
1050        # adds the new user keyed by its name and makes the new status list
1051        # returns: true if successful, false otherwise (if username already exists)
1052    def RegisterUser(self, singleUser, username):
1053        self.registeredUsersLock.acquire()
1054        if not self.registeredUsers.has_key(username) or self.registeredUsers[username].ip == singleUser.ip:
1055            self.registeredUsers[ username ] = singleUser  # add the user to the list
1056            singleUser.registered = True
1057           
1058        # remove from the list of pending usernames
1059        self.pendingUsernamesLock.acquire()
1060        if username in self.pendingUsernames:
1061            self.pendingUsernames.remove(username)
1062        self.pendingUsernamesLock.release()
1063
1064        # update the status of other users
1065        self.UpdateUsers()
1066        self.registeredUsersLock.release()
1067
1068
1069
1070    def UnregisterUser(self, singleUser, username):
1071       
1072        # remove the user from the list of all the connected users
1073        self.connectedUsersLock.acquire()
1074        if singleUser in self.connectedUsers:
1075            self.connectedUsers.remove(singleUser)
1076            WriteToFile("removed "+username+" from connectedUsers")
1077        self.connectedUsersLock.release()
1078
1079        # now remove him from the list of registered users
1080        self.registeredUsersLock.acquire()
1081        if self.registeredUsers.has_key( username ):
1082            del self.registeredUsers[ username ]
1083            self.UpdateUsers()
1084            WriteToFile("removed "+username+" from registeredUsers")
1085        self.registeredUsersLock.release()
1086
1087        # now, check all the rooms that the user was connected to and see if any
1088        # of them are empty now that the user has left... if there are empty rooms,
1089        # close them
1090        emptyRooms = False
1091        self.connectedMachinesLock.acquire()
1092        for room in self.connectedMachines.keys()[:]:  #loop through all the machines just in case there are some daemons
1093            if self.connectedMachines.has_key(room) and (not self.connectedMachines[room].IsAlive()) and self.IsRoomEmpty(room):
1094                emptyRooms = True
1095                del self.connectedMachines[room]
1096                WriteToFile("closed the room "+room)
1097        if emptyRooms:
1098            self.UpdateMachines()
1099        self.connectedMachinesLock.release()
1100
1101
1102        # updates all the users with the new status (based on self.registeredUsers)
1103    def UpdateUsers(self):
1104        self.registeredUsersLock.acquire()
1105        statusList = self.MakeUsersStatusList()
1106        for username, singleUser in self.registeredUsers.iteritems():
1107            if len(singleUser.GetMachines()) > 0:
1108                singleUser.SendUsersStatusMessage(statusList)
1109        self.registeredUsersLock.release()
1110
1111       
1112        # forwads the chat message either to all the chat rooms or a specific one
1113    def ForwardChatMessage(self, sender, toRoom, message):
1114        self.registeredUsersLock.acquire()
1115        for name, singleUser in self.registeredUsers.iteritems():
1116            singleUser.SendChatMessage(message)
1117        self.registeredUsersLock.release()
1118
1119
1120        # checks for duplicates in usernames
1121    def IsUsernameOK(self, singleUser, username):
1122        self.registeredUsersLock.acquire()
1123        self.pendingUsernamesLock.acquire()
1124        if username in self.registeredUsers:   # username already exists
1125            if self.registeredUsers[username].ip == singleUser.ip:  # its the same user reconnecting so it's OK
1126                usernameTaken = False
1127            else:
1128                usernameTaken = True
1129        elif username in self.pendingUsernames:
1130            usernameTaken = True
1131        else:
1132            usernameTaken = False
1133           
1134        if not usernameTaken:
1135            self.pendingUsernames.append(username)
1136            t = Timer(2, self.ExpireUsername, [username])
1137            t.start()
1138        self.pendingUsernamesLock.release()
1139        self.registeredUsersLock.release()
1140        return not usernameTaken
1141
1142
1143    def ExpireUsername(self, username):
1144        # remove from the list of pending usernames
1145        self.pendingUsernamesLock.acquire()
1146        if username in self.pendingUsernames:
1147            self.pendingUsernames.remove(username)
1148        self.pendingUsernamesLock.release()
1149       
1150   
1151        #make the status list consisting of name,info,machine,name,info,machine...
1152    def MakeUsersStatusList(self):
1153        statusList = []   
1154        keys = self.registeredUsers.keys()
1155        keys.sort()
1156        for username in keys:
1157            user = self.registeredUsers[username]
1158            tempList = []
1159            tempList.append(username)
1160            tempList.append(user.GetInfo())
1161            for machine in user.GetMachines():
1162                tempList.append(machine)
1163            statusList.append(tempList)
1164        return statusList
1165
1166
1167
1168#-------------------------------------------------------
1169#  MESSAGE CALLBACKS - MACHINES
1170#-------------------------------------------------------
1171
1172        #registers SAGE with the server so that it's visible to the users
1173    def RegisterMachine(self, singleMachine):
1174        machineId = singleMachine.GetId()
1175        self.connectedMachinesLock.acquire()
1176        if not self.connectedMachines.has_key( machineId ):
1177            self.connectedMachines[ machineId ] = singleMachine
1178            self.UpdateMachines()  #update all the users with the new machine status
1179        else:  #the old singleMachine was still preserved since there were some users in it still
1180            WriteToFile("\n* The machine "+str(machineId)+" already exists so trying to close the connection with the previous one")
1181            self.connectedMachines[ machineId ].Stop(False)  # this is a preventative measure just in case it was a zombie
1182            del self.connectedMachines[ machineId ]  #delete the old one and save the new one
1183            self.connectedMachines[ machineId ] = singleMachine
1184            self.UpdateMachines()
1185        self.connectedMachinesLock.release()
1186
1187
1188        # updates all the users with the new machine status (based on self.connectedMachines)
1189    def UpdateMachines(self):
1190        self.connectedUsersLock.acquire()
1191        statusList = self.MakeMachinesStatusList()
1192        newStatusList = self.MakeNewMachinesStatusList()
1193        for singleUser in self.connectedUsers:
1194            if singleUser.IsNewStyle():  # ui 2.82 and above gets the systemip/port info as well
1195                singleUser.SendMachinesStatusMessage(newStatusList)
1196            else:
1197                singleUser.SendMachinesStatusMessage(statusList)
1198        self.connectedUsersLock.release()
1199
1200
1201        # removes the machine keyed by its machineId
1202    def UnregisterMachine(self, machineId):
1203        self.connectedMachinesLock.acquire()
1204        if self.connectedMachines.has_key( machineId ):
1205            if self.IsRoomEmpty( machineId ):  #if the room was determined to be empty, close it
1206                del self.connectedMachines[machineId]   
1207            self.UpdateMachines()
1208        self.connectedMachinesLock.release()
1209
1210
1211        # check if there are any users still left in this room,
1212        # if there are, return FALSE, otherwise return TRUE
1213    def IsRoomEmpty(self, machineId):
1214        roomEmpty = True
1215        self.registeredUsersLock.acquire()
1216        registeredUsers = self.registeredUsers.copy()
1217        self.registeredUsersLock.release()
1218        for singleUser in registeredUsers.values():
1219            if machineId in singleUser.GetMachines():
1220                roomEmpty = False
1221                break   # there is at least one user still left in the room, so leave it open
1222        return roomEmpty
1223               
1224
1225        # it makes the list of currently connected machines
1226    def MakeMachinesStatusList(self):
1227        statusList = []   #make the status list consisting of [name,ip,port,id,alive], [name,ip,port,id,alive], ....
1228        keys = self.connectedMachines.keys()
1229        keys.sort()
1230        for machineId in keys:
1231            tempList = []
1232            singleMachine = self.connectedMachines[machineId]
1233            tempList.append( singleMachine.GetName() )
1234            tempList.append( singleMachine.GetIP() )
1235            tempList.append( singleMachine.GetPort() )
1236            tempList.append( machineId )
1237            tempList.append( int(singleMachine.IsAlive()) )
1238            tempList.append( singleMachine.GetDisplayInfo() )
1239            statusList.append(tempList)
1240        return statusList
1241
1242   
1243        # this is for new style UIs that accept system ip/port
1244        # as well
1245    def MakeNewMachinesStatusList(self):
1246        statusList = []
1247        keys = self.connectedMachines.keys()
1248        keys.sort()
1249        for machineId in keys:
1250            tempList = []
1251            singleMachine = self.connectedMachines[machineId]
1252            tempList.append( singleMachine.GetName() )
1253            tempList.append( singleMachine.GetIP() )
1254            tempList.append( singleMachine.GetPort() )
1255            tempList.append( machineId )
1256            tempList.append( int(singleMachine.IsAlive()) )
1257            tempList.append( singleMachine.GetDisplayInfo() )
1258            tempList.append( singleMachine.GetSystemIP()+" "+str(singleMachine.GetSystemPort()))
1259            statusList.append(tempList)
1260        return statusList
1261
1262
1263       
1264
1265
1266#-----------------------------------------------------------------------------------------------
1267
1268
1269class XMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
1270    allow_reuse_address = True
1271    def __init__(self, addr):
1272        SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, addr, logRequests=False)
1273        self.socket.settimeout(2)  # so that handle_request times out and we can check all appLaunchers
1274
1275       
1276
1277def main( argv ):
1278    global USER_SERVER_PORT
1279    global SAGE_SERVER_PORT
1280    global PRINT_TO_SCREEN
1281   
1282    WriteToFile("\n\n\n#####################################################################\n")
1283    WriteToFile("   SAGE Server HAS BEEN RESTARTED\t<" + time.asctime() + ">")
1284    WriteToFile("\n#####################################################################\n\n\n")
1285
1286    WriteStats("\n\nSERVER RESTARTED\t"+str(time.asctime())+"\n\n")
1287   
1288    # get the arguments (port)
1289    if len(argv) == 3:
1290        if "-v" in argv:
1291            PRINT_TO_SCREEN = True
1292        else:
1293            print "Usage: python UsersServer.py [USER_SERVER_PORT] [SAGE_SERVER_PORT]\n"
1294            sys.exit(0)
1295    elif len(argv) > 3:
1296        USER_SERVER_PORT = int(argv[2])
1297        SAGE_SERVER_PORT = int(argv[3])
1298        if "-v" in argv:
1299            PRINT_TO_SCREEN = True
1300       
1301
1302    # start the server accepting connections at the specified port
1303    server = Server()
1304
1305   
1306       
1307if __name__ == '__main__':
1308        main( ['', os.path.basename( sys.argv[0] )] + sys.argv[1:] )
Note: See TracBrowser for help on using the repository browser.