Merge lp:~abentley/launchpad/ampoulejob into lp:launchpad
- ampoulejob
- Merge into devel
Proposed by
Aaron Bentley
Status: | Merged |
---|---|
Approved by: | Aaron Bentley |
Approved revision: | not available |
Merged at revision: | not available |
Proposed branch: | lp:~abentley/launchpad/ampoulejob |
Merge into: | lp:launchpad |
Prerequisite: | lp:~abentley/launchpad/twistedjob |
Diff against target: |
515 lines (+195/-48) 11 files modified
cronscripts/update_preview_diffs.py (+1/-7) cronscripts/upgrade_branches.py (+0/-6) lib/lp/code/configure.zcml (+1/-0) lib/lp/code/interfaces/branchjob.py (+3/-0) lib/lp/code/interfaces/branchmergeproposal.py (+3/-0) lib/lp/code/model/branchjob.py (+13/-2) lib/lp/code/model/branchmergeproposaljob.py (+22/-2) lib/lp/code/scripts/tests/test_update_preview_diffs.py (+1/-1) lib/lp/services/job/runner.py (+144/-30) setup.py (+1/-0) versions.cfg (+6/-0) |
To merge this branch: | bzr merge lp:~abentley/launchpad/ampoulejob |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Paul Hummer (community) | Approve | ||
Review via email: mp+15982@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote : | # |
Revision history for this message
Paul Hummer (rockstar) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'cronscripts/update_preview_diffs.py' |
2 | --- cronscripts/update_preview_diffs.py 2009-12-18 16:03:15 +0000 |
3 | +++ cronscripts/update_preview_diffs.py 2009-12-18 16:03:17 +0000 |
4 | @@ -11,7 +11,6 @@ |
5 | |
6 | import _pythonpath |
7 | |
8 | -from lp.codehosting.vfs import get_scanner_server |
9 | from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner |
10 | from lp.code.interfaces.branchmergeproposal import ( |
11 | IUpdatePreviewDiffJobSource,) |
12 | @@ -24,7 +23,7 @@ |
13 | source_interface = IUpdatePreviewDiffJobSource |
14 | |
15 | def __init__(self): |
16 | - super(JobCronScript, self).__init__() |
17 | + super(RunUpdatePreviewDiffJobs, self).__init__() |
18 | if self.options.twisted: |
19 | self.runner_class = TwistedJobRunner |
20 | else: |
21 | @@ -33,11 +32,6 @@ |
22 | def add_my_options(self): |
23 | self.parser.add_option('--twisted', action='store_true') |
24 | |
25 | - def setUp(self): |
26 | - server = get_scanner_server() |
27 | - server.setUp() |
28 | - return [server.tearDown] |
29 | - |
30 | |
31 | if __name__ == '__main__': |
32 | script = RunUpdatePreviewDiffJobs() |
33 | |
34 | === modified file 'cronscripts/upgrade_branches.py' |
35 | --- cronscripts/upgrade_branches.py 2009-10-31 12:07:16 +0000 |
36 | +++ cronscripts/upgrade_branches.py 2009-12-18 16:03:17 +0000 |
37 | @@ -11,7 +11,6 @@ |
38 | |
39 | from lp.services.job.runner import JobCronScript |
40 | from lp.code.interfaces.branchjob import IBranchUpgradeJobSource |
41 | -from lp.codehosting.vfs import get_multi_server |
42 | |
43 | |
44 | class RunUpgradeBranches(JobCronScript): |
45 | @@ -20,11 +19,6 @@ |
46 | config_name = 'upgrade_branches' |
47 | source_interface = IBranchUpgradeJobSource |
48 | |
49 | - def setUp(self): |
50 | - server = get_multi_server(write_hosted=True) |
51 | - server.setUp() |
52 | - return [server.tearDown] |
53 | - |
54 | |
55 | if __name__ == '__main__': |
56 | script = RunUpgradeBranches() |
57 | |
58 | === modified file 'lib/lp/code/configure.zcml' |
59 | --- lib/lp/code/configure.zcml 2009-12-11 00:56:16 +0000 |
60 | +++ lib/lp/code/configure.zcml 2009-12-18 16:03:17 +0000 |
61 | @@ -889,6 +889,7 @@ |
62 | </securedutility> |
63 | <class class="lp.code.model.branchmergeproposaljob.UpdatePreviewDiffJob"> |
64 | <allow interface="lp.services.job.interfaces.job.IRunnableJob" /> |
65 | + <allow interface="lp.code.interfaces.branchmergeproposal.IBranchMergeProposalJob" /> |
66 | </class> |
67 | |
68 | <securedutility |
69 | |
70 | === modified file 'lib/lp/code/interfaces/branchjob.py' |
71 | --- lib/lp/code/interfaces/branchjob.py 2009-10-29 05:50:08 +0000 |
72 | +++ lib/lp/code/interfaces/branchjob.py 2009-12-18 16:03:17 +0000 |
73 | @@ -90,6 +90,9 @@ |
74 | def iterReady(): |
75 | """Iterate through all IBranchUpgradeJobs.""" |
76 | |
77 | + def contextManager(): |
78 | + """Get a context for running this kind of job in.""" |
79 | + |
80 | |
81 | class IRevisionMailJob(IRunnableJob): |
82 | """A Job to send email a revision change in a branch.""" |
83 | |
84 | === modified file 'lib/lp/code/interfaces/branchmergeproposal.py' |
85 | --- lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:15 +0000 |
86 | +++ lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:17 +0000 |
87 | @@ -597,6 +597,9 @@ |
88 | def iterReady(): |
89 | """Iterate through jobs ready to update preview diffs.""" |
90 | |
91 | + def contextManager(): |
92 | + """Get a context for running this kind of job in.""" |
93 | + |
94 | |
95 | def notify_modified(proposal, func, *args, **kwargs): |
96 | """Call func, then notify about the changes it made. |
97 | |
98 | === modified file 'lib/lp/code/model/branchjob.py' |
99 | --- lib/lp/code/model/branchjob.py 2009-10-29 17:07:18 +0000 |
100 | +++ lib/lp/code/model/branchjob.py 2009-12-18 16:03:17 +0000 |
101 | @@ -9,6 +9,7 @@ |
102 | 'RosettaUploadJob', |
103 | ] |
104 | |
105 | +import contextlib |
106 | import os |
107 | import shutil |
108 | from StringIO import StringIO |
109 | @@ -40,14 +41,14 @@ |
110 | from canonical.config import config |
111 | from canonical.database.enumcol import EnumCol |
112 | from canonical.database.sqlbase import SQLBase |
113 | -from canonical.launchpad.webapp import canonical_url |
114 | +from canonical.launchpad.webapp import canonical_url, errorlog |
115 | from lp.code.bzr import ( |
116 | BRANCH_FORMAT_UPGRADE_PATH, REPOSITORY_FORMAT_UPGRADE_PATH) |
117 | from lp.code.model.branch import Branch |
118 | from lp.code.model.branchmergeproposal import BranchMergeProposal |
119 | from lp.code.model.diff import StaticDiff |
120 | from lp.code.model.revision import RevisionSet |
121 | -from lp.codehosting.vfs import branch_id_to_path |
122 | +from lp.codehosting.vfs import branch_id_to_path, get_multi_server |
123 | from lp.services.job.model.job import Job |
124 | from lp.services.job.interfaces.job import JobStatus |
125 | from lp.services.job.runner import BaseRunnableJob |
126 | @@ -258,6 +259,16 @@ |
127 | branch_job = BranchJob(branch, BranchJobType.UPGRADE_BRANCH, {}) |
128 | return cls(branch_job) |
129 | |
130 | + @staticmethod |
131 | + @contextlib.contextmanager |
132 | + def contextManager(): |
133 | + """See `IBranchUpgradeJobSource`.""" |
134 | + errorlog.globalErrorUtility.configure('upgrade_branches') |
135 | + server = get_multi_server(write_hosted=True) |
136 | + server.setUp() |
137 | + yield |
138 | + server.tearDown() |
139 | + |
140 | def run(self): |
141 | """See `IBranchUpgradeJob`.""" |
142 | # Set up the new branch structure |
143 | |
144 | === modified file 'lib/lp/code/model/branchmergeproposaljob.py' |
145 | --- lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:15 +0000 |
146 | +++ lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:17 +0000 |
147 | @@ -14,6 +14,7 @@ |
148 | 'MergeProposalCreatedJob', |
149 | ] |
150 | |
151 | +import contextlib |
152 | from email.Utils import parseaddr |
153 | import transaction |
154 | |
155 | @@ -31,6 +32,7 @@ |
156 | from canonical.database.enumcol import EnumCol |
157 | from canonical.launchpad.database.message import MessageJob, MessageJobAction |
158 | from canonical.launchpad.interfaces.message import IMessageJob |
159 | +from canonical.launchpad.webapp import errorlog |
160 | from canonical.launchpad.webapp.interaction import setupInteraction |
161 | from canonical.launchpad.webapp.interfaces import ( |
162 | DEFAULT_FLAVOR, IPlacelessAuthUtility, IStoreSelector, MAIN_STORE, |
163 | @@ -44,10 +46,10 @@ |
164 | from lp.code.mail.branchmergeproposal import BMPMailer |
165 | from lp.code.model.branchmergeproposal import BranchMergeProposal |
166 | from lp.code.model.diff import PreviewDiff, StaticDiff |
167 | -from lp.codehosting.vfs import get_multi_server |
168 | +from lp.codehosting.vfs import get_multi_server, get_scanner_server |
169 | from lp.services.job.model.job import Job |
170 | from lp.services.job.interfaces.job import IRunnableJob |
171 | -from lp.services.job.runner import BaseRunnableJob |
172 | +from lp.services.job.runner import BaseRunnableJob, JobRunnerProcess |
173 | |
174 | |
175 | class BranchMergeProposalJobType(DBEnumeratedType): |
176 | @@ -287,6 +289,16 @@ |
177 | |
178 | class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF |
179 | |
180 | + @staticmethod |
181 | + @contextlib.contextmanager |
182 | + def contextManager(): |
183 | + """See `IUpdatePreviewDiffJobSource`.""" |
184 | + errorlog.globalErrorUtility.configure('update_preview_diffs') |
185 | + server = get_scanner_server() |
186 | + server.setUp() |
187 | + yield |
188 | + server.tearDown() |
189 | + |
190 | def run(self): |
191 | """See `IRunnableJob`""" |
192 | preview = PreviewDiff.fromBranchMergeProposal( |
193 | @@ -294,6 +306,14 @@ |
194 | self.branch_merge_proposal.preview_diff = preview |
195 | |
196 | |
197 | +class UpdatePreviewDiffProcess(JobRunnerProcess): |
198 | + """A process that runs UpdatePreviewDiffJobs""" |
199 | + job_class = UpdatePreviewDiffJob |
200 | + |
201 | + |
202 | +UpdatePreviewDiffJob.amp = UpdatePreviewDiffProcess |
203 | + |
204 | + |
205 | class CreateMergeProposalJob(BaseRunnableJob): |
206 | """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`.""" |
207 | |
208 | |
209 | === modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py' |
210 | --- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:15 +0000 |
211 | +++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:17 +0000 |
212 | @@ -78,7 +78,7 @@ |
213 | error_utility.configure('update_preview_diffs') |
214 | old_id = error_utility.getLastOopsReport().id |
215 | retcode, stdout, stderr = run_script( |
216 | - 'cronscripts/update_preview_diffs.py', []) |
217 | + 'cronscripts/update_preview_diffs.py', ['--twisted']) |
218 | self.assertEqual(0, retcode) |
219 | self.assertIn( |
220 | 'INFO 1 IUpdatePreviewDiffJobSource jobs did not complete.\n', |
221 | |
222 | === modified file 'lib/lp/services/job/runner.py' |
223 | --- lib/lp/services/job/runner.py 2009-12-18 16:03:15 +0000 |
224 | +++ lib/lp/services/job/runner.py 2009-12-18 16:03:17 +0000 |
225 | @@ -11,10 +11,16 @@ |
226 | __all__ = ['JobRunner'] |
227 | |
228 | |
229 | +import os |
230 | +from signal import getsignal, SIGCHLD, signal |
231 | import sys |
232 | |
233 | +from ampoule import child, pool, main |
234 | + |
235 | from twisted.internet import reactor, defer |
236 | +from twisted.protocols import amp |
237 | from zope.component import getUtility |
238 | +from zope.security.proxy import removeSecurityProxy |
239 | |
240 | from canonical.config import config |
241 | from canonical.twistedsupport.task import ( |
242 | @@ -27,6 +33,27 @@ |
243 | from lp.services.mail.sendmail import MailController |
244 | from canonical.launchpad.webapp import errorlog |
245 | |
246 | +BOOTSTRAP = """\ |
247 | +import sys |
248 | + |
249 | +def main(reactor, ampChildPath): |
250 | + from twisted.application import reactors |
251 | + reactors.installReactor(reactor) |
252 | + |
253 | + from twisted.python import log |
254 | + log.startLogging(sys.stderr) |
255 | + |
256 | + from twisted.internet import reactor, stdio |
257 | + from twisted.python import reflect |
258 | + |
259 | + ampChild = reflect.namedAny(ampChildPath) |
260 | + stdio.StandardIO(ampChild(), 3, 4) |
261 | + from canonical.launchpad import scripts |
262 | + scripts.execute_zcml_for_scripts(use_web_security=False) |
263 | + reactor.run() |
264 | +main(sys.argv[-2], sys.argv[-1]) |
265 | +""" |
266 | + |
267 | |
268 | class BaseRunnableJob: |
269 | """Base class for jobs to be run via JobRunner. |
270 | @@ -102,10 +129,13 @@ |
271 | class BaseJobRunner(object): |
272 | """Runner of Jobs.""" |
273 | |
274 | - def __init__(self, logger=None): |
275 | + def __init__(self, logger=None, error_utility=None): |
276 | self.completed_jobs = [] |
277 | self.incomplete_jobs = [] |
278 | self.logger = logger |
279 | + self.error_utility = error_utility |
280 | + if self.error_utility is None: |
281 | + self.error_utility = errorlog.globalErrorUtility |
282 | |
283 | def runJob(self, job): |
284 | """Attempt to run a job, updating its status as appropriate.""" |
285 | @@ -139,7 +169,7 @@ |
286 | The list of complete or incomplete jobs will be updated. |
287 | """ |
288 | job = IRunnableJob(job) |
289 | - with errorlog.globalErrorUtility.oopsMessage( |
290 | + with self.error_utility.oopsMessage( |
291 | dict(job.getOopsVars())): |
292 | try: |
293 | self.runJob(job) |
294 | @@ -149,11 +179,24 @@ |
295 | job.notifyUserError(e) |
296 | except Exception: |
297 | info = sys.exc_info() |
298 | - errorlog.globalErrorUtility.raising(info) |
299 | - oops = errorlog.globalErrorUtility.getLastOopsReport() |
300 | - job.notifyOops(oops) |
301 | - if self.logger is not None: |
302 | - self.logger.info('Job resulted in OOPS: %s' % oops.id) |
303 | + return self._doOops(job, info) |
304 | + |
305 | + def _doOops(self, job, info): |
306 | + """Report an OOPS for the provided job and info. |
307 | + |
308 | + :param job: The IRunnableJob whose run failed. |
309 | + :param info: The standard sys.exc_info() value. |
310 | + :return: the Oops that was reported. |
311 | + """ |
312 | + self.error_utility.raising(info) |
313 | + oops = self.error_utility.getLastOopsReport() |
314 | + job.notifyOops(oops) |
315 | + return oops |
316 | + |
317 | + def _logOopsId(self, oops_id): |
318 | + """Report oopses by id to the log.""" |
319 | + if self.logger is not None: |
320 | + self.logger.info('Job resulted in OOPS: %s' % oops_id) |
321 | |
322 | |
323 | class JobRunner(BaseJobRunner): |
324 | @@ -170,30 +213,97 @@ |
325 | @classmethod |
326 | def runFromSource(cls, job_source, logger): |
327 | """Run all ready jobs provided by the specified source.""" |
328 | - logger.info("Running synchronously.") |
329 | - runner = cls.fromReady(job_source, logger) |
330 | - runner.runAll() |
331 | + with removeSecurityProxy(job_source.contextManager()): |
332 | + logger.info("Running synchronously.") |
333 | + runner = cls.fromReady(job_source, logger) |
334 | + runner.runAll() |
335 | return runner |
336 | |
337 | def runAll(self): |
338 | """Run all the Jobs for this JobRunner.""" |
339 | for job in self.jobs: |
340 | - self.runJobHandleError(job) |
341 | + oops = self.runJobHandleError(job) |
342 | + if oops is not None: |
343 | + self._logOopsId(oops.id) |
344 | + |
345 | + |
346 | +class RunJobCommand(amp.Command): |
347 | + |
348 | + arguments = [('job_id', amp.Integer())] |
349 | + response = [('success', amp.Integer()), ('oops_id', amp.String())] |
350 | + |
351 | + |
352 | +class JobRunnerProcess(child.AMPChild): |
353 | + """Base class for processes that run jobs.""" |
354 | + |
355 | + def __init__(self): |
356 | + child.AMPChild.__init__(self) |
357 | + self.context_manager = self.job_class.contextManager() |
358 | + |
359 | + def makeConnection(self, transport): |
360 | + """The Job context is entered on connect.""" |
361 | + child.AMPChild.makeConnection(self, transport) |
362 | + self.context_manager.__enter__() |
363 | + |
364 | + def connectionLost(self, reason): |
365 | + """The Job context is left on disconnect.""" |
366 | + self.context_manager.__exit__(None, None, None) |
367 | + child.AMPChild.connectionLost(self, reason) |
368 | + |
369 | + @RunJobCommand.responder |
370 | + def runJobCommand(self, job_id): |
371 | + """Run a job of this job_class according to its job id.""" |
372 | + runner = BaseJobRunner() |
373 | + job = self.job_class.get(job_id) |
374 | + oops = runner.runJobHandleError(job) |
375 | + if oops is None: |
376 | + oops_id = '' |
377 | + else: |
378 | + oops_id = oops.id |
379 | + return {'success': len(runner.completed_jobs), 'oops_id': oops_id} |
380 | |
381 | |
382 | class TwistedJobRunner(BaseJobRunner): |
383 | """Run Jobs via twisted.""" |
384 | |
385 | - def __init__(self, job_source, logger=None): |
386 | - super(TwistedJobRunner, self).__init__(logger=logger) |
387 | + def __init__(self, job_source, job_amp, logger=None, error_utility=None): |
388 | + starter = main.ProcessStarter( |
389 | + bootstrap=BOOTSTRAP, packages=('twisted', 'ampoule'), |
390 | + env={'PYTHONPATH': os.environ['PYTHONPATH'], |
391 | + 'PATH': os.environ['PATH'], |
392 | + 'LPCONFIG': os.environ['LPCONFIG']}) |
393 | + super(TwistedJobRunner, self).__init__(logger, error_utility) |
394 | self.job_source = job_source |
395 | + self.pool = pool.ProcessPool(job_amp, starter=starter, min=0) |
396 | + |
397 | + def runJobInSubprocess(self, job): |
398 | + """Run the job_class with the specified id in the process pool. |
399 | + |
400 | + :return: a Deferred that fires when the job has completed. |
401 | + """ |
402 | + job_id = job.id |
403 | + deferred = self.pool.doWork(RunJobCommand, job_id=job_id) |
404 | + def update(response): |
405 | + if response['success']: |
406 | + self.completed_jobs.append(job) |
407 | + else: |
408 | + self.incomplete_jobs.append(job) |
409 | + if response['oops_id'] != '': |
410 | + self._logOopsId(response['oops_id']) |
411 | + def job_raised(failure): |
412 | + self.incomplete_jobs.append(job) |
413 | + info = (failure.type, failure.value, failure.tb) |
414 | + oops = self._doOops(job, info) |
415 | + self._logOopsId(oops.id) |
416 | + deferred.addCallbacks(update, job_raised) |
417 | + return deferred |
418 | |
419 | def getTaskSource(self): |
420 | """Return a task source for all jobs in job_source.""" |
421 | def producer(): |
422 | while True: |
423 | for job in self.job_source.iterReady(): |
424 | - yield lambda: self.runJobHandleError(job) |
425 | + yield lambda: self.runJobInSubprocess(job) |
426 | yield None |
427 | return PollingTaskSource(5, producer().next) |
428 | |
429 | @@ -204,22 +314,32 @@ |
430 | |
431 | def runAll(self): |
432 | """Run all ready jobs, and any that become ready while running.""" |
433 | + self.pool.start() |
434 | d = defer.maybeDeferred(self.doConsumer) |
435 | - d.addCallbacks(lambda ignored: reactor.stop(), self.failed) |
436 | - |
437 | - @staticmethod |
438 | - def failed(failure): |
439 | + d.addCallbacks(self.terminated, self.failed) |
440 | + |
441 | + def terminated(self, ignored=None): |
442 | + """Callback to stop the processpool and reactor.""" |
443 | + deferred = self.pool.stop() |
444 | + deferred.addBoth(lambda ignored: reactor.stop()) |
445 | + |
446 | + def failed(self, failure): |
447 | """Callback for when the job fails.""" |
448 | failure.printTraceback() |
449 | - reactor.stop() |
450 | + self.terminated() |
451 | |
452 | @classmethod |
453 | - def runFromSource(cls, job_source, logger): |
454 | + def runFromSource(cls, job_source, logger, error_utility=None): |
455 | """Run all ready jobs provided by the specified source.""" |
456 | logger.info("Running through Twisted.") |
457 | - runner = cls(job_source, logger) |
458 | + runner = cls(job_source, removeSecurityProxy(job_source).amp, logger, |
459 | + error_utility) |
460 | reactor.callWhenRunning(runner.runAll) |
461 | - reactor.run() |
462 | + handler = getsignal(SIGCHLD) |
463 | + try: |
464 | + reactor.run() |
465 | + finally: |
466 | + signal(SIGCHLD, handler) |
467 | return runner |
468 | |
469 | |
470 | @@ -232,14 +352,8 @@ |
471 | self.runner_class = runner_class |
472 | |
473 | def main(self): |
474 | - errorlog.globalErrorUtility.configure(self.config_name) |
475 | - cleanups = self.setUp() |
476 | - try: |
477 | - job_source = getUtility(self.source_interface) |
478 | - runner = self.runner_class.runFromSource(job_source, self.logger) |
479 | - finally: |
480 | - for cleanup in reversed(cleanups): |
481 | - cleanup() |
482 | + job_source = getUtility(self.source_interface) |
483 | + runner = self.runner_class.runFromSource(job_source, self.logger) |
484 | self.logger.info( |
485 | 'Ran %d %s jobs.', |
486 | len(runner.completed_jobs), self.source_interface.__name__) |
487 | |
488 | === modified file 'setup.py' |
489 | --- setup.py 2009-11-25 10:59:02 +0000 |
490 | +++ setup.py 2009-12-18 16:03:17 +0000 |
491 | @@ -24,6 +24,7 @@ |
492 | # this list should only contain direct dependencies--things imported or |
493 | # used in zcml. |
494 | install_requires=[ |
495 | + 'ampoule', |
496 | 'bzr', |
497 | 'chameleon.core', |
498 | 'chameleon.zpt', |
499 | |
500 | === modified file 'versions.cfg' |
501 | --- versions.cfg 2009-12-16 14:14:41 +0000 |
502 | +++ versions.cfg 2009-12-18 16:03:17 +0000 |
503 | @@ -3,6 +3,12 @@ |
504 | |
505 | [versions] |
506 | # Alphabetical, case-insensitive, please! :-) |
507 | + |
508 | +# from -r 3:lp:~launchpad/ampoule/launchpad-tweaked |
509 | +# To reproduce: |
510 | +# bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\ |
511 | +# -r 3 |
512 | +ampoule = 0.1.0-lp-1 |
513 | bzr = 2.1b3 |
514 | chameleon.core = 1.0b35 |
515 | chameleon.zpt = 1.0b17 |
= Summary =
Modify the twisted job runner to use a subprocess.
== Proposed fix == TaskConsumer is paramaterized to run
Use the Ampoule process pool. Note that this implementation does not provide
parallelism, because the ParallelLimited
1 parallel process.
== Pre-implementation notes ==
Preimplementation discussion with thumper, mwhudson.
== Implementation details ==
Make ampoule a dependency.
Because process set-up should happen in a subprocess, with Ampoule, remove the
setUp concept from JobCronScript, and instead provide a context manager.
Configure the globalErrorUtility there. Use the context manager in JobRunner.
Add an Amp command called RunJobCommand. Add a JobRunnerProcess class that can
provide RunJobCommand. Have it enter the context when a connection is made,
and exit it when the connection is lost. (These should happen only once in the
JobRunnerProcess's lifecycle.)
Subclass RunJobCommand as UpdatePreviewDi ffProcess. Provide ffJob.amp so that TwistedJobRunne r.runFromSource can find the
UpdatePreviewDi
right process class to use.
Update runAll to start the process pool, using the specified job_amp kind. execute_ zcml_for_ scripts.
Provide a customized BOOTSTRAP to invoke scripts.
Change getTaskSource to return runJobInSubprocess, which uses the process pool
to run the job.
Update TwistedJobRunne r.runFromSource to clean up SIGCHLD signal handlers after
reactor.run.
== Tests == preview_ diffs
bin/test test_update_
== Demo and Q/A ==
Have a LOSA update the staging config to use --twisted, and ensure the log
says it ran under Twisted.