source: trunk/src/testing/bin/appLauncher/request.py @ 4

Revision 4, 11.4 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1############################################################################
2#
3# AppLauncher - Application Launcher for SAGE
4# Copyright (C) 2006 Electronic Visualization Laboratory,
5# University of Illinois at Chicago
6#
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions are met:
11#
12#  * Redistributions of source code must retain the above copyright
13#    notice, this list of conditions and the following disclaimer.
14#  * Redistributions in binary form must reproduce the above
15#    copyright notice, this list of conditions and the following disclaimer
16#    in the documentation and/or other materials provided with the distribution.
17#  * Neither the name of the University of Illinois at Chicago nor
18#    the names of its contributors may be used to endorse or promote
19#    products derived from this software without specific prior written permission.
20#
21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
25# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32#
33# Direct questions, comments etc about AppLauncher to www.evl.uic.edu/cavern/forum
34#
35# Author: Ratko Jagodic
36#
37############################################################################
38
39
40import subprocess as sp
41import traceback as tb
42import os, sys, time, os.path
43from myprint import *   # handles the printing or logging
44from threading import RLock, Thread
45
46opj = os.path.join
47
48#######################################################################
49#####    NOT THREAD SAFE  !!!!!
50#####    - needs fixing if multithreaded xmlrpc server is used
51#####    - otherwise the requests are queued and processed one by one
52#######################################################################
53
54class CurrentRequests:
55    def __init__(self, nodeHash):
56        self._requests = {}  #key = id, value = Request()
57        self._nodeHash = nodeHash  #key=IP, value=[0...n]   --> how many apps are running on that node
58
59        self.__submitThread = Thread(target=self.submitRequests)
60        self.__requestsToSubmit = []
61        self.__submitLock = RLock()
62        self.__doRunSubmitThread = True
63        self.__submitThread.start()
64       
65
66    def __getFirstAvailableId(self):
67        """ loops through the running requests until it finds an available id """
68
69        def inQueue(appId):
70            for r in self.__requestsToSubmit:
71                if r.config.getAppId() == appId:
72                    return True
73            return False
74       
75       
76        self.cleanup()  #first clean up all of the dead requests and their appIds
77
78        self.__submitLock.acquire()
79        for i in range(0, 9999):
80            if not self._requests.has_key(i) and not inQueue(i):
81                self.__submitLock.release()
82                return i
83        self.__submitLock.release()
84       
85
86    def __getNextAvailableNode(self):
87        ''' it loops through all the nodes of this cluster and returns the
88            one that is running the fewest apps
89        '''
90        currentMin = 99999
91        currentIP = None
92        for ip, numApps in self._nodeHash.iteritems():
93            #if currentIP == None:  currentIP=ip  #initial case
94            if numApps == 0:
95                self._nodeHash[ip] = numApps+1  #increase the number of apps running on this node
96                return ip
97            elif numApps <= currentMin:   #any machine with less than or equal to apps is a candidate
98                currentMin = numApps
99                currentIP = ip
100
101        self._nodeHash[currentIP] = currentMin+1  #increase the number of apps running on this node
102        return currentIP
103
104
105    def getRequest(self, appId):
106        """ get the request based on its appID (port number in fact) """
107        self.__submitLock.acquire()
108        if self._requests.has_key(appId):
109            r = self._requests[appId]
110            self.__submitLock.release()
111            return r
112        else:
113            self.__submitLock.release()
114            return False
115
116
117    def getStatus(self):
118        """ returns the current app status as a hash of appNames keyed by appId """
119       
120        self.cleanup()
121        status = {}
122        self.__submitLock.acquire()
123        for appId, request in self._requests.iteritems():
124            status[str(appId)] = (request.config.getAppName(), request.command, request.targetMachine)
125
126        self.__submitLock.release()
127        return status
128       
129
130    def addRequest(self, config):
131        """ modifies the appId parameter of the config and creates a new Request object """
132
133        # modify the appId parameter before writing the config to a file
134        appId = self.__getFirstAvailableId()
135        config.setAppId(appId)
136       
137        # if the app can be run on the nodes, get the machine with the lowest load (fewest apps running)
138        if config.getRunOnNodes():
139            nodeIP = self.__getNextAvailableNode()
140            config.setTargetMachine(nodeIP)
141            WriteLog( "\nNODE IP = "+ nodeIP + str(self._nodeHash[nodeIP])+ "\n\n")
142        else:
143            if self._nodeHash.has_key( config.getTargetMachine() ):
144                self._nodeHash[config.getTargetMachine()] += 1
145           
146        # make the request
147        request = SSHRequest(config)
148
149        # submit it... in a separate thread
150        self.__submitLock.acquire()
151        self.__requestsToSubmit.append(request)
152        self.__submitLock.release()
153       
154        return appId
155
156
157    def submitRequests(self):
158        while self.__doRunSubmitThread:
159           
160            if len(self.__requestsToSubmit) > 0:
161                self.__submitLock.acquire()
162                request = self.__requestsToSubmit.pop(0)
163                self.__submitLock.release()
164
165                res = request.submit()
166
167                if res != -1:
168                    self.__submitLock.acquire()
169                    self._requests[ request.config.getAppId() ] = request
170                    self.__submitLock.release()
171
172            time.sleep(1.5)
173
174
175    def stopSubmitThread(self):
176        self.__doRunSubmitThread = False
177       
178
179    def stopRequest(self, appId):
180        """ stops the request forcefully """
181
182        self.__submitLock.acquire()
183       
184        if self._requests.has_key(appId):
185            ret = self._requests[appId].kill()
186            self.__submitLock.release()
187            time.sleep(1)
188            self.cleanup()
189           
190            return ret
191        else:
192            self.__submitLock.release()
193            return False
194
195
196    def cleanup(self):
197        """ this runs every so often and checks whether the requests that we started are still alive
198            if they are not alive, they are removed from the list of requests and their port (appId) is recycled
199        """
200        self.__submitLock.acquire()
201       
202        for appId, request in self._requests.items():
203            if not request.isAlive():
204                WriteLog( ">>>> Cleaning up: " + request.config.getCommand() + "  appId = " + str(appId) )
205                request.deletePIDFile()
206                #if request.config.getRunOnNodes():
207                if self._nodeHash.has_key( request.config.getTargetMachine() ):
208                    self._nodeHash[ request.config.getTargetMachine() ] -= 1  #decrease the num of apps running on this node
209                del self._requests[appId]
210
211        self.__submitLock.release()
212
213
214
215class Request:
216    def __init__(self, config):
217        self.targetMachine = config.getTargetMachine()
218        self.command = config.getCommand()
219        self.configFilename = config.getConfigFilename()
220        self.processObj = None    # the object corresponding to the process we started
221        self.config = config
222
223       
224
225class SSHRequest(Request):
226
227    def submit(self):
228        # copy the configuration file over
229        try:
230            self.config.writeToFile()
231            sp.call(["chmod", "g+w", self.config.getConfigFilename()])  #change the permissions of the temp file
232           
233            retcode = sp.call(["/usr/bin/scp", self.configFilename, self.targetMachine+":"+os.path.basename(self.configFilename)])
234            sp.call(["/usr/bin/ssh", "-x", self.targetMachine, "chmod a+rw "+self.configFilename])
235        except:
236            WriteLog( "===>  ERROR copying config file... application will use the default configuration:")
237            WriteLog( "".join(tb.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])) )
238
239        # launch the application via ssh
240        try:
241            WriteLog( "\n\nRunning with command: /usr/bin/ssh -x " + self.targetMachine + " cd "+self.config.getBinDir()+" ;env DISPLAY=:0.0 "+ self.command)
242            self.processObj = sp.Popen(["/usr/bin/ssh", "-x", self.targetMachine, "cd "+self.config.getBinDir(), ";env DISPLAY=:0.0 ", self.command])
243            WriteLog( ">>>>  EXECUTING:  " + self.command + "\nPID = " + str(self.processObj.pid) + "\n")
244        except:
245            WriteLog( "===>  ERROR launching application ---------> :")
246            WriteLog( "".join(tb.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])) )
247            return -1
248       
249        return self.config.getAppId()
250
251       
252    def isAlive(self):
253        """ returns true if the process is still alive """
254        return self.processObj.poll() is None
255       
256
257    def kill(self):
258        # forcefully kill the application via ssh and delete its pid file if possible
259        pidPath = opj(os.path.basename(self.configFilename), "pid")
260        killCmd = "/bin/kill -9 `cat "+opj(pidPath, self.config.getAppName()+"_"+str(self.config.getAppId())+".pid")+"`"
261        delCmd = "/bin/rm -rf "+opj(pidPath, self.config.getAppName()+"_"+str(self.config.getAppId())+".pid")
262        try:
263            retcode = sp.Popen(["/usr/bin/ssh", "-x", self.targetMachine, killCmd, ";", delCmd])
264            WriteLog( ">>>>  KILLING:  " + killCmd + "\nPID = " + str(self.processObj.pid) + "\n")
265        except:
266            WriteLog( "===>  ERROR killing application ---------> :")
267            WriteLog( "".join(tb.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])) )
268            return -1
269       
270        return True
271
272
273    def deletePIDFile(self):
274        # delete the temp file where the app writes its pid
275        pidPath = opj(os.path.basename(self.configFilename), "pid")
276        delCmd = "/bin/rm -rf "+opj(pidPath, self.config.getAppName()+"_"+str(self.config.getAppId())+".pid")
277        try:
278            retcode = sp.Popen(["/usr/bin/ssh", "-x", self.targetMachine, delCmd], env={"DISPLAY": ":0.0"})
279            WriteLog(">>>>  DELETING:  " + delCmd + "\nPID = " + str(self.processObj.pid) + "\n")
280        except:
281            WriteLog("===>  ERROR deleting temporary pid file ---------> :")
282            WriteLog( "".join(tb.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])) )
283        #    return False
284       
285        return True
Note: See TracBrowser for help on using the repository browser.