Package flumotion :: Package worker :: Module base
[hide private]

Source Code for Module flumotion.worker.base

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import sys 
 28  import signal 
 29   
 30  from twisted.cred import portal 
 31  from twisted.internet import defer, reactor 
 32  from twisted.spread import pb 
 33  from zope.interface import implements 
 34   
 35  from flumotion.common import errors, log 
 36  from flumotion.common import worker, startset 
 37  from flumotion.common.process import signalPid 
 38  from flumotion.twisted import checkers, fdserver 
 39  from flumotion.twisted import pb as fpb 
 40   
 41  __version__ = "$Rev$" 
 42   
 43  JOB_SHUTDOWN_TIMEOUT = 5 
 44   
 45   
46 -def _getSocketPath():
47 # FIXME: there is mkstemp for sockets, so we have a small window 48 # here in which the socket could be created by something else 49 # I didn't succeed in preparing a socket file with that name either 50 51 # caller needs to delete name before using 52 import tempfile 53 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 54 os.close(fd) 55 56 return name
57 58
59 -class JobInfo(object):
60 """ 61 I hold information about a job. 62 63 @cvar pid: PID of the child process 64 @type pid: int 65 @cvar avatarId: avatar identification string 66 @type avatarId: str 67 @cvar type: type of the component to create 68 @type type: str 69 @cvar moduleName: name of the module to create the component from 70 @type moduleName: str 71 @cvar methodName: the factory method to use to create the component 72 @type methodName: str 73 @cvar nice: the nice level to run the job as 74 @type nice: int 75 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 76 create the component 77 @type bundles: list of (str, str) 78 """ 79 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 80 'nice', 'bundles') 81
82 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice, 83 bundles):
84 self.pid = pid 85 self.avatarId = avatarId 86 self.type = type 87 self.moduleName = moduleName 88 self.methodName = methodName 89 self.nice = nice 90 self.bundles = bundles
91 92
93 -class JobProcessProtocol(worker.ProcessProtocol):
94
95 - def __init__(self, heaven, avatarId, startSet):
96 self._startSet = startSet 97 self._deferredStart = startSet.createRegistered(avatarId) 98 worker.ProcessProtocol.__init__(self, heaven, avatarId, 99 'component', 100 heaven.getWorkerName())
101
102 - def sendMessage(self, message):
103 heaven = self.loggable 104 heaven.brain.callRemote('componentAddMessage', self.avatarId, 105 message)
106
107 - def processEnded(self, status):
108 heaven = self.loggable 109 dstarts = self._startSet 110 signum = status.value.signal 111 112 # we need to trigger a failure on the create deferred 113 # if the job failed before logging in to the worker; 114 # otherwise the manager still thinks it's starting up when it's 115 # dead. If the job already attached to the worker however, 116 # the create deferred will already have callbacked. 117 deferred = dstarts.createRegistered(self.avatarId) 118 if deferred is self._deferredStart: 119 if signum: 120 reason = "received signal %d" % signum 121 else: 122 reason = "unknown reason" 123 text = ("Component '%s' has exited early (%s)." % 124 (self.avatarId, reason)) 125 dstarts.createFailed(self.avatarId, 126 errors.ComponentCreateError(text)) 127 128 if dstarts.shutdownRegistered(self.avatarId): 129 dstarts.shutdownSuccess(self.avatarId) 130 131 heaven.jobStopped(self.pid) 132 133 # chain up 134 worker.ProcessProtocol.processEnded(self, status)
135 136
137 -class BaseJobHeaven(pb.Root, log.Loggable):
138 """ 139 I am similar to but not quite the same as a manager-side Heaven. 140 I manage avatars inside the worker for job processes spawned by the worker. 141 142 @ivar avatars: dict of avatarId -> avatar 143 @type avatars: dict of str -> L{base.BaseJobAvatar} 144 @ivar brain: the worker brain 145 @type brain: L{worker.WorkerBrain} 146 """ 147 148 logCategory = "job-heaven" 149 implements(portal.IRealm) 150 151 avatarClass = None 152
153 - def __init__(self, brain):
154 """ 155 @param brain: a reference to the worker brain 156 @type brain: L{worker.WorkerBrain} 157 """ 158 self.avatars = {} # componentId -> avatar 159 self.brain = brain 160 self._socketPath = _getSocketPath() 161 self._port = None 162 self._onShutdown = None # If set, a deferred to fire when 163 # our last child process exits 164 165 self._jobInfos = {} # processid -> JobInfo 166 167 self._startSet = startset.StartSet( 168 lambda x: x in self.avatars, 169 errors.ComponentAlreadyStartingError, 170 errors.ComponentAlreadyRunningError)
171
172 - def listen(self):
173 assert self._port is None 174 assert self.avatarClass is not None 175 # FIXME: we should hand a username and password to log in with to 176 # the job process instead of allowing anonymous 177 checker = checkers.FlexibleCredentialsChecker() 178 checker.allowPasswordless(True) 179 p = portal.Portal(self, [checker]) 180 f = pb.PBServerFactory(p) 181 try: 182 os.unlink(self._socketPath) 183 except OSError: 184 pass 185 186 # Rather than a listenUNIX(), we use listenWith so that we can specify 187 # our particular Port, which creates Transports that we know how to 188 # pass FDs over. 189 self.debug("Listening for FD's on unix socket %s", self._socketPath) 190 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 191 self._port = port
192 193 ### portal.IRealm method 194
195 - def requestAvatar(self, avatarId, mind, *interfaces):
196 if pb.IPerspective in interfaces: 197 avatar = self.avatarClass(self, avatarId, mind) 198 assert avatarId not in self.avatars 199 self.avatars[avatarId] = avatar 200 return pb.IPerspective, avatar, avatar.logout 201 else: 202 raise NotImplementedError("no interface")
203
204 - def removeAvatar(self, avatarId):
205 if avatarId in self.avatars: 206 del self.avatars[avatarId] 207 else: 208 self.warning("some programmer is telling me about an avatar " 209 "I have no idea about: %s", avatarId)
210
211 - def getWorkerName(self):
212 """ 213 Gets the name of the worker that spawns the process. 214 215 @rtype: str 216 """ 217 return self.brain.workerName
218
219 - def addJobInfo(self, processId, jobInfo):
220 self._jobInfos[processId] = jobInfo
221
222 - def getJobInfo(self, processId):
223 return self._jobInfos[processId]
224
225 - def getJobInfos(self):
226 return self._jobInfos.values()
227
228 - def getJobPids(self):
229 return self._jobInfos.keys()
230
231 - def rotateChildLogFDs(self):
232 self.debug('telling kids about new log file descriptors') 233 for avatar in self.avatars.values(): 234 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
235
236 - def jobStopped(self, pid):
237 if pid in self._jobInfos: 238 self.debug('Removing job info for %d', pid) 239 del self._jobInfos[pid] 240 241 if not self._jobInfos and self._onShutdown: 242 self.debug("Last child exited") 243 self._onShutdown.callback(None) 244 else: 245 self.warning("some programmer is telling me about a pid " 246 "I have no idea about: %d", pid)
247
248 - def shutdown(self):
249 self.debug('Shutting down JobHeaven') 250 self.debug('Stopping all jobs') 251 for avatar in self.avatars.values(): 252 avatar.stop() 253 254 if self.avatars: 255 # If our jobs fail to shut down nicely within some period of 256 # time, shut them down less nicely 257 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 258 259 def cancelDelayedCall(res, dc): 260 # be nice to unit tests 261 if dc.active(): 262 dc.cancel() 263 return res
264 265 self._onShutdown = defer.Deferred() 266 self._onShutdown.addCallback(cancelDelayedCall, dc) 267 ret = self._onShutdown 268 else: 269 # everything's gone already, return success 270 ret = defer.succeed(None) 271 272 def stopListening(_): 273 # possible for it to be None, if we haven't been told to 274 # listen yet, as in some test cases 275 if self._port: 276 port = self._port 277 self._port = None 278 return port.stopListening()
279 ret.addCallback(stopListening) 280 return ret 281
282 - def kill(self, signum=signal.SIGKILL):
283 self.warning("Killing all children immediately") 284 for pid in self.getJobPids(): 285 self.killJobByPid(pid, signum)
286
287 - def killJobByPid(self, pid, signum):
288 if pid not in self._jobInfos: 289 raise errors.UnknownComponentError(pid) 290 291 jobInfo = self._jobInfos[pid] 292 self.debug("Sending signal %d to job %s at pid %d", signum, 293 jobInfo.avatarId, jobInfo.pid) 294 signalPid(jobInfo.pid, signum)
295
296 - def killJob(self, avatarId, signum):
297 for job in self._jobInfos.values(): 298 if job.avatarId == avatarId: 299 self.killJobByPid(job.pid, signum)
300 301
302 -class BaseJobAvatar(fpb.Avatar, log.Loggable):
303 """ 304 I am an avatar for the job living in the worker. 305 """ 306 logCategory = 'job-avatar' 307
308 - def __init__(self, heaven, avatarId, mind):
309 """ 310 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 311 @type avatarId: str 312 """ 313 fpb.Avatar.__init__(self, avatarId) 314 self._heaven = heaven 315 self.setMind(mind) 316 self.pid = None
317
318 - def setMind(self, mind):
319 """ 320 @param mind: reference to the job's JobMedium on which we can call 321 @type mind: L{twisted.spread.pb.RemoteReference} 322 """ 323 fpb.Avatar.setMind(self, mind) 324 self.haveMind()
325
326 - def haveMind(self):
327 # implement me in subclasses 328 pass
329
330 - def logout(self):
331 self.log('logout called, %s disconnected', self.avatarId) 332 333 self._heaven.removeAvatar(self.avatarId)
334
335 - def stop(self):
336 """ 337 returns: a deferred marking completed stop. 338 """ 339 raise NotImplementedError
340
341 - def _sendFileDescriptor(self, fd, message):
342 try: 343 # FIXME: pay attention to the return value of 344 # sendFileDescriptor; is the same as the return value of 345 # sendmsg(2) 346 self.mind.broker.transport.sendFileDescriptor(fd, message) 347 return True 348 except OSError, e: 349 # OSError is what is thrown by the C code doing this 350 # when there are issues 351 self.warning("Error %s sending file descriptors", 352 log.getExceptionMessage(e)) 353 return False
354
355 - def logTo(self, stdout, stderr):
356 """ 357 Tell the job to log to the given file descriptors. 358 """ 359 self.debug('Giving job new stdout and stderr') 360 if self.mind: 361 self._sendFileDescriptor(stdout, "redirectStdout") 362 self._sendFileDescriptor(stdout, "redirectStderr")
363