Merge lp:~abentley/launchpad/ampoulejob-timeout into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~abentley/launchpad/ampoulejob-timeout
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/ampoulejob
Diff against target: 358 lines (+155/-40)
6 files modified
lib/lp/code/scripts/tests/test_update_preview_diffs.py (+5/-1)
lib/lp/services/job/runner.py (+67/-31)
lib/lp/services/job/tests/test_runner.py (+69/-2)
lib/lp/translations/scripts/po_import.py (+4/-1)
lib/lp/translations/scripts/tests/test_translations_import.py (+9/-0)
versions.cfg (+1/-5)
To merge this branch: bzr merge lp:~abentley/launchpad/ampoulejob-timeout
Reviewer Review Type Date Requested Status
Paul Hummer (community) Approve
Review via email: mp+15986@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

= Summary =
Enforce timeouts for jobs running under Twisted.

== Proposed fix ==
Use Ampoule's timeout handling.

== Pre-implementation notes ==
This was discussed a bit with mwhudson.

== Implementation details ==
Use Job.getTimeout to determine the job's timeout and run it with that timeout.

Ideally, timeouts should cause the subprocess to oops, so subclass ProcessPool
as HUPProcessPool, and override _handleTimeout to use HUP instead. Install a
signal handler for HUP in the child process that raises TimeoutError.

The lease must be acquired before the job is run in order to determine the
timeout, so move acquireLease to JobRunner.runAll and
TwistedJobRunner.runJobInSubprocess.

Extract most of the bootstrap code to lp.services.job.runner.bootstrap()

== Tests ==
bin/test -t update_preview_diffs -t test_runner

== Demo and Q/A ==
None, really. We don't have any jobs that exceed timeouts at present.

