Merge lp:~abentley/launchpad/ampoulejob-timeout into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~abentley/launchpad/ampoulejob-timeout
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/ampoulejob
Diff against target: 358 lines (+155/-40)
6 files modified
lib/lp/code/scripts/tests/test_update_preview_diffs.py (+5/-1)
lib/lp/services/job/runner.py (+67/-31)
lib/lp/services/job/tests/test_runner.py (+69/-2)
lib/lp/translations/scripts/po_import.py (+4/-1)
lib/lp/translations/scripts/tests/test_translations_import.py (+9/-0)
versions.cfg (+1/-5)
To merge this branch: bzr merge lp:~abentley/launchpad/ampoulejob-timeout
Reviewer Review Type Date Requested Status
Paul Hummer (community) Approve
Review via email: mp+15986@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

= Summary =
Enforce timeouts for jobs running under Twisted.

== Proposed fix ==
Use Ampoule's timeout handling.

== Pre-implementation notes ==
This was discussed a bit with mwhudson.

== Implementation details ==
Use Job.getTimeout to determine the job's timeout and run it with that timeout.

Ideally, timeouts should cause the subprocess to oops, so subclass ProcessPool
as HUPProcessPool, and override _handleTimeout to use HUP instead. Install a
signal handler for HUP in the child process that raises TimeoutError.

The lease must be acquired before the job is run in order to determine the
timeout, so move acquireLease to JobRunner.runAll and
TwistedJobRunner.runJobInSubprocess.

Extract most of the bootstrap code to lp.services.job.runner.bootstrap()

== Tests ==
bin/test -t update_preview_diffs -t test_runner

== Demo and Q/A ==
None, really. We don't have any jobs that exceed timeouts at present.

