Merge lp:~abentley/launchpad/twistedjob into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~abentley/launchpad/twistedjob
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/twistedjob-enhancements
Diff against target: 228 lines (+122/-29)
3 files modified
cronscripts/update_preview_diffs.py (+11/-1)
lib/lp/code/scripts/tests/test_update_preview_diffs.py (+16/-0)
lib/lp/services/job/runner.py (+95/-28)
To merge this branch: bzr merge lp:~abentley/launchpad/twistedjob
Reviewer Review Type Date Requested Status
Paul Hummer (community) Approve
Review via email: mp+15980@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

= Summary =
Provide a way of running jobs through Twisted

== Proposed fix ==
Provide a TwistedJobRunner, and adjust related interfaces.

== Pre-implementation notes ==
Preimplementation was with thumper.

== Implementation details ==
Extract BaseJobRunner from JobRunner. Extract runJobHandleError from
runAll.

Implement TwistedJobRunner, derived from BaseJobRunner. Use the task support
classes to implement support for running jobs via Twisted. Specifically,
Provide a PollingTaskSource that returns a lambda to call runJobHandleError.
Use the ParallelLimitedTaskConsumer to consume these tasks.

Implement runAll in terms of ParallelLimitedTaskConsumer. Note that, at this
time, this does not cause parallelism, because ParallelLimitedTaskConsumer is
parameterized to run 1 task at a time.

Implement JobRunner.runFromSource method, because this can be polymophic.
Adjust JobCronScript to take a runner_class, so that it can optionally use
Twisted.

Change update_prefiew_diffs to run under Twisted if the --twisted option is
supplied.

== Tests ==
bin/test -t update_preview_diffs

== Demo and Q/A ==
Have the LOSAs change the staging cron script to run update_preview_diffs with
--twisted. Cause a preview diff to be generated. Examine the log, and see
that it ran under Twisted.

