Merge lp:~abentley/launchpad/ampoulejob into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~abentley/launchpad/ampoulejob
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/twistedjob
Diff against target: 515 lines (+195/-48)
11 files modified
cronscripts/update_preview_diffs.py (+1/-7)
cronscripts/upgrade_branches.py (+0/-6)
lib/lp/code/configure.zcml (+1/-0)
lib/lp/code/interfaces/branchjob.py (+3/-0)
lib/lp/code/interfaces/branchmergeproposal.py (+3/-0)
lib/lp/code/model/branchjob.py (+13/-2)
lib/lp/code/model/branchmergeproposaljob.py (+22/-2)
lib/lp/code/scripts/tests/test_update_preview_diffs.py (+1/-1)
lib/lp/services/job/runner.py (+144/-30)
setup.py (+1/-0)
versions.cfg (+6/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/ampoulejob
Reviewer Review Type Date Requested Status
Paul Hummer (community) Approve
Review via email: mp+15982@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

= Summary =
Modify the twisted job runner to use a subprocess.

== Proposed fix ==
Use the Ampoule process pool. Note that this implementation does not provide
parallelism, because the ParallelLimitedTaskConsumer is paramaterized to run
1 parallel process.

== Pre-implementation notes ==
Preimplementation discussion with thumper, mwhudson.

== Implementation details ==
Make ampoule a dependency.

Because process set-up should happen in a subprocess, with Ampoule, remove the
setUp concept from JobCronScript, and instead provide a context manager.
Configure the globalErrorUtility there. Use the context manager in JobRunner.

Add an Amp command called RunJobCommand. Add a JobRunnerProcess class that can
provide RunJobCommand. Have it enter the context when a connection is made,
and exit it when the connection is lost. (These should happen only once in the
JobRunnerProcess's lifecycle.)

Subclass RunJobCommand as UpdatePreviewDiffProcess. Provide
UpdatePreviewDiffJob.amp so that TwistedJobRunner.runFromSource can find the
right process class to use.

Update runAll to start the process pool, using the specified job_amp kind.
Provide a customized BOOTSTRAP to invoke scripts.execute_zcml_for_scripts.

Change getTaskSource to return runJobInSubprocess, which uses the process pool
to run the job.

Update TwistedJobRunner.runFromSource to clean up SIGCHLD signal handlers after
reactor.run.

== Tests ==
bin/test test_update_preview_diffs

== Demo and Q/A ==
Have a LOSA update the staging config to use --twisted, and ensure the log
says it ran under Twisted.

Revision history for this message
Paul Hummer (rockstar) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'cronscripts/update_preview_diffs.py'
--- cronscripts/update_preview_diffs.py 2009-12-18 16:03:15 +0000
+++ cronscripts/update_preview_diffs.py 2009-12-18 16:03:17 +0000
@@ -11,7 +11,6 @@
1111
12import _pythonpath12import _pythonpath
1313
14from lp.codehosting.vfs import get_scanner_server
15from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner14from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner
16from lp.code.interfaces.branchmergeproposal import (15from lp.code.interfaces.branchmergeproposal import (
17 IUpdatePreviewDiffJobSource,)16 IUpdatePreviewDiffJobSource,)
@@ -24,7 +23,7 @@
24 source_interface = IUpdatePreviewDiffJobSource23 source_interface = IUpdatePreviewDiffJobSource
2524
26 def __init__(self):25 def __init__(self):
27 super(JobCronScript, self).__init__()26 super(RunUpdatePreviewDiffJobs, self).__init__()
28 if self.options.twisted:27 if self.options.twisted:
29 self.runner_class = TwistedJobRunner28 self.runner_class = TwistedJobRunner
30 else:29 else:
@@ -33,11 +32,6 @@
33 def add_my_options(self):32 def add_my_options(self):
34 self.parser.add_option('--twisted', action='store_true')33 self.parser.add_option('--twisted', action='store_true')
3534
36 def setUp(self):
37 server = get_scanner_server()
38 server.setUp()
39 return [server.tearDown]
40
4135
42if __name__ == '__main__':36if __name__ == '__main__':
43 script = RunUpdatePreviewDiffJobs()37 script = RunUpdatePreviewDiffJobs()
4438
=== modified file 'cronscripts/upgrade_branches.py'
--- cronscripts/upgrade_branches.py 2009-10-31 12:07:16 +0000
+++ cronscripts/upgrade_branches.py 2009-12-18 16:03:17 +0000
@@ -11,7 +11,6 @@
1111
12from lp.services.job.runner import JobCronScript12from lp.services.job.runner import JobCronScript
13from lp.code.interfaces.branchjob import IBranchUpgradeJobSource13from lp.code.interfaces.branchjob import IBranchUpgradeJobSource
14from lp.codehosting.vfs import get_multi_server
1514
1615
17class RunUpgradeBranches(JobCronScript):16class RunUpgradeBranches(JobCronScript):
@@ -20,11 +19,6 @@
20 config_name = 'upgrade_branches'19 config_name = 'upgrade_branches'
21 source_interface = IBranchUpgradeJobSource20 source_interface = IBranchUpgradeJobSource
2221
23 def setUp(self):
24 server = get_multi_server(write_hosted=True)
25 server.setUp()
26 return [server.tearDown]
27
2822
29if __name__ == '__main__':23if __name__ == '__main__':
30 script = RunUpgradeBranches()24 script = RunUpgradeBranches()
3125
=== modified file 'lib/lp/code/configure.zcml'
--- lib/lp/code/configure.zcml 2009-12-11 00:56:16 +0000
+++ lib/lp/code/configure.zcml 2009-12-18 16:03:17 +0000
@@ -889,6 +889,7 @@
889 </securedutility>889 </securedutility>
890 <class class="lp.code.model.branchmergeproposaljob.UpdatePreviewDiffJob">890 <class class="lp.code.model.branchmergeproposaljob.UpdatePreviewDiffJob">
891 <allow interface="lp.services.job.interfaces.job.IRunnableJob" />891 <allow interface="lp.services.job.interfaces.job.IRunnableJob" />
892 <allow interface="lp.code.interfaces.branchmergeproposal.IBranchMergeProposalJob" />
892 </class>893 </class>
893894
894 <securedutility895 <securedutility
895896
=== modified file 'lib/lp/code/interfaces/branchjob.py'
--- lib/lp/code/interfaces/branchjob.py 2009-10-29 05:50:08 +0000
+++ lib/lp/code/interfaces/branchjob.py 2009-12-18 16:03:17 +0000
@@ -90,6 +90,9 @@
90 def iterReady():90 def iterReady():
91 """Iterate through all IBranchUpgradeJobs."""91 """Iterate through all IBranchUpgradeJobs."""
9292
93 def contextManager():
94 """Get a context for running this kind of job in."""
95
9396
94class IRevisionMailJob(IRunnableJob):97class IRevisionMailJob(IRunnableJob):
95 """A Job to send email a revision change in a branch."""98 """A Job to send email a revision change in a branch."""
9699
=== modified file 'lib/lp/code/interfaces/branchmergeproposal.py'
--- lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:15 +0000
+++ lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:17 +0000
@@ -597,6 +597,9 @@
597 def iterReady():597 def iterReady():
598 """Iterate through jobs ready to update preview diffs."""598 """Iterate through jobs ready to update preview diffs."""
599599
600 def contextManager():
601 """Get a context for running this kind of job in."""
602
600603
601def notify_modified(proposal, func, *args, **kwargs):604def notify_modified(proposal, func, *args, **kwargs):
602 """Call func, then notify about the changes it made.605 """Call func, then notify about the changes it made.
603606
=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py 2009-10-29 17:07:18 +0000
+++ lib/lp/code/model/branchjob.py 2009-12-18 16:03:17 +0000
@@ -9,6 +9,7 @@
9 'RosettaUploadJob',9 'RosettaUploadJob',
10]10]
1111
12import contextlib
12import os13import os
13import shutil14import shutil
14from StringIO import StringIO15from StringIO import StringIO
@@ -40,14 +41,14 @@
40from canonical.config import config41from canonical.config import config
41from canonical.database.enumcol import EnumCol42from canonical.database.enumcol import EnumCol
42from canonical.database.sqlbase import SQLBase43from canonical.database.sqlbase import SQLBase
43from canonical.launchpad.webapp import canonical_url44from canonical.launchpad.webapp import canonical_url, errorlog
44from lp.code.bzr import (45from lp.code.bzr import (
45 BRANCH_FORMAT_UPGRADE_PATH, REPOSITORY_FORMAT_UPGRADE_PATH)46 BRANCH_FORMAT_UPGRADE_PATH, REPOSITORY_FORMAT_UPGRADE_PATH)
46from lp.code.model.branch import Branch47from lp.code.model.branch import Branch
47from lp.code.model.branchmergeproposal import BranchMergeProposal48from lp.code.model.branchmergeproposal import BranchMergeProposal
48from lp.code.model.diff import StaticDiff49from lp.code.model.diff import StaticDiff
49from lp.code.model.revision import RevisionSet50from lp.code.model.revision import RevisionSet
50from lp.codehosting.vfs import branch_id_to_path51from lp.codehosting.vfs import branch_id_to_path, get_multi_server
51from lp.services.job.model.job import Job52from lp.services.job.model.job import Job
52from lp.services.job.interfaces.job import JobStatus53from lp.services.job.interfaces.job import JobStatus
53from lp.services.job.runner import BaseRunnableJob54from lp.services.job.runner import BaseRunnableJob
@@ -258,6 +259,16 @@
258 branch_job = BranchJob(branch, BranchJobType.UPGRADE_BRANCH, {})259 branch_job = BranchJob(branch, BranchJobType.UPGRADE_BRANCH, {})
259 return cls(branch_job)260 return cls(branch_job)
260261
262 @staticmethod
263 @contextlib.contextmanager
264 def contextManager():
265 """See `IBranchUpgradeJobSource`."""
266 errorlog.globalErrorUtility.configure('upgrade_branches')
267 server = get_multi_server(write_hosted=True)
268 server.setUp()
269 yield
270 server.tearDown()
271
261 def run(self):272 def run(self):
262 """See `IBranchUpgradeJob`."""273 """See `IBranchUpgradeJob`."""
263 # Set up the new branch structure274 # Set up the new branch structure
264275
=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
--- lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:15 +0000
+++ lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:17 +0000
@@ -14,6 +14,7 @@
14 'MergeProposalCreatedJob',14 'MergeProposalCreatedJob',
15 ]15 ]
1616
17import contextlib
17from email.Utils import parseaddr18from email.Utils import parseaddr
18import transaction19import transaction
1920
@@ -31,6 +32,7 @@
31from canonical.database.enumcol import EnumCol32from canonical.database.enumcol import EnumCol
32from canonical.launchpad.database.message import MessageJob, MessageJobAction33from canonical.launchpad.database.message import MessageJob, MessageJobAction
33from canonical.launchpad.interfaces.message import IMessageJob34from canonical.launchpad.interfaces.message import IMessageJob
35from canonical.launchpad.webapp import errorlog
34from canonical.launchpad.webapp.interaction import setupInteraction36from canonical.launchpad.webapp.interaction import setupInteraction
35from canonical.launchpad.webapp.interfaces import (37from canonical.launchpad.webapp.interfaces import (
36 DEFAULT_FLAVOR, IPlacelessAuthUtility, IStoreSelector, MAIN_STORE,38 DEFAULT_FLAVOR, IPlacelessAuthUtility, IStoreSelector, MAIN_STORE,
@@ -44,10 +46,10 @@
44from lp.code.mail.branchmergeproposal import BMPMailer46from lp.code.mail.branchmergeproposal import BMPMailer
45from lp.code.model.branchmergeproposal import BranchMergeProposal47from lp.code.model.branchmergeproposal import BranchMergeProposal
46from lp.code.model.diff import PreviewDiff, StaticDiff48from lp.code.model.diff import PreviewDiff, StaticDiff
47from lp.codehosting.vfs import get_multi_server49from lp.codehosting.vfs import get_multi_server, get_scanner_server
48from lp.services.job.model.job import Job50from lp.services.job.model.job import Job
49from lp.services.job.interfaces.job import IRunnableJob51from lp.services.job.interfaces.job import IRunnableJob
50from lp.services.job.runner import BaseRunnableJob52from lp.services.job.runner import BaseRunnableJob, JobRunnerProcess
5153
5254
53class BranchMergeProposalJobType(DBEnumeratedType):55class BranchMergeProposalJobType(DBEnumeratedType):
@@ -287,6 +289,16 @@
287289
288 class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF290 class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF
289291
292 @staticmethod
293 @contextlib.contextmanager
294 def contextManager():
295 """See `IUpdatePreviewDiffJobSource`."""
296 errorlog.globalErrorUtility.configure('update_preview_diffs')
297 server = get_scanner_server()
298 server.setUp()
299 yield
300 server.tearDown()
301
290 def run(self):302 def run(self):
291 """See `IRunnableJob`"""303 """See `IRunnableJob`"""
292 preview = PreviewDiff.fromBranchMergeProposal(304 preview = PreviewDiff.fromBranchMergeProposal(
@@ -294,6 +306,14 @@
294 self.branch_merge_proposal.preview_diff = preview306 self.branch_merge_proposal.preview_diff = preview
295307
296308
309class UpdatePreviewDiffProcess(JobRunnerProcess):
310 """A process that runs UpdatePreviewDiffJobs"""
311 job_class = UpdatePreviewDiffJob
312
313
314UpdatePreviewDiffJob.amp = UpdatePreviewDiffProcess
315
316
297class CreateMergeProposalJob(BaseRunnableJob):317class CreateMergeProposalJob(BaseRunnableJob):
298 """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`."""318 """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`."""
299319
300320
=== modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py'
--- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:15 +0000
+++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:17 +0000
@@ -78,7 +78,7 @@
78 error_utility.configure('update_preview_diffs')78 error_utility.configure('update_preview_diffs')
79 old_id = error_utility.getLastOopsReport().id79 old_id = error_utility.getLastOopsReport().id
80 retcode, stdout, stderr = run_script(80 retcode, stdout, stderr = run_script(
81 'cronscripts/update_preview_diffs.py', [])81 'cronscripts/update_preview_diffs.py', ['--twisted'])
82 self.assertEqual(0, retcode)82 self.assertEqual(0, retcode)
83 self.assertIn(83 self.assertIn(
84 'INFO 1 IUpdatePreviewDiffJobSource jobs did not complete.\n',84 'INFO 1 IUpdatePreviewDiffJobSource jobs did not complete.\n',
8585
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2009-12-18 16:03:15 +0000
+++ lib/lp/services/job/runner.py 2009-12-18 16:03:17 +0000
@@ -11,10 +11,16 @@
11__all__ = ['JobRunner']11__all__ = ['JobRunner']
1212
1313
14import os
15from signal import getsignal, SIGCHLD, signal
14import sys16import sys
1517
18from ampoule import child, pool, main
19
16from twisted.internet import reactor, defer20from twisted.internet import reactor, defer
21from twisted.protocols import amp
17from zope.component import getUtility22from zope.component import getUtility
23from zope.security.proxy import removeSecurityProxy
1824
19from canonical.config import config25from canonical.config import config
20from canonical.twistedsupport.task import (26from canonical.twistedsupport.task import (
@@ -27,6 +33,27 @@
27from lp.services.mail.sendmail import MailController33from lp.services.mail.sendmail import MailController
28from canonical.launchpad.webapp import errorlog34from canonical.launchpad.webapp import errorlog
2935
36BOOTSTRAP = """\
37import sys
38
39def main(reactor, ampChildPath):
40 from twisted.application import reactors
41 reactors.installReactor(reactor)
42
43 from twisted.python import log
44 log.startLogging(sys.stderr)
45
46 from twisted.internet import reactor, stdio
47 from twisted.python import reflect
48
49 ampChild = reflect.namedAny(ampChildPath)
50 stdio.StandardIO(ampChild(), 3, 4)
51 from canonical.launchpad import scripts
52 scripts.execute_zcml_for_scripts(use_web_security=False)
53 reactor.run()
54main(sys.argv[-2], sys.argv[-1])
55"""
56
3057
31class BaseRunnableJob:58class BaseRunnableJob:
32 """Base class for jobs to be run via JobRunner.59 """Base class for jobs to be run via JobRunner.
@@ -102,10 +129,13 @@
102class BaseJobRunner(object):129class BaseJobRunner(object):
103 """Runner of Jobs."""130 """Runner of Jobs."""
104131
105 def __init__(self, logger=None):132 def __init__(self, logger=None, error_utility=None):
106 self.completed_jobs = []133 self.completed_jobs = []
107 self.incomplete_jobs = []134 self.incomplete_jobs = []
108 self.logger = logger135 self.logger = logger
136 self.error_utility = error_utility
137 if self.error_utility is None:
138 self.error_utility = errorlog.globalErrorUtility
109139
110 def runJob(self, job):140 def runJob(self, job):
111 """Attempt to run a job, updating its status as appropriate."""141 """Attempt to run a job, updating its status as appropriate."""
@@ -139,7 +169,7 @@
139 The list of complete or incomplete jobs will be updated.169 The list of complete or incomplete jobs will be updated.
140 """170 """
141 job = IRunnableJob(job)171 job = IRunnableJob(job)
142 with errorlog.globalErrorUtility.oopsMessage(172 with self.error_utility.oopsMessage(
143 dict(job.getOopsVars())):173 dict(job.getOopsVars())):
144 try:174 try:
145 self.runJob(job)175 self.runJob(job)
@@ -149,11 +179,24 @@
149 job.notifyUserError(e)179 job.notifyUserError(e)
150 except Exception:180 except Exception:
151 info = sys.exc_info()181 info = sys.exc_info()
152 errorlog.globalErrorUtility.raising(info)182 return self._doOops(job, info)
153 oops = errorlog.globalErrorUtility.getLastOopsReport()183
154 job.notifyOops(oops)184 def _doOops(self, job, info):
155 if self.logger is not None:185 """Report an OOPS for the provided job and info.
156 self.logger.info('Job resulted in OOPS: %s' % oops.id)186
187 :param job: The IRunnableJob whose run failed.
188 :param info: The standard sys.exc_info() value.
189 :return: the Oops that was reported.
190 """
191 self.error_utility.raising(info)
192 oops = self.error_utility.getLastOopsReport()
193 job.notifyOops(oops)
194 return oops
195
196 def _logOopsId(self, oops_id):
197 """Report oopses by id to the log."""
198 if self.logger is not None:
199 self.logger.info('Job resulted in OOPS: %s' % oops_id)
157200
158201
159class JobRunner(BaseJobRunner):202class JobRunner(BaseJobRunner):
@@ -170,30 +213,97 @@
170 @classmethod213 @classmethod
171 def runFromSource(cls, job_source, logger):214 def runFromSource(cls, job_source, logger):
172 """Run all ready jobs provided by the specified source."""215 """Run all ready jobs provided by the specified source."""
173 logger.info("Running synchronously.")216 with removeSecurityProxy(job_source.contextManager()):
174 runner = cls.fromReady(job_source, logger)217 logger.info("Running synchronously.")
175 runner.runAll()218 runner = cls.fromReady(job_source, logger)
219 runner.runAll()
176 return runner220 return runner
177221
178 def runAll(self):222 def runAll(self):
179 """Run all the Jobs for this JobRunner."""223 """Run all the Jobs for this JobRunner."""
180 for job in self.jobs:224 for job in self.jobs:
181 self.runJobHandleError(job)225 oops = self.runJobHandleError(job)
226 if oops is not None:
227 self._logOopsId(oops.id)
228
229
230class RunJobCommand(amp.Command):
231
232 arguments = [('job_id', amp.Integer())]
233 response = [('success', amp.Integer()), ('oops_id', amp.String())]
234
235
236class JobRunnerProcess(child.AMPChild):
237 """Base class for processes that run jobs."""
238
239 def __init__(self):
240 child.AMPChild.__init__(self)
241 self.context_manager = self.job_class.contextManager()
242
243 def makeConnection(self, transport):
244 """The Job context is entered on connect."""
245 child.AMPChild.makeConnection(self, transport)
246 self.context_manager.__enter__()
247
248 def connectionLost(self, reason):
249 """The Job context is left on disconnect."""
250 self.context_manager.__exit__(None, None, None)
251 child.AMPChild.connectionLost(self, reason)
252
253 @RunJobCommand.responder
254 def runJobCommand(self, job_id):
255 """Run a job of this job_class according to its job id."""
256 runner = BaseJobRunner()
257 job = self.job_class.get(job_id)
258 oops = runner.runJobHandleError(job)
259 if oops is None:
260 oops_id = ''
261 else:
262 oops_id = oops.id
263 return {'success': len(runner.completed_jobs), 'oops_id': oops_id}
182264
183265
184class TwistedJobRunner(BaseJobRunner):266class TwistedJobRunner(BaseJobRunner):
185 """Run Jobs via twisted."""267 """Run Jobs via twisted."""
186268
187 def __init__(self, job_source, logger=None):269 def __init__(self, job_source, job_amp, logger=None, error_utility=None):
188 super(TwistedJobRunner, self).__init__(logger=logger)270 starter = main.ProcessStarter(
271 bootstrap=BOOTSTRAP, packages=('twisted', 'ampoule'),
272 env={'PYTHONPATH': os.environ['PYTHONPATH'],
273 'PATH': os.environ['PATH'],
274 'LPCONFIG': os.environ['LPCONFIG']})
275 super(TwistedJobRunner, self).__init__(logger, error_utility)
189 self.job_source = job_source276 self.job_source = job_source
277 self.pool = pool.ProcessPool(job_amp, starter=starter, min=0)
278
279 def runJobInSubprocess(self, job):
280 """Run the job_class with the specified id in the process pool.
281
282 :return: a Deferred that fires when the job has completed.
283 """
284 job_id = job.id
285 deferred = self.pool.doWork(RunJobCommand, job_id=job_id)
286 def update(response):
287 if response['success']:
288 self.completed_jobs.append(job)
289 else:
290 self.incomplete_jobs.append(job)
291 if response['oops_id'] != '':
292 self._logOopsId(response['oops_id'])
293 def job_raised(failure):
294 self.incomplete_jobs.append(job)
295 info = (failure.type, failure.value, failure.tb)
296 oops = self._doOops(job, info)
297 self._logOopsId(oops.id)
298 deferred.addCallbacks(update, job_raised)
299 return deferred
190300
191 def getTaskSource(self):301 def getTaskSource(self):
192 """Return a task source for all jobs in job_source."""302 """Return a task source for all jobs in job_source."""
193 def producer():303 def producer():
194 while True:304 while True:
195 for job in self.job_source.iterReady():305 for job in self.job_source.iterReady():
196 yield lambda: self.runJobHandleError(job)306 yield lambda: self.runJobInSubprocess(job)
197 yield None307 yield None
198 return PollingTaskSource(5, producer().next)308 return PollingTaskSource(5, producer().next)
199309
@@ -204,22 +314,32 @@
204314
205 def runAll(self):315 def runAll(self):
206 """Run all ready jobs, and any that become ready while running."""316 """Run all ready jobs, and any that become ready while running."""
317 self.pool.start()
207 d = defer.maybeDeferred(self.doConsumer)318 d = defer.maybeDeferred(self.doConsumer)
208 d.addCallbacks(lambda ignored: reactor.stop(), self.failed)319 d.addCallbacks(self.terminated, self.failed)
209320
210 @staticmethod321 def terminated(self, ignored=None):
211 def failed(failure):322 """Callback to stop the processpool and reactor."""
323 deferred = self.pool.stop()
324 deferred.addBoth(lambda ignored: reactor.stop())
325
326 def failed(self, failure):
212 """Callback for when the job fails."""327 """Callback for when the job fails."""
213 failure.printTraceback()328 failure.printTraceback()
214 reactor.stop()329 self.terminated()
215330
216 @classmethod331 @classmethod
217 def runFromSource(cls, job_source, logger):332 def runFromSource(cls, job_source, logger, error_utility=None):
218 """Run all ready jobs provided by the specified source."""333 """Run all ready jobs provided by the specified source."""
219 logger.info("Running through Twisted.")334 logger.info("Running through Twisted.")
220 runner = cls(job_source, logger)335 runner = cls(job_source, removeSecurityProxy(job_source).amp, logger,
336 error_utility)
221 reactor.callWhenRunning(runner.runAll)337 reactor.callWhenRunning(runner.runAll)
222 reactor.run()338 handler = getsignal(SIGCHLD)
339 try:
340 reactor.run()
341 finally:
342 signal(SIGCHLD, handler)
223 return runner343 return runner
224344
225345
@@ -232,14 +352,8 @@
232 self.runner_class = runner_class352 self.runner_class = runner_class
233353
234 def main(self):354 def main(self):
235 errorlog.globalErrorUtility.configure(self.config_name)355 job_source = getUtility(self.source_interface)
236 cleanups = self.setUp()356 runner = self.runner_class.runFromSource(job_source, self.logger)
237 try:
238 job_source = getUtility(self.source_interface)
239 runner = self.runner_class.runFromSource(job_source, self.logger)
240 finally:
241 for cleanup in reversed(cleanups):
242 cleanup()
243 self.logger.info(357 self.logger.info(
244 'Ran %d %s jobs.',358 'Ran %d %s jobs.',
245 len(runner.completed_jobs), self.source_interface.__name__)359 len(runner.completed_jobs), self.source_interface.__name__)
246360
=== modified file 'setup.py'
--- setup.py 2009-11-25 10:59:02 +0000
+++ setup.py 2009-12-18 16:03:17 +0000
@@ -24,6 +24,7 @@
24 # this list should only contain direct dependencies--things imported or24 # this list should only contain direct dependencies--things imported or
25 # used in zcml.25 # used in zcml.
26 install_requires=[26 install_requires=[
27 'ampoule',
27 'bzr',28 'bzr',
28 'chameleon.core',29 'chameleon.core',
29 'chameleon.zpt',30 'chameleon.zpt',
3031
=== modified file 'versions.cfg'
--- versions.cfg 2009-12-16 14:14:41 +0000
+++ versions.cfg 2009-12-18 16:03:17 +0000
@@ -3,6 +3,12 @@
33
4[versions]4[versions]
5# Alphabetical, case-insensitive, please! :-)5# Alphabetical, case-insensitive, please! :-)
6
7# from -r 3:lp:~launchpad/ampoule/launchpad-tweaked
8# To reproduce:
9# bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\
10# -r 3
11ampoule = 0.1.0-lp-1
6bzr = 2.1b312bzr = 2.1b3
7chameleon.core = 1.0b3513chameleon.core = 1.0b35
8chameleon.zpt = 1.0b1714chameleon.zpt = 1.0b17