source: trunk/src/testing/bin/appLauncher/appLauncher.py @ 4

Revision 4, 12.4 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1############################################################################
2#
3# AppLauncher - Application Launcher for SAGE
4# Copyright (C) 2006 Electronic Visualization Laboratory,
5# University of Illinois at Chicago
6#
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions are met:
11#
12#  * Redistributions of source code must retain the above copyright
13#    notice, this list of conditions and the following disclaimer.
14#  * Redistributions in binary form must reproduce the above
15#    copyright notice, this list of conditions and the following disclaimer
16#    in the documentation and/or other materials provided with the distribution.
17#  * Neither the name of the University of Illinois at Chicago nor
18#    the names of its contributors may be used to endorse or promote
19#    products derived from this software without specific prior written permission.
20#
21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
25# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32#
33# Direct questions, comments etc about AppLauncher to www.evl.uic.edu/cavern/forum
34#
35# Author: Ratko Jagodic
36#
37############################################################################
38
39
40from data import *
41from request import CurrentRequests, SSHRequest
42from SimpleXMLRPCServer import *
43import socket, os, sys, xmlrpclib, time, optparse, os.path
44import traceback as tb
45from threading import RLock
46from myprint import *   # handles the printing or logging
47
48opj = os.path.join
49sys.path.append( opj(os.environ["SAGE_DIRECTORY"], "bin" ) )
50from sagePath import getUserPath, SAGE_DIR, getPath, getDefaultPath
51
52
53APP_CONFIG_FILE = getPath("applications", "applications.conf")
54print "AppConfigFile: ", APP_CONFIG_FILE
55TILE_CONFIG_FILE = None
56XMLRPC_PORT = 19010
57SAGE_SERVER_PORT = 8009    # sage server port for reporting to
58SAGE_SERVER = "papyrus" #"sage.sl.startap.net"
59STOP_LAUNCHER = False    #used for stopping the applauncher remotely
60DO_REPORT = True      # for determining whether to report to the sage server or not
61REPORT_NAME = socket.gethostname()
62DEFAULT_SYSTEM_IP = None   # for application pixel streaming
63DEFAULT_SYSTEM_PORT = None
64
65
66# change to the directory of this script
67os.chdir(sys.path[0])
68
69
70# read the SYSTEM_IP from the fsManager.conf
71try:
72    f = open( getPath("fsManager.conf"), "r")
73    for line in f:               # read the fsManager line
74        if line.strip().startswith("fsManager"):
75            lineTokens = line.split()
76            if len(lineTokens) >= 4:
77                DEFAULT_SYSTEM_IP = lineTokens[2]
78        elif line.strip().startswith("systemPort"):
79            DEFAULT_SYSTEM_PORT = line.split()[1]
80        elif line.strip().startswith("tileConfiguration"):
81            TILE_CONFIG_FILE = getPath( line.split()[1] )
82    f.close()
83except:
84    DEFAULT_SYSTEM_IP = None
85
86
87
88
89class AppLauncher:
90    def __init__(self):
91        # read the tile config to get a list of all the nodes in this cluster available for running apps
92        self._nodeHash = {}  #key=IP, value=[0...n]   --> how many apps are running on that node
93
94        if TILE_CONFIG_FILE:
95            self.__readNodeIPs()
96        else:
97            self._nodeHash = {"127.0.0.1":0}
98
99        # read the sage app configuration file and create the container for all the requests
100        WriteLog( "\nUsing config file: " + APP_CONFIG_FILE )
101        self.configs = Configurations(APP_CONFIG_FILE)
102        self.requests = CurrentRequests(self._nodeHash)
103
104        # report ourselves to the sage server
105        self.sageServer = xmlrpclib.ServerProxy("http://"+SAGE_SERVER+":8009")
106        self.launcherId = ""
107        self.reportLauncher()
108
109                           
110
111    def __readNodeIPs(self):
112        f = open(TILE_CONFIG_FILE, "r")
113        for line in f:
114            previous = False  # True if previous line was an IP, False otherwise
115            if previous:
116                previous = False
117                continue
118            elif line.strip().startswith("IP"):
119                self._nodeHash[line.strip().split()[1]] = 0   #add the machine ip to the hash
120        f.close()
121        WriteLog( "\n\nNodeHash = \n" + str(self._nodeHash) + "\n\n" )
122   
123
124    def startDefaultApp(self, appName, fsIP, fsPort, useBridge, configName, pos=False, size=False, optionalArgs=""):
125        """ starts the application based on the entry in applications.conf """
126        return self.startApp(appName, fsIP, fsPort, useBridge, configName, optionalArgs=optionalArgs, pos=pos, size=size)
127       
128
129    def startApp(self, appName, fsIP, fsPort, useBridge, configName="", command="", optionalArgs="", pos=False, size=False, nodeNum=False, protocol=False):
130        """ receive the request for starting an app
131            either start the default app from a predefined config
132            or start a completely custom app with dynamic config
133        """
134        if configName != "":   # this assumes a default config and app startup
135            if configName == "default":
136                config = self.configs.getDefaultConfig(appName)
137            else:
138                config = self.configs.getConfig(appName, configName)
139        elif command != "":    # this assumes a completely custom app startup
140            config = OneConfig(appName+"_temp", appName, True)
141            config.setCommand(command + " " + optionalArgs)
142
143        # common set of parameters that can be changed in either case
144        if pos:      config.setPosition(pos)
145        if size:     config.setSize(size)
146        if nodeNum:  config.setNodeNum(nodeNum)
147        if protocol: config.setProtocol(protocol)
148
149        if not fsIP:
150            fsIP = DEFAULT_SYSTEM_IP
151        config.setFSIP(fsIP)
152
153        if not fsPort:
154            fsPort = DEFAULT_SYSTEM_PORT
155        config.setFSPort(fsPort)
156       
157        config.setUseBridge(useBridge)
158        config.setLauncherId(socket.gethostbyname(socket.gethostname())+":"+str(XMLRPC_PORT))
159        if useBridge:
160            if config.getBridgeIP() == "":   #means that there was no app-specific bridge info set so use the default
161                config.setBridgeIP(self.configs.getBridgeIP())
162                config.setBridgePort(self.configs.getBridgePort())
163
164        config.setCommand(config.getCommand()+" "+optionalArgs)
165
166        # create the request, set the appropriate appId, write the config file and submit it
167        return self.requests.addRequest(config)
168
169       
170    def stopApp(self, appId):
171        """ forcefully kills the application """
172        return self.requests.stopRequest(appId)
173
174
175    def appStatus(self):
176        """ returns a list of currently running applications as a hash key=appId, value=appName-command """
177        return self.requests.getStatus()
178       
179
180    def getAppList(self):
181        """ return configurations for all apps in a hash of strings """
182        self.configs.reloadConfigFile()
183        return self.configs.getConfigHash()
184
185
186    def getAppConfigInfo(self, appId):
187        """ return the configuration information for the running app """
188        if self.requests.getRequest(appId):
189            s = self.requests.getRequest(appId).config.getConfigString()
190            return s
191        else:
192            return -1
193 
194
195    def killLauncher(self):
196        """ used for killing the appLauncher remotely """
197        global STOP_LAUNCHER
198        STOP_LAUNCHER = True
199
200        # unregister from the sage server
201        if self.launcherId:
202            try: self.sageServer.UnregisterLauncher(self.launcherId)
203            except: pass
204
205        return 1
206
207
208    def reportLauncher(self):
209        """ only used to report ourselves to the sage server that we are still alive
210            every time pretend like it's our first time connecting so that even if the
211            sage server was started after the appLauncher, it will still be reported
212        """
213        if not DO_REPORT:
214            return
215       
216        try:
217            self.launcherId = self.sageServer.ReportLauncher( REPORT_NAME, socket.gethostbyname(socket.gethostname()),
218                                                              XMLRPC_PORT, self.getAppList() )
219        except socket.error:
220            pass
221        except:
222            WriteLog( "".join(tb.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])) )
223
224
225    def test(self):
226        return 1
227
228       
229
230    # a mix-in class for adding the threading support to the XMLRPC server
231class ThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
232    allow_reuse_address = True
233
234class MySimpleXMLRPCServer(SimpleXMLRPCServer):
235    allow_reuse_address = True
236   
237
238
239
240def get_commandline_options():
241    parser = optparse.OptionParser()
242    h = "if set, prints output to console, otherwise to output.txt"
243    parser.add_option("-v", "--verbose", action="store_true", help=h, dest="verbose", default=False)
244
245    h = "change the port number that the server listens on (default is 19010)"
246    parser.add_option("-p", "--port", help=h, type="int", dest="port", default=19010)
247
248    h = "specify the application configuration file to use (default is applications.conf)"
249    parser.add_option("-c", "--config", dest="config", help=h, default="")
250
251    h = "which sage server to report to (default is sage.sl.startap.net)"
252    parser.add_option("-s", "--server", dest="server", help=h, default="sage.sl.startap.net")
253
254    h = "report yourself as *name* (default is the local machine name)"
255    parser.add_option("-a", "--name", dest="name", help=h, default=socket.gethostname())
256
257    h = "set this flag if you don't want the appLauncher to report to the sage server and become visible by other people"
258    parser.add_option("-n", "--noreport", action="store_true", dest="report", help=h, default=False)
259
260    return parser.parse_args()
261
262
263
264
265def main(argv):
266    global STOP_LAUNCHER
267    global APP_CONFIG_FILE
268    global XMLRPC_PORT
269    global SAGE_SERVER
270    global DO_REPORT
271    global REPORT_NAME
272   
273    # parse the command line params
274    (options, args) = get_commandline_options()
275    doRedirect(not options.verbose)
276    if options.config != "":
277        APP_CONFIG_FILE = options.config
278    XMLRPC_PORT = options.port
279    SAGE_SERVER = options.server
280    DO_REPORT = not options.report
281    REPORT_NAME = options.name
282
283    # set the default timeout so that we don't wait forever
284    socket.setdefaulttimeout(2)   
285       
286    # create the main object for handling requests
287    appLauncher = AppLauncher()
288
289    # start the local server and listen for sage ui connections
290    server = MySimpleXMLRPCServer(("", XMLRPC_PORT), logRequests=False)
291    server.register_function(appLauncher.startDefaultApp)
292    server.register_function(appLauncher.startApp)
293    server.register_function(appLauncher.stopApp)
294    server.register_function(appLauncher.getAppList)
295    server.register_function(appLauncher.getAppConfigInfo)
296    server.register_function(appLauncher.killLauncher)
297    server.register_function(appLauncher.test)
298    server.register_function(appLauncher.appStatus)
299    server.register_introspection_functions()
300
301    # now loop forever and serve the requests
302    oldT = time.time()
303    while not STOP_LAUNCHER:
304        try:
305            # report every 6 seconds that we are still alive
306            if (time.time() - oldT) > 6:
307                appLauncher.reportLauncher()
308                oldT = time.time()
309
310            # accept and process any xmlrpc requests
311            # this times out every few seconds as defined by the setdefaulttimeout above
312            server.handle_request()
313        except KeyboardInterrupt:
314            break
315        except:
316            WriteLog( "\n=======> XML_RPC SERVER ERROR: ")
317            WriteLog( "".join(tb.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])) )
318            continue
319
320    appLauncher.requests.stopSubmitThread()
321    server.server_close()
322    sys.exit(0)
323
324
325
326if __name__ == '__main__':
327    main(['', os.path.basename(sys.argv[0])] + sys.argv[1:])
Note: See TracBrowser for help on using the repository browser.