Revision history for this message
Paul Hummer (rockstar) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py'
2--- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2010-01-06 01:34:20 +0000
3+++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2010-01-06 01:34:23 +0000
4@@ -76,7 +76,11 @@
5 source_tree.bzrdir.root_transport.delete_tree('.bzr')
6 error_utility = errorlog.ErrorReportingUtility()
7 error_utility.configure('update_preview_diffs')
8- old_id = error_utility.getLastOopsReport().id
9+ old_report = error_utility.getLastOopsReport()
10+ if old_report is not None:
11+ old_id = old_report.id
12+ else:
13+ old_id = None
14 retcode, stdout, stderr = run_script(
15 'cronscripts/update_preview_diffs.py', ['--twisted'])
16 self.assertEqual(0, retcode)
17
18=== modified file 'lib/lp/services/job/runner.py'
19--- lib/lp/services/job/runner.py 2010-01-06 01:34:20 +0000
20+++ lib/lp/services/job/runner.py 2010-01-06 01:34:23 +0000
21@@ -11,14 +11,16 @@
22 __all__ = ['JobRunner']
23
24
25+import contextlib
26 import os
27-from signal import getsignal, SIGCHLD, signal
28+from signal import getsignal, SIGCHLD, SIGHUP, signal
29 import sys
30
31 from ampoule import child, pool, main
32-
33-from twisted.internet import reactor, defer
34+from twisted.internet import defer, error, reactor, stdio
35 from twisted.protocols import amp
36+from twisted.python import log, reflect
37+
38 from zope.component import getUtility
39 from zope.security.proxy import removeSecurityProxy
40
41@@ -31,29 +33,9 @@
42 from lp.services.scripts.base import LaunchpadCronScript
43 from lp.services.job.interfaces.job import LeaseHeld, IRunnableJob, IJob
44 from lp.services.mail.sendmail import MailController
45+from canonical.launchpad import scripts
46 from canonical.launchpad.webapp import errorlog
47
48-BOOTSTRAP = """\
49-import sys
50-
51-def main(reactor, ampChildPath):
52- from twisted.application import reactors
53- reactors.installReactor(reactor)
54-
55- from twisted.python import log
56- log.startLogging(sys.stderr)
57-
58- from twisted.internet import reactor, stdio
59- from twisted.python import reflect
60-
61- ampChild = reflect.namedAny(ampChildPath)
62- stdio.StandardIO(ampChild(), 3, 4)
63- from canonical.launchpad import scripts
64- scripts.execute_zcml_for_scripts(use_web_security=False)
65- reactor.run()
66-main(sys.argv[-2], sys.argv[-1])
67-"""
68-
69
70 class BaseRunnableJob:
71 """Base class for jobs to be run via JobRunner.
72@@ -125,6 +107,11 @@
73 return
74 ctrl.send()
75
76+ @staticmethod
77+ @contextlib.contextmanager
78+ def contextManager():
79+ yield
80+
81
82 class BaseJobRunner(object):
83 """Runner of Jobs."""
84@@ -140,9 +127,6 @@
85 def runJob(self, job):
86 """Attempt to run a job, updating its status as appropriate."""
87 job = IRunnableJob(job)
88- job.acquireLease()
89- # Commit transaction to clear the row lock.
90- transaction.commit()
91 try:
92 job.start()
93 transaction.commit()
94@@ -173,8 +157,6 @@
95 dict(job.getOopsVars())):
96 try:
97 self.runJob(job)
98- except LeaseHeld:
99- self.incomplete_jobs.append(job)
100 except job.user_error_types, e:
101 job.notifyUserError(e)
102 except Exception:
103@@ -222,6 +204,14 @@
104 def runAll(self):
105 """Run all the Jobs for this JobRunner."""
106 for job in self.jobs:
107+ job = IRunnableJob(job)
108+ try:
109+ job.acquireLease()
110+ except LeaseHeld:
111+ self.incomplete_jobs.append(job)
112+ continue
113+ # Commit transaction to clear the row lock.
114+ transaction.commit()
115 oops = self.runJobHandleError(job)
116 if oops is not None:
117 self._logOopsId(oops.id)
118@@ -263,6 +253,16 @@
119 return {'success': len(runner.completed_jobs), 'oops_id': oops_id}
120
121
122+class HUPProcessPool(pool.ProcessPool):
123+ """A ProcessPool that kills with HUP."""
124+
125+ def _handleTimeout(self, child):
126+ try:
127+ child.transport.signalProcess(SIGHUP)
128+ except error.ProcessExitedAlready:
129+ pass
130+
131+
132 class TwistedJobRunner(BaseJobRunner):
133 """Run Jobs via twisted."""
134
135@@ -274,15 +274,26 @@
136 'LPCONFIG': os.environ['LPCONFIG']})
137 super(TwistedJobRunner, self).__init__(logger, error_utility)
138 self.job_source = job_source
139- self.pool = pool.ProcessPool(job_amp, starter=starter, min=0)
140+ self.pool = HUPProcessPool(job_amp, starter=starter, min=0)
141
142 def runJobInSubprocess(self, job):
143 """Run the job_class with the specified id in the process pool.
144
145 :return: a Deferred that fires when the job has completed.
146 """
147+ job = IRunnableJob(job)
148+ try:
149+ job.acquireLease()
150+ except LeaseHeld:
151+ self.incomplete_jobs.append(job)
152+ return
153 job_id = job.id
154- deferred = self.pool.doWork(RunJobCommand, job_id=job_id)
155+ timeout = job.getTimeout()
156+ # work around ampoule bug
157+ if timeout == 0:
158+ timeout = 0.0000000000001
159+ deferred = self.pool.doWork(
160+ RunJobCommand, job_id = job_id, _timeout=timeout)
161 def update(response):
162 if response['success']:
163 self.completed_jobs.append(job)
164@@ -360,3 +371,28 @@
165 self.logger.info(
166 '%d %s jobs did not complete.',
167 len(runner.incomplete_jobs), self.source_interface.__name__)
168+
169+
170+class TimeoutError(Exception):
171+
172+ def __init__(self):
173+ Exception.__init__(self, "Job ran too long.")
174+
175+
176+BOOTSTRAP = """\
177+import sys
178+from twisted.application import reactors
179+reactors.installReactor(sys.argv[-2])
180+from lp.services.job.runner import bootstrap
181+bootstrap(sys.argv[-1])
182+"""
183+
184+def bootstrap(ampChildPath):
185+ def handler(signum, frame):
186+ raise TimeoutError
187+ signal(SIGHUP, handler)
188+ log.startLogging(sys.stderr)
189+ ampChild = reflect.namedAny(ampChildPath)
190+ stdio.StandardIO(ampChild(), 3, 4)
191+ scripts.execute_zcml_for_scripts(use_web_security=False)
192+ reactor.run()
193
194=== modified file 'lib/lp/services/job/tests/test_runner.py'
195--- lib/lp/services/job/tests/test_runner.py 2009-11-27 14:25:44 +0000
196+++ lib/lp/services/job/tests/test_runner.py 2010-01-06 01:34:23 +0000
197@@ -6,8 +6,8 @@
198
199 from __future__ import with_statement
200
201-import contextlib
202 import sys
203+from time import sleep
204 from unittest import TestLoader
205
206 import transaction
207@@ -17,7 +17,9 @@
208 from zope.interface import implements
209
210 from lp.testing.mail_helpers import pop_notifications
211-from lp.services.job.runner import JobRunner, BaseRunnableJob
212+from lp.services.job.runner import (
213+ JobRunner, BaseRunnableJob, JobRunnerProcess, TwistedJobRunner
214+)
215 from lp.services.job.interfaces.job import JobStatus, IRunnableJob
216 from lp.services.job.model.job import Job
217 from lp.testing import TestCaseWithFactory
218@@ -248,5 +250,70 @@
219 self.assertEqual(JobStatus.FAILED, job.job.status)
220
221
222+class StuckJob(BaseRunnableJob):
223+ """Simulation of a job that stalls."""
224+ implements(IRunnableJob)
225+
226+ done = False
227+
228+ @classmethod
229+ def iterReady(cls):
230+ if not cls.done:
231+ yield StuckJob()
232+ cls.done = True
233+
234+ @staticmethod
235+ def get(id):
236+ return StuckJob()
237+
238+ def __init__(self):
239+ self.id = 1
240+ self.job = Job()
241+
242+ def acquireLease(self):
243+ # Must be enough time for the setup to complete and runJobHandleError
244+ # to be called. 7 was the minimum that worked on my computer.
245+ # -- abentley
246+ return self.job.acquireLease(10)
247+
248+ def run(self):
249+ sleep(30)
250+
251+
252+class StuckJobProcess(JobRunnerProcess):
253+
254+ job_class = StuckJob
255+
256+
257+StuckJob.amp = StuckJobProcess
258+
259+
260+class ListLogger:
261+
262+ def __init__(self):
263+ self.entries = []
264+
265+ def info(self, input):
266+ self.entries.append(input)
267+
268+
269+class TestTwistedJobRunner(TestCaseWithFactory):
270+
271+ layer = LaunchpadZopelessLayer
272+
273+ def test_timeout(self):
274+ """When a job exceeds its lease, an exception is raised."""
275+ logger = ListLogger()
276+ runner = TwistedJobRunner.runFromSource(StuckJob, logger)
277+ self.assertEqual([], runner.completed_jobs)
278+ self.assertEqual(1, len(runner.incomplete_jobs))
279+ oops = errorlog.globalErrorUtility.getLastOopsReport()
280+ expected = [
281+ 'Running through Twisted.', 'Job resulted in OOPS: %s' % oops.id]
282+ self.assertEqual(expected, logger.entries)
283+ self.assertEqual('TimeoutError', oops.type)
284+ self.assertIn('Job ran too long.', oops.value)
285+
286+
287 def test_suite():
288 return TestLoader().loadTestsFromName(__name__)
289
290=== modified file 'lib/lp/translations/scripts/po_import.py'
291--- lib/lp/translations/scripts/po_import.py 2009-11-17 09:50:33 +0000
292+++ lib/lp/translations/scripts/po_import.py 2010-01-06 01:34:23 +0000
293@@ -135,11 +135,14 @@
294 text = MailWrapper().format(mail_body)
295 simple_sendmail(from_email, to_email, mail_subject, text)
296
297+ def run(self, *args, **kwargs):
298+ errorlog.globalErrorUtility.configure('poimport')
299+ LaunchpadCronScript.run(self, *args, **kwargs)
300+
301 def main(self):
302 """Import entries from the queue."""
303 self.logger.debug("Starting the import process.")
304
305- errorlog.globalErrorUtility.configure('poimport')
306 self.deadline = datetime.now(UTC) + self.time_to_run
307 translation_import_queue = getUtility(ITranslationImportQueue)
308
309
310=== modified file 'lib/lp/translations/scripts/tests/test_translations_import.py'
311--- lib/lp/translations/scripts/tests/test_translations_import.py 2009-10-20 05:17:01 +0000
312+++ lib/lp/translations/scripts/tests/test_translations_import.py 2010-01-06 01:34:23 +0000
313@@ -7,6 +7,7 @@
314 from unittest import TestLoader
315
316 from lp.testing import TestCaseWithFactory
317+from canonical.launchpad.webapp import errorlog
318 from canonical.testing.layers import LaunchpadScriptLayer
319
320 from lp.translations.interfaces.translationimportqueue import (
321@@ -158,6 +159,14 @@
322 self.assertEqual(RosettaImportStatus.FAILED, entry.status)
323 self.assertEqual(message, entry.error_output)
324
325+ def test_main_leaves_oops_handling_alone(self):
326+ """Ensure that script.main is not altering oops reporting."""
327+ self.script.main()
328+ default_reporting = errorlog.ErrorReportingUtility()
329+ default_reporting.configure('error_reports')
330+ self.assertEqual(default_reporting.oops_prefix,
331+ errorlog.globalErrorUtility.oops_prefix)
332+
333
334 def test_suite():
335 return TestLoader().loadTestsFromName(__name__)
336
337=== modified file 'versions.cfg'
338--- versions.cfg 2010-01-06 01:34:20 +0000
339+++ versions.cfg 2010-01-06 01:34:23 +0000
340@@ -3,17 +3,13 @@
341
342 [versions]
343 # Alphabetical, case-insensitive, please! :-)
344-<<<<<<< TREE
345-bzr = 2.1b4
346-=======
347
348 # from -r 3:lp:~launchpad/ampoule/launchpad-tweaked
349 # To reproduce:
350 # bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\
351 # -r 3
352 ampoule = 0.1.0-lp-1
353-bzr = 2.1b3
354->>>>>>> MERGE-SOURCE
355+bzr = 2.1b4
356 chameleon.core = 1.0b35
357 chameleon.zpt = 1.0b17
358 ClientForm = 0.2.10