Package flumotion :: Package twisted :: Module fdserver
[hide private]

Source Code for Module flumotion.twisted.fdserver

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  from flumotion.common import log 
 23  from flumotion.extern.fdpass import fdpass 
 24   
 25  from twisted.internet import unix, main, address, tcp 
 26  from twisted.spread import pb 
 27   
 28  import errno 
 29  import os 
 30  import socket 
 31  import struct 
 32  import time 
 33   
 34  __version__ = "$Rev$" 
 35   
 36   
 37  # Heavily based on 
 38  # http://twistedmatrix.com/trac/browser/sandbox/exarkun/copyover/server.py 
 39  # and client.py 
 40  # Thanks for the inspiration! 
 41   
 42  # Since we're doing this over a stream socket, our file descriptor messages 
 43  # aren't guaranteed to be received alone; they could arrive along with some 
 44  # unrelated data. 
 45  # So, we prefix the message with a 16 byte randomly generated magic signature, 
 46  # and a length, and if we receive file descriptors decode based on this. 
 47  # 
 48  # map() instead of a string to workaround gettext encoding problems. 
 49  # 
 50  MAGIC_SIGNATURE = ''.join(map(chr, [253, 252, 142, 127, 7, 71, 185, 234, 
 51                                      161, 117, 238, 216, 220, 54, 200, 163])) 
 52   
 53   
54 -class FDServer(unix.Server):
55
56 - def sendFileDescriptor(self, fileno, data=""):
57 message = struct.pack("@16sI", MAGIC_SIGNATURE, len(data)) + data 58 return fdpass.writefds(self.fileno(), [fileno], message)
59 60
61 -class FDPort(unix.Port):
62 transport = FDServer
63 64
65 -class FDClient(unix.Client): #, log.Loggable):
66
67 - def doRead(self):
68 if not self.connected: 69 return 70 try: 71 (fds, message) = fdpass.readfds(self.fileno(), 64 * 1024) 72 except OSError, e: 73 if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): 74 return 75 else: 76 return main.CONNECTION_LOST 77 else: 78 if not message: 79 return main.CONNECTION_DONE 80 81 if len(fds) > 0: 82 # Look for our magic cookie in (possibly) the midst of other 83 # data. Pass surrounding chunks, if any, onto dataReceived(), 84 # which (undocumentedly) must return None unless a failure 85 # occurred. 86 # Pass the actual FDs and their message to 87 # fileDescriptorsReceived() 88 offset = message.find(MAGIC_SIGNATURE) 89 if offset < 0: 90 # Old servers did not send this; be hopeful that this 91 # doesn't have bits of other protocol (i.e. PB) mixed up 92 # in it. 93 return self.protocol.fileDescriptorsReceived(fds, message) 94 elif offset > 0: 95 ret = self.protocol.dataReceived(message[0:offset]) 96 if ret: 97 return ret 98 99 msglen = struct.unpack("@I", message[offset+16:offset+20])[0] 100 offset += 20 101 ret = self.protocol.fileDescriptorsReceived(fds, 102 message[offset:offset+msglen]) 103 if ret: 104 return ret 105 106 if offset+msglen < len(message): 107 return self.protocol.dataReceived(message[offset+msglen:]) 108 return ret 109 else: 110 # self.debug("No FDs, passing to dataReceived") 111 return self.protocol.dataReceived(message)
112 113
114 -class FDConnector(unix.Connector):
115
116 - def _makeTransport(self):
117 return FDClient(self.address, self, self.reactor)
118 119
120 -class FDPassingBroker(pb.Broker, log.Loggable):
121 """ 122 A pb.Broker subclass that handles FDs being passed to it (with associated 123 data) over the same connection as the normal PB data stream. 124 When an FD is seen, it creates new protocol objects for them from the 125 childFactory attribute. 126 """ 127 # FIXME: looks like we can only use our own subclasses that take 128 # three __init__ args 129
130 - def __init__(self, childFactory, connectionClass, **kwargs):
131 """ 132 @param connectionClass: subclass of L{twisted.internet.tcp.Connection} 133 """ 134 pb.Broker.__init__(self, **kwargs) 135 136 self.childFactory = childFactory 137 self._connectionClass = connectionClass
138 139 # This is the complex bit. If our underlying transport receives a file 140 # descriptor, this gets called - along with the data we got with the FD. 141 # We create an appropriate protocol object, and attach it to the reactor. 142
143 - def fileDescriptorsReceived(self, fds, message):
144 if len(fds) == 1: 145 fd = fds[0] 146 147 # Note that we hardcode IPv4 here! 148 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 149 150 # PROBE: received fd; see porter.py 151 self.debug("[fd %5d] (ts %f) received fd from %d, created socket", 152 sock.fileno(), time.time(), fd) 153 154 # Undocumentedly (other than a comment in 155 # Python/Modules/socketmodule.c), socket.fromfd() calls dup() on 156 # the passed FD before it actually wraps it in a socket object. 157 # So, we need to close the FD that we originally had... 158 os.close(fd) 159 160 try: 161 peeraddr = sock.getpeername() 162 except socket.error: 163 self.info("Socket disconnected before being passed to client") 164 sock.close() 165 return 166 167 # Based on bits in tcp.Port.doRead() 168 addr = address._ServerFactoryIPv4Address('TCP', 169 peeraddr[0], peeraddr[1]) 170 protocol = self.childFactory.buildProtocol(addr) 171 172 self._connectionClass(sock, protocol, peeraddr, message) 173 else: 174 self.warning("Unexpected: FD-passing message with len(fds) != 1")
175 176
177 -class _SocketMaybeCloser(tcp._SocketCloser):
178 keepSocketAlive = False 179
180 - def _closeSocket(self):
181 # We override this (from tcp._SocketCloser) so that we can close 182 # sockets properly in the normal case, but once we've passed our 183 # socket on via the FD-channel, we just close() it (not calling 184 # shutdown() which will close the TCP channel without closing 185 # the FD itself) 186 if self.keepSocketAlive: 187 try: 188 self.socket.close() 189 except socket.error: 190 pass 191 else: 192 tcp.Server._closeSocket(self)
193 194
195 -class PassableServerConnection(_SocketMaybeCloser, tcp.Server):
196 """ 197 A subclass of tcp.Server that permits passing the FDs used to other 198 processes (by just calling close(2) rather than shutdown(2) on them) 199 """ 200 pass
201 202
203 -class PassableServerPort(tcp.Port):
204 transport = PassableServerConnection
205