Merge lp:~abentley/launchpad/twistedjob into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~abentley/launchpad/twistedjob
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/twistedjob-enhancements
Diff against target: 228 lines (+122/-29)
3 files modified
cronscripts/update_preview_diffs.py (+11/-1)
lib/lp/code/scripts/tests/test_update_preview_diffs.py (+16/-0)
lib/lp/services/job/runner.py (+95/-28)
To merge this branch: bzr merge lp:~abentley/launchpad/twistedjob
Reviewer Review Type Date Requested Status
Paul Hummer (community) Approve
Review via email: mp+15980@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

= Summary =
Provide a way of running jobs through Twisted

== Proposed fix ==
Provide a TwistedJobRunner, and adjust related interfaces.

== Pre-implementation notes ==
Preimplementation was with thumper.

== Implementation details ==
Extract BaseJobRunner from JobRunner. Extract runJobHandleError from
runAll.

Implement TwistedJobRunner, derived from BaseJobRunner. Use the task support
classes to implement support for running jobs via Twisted. Specifically,
Provide a PollingTaskSource that returns a lambda to call runJobHandleError.
Use the ParallelLimitedTaskConsumer to consume these tasks.

Implement runAll in terms of ParallelLimitedTaskConsumer. Note that, at this
time, this does not cause parallelism, because ParallelLimitedTaskConsumer is
parameterized to run 1 task at a time.

Implement JobRunner.runFromSource method, because this can be polymophic.
Adjust JobCronScript to take a runner_class, so that it can optionally use
Twisted.

Change update_prefiew_diffs to run under Twisted if the --twisted option is
supplied.

== Tests ==
bin/test -t update_preview_diffs

== Demo and Q/A ==
Have the LOSAs change the staging cron script to run update_preview_diffs with
--twisted. Cause a preview diff to be generated. Examine the log, and see
that it ran under Twisted.

