Package flumotion :: Package worker :: Module feedserver
[hide private]

Source Code for Module flumotion.worker.feedserver

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  implementation of a PB Server through which other components can request 
 24  to eat from or feed to this worker's components. 
 25  """ 
 26   
 27  from twisted.internet import reactor 
 28  from twisted.spread import pb 
 29  from twisted.cred import portal 
 30  from zope.interface import implements 
 31   
 32  from flumotion.common import log, common 
 33  from flumotion.twisted import fdserver 
 34  from flumotion.twisted import portal as fportal 
 35  from flumotion.twisted import pb as fpb 
 36   
 37  __version__ = "$Rev: 7162 $" 
 38   
 39   
40 -class FeedServer(log.Loggable):
41 """ 42 I am the feed server. PHEAR 43 """ 44 45 implements(portal.IRealm) 46 47 logCategory = 'dispatcher' 48
49 - def __init__(self, brain, bouncer, portNum):
50 """ 51 @param brain: L{flumotion.worker.worker.WorkerBrain} 52 """ 53 self._brain = brain 54 self._tport = None 55 self.listen(bouncer, portNum)
56
57 - def getPortNum(self):
58 if not self._tport: 59 self.warning('not listening!') 60 return 0 61 return self._tport.getHost().port
62
63 - def listen(self, bouncer, portNum, unsafeTracebacks=0):
64 portal = fportal.BouncerPortal(self, bouncer) 65 factory = pb.PBServerFactory(portal, 66 unsafeTracebacks=unsafeTracebacks) 67 68 tport = reactor.listenWith(fdserver.PassableServerPort, portNum, 69 factory) 70 71 self._tport = tport 72 self.debug('Listening for feed requests on TCP port %d', 73 self.getPortNum())
74
75 - def shutdown(self):
76 d = self._tport.stopListening() 77 self._tport = None 78 return d
79 80 ### IRealm method 81
82 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
83 avatar = FeedAvatar(self, avatarId, mind) 84 return (pb.IPerspective, avatar, 85 lambda: self.avatarLogout(avatar))
86
87 - def avatarLogout(self, avatar):
88 self.debug('feed avatar logged out: %s', avatar.avatarId)
89 90 ## proxy these to the brain 91
92 - def feedToFD(self, componentId, feedId, fd, eaterId):
93 return self._brain.feedToFD(componentId, feedId, fd, eaterId)
94
95 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
96 return self._brain.eatFromFD(componentId, eaterAlias, fd, feedId)
97 98
99 -class FeedAvatar(fpb.Avatar):
100 """ 101 I am an avatar in a FeedServer for components that log in and request 102 to eat from or feed to one of my components. 103 104 My mind is a reference to a L{flumotion.component.feed.FeedMedium} 105 """ 106 logCategory = "feedavatar" 107 remoteLogName = "feedmedium" 108
109 - def __init__(self, feedServer, avatarId, mind):
110 """ 111 """ 112 fpb.Avatar.__init__(self, avatarId) 113 self._transport = None 114 self.feedServer = feedServer 115 self.avatarId = avatarId 116 self.setMind(mind)
117
118 - def perspective_sendFeed(self, fullFeedId):
119 """ 120 Called when the PB client wants us to send them the given feed. 121 """ 122 # the PB message needs to be sent from the side that has the feeder 123 # for proper switching, so we call back as a reply 124 d = self.mindCallRemote('sendFeedReply', fullFeedId) 125 d.addCallback(self._sendFeedReplyCb, fullFeedId)
126
127 - def _sendFeedReplyCb(self, result, fullFeedId):
128 # compare with startStreaming in prototype 129 # Remove this from the reactor; we mustn't read or write from it from 130 # here on 131 t = self.mind.broker.transport 132 t.stopReading() 133 t.stopWriting() 134 135 # hand off the fd to the component 136 self.debug("Attempting to send FD: %d", t.fileno()) 137 138 (flowName, componentName, feedName) = common.parseFullFeedId( 139 fullFeedId) 140 componentId = common.componentId(flowName, componentName) 141 142 if self.feedServer.feedToFD(componentId, feedName, t.fileno(), 143 self.avatarId): 144 t.keepSocketAlive = True 145 146 # We removed the transport from the reactor before sending the 147 # FD; now we want the socket cleaned up. 148 t.loseConnection()
149
150 - def perspective_receiveFeed(self, fullFeedId):
151 """ 152 Called when the PB client wants to send the given feedId to the 153 given component 154 """ 155 # we need to make sure our result goes back, so only stop reading 156 t = self.mind.broker.transport 157 t.stopReading() 158 reactor.callLater(0, self._doReceiveFeed, fullFeedId)
159
160 - def _doReceiveFeed(self, fullFeedId):
161 t = self.mind.broker.transport 162 163 self.debug('flushing PB write queue') 164 t.doWrite() 165 self.debug('stop writing to transport') 166 t.stopWriting() 167 168 # hand off the fd to the component 169 self.debug("Attempting to send FD: %d", t.fileno()) 170 171 (flowName, componentName, eaterAlias) = common.parseFullFeedId( 172 fullFeedId) 173 componentId = common.componentId(flowName, componentName) 174 175 if self.feedServer.eatFromFD(componentId, eaterAlias, t.fileno(), 176 self.avatarId): 177 t.keepSocketAlive = True 178 179 t.loseConnection()
180