Merge lp:~abentley/launchpad/ampoulejob into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~abentley/launchpad/ampoulejob
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/twistedjob
Diff against target: 515 lines (+195/-48)
11 files modified
cronscripts/update_preview_diffs.py (+1/-7)
cronscripts/upgrade_branches.py (+0/-6)
lib/lp/code/configure.zcml (+1/-0)
lib/lp/code/interfaces/branchjob.py (+3/-0)
lib/lp/code/interfaces/branchmergeproposal.py (+3/-0)
lib/lp/code/model/branchjob.py (+13/-2)
lib/lp/code/model/branchmergeproposaljob.py (+22/-2)
lib/lp/code/scripts/tests/test_update_preview_diffs.py (+1/-1)
lib/lp/services/job/runner.py (+144/-30)
setup.py (+1/-0)
versions.cfg (+6/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/ampoulejob
Reviewer Review Type Date Requested Status
Paul Hummer (community) Approve
Review via email: mp+15982@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

= Summary =
Modify the twisted job runner to use a subprocess.

== Proposed fix ==
Use the Ampoule process pool. Note that this implementation does not provide
parallelism, because the ParallelLimitedTaskConsumer is paramaterized to run
1 parallel process.

== Pre-implementation notes ==
Preimplementation discussion with thumper, mwhudson.

== Implementation details ==
Make ampoule a dependency.

Because process set-up should happen in a subprocess, with Ampoule, remove the
setUp concept from JobCronScript, and instead provide a context manager.
Configure the globalErrorUtility there. Use the context manager in JobRunner.

Add an Amp command called RunJobCommand. Add a JobRunnerProcess class that can
provide RunJobCommand. Have it enter the context when a connection is made,
and exit it when the connection is lost. (These should happen only once in the
JobRunnerProcess's lifecycle.)

Subclass RunJobCommand as UpdatePreviewDiffProcess. Provide
UpdatePreviewDiffJob.amp so that TwistedJobRunner.runFromSource can find the
right process class to use.

Update runAll to start the process pool, using the specified job_amp kind.
Provide a customized BOOTSTRAP to invoke scripts.execute_zcml_for_scripts.

Change getTaskSource to return runJobInSubprocess, which uses the process pool
to run the job.

Update TwistedJobRunner.runFromSource to clean up SIGCHLD signal handlers after
reactor.run.

== Tests ==
bin/test test_update_preview_diffs

== Demo and Q/A ==
Have a LOSA update the staging config to use --twisted, and ensure the log
says it ran under Twisted.

Revision history for this message
Paul Hummer (rockstar) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'cronscripts/update_preview_diffs.py'
2--- cronscripts/update_preview_diffs.py 2009-12-18 16:03:15 +0000
3+++ cronscripts/update_preview_diffs.py 2009-12-18 16:03:17 +0000
4@@ -11,7 +11,6 @@
5
6 import _pythonpath
7
8-from lp.codehosting.vfs import get_scanner_server
9 from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner
10 from lp.code.interfaces.branchmergeproposal import (
11 IUpdatePreviewDiffJobSource,)
12@@ -24,7 +23,7 @@
13 source_interface = IUpdatePreviewDiffJobSource
14
15 def __init__(self):
16- super(JobCronScript, self).__init__()
17+ super(RunUpdatePreviewDiffJobs, self).__init__()
18 if self.options.twisted:
19 self.runner_class = TwistedJobRunner
20 else:
21@@ -33,11 +32,6 @@
22 def add_my_options(self):
23 self.parser.add_option('--twisted', action='store_true')
24
25- def setUp(self):
26- server = get_scanner_server()
27- server.setUp()
28- return [server.tearDown]
29-
30
31 if __name__ == '__main__':
32 script = RunUpdatePreviewDiffJobs()
33
34=== modified file 'cronscripts/upgrade_branches.py'
35--- cronscripts/upgrade_branches.py 2009-10-31 12:07:16 +0000
36+++ cronscripts/upgrade_branches.py 2009-12-18 16:03:17 +0000
37@@ -11,7 +11,6 @@
38
39 from lp.services.job.runner import JobCronScript
40 from lp.code.interfaces.branchjob import IBranchUpgradeJobSource
41-from lp.codehosting.vfs import get_multi_server
42
43
44 class RunUpgradeBranches(JobCronScript):
45@@ -20,11 +19,6 @@
46 config_name = 'upgrade_branches'
47 source_interface = IBranchUpgradeJobSource
48
49- def setUp(self):
50- server = get_multi_server(write_hosted=True)
51- server.setUp()
52- return [server.tearDown]
53-
54
55 if __name__ == '__main__':
56 script = RunUpgradeBranches()
57
58=== modified file 'lib/lp/code/configure.zcml'
59--- lib/lp/code/configure.zcml 2009-12-11 00:56:16 +0000
60+++ lib/lp/code/configure.zcml 2009-12-18 16:03:17 +0000
61@@ -889,6 +889,7 @@
62 </securedutility>
63 <class class="lp.code.model.branchmergeproposaljob.UpdatePreviewDiffJob">
64 <allow interface="lp.services.job.interfaces.job.IRunnableJob" />
65+ <allow interface="lp.code.interfaces.branchmergeproposal.IBranchMergeProposalJob" />
66 </class>
67
68 <securedutility
69
70=== modified file 'lib/lp/code/interfaces/branchjob.py'
71--- lib/lp/code/interfaces/branchjob.py 2009-10-29 05:50:08 +0000
72+++ lib/lp/code/interfaces/branchjob.py 2009-12-18 16:03:17 +0000
73@@ -90,6 +90,9 @@
74 def iterReady():
75 """Iterate through all IBranchUpgradeJobs."""
76
77+ def contextManager():
78+ """Get a context for running this kind of job in."""
79+
80
81 class IRevisionMailJob(IRunnableJob):
82 """A Job to send email a revision change in a branch."""
83
84=== modified file 'lib/lp/code/interfaces/branchmergeproposal.py'
85--- lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:15 +0000
86+++ lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:17 +0000
87@@ -597,6 +597,9 @@
88 def iterReady():
89 """Iterate through jobs ready to update preview diffs."""
90
91+ def contextManager():
92+ """Get a context for running this kind of job in."""
93+
94
95 def notify_modified(proposal, func, *args, **kwargs):
96 """Call func, then notify about the changes it made.
97
98=== modified file 'lib/lp/code/model/branchjob.py'
99--- lib/lp/code/model/branchjob.py 2009-10-29 17:07:18 +0000
100+++ lib/lp/code/model/branchjob.py 2009-12-18 16:03:17 +0000
101@@ -9,6 +9,7 @@
102 'RosettaUploadJob',
103 ]
104
105+import contextlib
106 import os
107 import shutil
108 from StringIO import StringIO
109@@ -40,14 +41,14 @@
110 from canonical.config import config
111 from canonical.database.enumcol import EnumCol
112 from canonical.database.sqlbase import SQLBase
113-from canonical.launchpad.webapp import canonical_url
114+from canonical.launchpad.webapp import canonical_url, errorlog
115 from lp.code.bzr import (
116 BRANCH_FORMAT_UPGRADE_PATH, REPOSITORY_FORMAT_UPGRADE_PATH)
117 from lp.code.model.branch import Branch
118 from lp.code.model.branchmergeproposal import BranchMergeProposal
119 from lp.code.model.diff import StaticDiff
120 from lp.code.model.revision import RevisionSet
121-from lp.codehosting.vfs import branch_id_to_path
122+from lp.codehosting.vfs import branch_id_to_path, get_multi_server
123 from lp.services.job.model.job import Job
124 from lp.services.job.interfaces.job import JobStatus
125 from lp.services.job.runner import BaseRunnableJob
126@@ -258,6 +259,16 @@
127 branch_job = BranchJob(branch, BranchJobType.UPGRADE_BRANCH, {})
128 return cls(branch_job)
129
130+ @staticmethod
131+ @contextlib.contextmanager
132+ def contextManager():
133+ """See `IBranchUpgradeJobSource`."""
134+ errorlog.globalErrorUtility.configure('upgrade_branches')
135+ server = get_multi_server(write_hosted=True)
136+ server.setUp()
137+ yield
138+ server.tearDown()
139+
140 def run(self):
141 """See `IBranchUpgradeJob`."""
142 # Set up the new branch structure
143
144=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
145--- lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:15 +0000
146+++ lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:17 +0000
147@@ -14,6 +14,7 @@
148 'MergeProposalCreatedJob',
149 ]
150
151+import contextlib
152 from email.Utils import parseaddr
153 import transaction
154
155@@ -31,6 +32,7 @@
156 from canonical.database.enumcol import EnumCol
157 from canonical.launchpad.database.message import MessageJob, MessageJobAction
158 from canonical.launchpad.interfaces.message import IMessageJob
159+from canonical.launchpad.webapp import errorlog
160 from canonical.launchpad.webapp.interaction import setupInteraction
161 from canonical.launchpad.webapp.interfaces import (
162 DEFAULT_FLAVOR, IPlacelessAuthUtility, IStoreSelector, MAIN_STORE,
163@@ -44,10 +46,10 @@
164 from lp.code.mail.branchmergeproposal import BMPMailer
165 from lp.code.model.branchmergeproposal import BranchMergeProposal
166 from lp.code.model.diff import PreviewDiff, StaticDiff
167-from lp.codehosting.vfs import get_multi_server
168+from lp.codehosting.vfs import get_multi_server, get_scanner_server
169 from lp.services.job.model.job import Job
170 from lp.services.job.interfaces.job import IRunnableJob
171-from lp.services.job.runner import BaseRunnableJob
172+from lp.services.job.runner import BaseRunnableJob, JobRunnerProcess
173
174
175 class BranchMergeProposalJobType(DBEnumeratedType):
176@@ -287,6 +289,16 @@
177
178 class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF
179
180+ @staticmethod
181+ @contextlib.contextmanager
182+ def contextManager():
183+ """See `IUpdatePreviewDiffJobSource`."""
184+ errorlog.globalErrorUtility.configure('update_preview_diffs')
185+ server = get_scanner_server()
186+ server.setUp()
187+ yield
188+ server.tearDown()
189+
190 def run(self):
191 """See `IRunnableJob`"""
192 preview = PreviewDiff.fromBranchMergeProposal(
193@@ -294,6 +306,14 @@
194 self.branch_merge_proposal.preview_diff = preview
195
196
197+class UpdatePreviewDiffProcess(JobRunnerProcess):
198+ """A process that runs UpdatePreviewDiffJobs"""
199+ job_class = UpdatePreviewDiffJob
200+
201+
202+UpdatePreviewDiffJob.amp = UpdatePreviewDiffProcess
203+
204+
205 class CreateMergeProposalJob(BaseRunnableJob):
206 """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`."""
207
208
209=== modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py'
210--- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:15 +0000
211+++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:17 +0000
212@@ -78,7 +78,7 @@
213 error_utility.configure('update_preview_diffs')
214 old_id = error_utility.getLastOopsReport().id
215 retcode, stdout, stderr = run_script(
216- 'cronscripts/update_preview_diffs.py', [])
217+ 'cronscripts/update_preview_diffs.py', ['--twisted'])
218 self.assertEqual(0, retcode)
219 self.assertIn(
220 'INFO 1 IUpdatePreviewDiffJobSource jobs did not complete.\n',
221
222=== modified file 'lib/lp/services/job/runner.py'
223--- lib/lp/services/job/runner.py 2009-12-18 16:03:15 +0000
224+++ lib/lp/services/job/runner.py 2009-12-18 16:03:17 +0000
225@@ -11,10 +11,16 @@
226 __all__ = ['JobRunner']
227
228
229+import os
230+from signal import getsignal, SIGCHLD, signal
231 import sys
232
233+from ampoule import child, pool, main
234+
235 from twisted.internet import reactor, defer
236+from twisted.protocols import amp
237 from zope.component import getUtility
238+from zope.security.proxy import removeSecurityProxy
239
240 from canonical.config import config
241 from canonical.twistedsupport.task import (
242@@ -27,6 +33,27 @@
243 from lp.services.mail.sendmail import MailController
244 from canonical.launchpad.webapp import errorlog
245
246+BOOTSTRAP = """\
247+import sys
248+
249+def main(reactor, ampChildPath):
250+ from twisted.application import reactors
251+ reactors.installReactor(reactor)
252+
253+ from twisted.python import log
254+ log.startLogging(sys.stderr)
255+
256+ from twisted.internet import reactor, stdio
257+ from twisted.python import reflect
258+
259+ ampChild = reflect.namedAny(ampChildPath)
260+ stdio.StandardIO(ampChild(), 3, 4)
261+ from canonical.launchpad import scripts
262+ scripts.execute_zcml_for_scripts(use_web_security=False)
263+ reactor.run()
264+main(sys.argv[-2], sys.argv[-1])
265+"""
266+
267
268 class BaseRunnableJob:
269 """Base class for jobs to be run via JobRunner.
270@@ -102,10 +129,13 @@
271 class BaseJobRunner(object):
272 """Runner of Jobs."""
273
274- def __init__(self, logger=None):
275+ def __init__(self, logger=None, error_utility=None):
276 self.completed_jobs = []
277 self.incomplete_jobs = []
278 self.logger = logger
279+ self.error_utility = error_utility
280+ if self.error_utility is None:
281+ self.error_utility = errorlog.globalErrorUtility
282
283 def runJob(self, job):
284 """Attempt to run a job, updating its status as appropriate."""
285@@ -139,7 +169,7 @@
286 The list of complete or incomplete jobs will be updated.
287 """
288 job = IRunnableJob(job)
289- with errorlog.globalErrorUtility.oopsMessage(
290+ with self.error_utility.oopsMessage(
291 dict(job.getOopsVars())):
292 try:
293 self.runJob(job)
294@@ -149,11 +179,24 @@
295 job.notifyUserError(e)
296 except Exception:
297 info = sys.exc_info()
298- errorlog.globalErrorUtility.raising(info)
299- oops = errorlog.globalErrorUtility.getLastOopsReport()
300- job.notifyOops(oops)
301- if self.logger is not None:
302- self.logger.info('Job resulted in OOPS: %s' % oops.id)
303+ return self._doOops(job, info)
304+
305+ def _doOops(self, job, info):
306+ """Report an OOPS for the provided job and info.
307+
308+ :param job: The IRunnableJob whose run failed.
309+ :param info: The standard sys.exc_info() value.
310+ :return: the Oops that was reported.
311+ """
312+ self.error_utility.raising(info)
313+ oops = self.error_utility.getLastOopsReport()
314+ job.notifyOops(oops)
315+ return oops
316+
317+ def _logOopsId(self, oops_id):
318+ """Report oopses by id to the log."""
319+ if self.logger is not None:
320+ self.logger.info('Job resulted in OOPS: %s' % oops_id)
321
322
323 class JobRunner(BaseJobRunner):
324@@ -170,30 +213,97 @@
325 @classmethod
326 def runFromSource(cls, job_source, logger):
327 """Run all ready jobs provided by the specified source."""
328- logger.info("Running synchronously.")
329- runner = cls.fromReady(job_source, logger)
330- runner.runAll()
331+ with removeSecurityProxy(job_source.contextManager()):
332+ logger.info("Running synchronously.")
333+ runner = cls.fromReady(job_source, logger)
334+ runner.runAll()
335 return runner
336
337 def runAll(self):
338 """Run all the Jobs for this JobRunner."""
339 for job in self.jobs:
340- self.runJobHandleError(job)
341+ oops = self.runJobHandleError(job)
342+ if oops is not None:
343+ self._logOopsId(oops.id)
344+
345+
346+class RunJobCommand(amp.Command):
347+
348+ arguments = [('job_id', amp.Integer())]
349+ response = [('success', amp.Integer()), ('oops_id', amp.String())]
350+
351+
352+class JobRunnerProcess(child.AMPChild):
353+ """Base class for processes that run jobs."""
354+
355+ def __init__(self):
356+ child.AMPChild.__init__(self)
357+ self.context_manager = self.job_class.contextManager()
358+
359+ def makeConnection(self, transport):
360+ """The Job context is entered on connect."""
361+ child.AMPChild.makeConnection(self, transport)
362+ self.context_manager.__enter__()
363+
364+ def connectionLost(self, reason):
365+ """The Job context is left on disconnect."""
366+ self.context_manager.__exit__(None, None, None)
367+ child.AMPChild.connectionLost(self, reason)
368+
369+ @RunJobCommand.responder
370+ def runJobCommand(self, job_id):
371+ """Run a job of this job_class according to its job id."""
372+ runner = BaseJobRunner()
373+ job = self.job_class.get(job_id)
374+ oops = runner.runJobHandleError(job)
375+ if oops is None:
376+ oops_id = ''
377+ else:
378+ oops_id = oops.id
379+ return {'success': len(runner.completed_jobs), 'oops_id': oops_id}
380
381
382 class TwistedJobRunner(BaseJobRunner):
383 """Run Jobs via twisted."""
384
385- def __init__(self, job_source, logger=None):
386- super(TwistedJobRunner, self).__init__(logger=logger)
387+ def __init__(self, job_source, job_amp, logger=None, error_utility=None):
388+ starter = main.ProcessStarter(
389+ bootstrap=BOOTSTRAP, packages=('twisted', 'ampoule'),
390+ env={'PYTHONPATH': os.environ['PYTHONPATH'],
391+ 'PATH': os.environ['PATH'],
392+ 'LPCONFIG': os.environ['LPCONFIG']})
393+ super(TwistedJobRunner, self).__init__(logger, error_utility)
394 self.job_source = job_source
395+ self.pool = pool.ProcessPool(job_amp, starter=starter, min=0)
396+
397+ def runJobInSubprocess(self, job):
398+ """Run the job_class with the specified id in the process pool.
399+
400+ :return: a Deferred that fires when the job has completed.
401+ """
402+ job_id = job.id
403+ deferred = self.pool.doWork(RunJobCommand, job_id=job_id)
404+ def update(response):
405+ if response['success']:
406+ self.completed_jobs.append(job)
407+ else:
408+ self.incomplete_jobs.append(job)
409+ if response['oops_id'] != '':
410+ self._logOopsId(response['oops_id'])
411+ def job_raised(failure):
412+ self.incomplete_jobs.append(job)
413+ info = (failure.type, failure.value, failure.tb)
414+ oops = self._doOops(job, info)
415+ self._logOopsId(oops.id)
416+ deferred.addCallbacks(update, job_raised)
417+ return deferred
418
419 def getTaskSource(self):
420 """Return a task source for all jobs in job_source."""
421 def producer():
422 while True:
423 for job in self.job_source.iterReady():
424- yield lambda: self.runJobHandleError(job)
425+ yield lambda: self.runJobInSubprocess(job)
426 yield None
427 return PollingTaskSource(5, producer().next)
428
429@@ -204,22 +314,32 @@
430
431 def runAll(self):
432 """Run all ready jobs, and any that become ready while running."""
433+ self.pool.start()
434 d = defer.maybeDeferred(self.doConsumer)
435- d.addCallbacks(lambda ignored: reactor.stop(), self.failed)
436-
437- @staticmethod
438- def failed(failure):
439+ d.addCallbacks(self.terminated, self.failed)
440+
441+ def terminated(self, ignored=None):
442+ """Callback to stop the processpool and reactor."""
443+ deferred = self.pool.stop()
444+ deferred.addBoth(lambda ignored: reactor.stop())
445+
446+ def failed(self, failure):
447 """Callback for when the job fails."""
448 failure.printTraceback()
449- reactor.stop()
450+ self.terminated()
451
452 @classmethod
453- def runFromSource(cls, job_source, logger):
454+ def runFromSource(cls, job_source, logger, error_utility=None):
455 """Run all ready jobs provided by the specified source."""
456 logger.info("Running through Twisted.")
457- runner = cls(job_source, logger)
458+ runner = cls(job_source, removeSecurityProxy(job_source).amp, logger,
459+ error_utility)
460 reactor.callWhenRunning(runner.runAll)
461- reactor.run()
462+ handler = getsignal(SIGCHLD)
463+ try:
464+ reactor.run()
465+ finally:
466+ signal(SIGCHLD, handler)
467 return runner
468
469
470@@ -232,14 +352,8 @@
471 self.runner_class = runner_class
472
473 def main(self):
474- errorlog.globalErrorUtility.configure(self.config_name)
475- cleanups = self.setUp()
476- try:
477- job_source = getUtility(self.source_interface)
478- runner = self.runner_class.runFromSource(job_source, self.logger)
479- finally:
480- for cleanup in reversed(cleanups):
481- cleanup()
482+ job_source = getUtility(self.source_interface)
483+ runner = self.runner_class.runFromSource(job_source, self.logger)
484 self.logger.info(
485 'Ran %d %s jobs.',
486 len(runner.completed_jobs), self.source_interface.__name__)
487
488=== modified file 'setup.py'
489--- setup.py 2009-11-25 10:59:02 +0000
490+++ setup.py 2009-12-18 16:03:17 +0000
491@@ -24,6 +24,7 @@
492 # this list should only contain direct dependencies--things imported or
493 # used in zcml.
494 install_requires=[
495+ 'ampoule',
496 'bzr',
497 'chameleon.core',
498 'chameleon.zpt',
499
500=== modified file 'versions.cfg'
501--- versions.cfg 2009-12-16 14:14:41 +0000
502+++ versions.cfg 2009-12-18 16:03:17 +0000
503@@ -3,6 +3,12 @@
504
505 [versions]
506 # Alphabetical, case-insensitive, please! :-)
507+
508+# from -r 3:lp:~launchpad/ampoule/launchpad-tweaked
509+# To reproduce:
510+# bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\
511+# -r 3
512+ampoule = 0.1.0-lp-1
513 bzr = 2.1b3
514 chameleon.core = 1.0b35
515 chameleon.zpt = 1.0b17