Revision history for this message
Paul Hummer (rockstar) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py'
--- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2010-01-06 01:34:20 +0000
+++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2010-01-06 01:34:23 +0000
@@ -76,7 +76,11 @@
76 source_tree.bzrdir.root_transport.delete_tree('.bzr')76 source_tree.bzrdir.root_transport.delete_tree('.bzr')
77 error_utility = errorlog.ErrorReportingUtility()77 error_utility = errorlog.ErrorReportingUtility()
78 error_utility.configure('update_preview_diffs')78 error_utility.configure('update_preview_diffs')
79 old_id = error_utility.getLastOopsReport().id79 old_report = error_utility.getLastOopsReport()
80 if old_report is not None:
81 old_id = old_report.id
82 else:
83 old_id = None
80 retcode, stdout, stderr = run_script(84 retcode, stdout, stderr = run_script(
81 'cronscripts/update_preview_diffs.py', ['--twisted'])85 'cronscripts/update_preview_diffs.py', ['--twisted'])
82 self.assertEqual(0, retcode)86 self.assertEqual(0, retcode)
8387
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2010-01-06 01:34:20 +0000
+++ lib/lp/services/job/runner.py 2010-01-06 01:34:23 +0000
@@ -11,14 +11,16 @@
11__all__ = ['JobRunner']11__all__ = ['JobRunner']
1212
1313
14import contextlib
14import os15import os
15from signal import getsignal, SIGCHLD, signal16from signal import getsignal, SIGCHLD, SIGHUP, signal
16import sys17import sys
1718
18from ampoule import child, pool, main19from ampoule import child, pool, main
1920from twisted.internet import defer, error, reactor, stdio
20from twisted.internet import reactor, defer
21from twisted.protocols import amp21from twisted.protocols import amp
22from twisted.python import log, reflect
23
22from zope.component import getUtility24from zope.component import getUtility
23from zope.security.proxy import removeSecurityProxy25from zope.security.proxy import removeSecurityProxy
2426
@@ -31,29 +33,9 @@
31from lp.services.scripts.base import LaunchpadCronScript33from lp.services.scripts.base import LaunchpadCronScript
32from lp.services.job.interfaces.job import LeaseHeld, IRunnableJob, IJob34from lp.services.job.interfaces.job import LeaseHeld, IRunnableJob, IJob
33from lp.services.mail.sendmail import MailController35from lp.services.mail.sendmail import MailController
36from canonical.launchpad import scripts
34from canonical.launchpad.webapp import errorlog37from canonical.launchpad.webapp import errorlog
3538
36BOOTSTRAP = """\
37import sys
38
39def main(reactor, ampChildPath):
40 from twisted.application import reactors
41 reactors.installReactor(reactor)
42
43 from twisted.python import log
44 log.startLogging(sys.stderr)
45
46 from twisted.internet import reactor, stdio
47 from twisted.python import reflect
48
49 ampChild = reflect.namedAny(ampChildPath)
50 stdio.StandardIO(ampChild(), 3, 4)
51 from canonical.launchpad import scripts
52 scripts.execute_zcml_for_scripts(use_web_security=False)
53 reactor.run()
54main(sys.argv[-2], sys.argv[-1])
55"""
56
5739
58class BaseRunnableJob:40class BaseRunnableJob:
59 """Base class for jobs to be run via JobRunner.41 """Base class for jobs to be run via JobRunner.
@@ -125,6 +107,11 @@
125 return107 return
126 ctrl.send()108 ctrl.send()
127109
110 @staticmethod
111 @contextlib.contextmanager
112 def contextManager():
113 yield
114
128115
129class BaseJobRunner(object):116class BaseJobRunner(object):
130 """Runner of Jobs."""117 """Runner of Jobs."""
@@ -140,9 +127,6 @@
140 def runJob(self, job):127 def runJob(self, job):
141 """Attempt to run a job, updating its status as appropriate."""128 """Attempt to run a job, updating its status as appropriate."""
142 job = IRunnableJob(job)129 job = IRunnableJob(job)
143 job.acquireLease()
144 # Commit transaction to clear the row lock.
145 transaction.commit()
146 try:130 try:
147 job.start()131 job.start()
148 transaction.commit()132 transaction.commit()
@@ -173,8 +157,6 @@
173 dict(job.getOopsVars())):157 dict(job.getOopsVars())):
174 try:158 try:
175 self.runJob(job)159 self.runJob(job)
176 except LeaseHeld:
177 self.incomplete_jobs.append(job)
178 except job.user_error_types, e:160 except job.user_error_types, e:
179 job.notifyUserError(e)161 job.notifyUserError(e)
180 except Exception:162 except Exception:
@@ -222,6 +204,14 @@
222 def runAll(self):204 def runAll(self):
223 """Run all the Jobs for this JobRunner."""205 """Run all the Jobs for this JobRunner."""
224 for job in self.jobs:206 for job in self.jobs:
207 job = IRunnableJob(job)
208 try:
209 job.acquireLease()
210 except LeaseHeld:
211 self.incomplete_jobs.append(job)
212 continue
213 # Commit transaction to clear the row lock.
214 transaction.commit()
225 oops = self.runJobHandleError(job)215 oops = self.runJobHandleError(job)
226 if oops is not None:216 if oops is not None:
227 self._logOopsId(oops.id)217 self._logOopsId(oops.id)
@@ -263,6 +253,16 @@
263 return {'success': len(runner.completed_jobs), 'oops_id': oops_id}253 return {'success': len(runner.completed_jobs), 'oops_id': oops_id}
264254
265255
256class HUPProcessPool(pool.ProcessPool):
257 """A ProcessPool that kills with HUP."""
258
259 def _handleTimeout(self, child):
260 try:
261 child.transport.signalProcess(SIGHUP)
262 except error.ProcessExitedAlready:
263 pass
264
265
266class TwistedJobRunner(BaseJobRunner):266class TwistedJobRunner(BaseJobRunner):
267 """Run Jobs via twisted."""267 """Run Jobs via twisted."""
268268
@@ -274,15 +274,26 @@
274 'LPCONFIG': os.environ['LPCONFIG']})274 'LPCONFIG': os.environ['LPCONFIG']})
275 super(TwistedJobRunner, self).__init__(logger, error_utility)275 super(TwistedJobRunner, self).__init__(logger, error_utility)
276 self.job_source = job_source276 self.job_source = job_source
277 self.pool = pool.ProcessPool(job_amp, starter=starter, min=0)277 self.pool = HUPProcessPool(job_amp, starter=starter, min=0)
278278
279 def runJobInSubprocess(self, job):279 def runJobInSubprocess(self, job):
280 """Run the job_class with the specified id in the process pool.280 """Run the job_class with the specified id in the process pool.
281281
282 :return: a Deferred that fires when the job has completed.282 :return: a Deferred that fires when the job has completed.
283 """283 """
284 job = IRunnableJob(job)
285 try:
286 job.acquireLease()
287 except LeaseHeld:
288 self.incomplete_jobs.append(job)
289 return
284 job_id = job.id290 job_id = job.id
285 deferred = self.pool.doWork(RunJobCommand, job_id=job_id)291 timeout = job.getTimeout()
292 # work around ampoule bug
293 if timeout == 0:
294 timeout = 0.0000000000001
295 deferred = self.pool.doWork(
296 RunJobCommand, job_id = job_id, _timeout=timeout)
286 def update(response):297 def update(response):
287 if response['success']:298 if response['success']:
288 self.completed_jobs.append(job)299 self.completed_jobs.append(job)
@@ -360,3 +371,28 @@
360 self.logger.info(371 self.logger.info(
361 '%d %s jobs did not complete.',372 '%d %s jobs did not complete.',
362 len(runner.incomplete_jobs), self.source_interface.__name__)373 len(runner.incomplete_jobs), self.source_interface.__name__)
374
375
376class TimeoutError(Exception):
377
378 def __init__(self):
379 Exception.__init__(self, "Job ran too long.")
380
381
382BOOTSTRAP = """\
383import sys
384from twisted.application import reactors
385reactors.installReactor(sys.argv[-2])
386from lp.services.job.runner import bootstrap
387bootstrap(sys.argv[-1])
388"""
389
390def bootstrap(ampChildPath):
391 def handler(signum, frame):
392 raise TimeoutError
393 signal(SIGHUP, handler)
394 log.startLogging(sys.stderr)
395 ampChild = reflect.namedAny(ampChildPath)
396 stdio.StandardIO(ampChild(), 3, 4)
397 scripts.execute_zcml_for_scripts(use_web_security=False)
398 reactor.run()
363399
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2009-11-27 14:25:44 +0000
+++ lib/lp/services/job/tests/test_runner.py 2010-01-06 01:34:23 +0000
@@ -6,8 +6,8 @@
66
7from __future__ import with_statement7from __future__ import with_statement
88
9import contextlib
10import sys9import sys
10from time import sleep
11from unittest import TestLoader11from unittest import TestLoader
1212
13import transaction13import transaction
@@ -17,7 +17,9 @@
17from zope.interface import implements17from zope.interface import implements
1818
19from lp.testing.mail_helpers import pop_notifications19from lp.testing.mail_helpers import pop_notifications
20from lp.services.job.runner import JobRunner, BaseRunnableJob20from lp.services.job.runner import (
21 JobRunner, BaseRunnableJob, JobRunnerProcess, TwistedJobRunner
22)
21from lp.services.job.interfaces.job import JobStatus, IRunnableJob23from lp.services.job.interfaces.job import JobStatus, IRunnableJob
22from lp.services.job.model.job import Job24from lp.services.job.model.job import Job
23from lp.testing import TestCaseWithFactory25from lp.testing import TestCaseWithFactory
@@ -248,5 +250,70 @@
248 self.assertEqual(JobStatus.FAILED, job.job.status)250 self.assertEqual(JobStatus.FAILED, job.job.status)
249251
250252
253class StuckJob(BaseRunnableJob):
254 """Simulation of a job that stalls."""
255 implements(IRunnableJob)
256
257 done = False
258
259 @classmethod
260 def iterReady(cls):
261 if not cls.done:
262 yield StuckJob()
263 cls.done = True
264
265 @staticmethod
266 def get(id):
267 return StuckJob()
268
269 def __init__(self):
270 self.id = 1
271 self.job = Job()
272
273 def acquireLease(self):
274 # Must be enough time for the setup to complete and runJobHandleError
275 # to be called. 7 was the minimum that worked on my computer.
276 # -- abentley
277 return self.job.acquireLease(10)
278
279 def run(self):
280 sleep(30)
281
282
283class StuckJobProcess(JobRunnerProcess):
284
285 job_class = StuckJob
286
287
288StuckJob.amp = StuckJobProcess
289
290
291class ListLogger:
292
293 def __init__(self):
294 self.entries = []
295
296 def info(self, input):
297 self.entries.append(input)
298
299
300class TestTwistedJobRunner(TestCaseWithFactory):
301
302 layer = LaunchpadZopelessLayer
303
304 def test_timeout(self):
305 """When a job exceeds its lease, an exception is raised."""
306 logger = ListLogger()
307 runner = TwistedJobRunner.runFromSource(StuckJob, logger)
308 self.assertEqual([], runner.completed_jobs)
309 self.assertEqual(1, len(runner.incomplete_jobs))
310 oops = errorlog.globalErrorUtility.getLastOopsReport()
311 expected = [
312 'Running through Twisted.', 'Job resulted in OOPS: %s' % oops.id]
313 self.assertEqual(expected, logger.entries)
314 self.assertEqual('TimeoutError', oops.type)
315 self.assertIn('Job ran too long.', oops.value)
316
317
251def test_suite():318def test_suite():
252 return TestLoader().loadTestsFromName(__name__)319 return TestLoader().loadTestsFromName(__name__)
253320
=== modified file 'lib/lp/translations/scripts/po_import.py'
--- lib/lp/translations/scripts/po_import.py 2009-11-17 09:50:33 +0000
+++ lib/lp/translations/scripts/po_import.py 2010-01-06 01:34:23 +0000
@@ -135,11 +135,14 @@
135 text = MailWrapper().format(mail_body)135 text = MailWrapper().format(mail_body)
136 simple_sendmail(from_email, to_email, mail_subject, text)136 simple_sendmail(from_email, to_email, mail_subject, text)
137137
138 def run(self, *args, **kwargs):
139 errorlog.globalErrorUtility.configure('poimport')
140 LaunchpadCronScript.run(self, *args, **kwargs)
141
138 def main(self):142 def main(self):
139 """Import entries from the queue."""143 """Import entries from the queue."""
140 self.logger.debug("Starting the import process.")144 self.logger.debug("Starting the import process.")
141145
142 errorlog.globalErrorUtility.configure('poimport')
143 self.deadline = datetime.now(UTC) + self.time_to_run146 self.deadline = datetime.now(UTC) + self.time_to_run
144 translation_import_queue = getUtility(ITranslationImportQueue)147 translation_import_queue = getUtility(ITranslationImportQueue)
145148
146149
=== modified file 'lib/lp/translations/scripts/tests/test_translations_import.py'
--- lib/lp/translations/scripts/tests/test_translations_import.py 2009-10-20 05:17:01 +0000
+++ lib/lp/translations/scripts/tests/test_translations_import.py 2010-01-06 01:34:23 +0000
@@ -7,6 +7,7 @@
7from unittest import TestLoader7from unittest import TestLoader
88
9from lp.testing import TestCaseWithFactory9from lp.testing import TestCaseWithFactory
10from canonical.launchpad.webapp import errorlog
10from canonical.testing.layers import LaunchpadScriptLayer11from canonical.testing.layers import LaunchpadScriptLayer
1112
12from lp.translations.interfaces.translationimportqueue import (13from lp.translations.interfaces.translationimportqueue import (
@@ -158,6 +159,14 @@
158 self.assertEqual(RosettaImportStatus.FAILED, entry.status)159 self.assertEqual(RosettaImportStatus.FAILED, entry.status)
159 self.assertEqual(message, entry.error_output)160 self.assertEqual(message, entry.error_output)
160161
162 def test_main_leaves_oops_handling_alone(self):
163 """Ensure that script.main is not altering oops reporting."""
164 self.script.main()
165 default_reporting = errorlog.ErrorReportingUtility()
166 default_reporting.configure('error_reports')
167 self.assertEqual(default_reporting.oops_prefix,
168 errorlog.globalErrorUtility.oops_prefix)
169
161170
162def test_suite():171def test_suite():
163 return TestLoader().loadTestsFromName(__name__)172 return TestLoader().loadTestsFromName(__name__)
164173
=== modified file 'versions.cfg'
--- versions.cfg 2010-01-06 01:34:20 +0000
+++ versions.cfg 2010-01-06 01:34:23 +0000
@@ -3,17 +3,13 @@
33
4[versions]4[versions]
5# Alphabetical, case-insensitive, please! :-)5# Alphabetical, case-insensitive, please! :-)
6<<<<<<< TREE
7bzr = 2.1b4
8=======
96
10# from -r 3:lp:~launchpad/ampoule/launchpad-tweaked7# from -r 3:lp:~launchpad/ampoule/launchpad-tweaked
11# To reproduce:8# To reproduce:
12# bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\9# bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\
13# -r 310# -r 3
14ampoule = 0.1.0-lp-111ampoule = 0.1.0-lp-1
15bzr = 2.1b312bzr = 2.1b4
16>>>>>>> MERGE-SOURCE
17chameleon.core = 1.0b3513chameleon.core = 1.0b35
18chameleon.zpt = 1.0b1714chameleon.zpt = 1.0b17
19ClientForm = 0.2.1015ClientForm = 0.2.10