Revision history for this message
Paul Hummer (rockstar) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'cronscripts/update_preview_diffs.py'
--- cronscripts/update_preview_diffs.py 2009-10-14 13:13:47 +0000
+++ cronscripts/update_preview_diffs.py 2009-12-18 16:02:18 +0000
@@ -12,7 +12,7 @@
12import _pythonpath12import _pythonpath
1313
14from lp.codehosting.vfs import get_scanner_server14from lp.codehosting.vfs import get_scanner_server
15from lp.services.job.runner import JobCronScript15from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner
16from lp.code.interfaces.branchmergeproposal import (16from lp.code.interfaces.branchmergeproposal import (
17 IUpdatePreviewDiffJobSource,)17 IUpdatePreviewDiffJobSource,)
1818
@@ -23,6 +23,16 @@
23 config_name = 'update_preview_diffs'23 config_name = 'update_preview_diffs'
24 source_interface = IUpdatePreviewDiffJobSource24 source_interface = IUpdatePreviewDiffJobSource
2525
26 def __init__(self):
27 super(JobCronScript, self).__init__()
28 if self.options.twisted:
29 self.runner_class = TwistedJobRunner
30 else:
31 self.runner_class = JobRunner
32
33 def add_my_options(self):
34 self.parser.add_option('--twisted', action='store_true')
35
26 def setUp(self):36 def setUp(self):
27 server = get_scanner_server()37 server = get_scanner_server()
28 server.setUp()38 server.setUp()
2939
=== modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py'
--- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:02:17 +0000
+++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:02:18 +0000
@@ -49,6 +49,22 @@
49 self.assertEqual('', stdout)49 self.assertEqual('', stdout)
50 self.assertEqual(50 self.assertEqual(
51 'INFO creating lockfile\n'51 'INFO creating lockfile\n'
52 'INFO Running synchronously.\n'
53 'INFO Ran 1 IUpdatePreviewDiffJobSource jobs.\n'
54 'INFO 0 IUpdatePreviewDiffJobSource jobs did not complete.\n',
55 stderr)
56 self.assertIsNot(None, bmp.preview_diff)
57
58 def test_update_preview_diffs_twisted(self):
59 """Ensure update_preview_diffs runs and generates diffs."""
60 job, bmp, source_tree = self.create_preview_diff_job()
61 retcode, stdout, stderr = run_script(
62 'cronscripts/update_preview_diffs.py', ['--twisted'])
63 self.assertEqual(0, retcode)
64 self.assertEqual('', stdout)
65 self.assertEqual(
66 'INFO creating lockfile\n'
67 'INFO Running through Twisted.\n'
52 'INFO Ran 1 IUpdatePreviewDiffJobSource jobs.\n'68 'INFO Ran 1 IUpdatePreviewDiffJobSource jobs.\n'
53 'INFO 0 IUpdatePreviewDiffJobSource jobs did not complete.\n',69 'INFO 0 IUpdatePreviewDiffJobSource jobs did not complete.\n',
54 stderr)70 stderr)
5571
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2009-12-18 16:02:17 +0000
+++ lib/lp/services/job/runner.py 2009-12-18 16:02:18 +0000
@@ -13,9 +13,12 @@
1313
14import sys14import sys
1515
16from twisted.internet import reactor, defer
16from zope.component import getUtility17from zope.component import getUtility
1718
18from canonical.config import config19from canonical.config import config
20from canonical.twistedsupport.task import (
21 ParallelLimitedTaskConsumer, PollingTaskSource)
19from lazr.delegates import delegates22from lazr.delegates import delegates
20import transaction23import transaction
2124
@@ -96,20 +99,14 @@
96 ctrl.send()99 ctrl.send()
97100
98101
99class JobRunner(object):102class BaseJobRunner(object):
100 """Runner of Jobs."""103 """Runner of Jobs."""
101104
102 def __init__(self, jobs, logger=None):105 def __init__(self, logger=None):
103 self.jobs = jobs
104 self.completed_jobs = []106 self.completed_jobs = []
105 self.incomplete_jobs = []107 self.incomplete_jobs = []
106 self.logger = logger108 self.logger = logger
107109
108 @classmethod
109 def fromReady(cls, job_class, logger=None):
110 """Return a job runner for all ready jobs of a given class."""
111 return cls(job_class.iterReady(), logger)
112
113 def runJob(self, job):110 def runJob(self, job):
114 """Attempt to run a job, updating its status as appropriate."""111 """Attempt to run a job, updating its status as appropriate."""
115 job = IRunnableJob(job)112 job = IRunnableJob(job)
@@ -135,41 +132,111 @@
135 # Commit transaction to update job status.132 # Commit transaction to update job status.
136 transaction.commit()133 transaction.commit()
137134
135 def runJobHandleError(self, job):
136 """Run the specified job, handling errors.
137
138 Most errors will be logged as Oopses. Jobs in user_error_types won't.
139 The list of complete or incomplete jobs will be updated.
140 """
141 job = IRunnableJob(job)
142 with errorlog.globalErrorUtility.oopsMessage(
143 dict(job.getOopsVars())):
144 try:
145 self.runJob(job)
146 except LeaseHeld:
147 self.incomplete_jobs.append(job)
148 except job.user_error_types, e:
149 job.notifyUserError(e)
150 except Exception:
151 info = sys.exc_info()
152 errorlog.globalErrorUtility.raising(info)
153 oops = errorlog.globalErrorUtility.getLastOopsReport()
154 job.notifyOops(oops)
155 if self.logger is not None:
156 self.logger.info('Job resulted in OOPS: %s' % oops.id)
157
158
159class JobRunner(BaseJobRunner):
160
161 def __init__(self, jobs, logger=None):
162 BaseJobRunner.__init__(self, logger=logger)
163 self.jobs = jobs
164
165 @classmethod
166 def fromReady(cls, job_class, logger=None):
167 """Return a job runner for all ready jobs of a given class."""
168 return cls(job_class.iterReady(), logger)
169
170 @classmethod
171 def runFromSource(cls, job_source, logger):
172 """Run all ready jobs provided by the specified source."""
173 logger.info("Running synchronously.")
174 runner = cls.fromReady(job_source, logger)
175 runner.runAll()
176 return runner
177
138 def runAll(self):178 def runAll(self):
139 """Run all the Jobs for this JobRunner."""179 """Run all the Jobs for this JobRunner."""
140 for job in self.jobs:180 for job in self.jobs:
141 job = IRunnableJob(job)181 self.runJobHandleError(job)
142 with errorlog.globalErrorUtility.oopsMessage(182
143 dict(job.getOopsVars())):183
144 try:184class TwistedJobRunner(BaseJobRunner):
145 self.runJob(job)185 """Run Jobs via twisted."""
146 except LeaseHeld:186
147 self.incomplete_jobs.append(job)187 def __init__(self, job_source, logger=None):
148 except job.user_error_types, e:188 super(TwistedJobRunner, self).__init__(logger=logger)
149 job.notifyUserError(e)189 self.job_source = job_source
150 except Exception:190
151 info = sys.exc_info()191 def getTaskSource(self):
152 errorlog.globalErrorUtility.raising(info)192 """Return a task source for all jobs in job_source."""
153 oops = errorlog.globalErrorUtility.getLastOopsReport()193 def producer():
154 job.notifyOops(oops)194 while True:
155 if self.logger is not None:195 for job in self.job_source.iterReady():
156 self.logger.info('Job resulted in OOPS: %s' % oops.id)196 yield lambda: self.runJobHandleError(job)
197 yield None
198 return PollingTaskSource(5, producer().next)
199
200 def doConsumer(self):
201 """Create a ParallelLimitedTaskConsumer for this job type."""
202 consumer = ParallelLimitedTaskConsumer(1)
203 return consumer.consume(self.getTaskSource())
204
205 def runAll(self):
206 """Run all ready jobs, and any that become ready while running."""
207 d = defer.maybeDeferred(self.doConsumer)
208 d.addCallbacks(lambda ignored: reactor.stop(), self.failed)
209
210 @staticmethod
211 def failed(failure):
212 """Callback for when the job fails."""
213 failure.printTraceback()
214 reactor.stop()
215
216 @classmethod
217 def runFromSource(cls, job_source, logger):
218 """Run all ready jobs provided by the specified source."""
219 logger.info("Running through Twisted.")
220 runner = cls(job_source, logger)
221 reactor.callWhenRunning(runner.runAll)
222 reactor.run()
223 return runner
157224
158225
159class JobCronScript(LaunchpadCronScript):226class JobCronScript(LaunchpadCronScript):
160 """Base class for scripts that run jobs."""227 """Base class for scripts that run jobs."""
161228
162 def __init__(self):229 def __init__(self, runner_class=JobRunner):
163 dbuser = getattr(config, self.config_name).dbuser230 dbuser = getattr(config, self.config_name).dbuser
164 super(JobCronScript, self).__init__(self.config_name, dbuser)231 super(JobCronScript, self).__init__(self.config_name, dbuser)
232 self.runner_class = runner_class
165233
166 def main(self):234 def main(self):
167 errorlog.globalErrorUtility.configure(self.config_name)235 errorlog.globalErrorUtility.configure(self.config_name)
168 runner = JobRunner.fromReady(
169 getUtility(self.source_interface), self.logger)
170 cleanups = self.setUp()236 cleanups = self.setUp()
171 try:237 try:
172 runner.runAll()238 job_source = getUtility(self.source_interface)
239 runner = self.runner_class.runFromSource(job_source, self.logger)
173 finally:240 finally:
174 for cleanup in reversed(cleanups):241 for cleanup in reversed(cleanups):
175 cleanup()242 cleanup()