Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 6 # Copyright (C) 2010,2011 Flumotion Services, S.A. 7 # All rights reserved. 8 # 9 # This file may be distributed and/or modified under the terms of 10 # the GNU Lesser General Public License version 2.1 as published by 11 # the Free Software Foundation. 12 # This file is distributed without any warranty; without even the implied 13 # warranty of merchantability or fitness for a particular purpose. 14 # See "LICENSE.LGPL" in the source distribution for more information. 15 # 16 # Headers in this file shall remain intact. 17 18 import gst 19 from twisted.internet import defer 20 from flumotion.component import feedcomponent 21 from flumotion.twisted.defer import RetryingDeferred 22 from flumotion.common import errors 23 24 __version__ = "$Rev$" 25 2628 29 configured = False 30 3312035 # Basing on the cappabilities plug additional gst compoponents: 36 # 1. If we have pure audio (http src doesn't support ICY) plug parser 37 # 2. If we have application/x-icy plug the icydemuxer and than parser 38 capsname = caps[0].get_name() 39 tf_src_pad = tf.get_pad('src') 40 gdp_sink_pad = tf_src_pad.get_peer() 41 # unlink the typefind from the gdp pad so that we can put another 42 # component in it's place 43 tf_src_pad.unlink(gdp_sink_pad) 44 45 if capsname == 'application/x-icy': 46 demuxer = gst.element_factory_make("icydemux") 47 demuxer.set_state(gst.STATE_PLAYING) 48 self._demuxer_name = demuxer.get_name() 49 self.pipeline.add(demuxer) 50 tf.link(demuxer) 51 # demuxer src pad is dynamic, we need to register a callback 52 demuxer.connect('pad-added', self._link_parser, gdp_sink_pad) 53 else: 54 self._demuxer_name = None 55 self._link_parser(tf, tf_src_pad, gdp_sink_pad)5658 # Append the audio parser to the end of the pipeline 59 caps = pad.get_caps() 60 capsname = caps.get_structure(0).get_name() 61 self._parser_name = None 62 parser = None 63 if self.passthrough: 64 self.info("Acting in passthrough mode, not parsing the audio") 65 pad.link(gdp_sink_pad) 66 return 67 if capsname == 'application/ogg': 68 parser = gst.element_factory_make('oggparse') 69 elif capsname == 'audio/mpeg': 70 mpegversion = caps[0]['mpegversion'] 71 if mpegversion == 1: 72 self.info("Detecting MP3 stream. Adding 'mp3parse'") 73 parser = gst.element_factory_make('mp3parse') 74 elif mpegversion in [2, 4]: 75 self.info("Detecting AAC stream. Adding 'aacparse'") 76 parser = gst.element_factory_make('aacparse') 77 if parser: 78 self._parser_name = parser.get_name() 79 parser.set_state(gst.STATE_PLAYING) 80 self.pipeline.add(parser) 81 element.link(parser) 82 parser.get_pad('src').link(gdp_sink_pad) 83 else: 84 # in case we good sth else than mp3 or ogg just connect the 85 # gdb back 86 self.warning("Couldn't find the correct parser for caps: %s",\ 87 capsname) 88 pad.link(gdp_sink_pad)8991 # Later, when the typefind element has successfully found the type 92 # of the data, we'll rebuild the pipeline. 93 self.src = pipeline.get_by_name('src') 94 self.url = properties['url'] 95 self.passthrough = properties.get('passthrough', False) 96 self.src.set_property('location', self.url) 97 self.src.set_property('iradio-mode', True) 98 99 typefind = pipeline.get_by_name('tf') 100 self.signal_id = typefind.connect('have-type',\ 101 self._typefind_have_caps_cb) 102 103 if not self.configured: 104 self.attachPadMonitorToElement('src', 105 self._src_connected, 106 self._src_disconnected) 107 self.reconnecting = False 108 self.reconnector = RetryingDeferred(self.connect) 109 self.reconnector.initialDelay = 1.0 110 self.reconnector.maxDelay = 300 111 self.attemptD = None 112 113 def _drop_eos(pad, event): 114 self.debug('Swallowing event %r', event) 115 if event.type == gst.EVENT_EOS: 116 return False 117 return True118 self.configured = True 119 self.src.get_pad('src').add_event_probe(_drop_eos)122 if message.type == gst.MESSAGE_ERROR and message.src == self.src: 123 gerror, debug = message.parse_error() 124 self.warning('element %s error %s %s', 125 message.src.get_path_string(), gerror, debug) 126 if self.reconnecting: 127 self._retry() 128 return True 129 feedcomponent.ParseLaunchComponent.bus_message_received_cb( 130 self, bus, message)131133 self.info('Connecting to icecast server on %s', self.url) 134 self.src.set_state(gst.STATE_READY) 135 # can't just self.src.set_state(gst.STATE_PLAYING), 136 # because the pipeline might NOT be in PLAYING, 137 # if we never connected to Icecast and never went to PLAYING 138 self.try_start_pipeline(force=True) 139 self.attemptD = defer.Deferred() 140 return self.attemptD141143 self.info('Connected to icecast server on %s', self.url) 144 if self.reconnecting: 145 assert self.attemptD 146 self.attemptD.callback(None) 147 self.reconnecting = False148150 # remove all the elements downstream souphttpsrc. 151 if not self._parser_name: 152 self.reconnecting = True 153 self.reconnector.start() 154 return 155 156 tf = self.get_element('tf') 157 pad.unlink(tf.get_pad('sink')) 158 159 parser = self.get_element(self._parser_name) 160 tf.get_pad('src').unlink(parser.get_pad('sink')) 161 peer = parser.get_pad('src').get_peer() 162 parser.get_pad('src').unlink(peer) 163 164 parser.set_state(gst.STATE_NULL) 165 self.pipeline.remove(parser) 166 self._parser_name = None 167 tf.set_state(gst.STATE_NULL) 168 self.pipeline.remove(tf) 169 if self._demuxer_name is not None: 170 demuxer = self.get_element(self._demuxer_name) 171 demuxer.set_state(gst.STATE_NULL) 172 self.pipeline.remove(demuxer) 173 self._demuxer_name = None 174 175 # recreate the typefind element in order to be in the same state as 176 # when the component was first initiated 177 tf = gst.element_factory_make('typefind', 'tf') 178 self.pipeline.add(tf) 179 tf.set_state(gst.STATE_PLAYING) 180 pad.link(tf.get_pad('sink')) 181 tf.get_pad('src').link(peer) 182 183 # reconfigure the pipeline 184 self.configure_pipeline(self.pipeline, self.config['properties']) 185 self.pipeline.set_state(gst.STATE_PLAYING) 186 self.reconnecting = True 187 self.reconnector.start()188190 self.info('Disconnected from icecast server on %s', self.url) 191 if not self.reconnecting: 192 src = self.get_element('src') 193 pad = src.get_pad('src') 194 self._reset(pad)195197 assert self.attemptD 198 if not self.attemptD.called: 199 self.debug('Retrying connection to icecast server on %s', self.url) 200 self.attemptD.errback(errors.ConnectionError)201
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Wed Jun 17 06:48:42 2015 | http://epydoc.sourceforge.net |