Revision history for this message
Paul Hummer (rockstar) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'cronscripts/update_preview_diffs.py'
2--- cronscripts/update_preview_diffs.py 2009-10-14 13:13:47 +0000
3+++ cronscripts/update_preview_diffs.py 2009-12-18 16:02:18 +0000
4@@ -12,7 +12,7 @@
5 import _pythonpath
6
7 from lp.codehosting.vfs import get_scanner_server
8-from lp.services.job.runner import JobCronScript
9+from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner
10 from lp.code.interfaces.branchmergeproposal import (
11 IUpdatePreviewDiffJobSource,)
12
13@@ -23,6 +23,16 @@
14 config_name = 'update_preview_diffs'
15 source_interface = IUpdatePreviewDiffJobSource
16
17+ def __init__(self):
18+ super(JobCronScript, self).__init__()
19+ if self.options.twisted:
20+ self.runner_class = TwistedJobRunner
21+ else:
22+ self.runner_class = JobRunner
23+
24+ def add_my_options(self):
25+ self.parser.add_option('--twisted', action='store_true')
26+
27 def setUp(self):
28 server = get_scanner_server()
29 server.setUp()
30
31=== modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py'
32--- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:02:17 +0000
33+++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:02:18 +0000
34@@ -49,6 +49,22 @@
35 self.assertEqual('', stdout)
36 self.assertEqual(
37 'INFO creating lockfile\n'
38+ 'INFO Running synchronously.\n'
39+ 'INFO Ran 1 IUpdatePreviewDiffJobSource jobs.\n'
40+ 'INFO 0 IUpdatePreviewDiffJobSource jobs did not complete.\n',
41+ stderr)
42+ self.assertIsNot(None, bmp.preview_diff)
43+
44+ def test_update_preview_diffs_twisted(self):
45+ """Ensure update_preview_diffs runs and generates diffs."""
46+ job, bmp, source_tree = self.create_preview_diff_job()
47+ retcode, stdout, stderr = run_script(
48+ 'cronscripts/update_preview_diffs.py', ['--twisted'])
49+ self.assertEqual(0, retcode)
50+ self.assertEqual('', stdout)
51+ self.assertEqual(
52+ 'INFO creating lockfile\n'
53+ 'INFO Running through Twisted.\n'
54 'INFO Ran 1 IUpdatePreviewDiffJobSource jobs.\n'
55 'INFO 0 IUpdatePreviewDiffJobSource jobs did not complete.\n',
56 stderr)
57
58=== modified file 'lib/lp/services/job/runner.py'
59--- lib/lp/services/job/runner.py 2009-12-18 16:02:17 +0000
60+++ lib/lp/services/job/runner.py 2009-12-18 16:02:18 +0000
61@@ -13,9 +13,12 @@
62
63 import sys
64
65+from twisted.internet import reactor, defer
66 from zope.component import getUtility
67
68 from canonical.config import config
69+from canonical.twistedsupport.task import (
70+ ParallelLimitedTaskConsumer, PollingTaskSource)
71 from lazr.delegates import delegates
72 import transaction
73
74@@ -96,20 +99,14 @@
75 ctrl.send()
76
77
78-class JobRunner(object):
79+class BaseJobRunner(object):
80 """Runner of Jobs."""
81
82- def __init__(self, jobs, logger=None):
83- self.jobs = jobs
84+ def __init__(self, logger=None):
85 self.completed_jobs = []
86 self.incomplete_jobs = []
87 self.logger = logger
88
89- @classmethod
90- def fromReady(cls, job_class, logger=None):
91- """Return a job runner for all ready jobs of a given class."""
92- return cls(job_class.iterReady(), logger)
93-
94 def runJob(self, job):
95 """Attempt to run a job, updating its status as appropriate."""
96 job = IRunnableJob(job)
97@@ -135,41 +132,111 @@
98 # Commit transaction to update job status.
99 transaction.commit()
100
101+ def runJobHandleError(self, job):
102+ """Run the specified job, handling errors.
103+
104+ Most errors will be logged as Oopses. Jobs in user_error_types won't.
105+ The list of complete or incomplete jobs will be updated.
106+ """
107+ job = IRunnableJob(job)
108+ with errorlog.globalErrorUtility.oopsMessage(
109+ dict(job.getOopsVars())):
110+ try:
111+ self.runJob(job)
112+ except LeaseHeld:
113+ self.incomplete_jobs.append(job)
114+ except job.user_error_types, e:
115+ job.notifyUserError(e)
116+ except Exception:
117+ info = sys.exc_info()
118+ errorlog.globalErrorUtility.raising(info)
119+ oops = errorlog.globalErrorUtility.getLastOopsReport()
120+ job.notifyOops(oops)
121+ if self.logger is not None:
122+ self.logger.info('Job resulted in OOPS: %s' % oops.id)
123+
124+
125+class JobRunner(BaseJobRunner):
126+
127+ def __init__(self, jobs, logger=None):
128+ BaseJobRunner.__init__(self, logger=logger)
129+ self.jobs = jobs
130+
131+ @classmethod
132+ def fromReady(cls, job_class, logger=None):
133+ """Return a job runner for all ready jobs of a given class."""
134+ return cls(job_class.iterReady(), logger)
135+
136+ @classmethod
137+ def runFromSource(cls, job_source, logger):
138+ """Run all ready jobs provided by the specified source."""
139+ logger.info("Running synchronously.")
140+ runner = cls.fromReady(job_source, logger)
141+ runner.runAll()
142+ return runner
143+
144 def runAll(self):
145 """Run all the Jobs for this JobRunner."""
146 for job in self.jobs:
147- job = IRunnableJob(job)
148- with errorlog.globalErrorUtility.oopsMessage(
149- dict(job.getOopsVars())):
150- try:
151- self.runJob(job)
152- except LeaseHeld:
153- self.incomplete_jobs.append(job)
154- except job.user_error_types, e:
155- job.notifyUserError(e)
156- except Exception:
157- info = sys.exc_info()
158- errorlog.globalErrorUtility.raising(info)
159- oops = errorlog.globalErrorUtility.getLastOopsReport()
160- job.notifyOops(oops)
161- if self.logger is not None:
162- self.logger.info('Job resulted in OOPS: %s' % oops.id)
163+ self.runJobHandleError(job)
164+
165+
166+class TwistedJobRunner(BaseJobRunner):
167+ """Run Jobs via twisted."""
168+
169+ def __init__(self, job_source, logger=None):
170+ super(TwistedJobRunner, self).__init__(logger=logger)
171+ self.job_source = job_source
172+
173+ def getTaskSource(self):
174+ """Return a task source for all jobs in job_source."""
175+ def producer():
176+ while True:
177+ for job in self.job_source.iterReady():
178+ yield lambda: self.runJobHandleError(job)
179+ yield None
180+ return PollingTaskSource(5, producer().next)
181+
182+ def doConsumer(self):
183+ """Create a ParallelLimitedTaskConsumer for this job type."""
184+ consumer = ParallelLimitedTaskConsumer(1)
185+ return consumer.consume(self.getTaskSource())
186+
187+ def runAll(self):
188+ """Run all ready jobs, and any that become ready while running."""
189+ d = defer.maybeDeferred(self.doConsumer)
190+ d.addCallbacks(lambda ignored: reactor.stop(), self.failed)
191+
192+ @staticmethod
193+ def failed(failure):
194+ """Callback for when the job fails."""
195+ failure.printTraceback()
196+ reactor.stop()
197+
198+ @classmethod
199+ def runFromSource(cls, job_source, logger):
200+ """Run all ready jobs provided by the specified source."""
201+ logger.info("Running through Twisted.")
202+ runner = cls(job_source, logger)
203+ reactor.callWhenRunning(runner.runAll)
204+ reactor.run()
205+ return runner
206
207
208 class JobCronScript(LaunchpadCronScript):
209 """Base class for scripts that run jobs."""
210
211- def __init__(self):
212+ def __init__(self, runner_class=JobRunner):
213 dbuser = getattr(config, self.config_name).dbuser
214 super(JobCronScript, self).__init__(self.config_name, dbuser)
215+ self.runner_class = runner_class
216
217 def main(self):
218 errorlog.globalErrorUtility.configure(self.config_name)
219- runner = JobRunner.fromReady(
220- getUtility(self.source_interface), self.logger)
221 cleanups = self.setUp()
222 try:
223- runner.runAll()
224+ job_source = getUtility(self.source_interface)
225+ runner = self.runner_class.runFromSource(job_source, self.logger)
226 finally:
227 for cleanup in reversed(cleanups):
228 cleanup()