Package flumotion :: Package component :: Module eater
[hide private]

Source Code for Module flumotion.component.eater

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  from twisted.internet import reactor 
 25   
 26  from flumotion.common import componentui 
 27   
 28  __version__ = "$Rev: 7162 $" 
 29   
 30   
31 -class Eater:
32 """ 33 This class groups eater-related information as used by a Feed Component. 34 35 @ivar eaterAlias: the alias of this eater (e.g. "default", "video", 36 ...) 37 @ivar feedId: id of the feed this is eating from 38 @ivar uiState: the serializable UI State for this eater 39 """ 40
41 - def __init__(self, eaterAlias, eaterName):
42 self.eaterAlias = eaterAlias 43 self.eaterName = eaterName 44 self.feedId = None 45 self.fd = None 46 self.elementName = 'eater:' + eaterAlias 47 self.depayName = self.elementName + '-depay' 48 self.setPadMonitor(None) 49 self.uiState = componentui.WorkerComponentUIState() 50 self.uiState.addKey('eater-alias') 51 self.uiState.set('eater-alias', eaterAlias) 52 self.uiState.addKey('eater-name') 53 self.uiState.set('eater-name', eaterName) 54 # dict for the current connection 55 connectionDict = { 56 "feed-id": None, 57 "time-timestamp-discont": None, 58 "timestamp-timestamp-discont": 0.0, # ts of buffer after discont, 59 # in float seconds 60 "last-timestamp-discont": 0.0, 61 "total-timestamp-discont": 0.0, 62 "count-timestamp-discont": 0, 63 "time-offset-discont": None, 64 "offset-offset-discont": 0, # offset of buffer 65 # after discont 66 "last-offset-discont": 0, 67 "total-offset-discont": 0, 68 "count-offset-discont": 0} 69 self.uiState.addDictKey('connection', connectionDict) 70 71 for key in ( 72 'last-connect', # last client connection, in epoch sec 73 'last-disconnect', # last client disconnect, in epoch sec 74 'total-connections', # number of connections by this client 75 'count-timestamp-discont', # number of timestamp disconts seen 76 'count-offset-discont', # number of timestamp disconts seen 77 ): 78 self.uiState.addKey(key, 0) 79 for key in ( 80 'total-timestamp-discont', # total timestamp discontinuity 81 'total-offset-discont', # total offset discontinuity 82 ): 83 self.uiState.addKey(key, 0.0) 84 self.uiState.addKey('fd', None)
85
86 - def __repr__(self):
87 return '<Eater %s %s>' % (self.eaterAlias, 88 (self.feedId and '(disconnected)' 89 or ('eating from %s' % self.feedId)))
90
91 - def connected(self, fd, feedId, when=None):
92 """ 93 The eater has been connected. 94 Update related stats. 95 """ 96 if not when: 97 when = time.time() 98 99 self.feedId = feedId 100 self.fd = fd 101 102 self.uiState.set('last-connect', when) 103 self.uiState.set('fd', fd) 104 self.uiState.set('total-connections', 105 self.uiState.get('total-connections', 0) + 1) 106 107 self.uiState.setitem("connection", 'feed-id', feedId) 108 self.uiState.setitem("connection", "count-timestamp-discont", 0) 109 self.uiState.setitem("connection", "time-timestamp-discont", None) 110 self.uiState.setitem("connection", "last-timestamp-discont", 0.0) 111 self.uiState.setitem("connection", "total-timestamp-discont", 0.0) 112 self.uiState.setitem("connection", "count-offset-discont", 0) 113 self.uiState.setitem("connection", "time-offset-discont", None) 114 self.uiState.setitem("connection", "last-offset-discont", 0) 115 self.uiState.setitem("connection", "total-offset-discont", 0)
116
117 - def disconnected(self, when=None):
118 """ 119 The eater has been disconnected. 120 Update related stats. 121 """ 122 if not when: 123 when = time.time() 124 125 def updateUIState(): 126 self.uiState.set('last-disconnect', when) 127 self.fd = None 128 self.uiState.set('fd', None)
129 130 reactor.callFromThread(updateUIState)
131
132 - def setPadMonitor(self, monitor):
133 self._padMonitor = monitor
134
135 - def isActive(self):
136 return self._padMonitor and self._padMonitor.isActive()
137
138 - def addWatch(self, setActive, setInactive):
139 self._padMonitor.addWatch(lambda _: setActive(self.eaterAlias), 140 lambda _: setInactive(self.eaterAlias))
141
142 - def timestampDiscont(self, seconds, timestamp):
143 """ 144 @param seconds: discont duration in seconds 145 @param timestamp: GStreamer timestamp of new buffer, in seconds. 146 147 Inform the eater of a timestamp discontinuity. 148 This is called from a bus message handler, so in the main thread. 149 """ 150 uiState = self.uiState 151 152 c = uiState.get('connection') # dict 153 uiState.setitem('connection', 'count-timestamp-discont', 154 c.get('count-timestamp-discont', 0) + 1) 155 uiState.set('count-timestamp-discont', 156 uiState.get('count-timestamp-discont', 0) + 1) 157 158 uiState.setitem('connection', 'time-timestamp-discont', time.time()) 159 uiState.setitem('connection', 'timestamp-timestamp-discont', timestamp) 160 uiState.setitem('connection', 'last-timestamp-discont', seconds) 161 uiState.setitem('connection', 'total-timestamp-discont', 162 c.get('total-timestamp-discont', 0) + seconds) 163 uiState.set('total-timestamp-discont', 164 uiState.get('total-timestamp-discont', 0) + seconds)
165
166 - def offsetDiscont(self, units, offset):
167 """ 168 Inform the eater of an offset discontinuity. 169 This is called from a bus message handler, so in the main thread. 170 """ 171 uiState = self.uiState 172 173 c = uiState.get('connection') # dict 174 uiState.setitem('connection', 'count-offset-discont', 175 c.get('count-offset-discont', 0) + 1) 176 uiState.set('count-offset-discont', 177 uiState.get('count-offset-discont', 0) + 1) 178 179 uiState.setitem('connection', 'time-offset-discont', time.time()) 180 uiState.setitem('connection', 'offset-offset-discont', offset) 181 uiState.setitem('connection', 'last-offset-discont', units) 182 uiState.setitem('connection', 'total-offset-discont', 183 c.get('total-offset-discont', 0) + units) 184 uiState.set('total-offset-discont', 185 uiState.get('total-offset-discont', 0) + units)
186