1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Servicer object used in service scripts
19 """
20
21 import os
22 import glob
23 import time
24
25 from flumotion.configure import configure
26 from flumotion.common import errors, log
27 from flumotion.common.python import makedirs
28 from flumotion.common.process import checkPidRunning, deletePidFile, getPid, \
29 killPid, termPid, waitPidFile
30
31 __version__ = "$Rev: 7917 $"
32
33
35 """
36 I manage running managers and workers on behalf of a service script.
37 """
38
39 logCategory = 'servicer'
40
41 - def __init__(self, configDir=None, logDir=None, runDir=None):
42 """
43 @type configDir: string
44 @param configDir: overridden path to the configuration directory.
45 @type logDir: string
46 @param logDir: overridden path to the log directory.
47 @type runDir: string
48 @param runDir: overridden path to the run directory.
49 """
50 self.managersDir = os.path.join(configure.configdir, 'managers')
51 self.workersDir = os.path.join(configure.configdir, 'workers')
52 self._overrideDir = {
53 'logdir': logDir,
54 'rundir': runDir,
55 }
56
58
59
60 managers = []
61 workers = []
62
63 if not args:
64 managers = self.getManagers().keys()
65 managers.sort()
66 workers = self.getWorkers()
67 workers.sort()
68 return (managers, workers)
69
70 which = args[0]
71 if which not in ['manager', 'worker']:
72 raise errors.FatalError, 'Please specify either manager or worker'
73
74 if len(args) < 2:
75 raise errors.FatalError, 'Please specify which %s to %s' % (
76 which, command)
77
78 name = args[1]
79 if which == 'manager':
80 managers = self.getManagers()
81 if not name in managers:
82 raise errors.FatalError, 'No manager "%s"' % name
83 managers = [name, ]
84 elif which == 'worker':
85 workers = self.getWorkers()
86 if not name in workers:
87 raise errors.FatalError, 'No worker with name %s' % name
88 workers = [name, ]
89
90 return (managers, workers)
91
93 """
94 Return a list of override directories for configure.configure
95 suitable for appending to a command line.
96 """
97 args = []
98 for key, value in self._overrideDir.items():
99 if value:
100 args.append('--%s=%s' % (key, value))
101 return " ".join(args)
102
104 """
105 @returns: a dictionary of manager names -> flow names
106 """
107 managers = {}
108
109 self.log('getManagers()')
110 if not os.path.exists(self.managersDir):
111 return managers
112
113 for managerDir in glob.glob(os.path.join(self.managersDir, '*')):
114 flows = []
115
116 flowsDir = os.path.join(managerDir, 'flows')
117 if os.path.exists(flowsDir):
118 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml'))
119 for flowFile in flowFiles:
120 filename = os.path.split(flowFile)[1]
121 name = filename.split(".xml")[0]
122 flows.append(name)
123 managerName = os.path.split(managerDir)[1]
124 self.log('Adding flows %r to manager %s' % (flows, managerName))
125 managers[managerName] = flows
126 self.log('returning managers: %r' % managers)
127 return managers
128
130 """
131 @returns: a list of worker names
132 """
133 workers = []
134
135 if not os.path.exists(self.workersDir):
136 return workers
137
138 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')):
139 filename = os.path.split(workerFile)[1]
140 name = filename.split(".xml")[0]
141 workers.append(name)
142 workers.sort()
143 return workers
144
146 """
147 Start processes as given in the args.
148
149 If nothing specified, start all managers and workers.
150 If first argument is "manager", start given manager.
151 If first argument is "worker", start given worker.
152
153 @returns: an exit value reflecting the number of processes that failed
154 to start
155 """
156 (managers, workers) = self._parseManagersWorkers('start', args)
157 self.debug("Start managers %r and workers %r" % (managers, workers))
158 managersDict = self.getManagers()
159 exitvalue = 0
160
161 for name in managers:
162 if not self.startManager(name, managersDict[name]):
163 exitvalue += 1
164 for name in workers:
165 if not self.startWorker(name):
166 exitvalue += 1
167
168 return exitvalue
169
170 - def stop(self, args):
171 """
172 Stop processes as given in the args.
173
174 If nothing specified, stop all managers and workers.
175 If first argument is "manager", stop given manager.
176 If first argument is "worker", stop given worker.
177
178 @returns: an exit value reflecting the number of processes that failed
179 to stop
180 """
181 (managers, workers) = self._parseManagersWorkers('stop', args)
182 self.debug("Stop managers %r and workers %r" % (managers, workers))
183
184 exitvalue = 0
185
186 for name in workers:
187 if not self.stopWorker(name):
188 exitvalue += 1
189 for name in managers:
190 if not self.stopManager(name):
191 exitvalue += 1
192
193 return exitvalue
194
196 """
197 Give status on processes as given in the args.
198 """
199 (managers, workers) = self._parseManagersWorkers('status', args)
200 self.debug("Status managers %r and workers %r" % (managers, workers))
201 for kind, names in [('manager', managers), ('worker', workers)]:
202 for name in names:
203 pid = getPid(kind, name)
204 if not pid:
205 print "%s %s not running" % (kind, name)
206 continue
207 if checkPidRunning(pid):
208 print "%s %s is running with pid %d" % (kind, name, pid)
209 else:
210 print "%s %s dead (stale pid %d)" % (kind, name, pid)
211
213 """
214 Clean up dead process pid files as given in the args.
215 """
216 (managers, workers) = self._parseManagersWorkers('clean', args)
217 self.debug("Clean managers %r and workers %r" % (managers, workers))
218 for kind, names in [('manager', managers), ('worker', workers)]:
219 for name in names:
220 pid = getPid(kind, name)
221 if not pid:
222
223 try:
224 deletePidFile(kind, name)
225 print "deleted bogus pid file for %s %s" % (kind, name)
226 except OSError:
227 print ("failed to delete pid file for %s %s "
228 "- ignoring" % (kind, name))
229 continue
230 if not checkPidRunning(pid):
231 self.debug("Cleaning up stale pid %d for %s %s" % (
232 pid, kind, name))
233 print "deleting stale pid file for %s %s" % (kind, name)
234 deletePidFile(kind, name)
235
237 """
238 Restart running processes as given in the args.
239
240 If nothing specified, condrestart all managers and workers.
241 If first argument is "manager", condrestart given manager.
242 If first argument is "worker", condrestart given worker.
243
244 @returns: an exit value reflecting the number of processes that failed
245 to start
246 """
247 (managers, workers) = self._parseManagersWorkers('condrestart', args)
248 self.debug("condrestart managers %r and workers %r" % (
249 managers, workers))
250 managersDict = self.getManagers()
251 exitvalue = 0
252
253 for kind, names in [('manager', managers), ('worker', workers)]:
254 for name in names:
255 pid = getPid(kind, name)
256 if not pid:
257 continue
258 if checkPidRunning(pid):
259 if kind == 'manager':
260 if not self.stopManager(name):
261 exitvalue += 1
262 continue
263 if not self.startManager(name, managersDict[name]):
264 exitvalue += 1
265 elif kind == 'worker':
266 if not self.stopWorker(name):
267 exitvalue += 1
268 continue
269 if not self.startWorker(name):
270 exitvalue += 1
271 else:
272 print "%s %s dead (stale pid %d)" % (kind, name, pid)
273
274 return exitvalue
275
277
278
279
280
281 """
282 Create a default manager or worker config.
283 """
284 if len(args) == 0:
285 raise errors.FatalError, \
286 "Please specify 'manager' or 'worker' to create."
287 kind = args[0]
288 if len(args) == 1:
289 raise errors.FatalError, \
290 "Please specify name of %s to create." % kind
291 name = args[1]
292
293 port = 7531
294 if len(args) == 3:
295 port = int(args[2])
296
297 if kind == 'manager':
298 self.createManager(name, port)
299 elif kind == 'worker':
300 self.createWorker(name, managerPort=port, randomFeederports=True)
301 else:
302 raise errors.FatalError, \
303 "Please specify 'manager' or 'worker' to create."
304
306 """
307 Create a sample manager.
308
309 @returns: whether or not the config was created.
310 """
311 self.info("Creating manager %s" % name)
312 managerDir = os.path.join(self.managersDir, name)
313 if os.path.exists(managerDir):
314 raise errors.FatalError, \
315 "Manager directory %s already exists" % managerDir
316 makedirs(managerDir)
317
318 planetFile = os.path.join(managerDir, 'planet.xml')
319
320
321 pemFile = os.path.join(configure.configdir, 'default.pem')
322 if not os.path.exists(pemFile):
323
324 retval = os.system("sh %s %s" % (
325 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile))
326
327
328
329
330
331 if retval != 0:
332 pemFile = 'default.pem'
333
334
335 handle = open(planetFile, 'w')
336 handle.write("""<planet>
337 <manager>
338 <debug>4</debug>
339 <host>localhost</host>
340 <port>%(port)d</port>
341 <transport>ssl</transport>
342 <!-- certificate path can be relative to $sysconfdir/flumotion,
343 or absolute -->
344 <certificate>%(pemFile)s</certificate>
345 <component name="manager-bouncer" type="htpasswdcrypt-bouncer">
346 <property name="data"><![CDATA[
347 user:PSfNpHTkpTx1M
348 ]]></property>
349 </component>
350 </manager>
351 </planet>
352 """ % locals())
353 handle.close()
354
355 return True
356
357 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
358 """
359 Create a sample worker.
360
361 @returns: whether or not the config was created.
362 """
363 makedirs(self.workersDir)
364 self.info("Creating worker %s" % name)
365 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
366 if os.path.exists(workerFile):
367 raise errors.FatalError, \
368 "Worker file %s already exists." % workerFile
369
370 feederports = " <!-- <feederports>8600-8639</feederports> -->"
371 if randomFeederports:
372 feederports = ' <feederports random="True" />'
373
374 handle = open(workerFile, 'w')
375 handle.write("""<worker>
376
377 <debug>4</debug>
378
379 <manager>
380 <host>localhost</host>
381 <port>%(managerPort)s</port>
382 </manager>
383
384 <authentication type="plaintext">
385 <username>user</username>
386 <password>test</password>
387 </authentication>
388
389 %(feederports)s
390
391 </worker>
392 """ % locals())
393 handle.close()
394
395 return True
396
398 """
399 Start the manager as configured in the manager directory for the given
400 manager name, together with the given flows.
401
402 @returns: whether or not the manager daemon started
403 """
404 self.info("Starting manager %s" % name)
405 self.debug("Starting manager with flows %r" % flowNames)
406 managerDir = os.path.join(self.managersDir, name)
407 planetFile = os.path.join(managerDir, 'planet.xml')
408 if not os.path.exists(planetFile):
409 raise errors.FatalError, \
410 "Planet file %s does not exist" % planetFile
411 self.info("Loading planet %s" % planetFile)
412
413 flowsDir = os.path.join(managerDir, 'flows')
414 flowFiles = []
415 for flowName in flowNames:
416 flowFile = os.path.join(flowsDir, "%s.xml" % flowName)
417 if not os.path.exists(flowFile):
418 raise errors.FatalError, \
419 "Flow file %s does not exist" % flowFile
420 flowFiles.append(flowFile)
421 self.info("Loading flow %s" % flowFile)
422
423 pid = getPid('manager', name)
424 if pid:
425 if checkPidRunning(pid):
426 raise errors.FatalError, \
427 "Manager %s is already running (with pid %d)" % (name, pid)
428 else:
429 raise errors.FatalError, \
430 "Manager %s is dead (stale pid %d)" % (name, pid)
431
432 dirOptions = self._getDirOptions()
433 command = "flumotion-manager %s -D --daemonize-to %s " \
434 "--service-name %s %s %s" % (
435 dirOptions, configure.daemondir, name, planetFile,
436 " ".join(flowFiles))
437 self.debug("starting process %s" % command)
438 retval = self.startProcess(command)
439
440 if retval == 0:
441 self.debug("Waiting for pid for manager %s" % name)
442 pid = waitPidFile('manager', name)
443 if pid:
444 self.info("Started manager %s with pid %d" % (name, pid))
445 return True
446 else:
447 self.warning("manager %s could not start" % name)
448 return False
449
450 self.warning("manager %s could not start (return value %d)" % (
451 name, retval))
452 return False
453
455 """
456 Start the worker as configured in the worker directory for the given
457 worker name.
458
459 @returns: whether or not the worker daemon started
460 """
461 self.info("Starting worker %s" % name)
462 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
463 if not os.path.exists(workerFile):
464 raise errors.FatalError, \
465 "Worker file %s does not exist" % workerFile
466
467 pid = getPid('worker', name)
468 if pid:
469 if checkPidRunning(pid):
470 raise errors.FatalError, \
471 "Worker %s is already running (with pid %d)" % (name, pid)
472 else:
473 raise errors.FatalError, \
474 "Worker %s is dead (stale pid %d)" % (name, pid)
475
476
477 self.info("Loading worker %s" % workerFile)
478
479 dirOptions = self._getDirOptions()
480 command = "flumotion-worker %s -D --daemonize-to %s " \
481 "--service-name %s %s" % (
482 dirOptions, configure.daemondir, name, workerFile)
483 self.debug("Running %s" % command)
484 retval = self.startProcess(command)
485
486 if retval == 0:
487 self.debug("Waiting for pid for worker %s" % name)
488 pid = waitPidFile('worker', name)
489 if pid:
490 self.info("Started worker %s with pid %d" % (name, pid))
491 return True
492 else:
493 self.warning("worker %s could not start" % name)
494 return False
495
496 self.warning("worker %s could not start (return value %d)" % (
497 name, retval))
498 return False
499
501 """
502 Start the given process and block.
503 Returns the exit status of the process, or -1 in case of another error.
504 """
505 status = os.system(command)
506 if os.WIFEXITED(status):
507 retval = os.WEXITSTATUS(status)
508 return retval
509
510
511 return -1
512
514 """
515 Stop the given manager if it is running.
516 """
517 self.info("Stopping manager %s" % name)
518 pid = getPid('manager', name)
519 if not pid:
520 return True
521
522
523 if not checkPidRunning(pid):
524 self.info("Manager %s is dead (stale pid %d)" % (name, pid))
525 return False
526
527 self.debug('Stopping manager %s with pid %d' % (name, pid))
528 if not self.stopProcess(pid):
529 return False
530
531 self.info('Stopped manager %s with pid %d' % (name, pid))
532 return True
533
535 """
536 Stop the given worker if it is running.
537 """
538 self.info("Stopping worker %s" % name)
539 pid = getPid('worker', name)
540 if not pid:
541 self.info("worker %s was not running" % name)
542 return True
543
544
545 if not checkPidRunning(pid):
546 self.info("Worker %s is dead (stale pid %d)" % (name, pid))
547 return False
548
549 self.debug('Stopping worker %s with pid %d' % (name, pid))
550 if not self.stopProcess(pid):
551 return False
552
553 self.info('Stopped worker %s with pid %d' % (name, pid))
554 return True
555
557 """
558 Stop the process with the given pid.
559 Wait until the pid has disappeared.
560 """
561 startClock = time.clock()
562 termClock = startClock + configure.processTermWait
563 killClock = termClock + configure.processKillWait
564
565 self.debug('stopping process with pid %d' % pid)
566 if not termPid(pid):
567 self.warning('No process with pid %d' % pid)
568 return False
569
570
571 while (checkPidRunning(pid)):
572 if time.clock() > termClock:
573 self.warning("Process with pid %d has not responded to TERM " \
574 "for %d seconds, killing" % (pid,
575 configure.processTermWait))
576 killPid(pid)
577
578 termClock = killClock + 1.0
579
580 if time.clock() > killClock:
581 self.warning("Process with pid %d has not responded to KILL " \
582 "for %d seconds, stopping" % (pid,
583 configure.processKillWait))
584 return False
585
586
587
588 return True
589
591 """
592 List all service parts managed.
593 """
594 managers = self.getManagers()
595 for name in managers.keys():
596 flows = managers[name]
597 print "manager %s" % name
598 if flows:
599 for flow in flows:
600 print " flow %s" % flow
601
602 workers = self.getWorkers()
603 for worker in workers:
604 print "worker %s" % worker
605