Package flumotion :: Package component :: Package base :: Module http
[hide private]

Source Code for Module flumotion.component.base.http

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import struct 
 23  import socket 
 24   
 25  from twisted.web import http, server 
 26  from twisted.web import resource as web_resource 
 27  from twisted.internet import reactor, defer 
 28  from twisted.python import reflect, failure 
 29   
 30  from flumotion.configure import configure 
 31  from flumotion.common import errors 
 32  from flumotion.twisted.credentials import cryptChallenge 
 33   
 34  from flumotion.common import common, log, keycards 
 35   
 36  #__all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 37  __version__ = "$Rev$" 
 38   
 39   
 40  HTTP_SERVER_NAME = 'FlumotionHTTPServer' 
 41  HTTP_SERVER_VERSION = configure.version 
 42   
 43  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 44  <html> 
 45  <head> 
 46    <title>%(code)d %(error)s</title> 
 47  </head> 
 48  <body> 
 49  <h2>%(code)d %(error)s</h2> 
 50  </body> 
 51  </html> 
 52  """ 
 53   
 54  HTTP_SERVER = '%s/%s' % (HTTP_SERVER_NAME, HTTP_SERVER_VERSION) 
 55   
 56  ### This is new Issuer code that eventually should move to e.g. 
 57  ### flumotion.common.keycards or related 
 58   
 59   
60 -class HTTPGenericIssuer(log.Loggable):
61 """ 62 I create L{flumotion.common.keycards.Keycard} based on an 63 HTTP request. Useful for authenticating based on 64 server-side checks such as time, as well as client credentials 65 such as HTTP Auth, get parameters, IP address and token. 66 """ 67
68 - def issue(self, request):
69 keycard = keycards.KeycardGeneric() 70 keycard.username = request.getUser() 71 keycard.password = request.getPassword() 72 keycard.address = request.getClientIP() 73 # args can have lists as values, if more than one specified 74 token = request.args.get('token', '') 75 if not isinstance(token, str): 76 token = token[0] 77 keycard.token = token 78 keycard.arguments = request.args 79 keycard.path = request.path 80 self.debug("Asking for authentication, generic HTTP") 81 return keycard
82 83 84 BOUNCER_SOCKET = 'flumotion.component.bouncers.plug.BouncerPlug' 85 BUS_SOCKET = 'flumotion.component.plugs.bus.BusPlug' 86 87
88 -class HTTPAuthentication(log.Loggable):
89 """ 90 Helper object for handling HTTP authentication for twisted.web 91 Resources, using issuers and bouncers. 92 """ 93 94 logCategory = 'httpauth' 95 96 KEYCARD_TTL = 60 * 60 97 KEYCARD_KEEPALIVE_INTERVAL = 20 * 60 98 KEYCARD_TRYAGAIN_INTERVAL = 1 * 60 99
100 - def __init__(self, component):
101 self.component = component 102 self._fdToKeycard = {} # request fd -> Keycard 103 self._idToKeycard = {} # keycard id -> Keycard 104 self._fdToDurationCall = {} # request fd -> IDelayedCall 105 # for duration 106 self._domain = None # used for auth challenge and on keycard 107 self._issuer = HTTPGenericIssuer() # issues keycards;default for compat 108 self.bouncerName = None 109 self.setRequesterId(component.getName()) 110 self._defaultDuration = None # default duration to use if the keycard 111 # doesn't specify one. 112 self._allowDefault = False # whether failures communicating with 113 # the bouncer should result in HTTP 500 114 # or with allowing the connection 115 self._pendingCleanups = [] 116 self._keepAlive = None 117 118 if (BOUNCER_SOCKET in self.component.plugs 119 and self.component.plugs[BOUNCER_SOCKET]): 120 assert len(self.component.plugs[BOUNCER_SOCKET]) == 1 121 self.plug = self.component.plugs[BOUNCER_SOCKET][0] 122 self.plug.set_expire_function(self.expireKeycards) 123 else: 124 self.plug = None
125
126 - def scheduleKeepAlive(self, tryingAgain=False):
127 128 def timeout(): 129 130 def reschedule(res): 131 if isinstance(res, failure.Failure): 132 self.info('keepAlive failed, rescheduling in %d ' 133 'seconds', self.KEYCARD_TRYAGAIN_INTERVAL) 134 self._keepAlive = None 135 self.scheduleKeepAlive(tryingAgain=True) 136 else: 137 self.info('keepAlive successful') 138 self._keepAlive = None 139 self.scheduleKeepAlive(tryingAgain=False)
140 141 if self.bouncerName is not None: 142 self.debug('calling keepAlive on bouncer %s', 143 self.bouncerName) 144 d = self.keepAlive(self.bouncerName, self.issuerName, 145 self.KEYCARD_TTL) 146 d.addCallbacks(reschedule, reschedule) 147 else: 148 self.scheduleKeepAlive()
149 150 if tryingAgain: 151 self._keepAlive = reactor.callLater( 152 self.KEYCARD_TRYAGAIN_INTERVAL, timeout) 153 else: 154 self._keepAlive = reactor.callLater( 155 self.KEYCARD_KEEPALIVE_INTERVAL, timeout) 156
157 - def stopKeepAlive(self):
158 if self._keepAlive is not None: 159 self._keepAlive.cancel() 160 self._keepAlive = None
161
162 - def setDomain(self, domain):
163 """ 164 Set a domain name on the resource, used in HTTP auth challenges and 165 on the keycard. 166 167 @type domain: string 168 """ 169 self._domain = domain
170
171 - def setBouncerName(self, bouncerName):
172 self.bouncerName = bouncerName
173
174 - def setRequesterId(self, requesterId):
175 self.requesterId = requesterId 176 # make something uniquey 177 self.issuerName = str(self.requesterId) + '-' + cryptChallenge()
178
179 - def setDefaultDuration(self, defaultDuration):
180 self._defaultDuration = defaultDuration
181
182 - def setAllowDefault(self, allowDefault):
183 self._allowDefault = allowDefault
184
185 - def authenticate(self, request):
186 """ 187 Returns: a deferred returning a keycard or None 188 """ 189 keycard = self._issuer.issue(request) 190 if not keycard: 191 self.debug('no keycard from issuer, firing None') 192 return defer.succeed(None) 193 194 keycard.requesterId = self.requesterId 195 keycard.issuerName = self.issuerName 196 keycard._fd = request.transport.fileno() 197 keycard.domain = self._domain 198 199 if self.plug: 200 self.debug('authenticating against plug') 201 return self.plug.authenticate(keycard) 202 elif self.bouncerName == None: 203 self.debug('no bouncer, accepting') 204 return defer.succeed(keycard) 205 else: 206 keycard.ttl = self.KEYCARD_TTL 207 self.debug('sending keycard to remote bouncer %r', 208 self.bouncerName) 209 return self.authenticateKeycard(self.bouncerName, keycard)
210
211 - def authenticateKeycard(self, bouncerName, keycard):
212 return self.component.medium.authenticate(bouncerName, keycard)
213
214 - def keepAlive(self, bouncerName, issuerName, ttl):
215 return self.component.medium.keepAlive(bouncerName, issuerName, ttl)
216
217 - def cleanupKeycard(self, bouncerName, keycard):
218 return self.component.medium.removeKeycardId(bouncerName, keycard.id)
219 220 # FIXME: check this 221
222 - def clientDone(self, fd):
223 return self.component.remove_client(fd)
224
225 - def doCleanupKeycard(self, bouncerName, keycard):
226 # cleanup this one keycard, and take the opportunity to retry 227 # previous failed cleanups 228 229 def cleanup(bouncerName, keycard): 230 231 def cleanupLater(res, pair): 232 self.log('failed to clean up keycard %r, will do ' 233 'so later', keycard) 234 self._pendingCleanups.append(pair)
235 d = self.cleanupKeycard(bouncerName, keycard) 236 d.addErrback(cleanupLater, (bouncerName, keycard)) 237 pending = self._pendingCleanups 238 self._pendingCleanups = [] 239 cleanup(bouncerName, keycard) 240 for bouncerName, keycard in pending: 241 cleanup(bouncerName, keycard) 242 243 # public 244
245 - def cleanupAuth(self, fd):
246 if self.bouncerName and fd in self._fdToKeycard: 247 keycard = self._fdToKeycard[fd] 248 self.debug('[fd %5d] asking bouncer %s to remove keycard id %s', 249 fd, self.bouncerName, keycard.id) 250 self.doCleanupKeycard(self.bouncerName, keycard) 251 self._removeKeycard(fd)
252
253 - def _removeKeycard(self, fd):
254 if (self.bouncerName or self.plug) and fd in self._fdToKeycard: 255 keycard = self._fdToKeycard[fd] 256 del self._fdToKeycard[fd] 257 del self._idToKeycard[keycard.id] 258 if fd in self._fdToDurationCall: 259 self.debug('[fd %5d] canceling later expiration call' % fd) 260 self._fdToDurationCall[fd].cancel() 261 del self._fdToDurationCall[fd]
262
263 - def _durationCallLater(self, fd):
264 """ 265 Expire a client due to a duration expiration. 266 """ 267 self.debug('[fd %5d] duration exceeded, expiring client' % fd) 268 269 # we're called from a callLater, so we've already run; just delete 270 if fd in self._fdToDurationCall: 271 del self._fdToDurationCall[fd] 272 273 self.debug('[fd %5d] asking streamer to remove client' % fd) 274 self.clientDone(fd)
275
276 - def expireKeycard(self, keycardId):
277 """ 278 Expire a client's connection associated with the keycard Id. 279 """ 280 keycard = self._idToKeycard[keycardId] 281 282 fd = keycard._fd 283 284 self.debug('[fd %5d] expiring client' % fd) 285 286 self._removeKeycard(fd) 287 288 self.debug('[fd %5d] asking streamer to remove client' % fd) 289 self.clientDone(fd)
290
291 - def expireKeycards(self, keycardIds):
292 """ 293 Expire client's connections associated with the keycard Ids. 294 """ 295 expired = 0 296 for keycardId in keycardIds: 297 try: 298 self.expireKeycard(keycardId) 299 expired += 1 300 except KeyError, e: 301 self.warn("Failed to expire keycard %r: %s", 302 keycardId, log.getExceptionMessage(e)) 303 return expired
304 305 ### resource.Resource methods 306
307 - def startAuthentication(self, request):
308 d = self.authenticate(request) 309 d.addCallback(self._authenticatedCallback, request) 310 d.addErrback(self._authenticatedErrback, request) 311 d.addErrback(self._defaultErrback, request) 312 313 return d
314
315 - def _authenticatedCallback(self, keycard, request):
316 # !: since we are a callback, the incoming fd might have gone away 317 # and closed 318 self.debug('_authenticatedCallback: keycard %r' % keycard) 319 if not keycard: 320 raise errors.NotAuthenticatedError() 321 322 # properly authenticated 323 if request.method == 'GET': 324 fd = request.transport.fileno() 325 326 if self.bouncerName or self.plug: 327 # the request was finished before the callback was executed 328 if fd == -1: 329 self.debug('Request interrupted before authentification ' 330 'was finished: asking bouncer %s to remove ' 331 'keycard id %s', self.bouncerName, keycard.id) 332 self.doCleanupKeycard(self.bouncerName, keycard) 333 return None 334 if keycard.id in self._idToKeycard: 335 self.warning("Duplicate keycard id: refusing") 336 raise errors.NotAuthenticatedError() 337 338 self._fdToKeycard[fd] = keycard 339 self._idToKeycard[keycard.id] = keycard 340 341 duration = keycard.duration or self._defaultDuration 342 343 if duration: 344 self.debug('new connection on %d will expire in %f seconds' % ( 345 fd, duration)) 346 self._fdToDurationCall[fd] = reactor.callLater( 347 duration, self._durationCallLater, fd) 348 349 return None
350
351 - def _authenticatedErrback(self, failure, request):
352 failure.trap(errors.NotAuthenticatedError) 353 self._handleUnauthorized(request, http.UNAUTHORIZED) 354 return failure
355
356 - def _defaultErrback(self, failure, request):
357 if failure.check(errors.NotAuthenticatedError) is None: 358 # If something else went wrong, we want to either disconnect the 359 # client and give them a 500 Internal Server Error or just allow 360 # them, depending on the configuration. 361 self.debug("Authorization request failed: %s", 362 log.getFailureMessage(failure)) 363 if self._allowDefault: 364 self.debug("Authorization failed, but allowing anyway") 365 return None 366 self._handleUnauthorized(request, http.INTERNAL_SERVER_ERROR) 367 return failure
368
369 - def _handleUnauthorized(self, request, code):
370 self.debug('client from %s is unauthorized, returning code %r' % 371 (request.getClientIP(), code)) 372 request.setHeader('content-type', 'text/html') 373 request.setHeader('server', HTTP_SERVER_VERSION) 374 request.setHeader('Connection', 'close') 375 if self._domain and code == http.UNAUTHORIZED: 376 request.setHeader('WWW-Authenticate', 377 'Basic realm="%s"' % self._domain) 378 379 request.setResponseCode(code) 380 381 # we have to write data ourselves, 382 # since we already returned NOT_DONE_YET 383 html = ERROR_TEMPLATE % {'code': code, 384 'error': http.RESPONSES[code]} 385 request.write(html) 386 request.finish()
387 388
389 -class LogFilter:
390
391 - def __init__(self):
392 self.filters = [] # list of (network, mask)
393
394 - def addIPFilter(self, filter):
395 """ 396 Add an IP filter of the form IP/prefix-length (CIDR syntax), or just 397 a single IP address 398 """ 399 definition = filter.split('/') 400 if len(definition) == 2: 401 (net, prefixlen) = definition 402 prefixlen = int(prefixlen) 403 elif len(definition) == 1: 404 net = definition[0] 405 prefixlen = 32 406 else: 407 raise errors.ConfigError( 408 "Cannot parse filter definition %s" % filter) 409 410 if prefixlen < 0 or prefixlen > 32: 411 raise errors.ConfigError("Invalid prefix length") 412 413 mask = ~((1 << (32 - prefixlen)) - 1) 414 try: 415 net = struct.unpack(">I", socket.inet_pton(socket.AF_INET, net))[0] 416 except socket.error: 417 raise errors.ConfigError( 418 "Failed to parse network address %s" % net) 419 net = net & mask # just in case 420 421 self.filters.append((net, mask))
422
423 - def isInRange(self, ip):
424 """ 425 Return true if ip is in any of the defined network(s) for this filter 426 """ 427 # Handles IPv4 only. 428 realip = struct.unpack(">I", socket.inet_pton(socket.AF_INET, ip))[0] 429 for f in self.filters: 430 if (realip & f[1]) == f[0]: 431 return True 432 return False
433