Package flumotion :: Package component :: Module feed
[hide private]

Source Code for Module flumotion.component.feed

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  implementation of a PB Client to interface with feedserver.py 
 24  """ 
 25   
 26  import socket 
 27  import os 
 28   
 29  from twisted.internet import reactor, main, defer, tcp 
 30  from twisted.python import failure 
 31  from zope.interface import implements 
 32   
 33  from flumotion.common import log, common, interfaces 
 34  from flumotion.twisted import pb as fpb 
 35   
 36  __version__ = "$Rev$" 
 37   
 38   
 39  # copied from fdserver.py so that it can be bundled 
 40   
 41   
42 -class _SocketMaybeCloser(tcp._SocketCloser):
43 keepSocketAlive = False 44
45 - def _closeSocket(self):
46 # We override this (from tcp._SocketCloser) so that we can close 47 # sockets properly in the normal case, but once we've passed our 48 # socket on via the FD-channel, we just close() it (not calling 49 # shutdown() which will close the TCP channel without closing 50 # the FD itself) 51 if self.keepSocketAlive: 52 try: 53 self.socket.close() 54 except socket.error: 55 pass 56 else: 57 tcp._SocketCloser._closeSocket(self)
58 59
60 -class PassableClientConnection(_SocketMaybeCloser, tcp.Client):
61 pass
62 63
64 -class PassableClientConnector(tcp.Connector):
65 # It is unfortunate, but it seems that either we override this 66 # private-ish method or reimplement BaseConnector.connect(). This is 67 # the path that tcp.py takes, so we take it too. 68
69 - def _makeTransport(self):
70 return PassableClientConnection(self.host, self.port, 71 self.bindAddress, self, 72 self.reactor)
73 74
75 -class FeedClientFactory(fpb.FPBClientFactory, log.Loggable):
76 """ 77 I am a client factory used by a feed component's medium to log into 78 a worker and exchange feeds. 79 """ 80 logCategory = 'feedclient' 81 perspectiveInterface = interfaces.IFeedMedium 82
83 - def __init__(self, medium):
84 fpb.FPBClientFactory.__init__(self) 85 self.medium = medium
86 87 # not a BaseMedium because we are going to do strange things to the transport 88 89
90 -class FeedMedium(fpb.Referenceable):
91 """ 92 I am a client for a Feed Server. 93 94 I am used as the remote interface between a component and another 95 component. 96 97 @ivar component: the component this is a feed client for 98 @type component: L{flumotion.component.feedcomponent.FeedComponent} 99 @ivar remote: a reference to a 100 L{flumotion.worker.feedserver.FeedAvatar} 101 @type remote: L{twisted.spread.pb.RemoteReference} 102 """ 103 logCategory = 'feedmedium' 104 remoteLogName = 'feedserver' 105 implements(interfaces.IFeedMedium) 106 107 remote = None 108
109 - def __init__(self, logName=None):
110 if logName: 111 assert isinstance(logName, str) 112 self.logName = logName 113 self._factory = None 114 self._feedToDeferred = defer.Deferred()
115
116 - def startConnecting(self, host, port, authenticator, timeout=30, 117 bindAddress=None):
118 """Optional helper method to connect to a remote feed server. 119 120 This method starts a client factory connecting via a 121 L{PassableClientConnector}. It offers the possibility of 122 cancelling an in-progress connection via the stopConnecting() 123 method. 124 125 @param host: the remote host name 126 @type host: str 127 @param port: the tcp port on which to connect 128 @param port int 129 @param authenticator: the authenticator, normally provided by 130 the worker 131 @type authenticator: L{flumotion.twisted.pb.Authenticator} 132 133 @returns: a deferred that will fire with the remote reference, 134 once we have authenticated. 135 """ 136 assert self._factory is None 137 self._factory = FeedClientFactory(self) 138 reactor.connectWith(PassableClientConnector, host, port, 139 self._factory, timeout, bindAddress) 140 return self._factory.login(authenticator)
141
142 - def requestFeed(self, host, port, authenticator, fullFeedId):
143 """Request a feed from a remote feed server. 144 145 This helper method calls startConnecting() to make the 146 connection and authenticate, and will return the feed file 147 descriptor or an error. A pending connection attempt can be 148 cancelled via stopConnecting(). 149 150 @param host: the remote host name 151 @type host: str 152 @param port: the tcp port on which to connect 153 @type port: int 154 @param authenticator: the authenticator, normally provided by 155 the worker 156 @type authenticator: L{flumotion.twisted.pb.Authenticator} 157 @param fullFeedId: the full feed id (/flow/component:feed) 158 offered by the remote side 159 @type fullFeedId: str 160 161 @returns: a deferred that, if successful, will fire with a pair 162 (feedId, fd). In an error case it will errback and close the 163 remote connection. 164 """ 165 166 def connected(remote): 167 self.setRemoteReference(remote) 168 return remote.callRemote('sendFeed', fullFeedId)
169 170 def feedSent(res): 171 # res is None 172 # either just before or just after this, we received a 173 # sendFeedReply call from the feedserver. so now we're 174 # waiting on the component to get its fd 175 return self._feedToDeferred
176 177 def error(failure): 178 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 179 host, port) 180 self.debug('failure: %s', log.getFailureMessage(failure)) 181 self.debug('closing connection') 182 self.stopConnecting() 183 return failure 184 185 d = self.startConnecting(host, port, authenticator) 186 d.addCallback(connected) 187 d.addCallback(feedSent) 188 d.addErrback(error) 189 return d 190
191 - def sendFeed(self, host, port, authenticator, fullFeedId):
192 """Send a feed to a remote feed server. 193 194 This helper method calls startConnecting() to make the 195 connection and authenticate, and will return the feed file 196 descriptor or an error. A pending connection attempt can be 197 cancelled via stopConnecting(). 198 199 @param host: the remote host name 200 @type host: str 201 @param port: the tcp port on which to connect 202 @type port: int 203 @param authenticator: the authenticator, normally provided by 204 the worker 205 @type authenticator: L{flumotion.twisted.pb.Authenticator} 206 @param fullFeedId: the full feed id (/flow/component:eaterAlias) 207 to feed to on the remote size 208 @type fullFeedId: str 209 210 @returns: a deferred that, if successful, will fire with a pair 211 (feedId, fd). In an error case it will errback and close the 212 remote connection. 213 """ 214 215 def connected(remote): 216 assert isinstance(remote.broker.transport, _SocketMaybeCloser) 217 self.setRemoteReference(remote) 218 return remote.callRemote('receiveFeed', fullFeedId)
219 220 def feedSent(res): 221 t = self.remote.broker.transport 222 self.debug('stop reading from transport') 223 t.stopReading() 224 225 self.debug('flushing PB write queue') 226 t.doWrite() 227 self.debug('stop writing to transport') 228 t.stopWriting() 229 230 t.keepSocketAlive = True 231 fd = os.dup(t.fileno()) 232 233 # avoid refcount cycles 234 self.setRemoteReference(None) 235 236 d = defer.Deferred() 237 238 def loseConnection(): 239 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 240 d.callback((fullFeedId, fd)) 241 242 reactor.callLater(0, loseConnection) 243 return d 244 245 def error(failure): 246 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 247 host, port) 248 self.debug('failure: %s', log.getFailureMessage(failure)) 249 self.debug('closing connection') 250 self.stopConnecting() 251 return failure 252 253 d = self.startConnecting(host, port, authenticator) 254 d.addCallback(connected) 255 d.addCallback(feedSent) 256 d.addErrback(error) 257 return d 258
259 - def stopConnecting(self):
260 """Stop a pending or established connection made via 261 startConnecting(). 262 263 Stops any established or pending connection to a remote feed 264 server started via the startConnecting() method. Safe to call 265 even if connection has not been started. 266 """ 267 if self._factory: 268 self._factory.disconnect() 269 self._factory = None 270 # not sure if this is necessary; call it just in case, so we 271 # don't leave a lingering reference cycle 272 self.setRemoteReference(None)
273 274 ### IMedium methods 275
276 - def setRemoteReference(self, remoteReference):
277 self.remote = remoteReference
278
279 - def hasRemoteReference(self):
280 return self.remote is not None
281
282 - def callRemote(self, name, *args, **kwargs):
283 return self.remote.callRemote(name, args, kwargs)
284
285 - def remote_sendFeedReply(self, fullFeedId):
286 t = self.remote.broker.transport 287 # make sure we stop receiving PB messages 288 self.debug('stop reading from transport') 289 t.stopReading() 290 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
291
292 - def _doFeedTo(self, fullFeedId, t):
293 self.debug('flushing PB write queue') 294 t.doWrite() 295 self.debug('stop writing to transport') 296 t.stopWriting() 297 298 # make sure shutdown() is not called on the socket 299 t.keepSocketAlive = True 300 301 fd = os.dup(t.fileno()) 302 # Similar to feedserver._sendFeedReplyCb, but since we are in a 303 # callLater, not doReadOrWrite, we call connectionLost directly 304 # on the transport. 305 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 306 307 # This medium object is of no use any more; drop our reference 308 # to the remote so we can avoid cycles. 309 self.setRemoteReference(None) 310 311 (flowName, componentName, feedName) = common.parseFullFeedId( 312 fullFeedId) 313 feedId = common.feedId(componentName, feedName) 314 315 self.debug('firing deferred with feedId %s on fd %d', feedId, 316 fd) 317 self._feedToDeferred.callback((feedId, fd))
318