1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """base classes for PB client-side mediums.
23 """
24
25 import time
26
27 from twisted.spread import pb
28 from twisted.internet import defer, reactor
29 from zope.interface import implements
30
31 from flumotion.common import log, interfaces, bundleclient, errors
32 from flumotion.common import messages
33 from flumotion.common.netutils import addressGetHost
34 from flumotion.configure import configure
35 from flumotion.twisted import pb as fpb
36
37 __version__ = "$Rev: 7371 $"
38
39
41 """
42 I am a base interface for PB clients interfacing with PB server-side
43 avatars.
44 Used by admin/worker/component to talk to manager's vishnu,
45 and by job to talk to worker's brain.
46
47 @ivar remote: a remote reference to the server-side object on
48 which perspective_(methodName) methods can be called
49 @type remote: L{twisted.spread.pb.RemoteReference}
50 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader}
51 """
52
53
54
55 implements(interfaces.IMedium)
56 logCategory = "basemedium"
57 remoteLogName = "baseavatar"
58
59 remote = None
60 bundleLoader = None
61
63 """
64 Set the given remoteReference as the reference to the server-side
65 avatar.
66
67 @param remoteReference: L{twisted.spread.pb.RemoteReference}
68 """
69 self.debug('%r.setRemoteReference: %r' % (self, remoteReference))
70 self.remote = remoteReference
71
72 def nullRemote(x):
73 self.debug('%r: disconnected from %r' % (self, self.remote))
74 self.remote = None
75 self.remote.notifyOnDisconnect(nullRemote)
76
77 self.bundleLoader = bundleclient.BundleLoader(self.callRemote)
78
79
80 tarzan = None
81 jane = None
82 try:
83 transport = remoteReference.broker.transport
84 tarzan = transport.getHost()
85 jane = transport.getPeer()
86 except Exception, e:
87 self.debug("could not get connection info, reason %r" % e)
88 if tarzan and jane:
89 self.debug("connection is from me on %s to remote on %s" % (
90 addressGetHost(tarzan),
91 addressGetHost(jane)))
92
94 """
95 Does the medium have a remote reference to a server-side avatar ?
96 """
97 return self.remote != None
98
100 """
101 Call the given method with the given arguments remotely on the
102 server-side avatar.
103
104 Gets serialized to server-side perspective_ methods.
105
106 @param level: the level we should log at (log.DEBUG, log.INFO, etc)
107 @type level: int
108 @param stackDepth: the number of stack frames to go back to get
109 file and line information, negative or zero.
110 @type stackDepth: non-positive int
111 @param name: name of the remote method
112 @type name: str
113 """
114 if level is not None:
115 debugClass = str(self.__class__).split(".")[-1].upper()
116 startArgs = [self.remoteLogName, debugClass, name]
117 format, debugArgs = log.getFormatArgs(
118 '%s --> %s: callRemote(%s, ', startArgs,
119 ')', (), args, kwargs)
120 logKwArgs = self.doLog(level, stackDepth - 1,
121 format, *debugArgs)
122
123 if not self.remote:
124 self.warning('Tried to callRemote(%s), but we are disconnected'
125 % name)
126 return defer.fail(errors.NotConnectedError())
127
128 def callback(result):
129 format, debugArgs = log.getFormatArgs(
130 '%s <-- %s: callRemote(%s, ', startArgs,
131 '): %s', (log.ellipsize(result), ), args, kwargs)
132 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
133 return result
134
135 def errback(failure):
136 format, debugArgs = log.getFormatArgs(
137 '%s <-- %s: callRemote(%s, ', startArgs,
138 '): %r', (failure, ), args, kwargs)
139 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
140 return failure
141
142 d = self.remote.callRemote(name, *args, **kwargs)
143 if level is not None:
144 d.addCallbacks(callback, errback)
145 return d
146
148 """
149 Call the given method with the given arguments remotely on the
150 server-side avatar.
151
152 Gets serialized to server-side perspective_ methods.
153 """
154 return self.callRemoteLogging(log.DEBUG, -1, name, *args,
155 **kwargs)
156
158 """
159 Returns the given function in the given module, loading the
160 module from a bundle.
161
162 If we can't find the bundle for the given module, or if the
163 given module does not contain the requested function, we will
164 raise L{flumotion.common.errors.RemoteRunError} (perhaps a
165 poorly chosen error). If importing the module raises an
166 exception, that exception will be passed through unmodified.
167
168 @param module: module the function lives in
169 @type module: str
170 @param function: function to run
171 @type function: str
172
173 @returns: a callable, the given function in the given module.
174 """
175
176 def gotModule(mod):
177 if hasattr(mod, function):
178 return getattr(mod, function)
179 else:
180 msg = 'No procedure named %s in module %s' % (function,
181 module)
182 self.warning('%s', msg)
183 raise errors.RemoteRunError(msg)
184
185 def gotModuleError(failure):
186 failure.trap(errors.NoBundleError)
187 msg = 'Failed to find bundle for module %s' % module
188 self.warning('%s', msg)
189 raise errors.RemoteRunError(msg)
190
191 d = self.bundleLoader.loadModule(module)
192 d.addCallbacks(gotModule, gotModuleError)
193 return d
194
196 """
197 Runs the given function in the given module with the given
198 arguments.
199
200 This method calls getBundledFunction and then invokes the
201 function. Any error raised by getBundledFunction or by invoking
202 the function will be passed through unmodified.
203
204 Callers that expect to return their result over a PB connection
205 should catch nonserializable exceptions so as to prevent nasty
206 backtraces in the logs.
207
208 @param module: module the function lives in
209 @type module: str
210 @param function: function to run
211 @type function: str
212
213 @returns: the return value of the given function in the module.
214 """
215 self.debug('runBundledFunction(%r, %r)', module, function)
216
217 def gotFunction(proc):
218
219 def invocationError(failure):
220 self.warning('Exception raised while calling '
221 '%s.%s(*args=%r, **kwargs=%r): %s',
222 module, function, args, kwargs,
223 log.getFailureMessage(failure))
224 return failure
225
226 self.debug('calling %s.%s(%r, %r)', module, function, args,
227 kwargs)
228 d = defer.maybeDeferred(proc, *args, **kwargs)
229 d.addErrback(invocationError)
230 return d
231
232 d = self.getBundledFunction(module, function)
233 d.addCallback(gotFunction)
234 return d
235
236
268
269 if self.remote:
270 self.log('pinging')
271 d = self.callRemoteLogging(log.LOG, 0, 'ping')
272 d.addCallbacks(pingback, pingFailed)
273 else:
274 self.info('tried to ping, but disconnected yo')
275
276 self._pingDC = reactor.callLater(self._pingInterval,
277 self._ping)
278
289
298
300 if self.remote:
301 self.remote.broker.transport.loseConnection()
302
309 self.remote.notifyOnDisconnect(stopPingingCb)
310
311 self.startPinging(self._disconnect)
312
314 """
315 Sets a marker that will be prefixed to the log strings. Setting this
316 marker to multiple elements at a time helps debugging.
317 @param marker: A string to prefix all the log strings.
318 @type marker: str
319 """
320 self.writeMarker(marker, level)
321