Merge lp:~allenap/launchpad/twisted-threading-bug-491870 into lp:launchpad

Proposed by Gavin Panella
Status: Rejected
Rejected by: Gavin Panella
Proposed branch: lp:~allenap/launchpad/twisted-threading-bug-491870
Merge into: lp:launchpad
Diff against target: 822 lines (+469/-158)
9 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+1/-1)
lib/lp/bugs/doc/externalbugtracker.txt (+0/-107)
lib/lp/bugs/scripts/checkwatches.py (+88/-32)
lib/lp/bugs/scripts/tests/test_checkwatches.py (+175/-3)
lib/lp/scripts/utilities/importfascist.py (+1/-0)
lib/lp/services/job/runner.py (+3/-2)
lib/lp/services/job/tests/test_runner.py (+15/-13)
lib/lp/testing/__init__.py (+55/-0)
lib/lp/testing/tests/test_zope_test_in_subprocess.py (+131/-0)
To merge this branch: bzr merge lp:~allenap/launchpad/twisted-threading-bug-491870
Reviewer Review Type Date Requested Status
Henning Eggers (community) code Approve
Review via email: mp+18843@code.launchpad.net

Commit message

In checkwatches, use Twisted to manage threading instead of the threading module.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

checkwatches can update bug watches for different remote trackers in parallel. Currently that's implemented using threads via the threading module. This branch continues to use threads, but does so with a Twisted ThreadPool.

To make testing easier, and to make the design more elegant, it's possible to pass in a scheduler to BugWatchUpdate.updateBugTrackers(). Previously the scheduling policy was baked into this method.

Getting Twisted in there also opens the door for using more async code in the future.

Revision history for this message
Henning Eggers (henninge) wrote :

Thanks for this improvement!

We talked about some things on IRC:
- Use of lambda is against LP coding style but common in Twisted. We will raise the issue at the reviewers meeting and you can leave it as it is for now. If the style change gets rejected, I ask you to fix this in a follow-up branch. Or wait until after the meeting before landing.
- The intended use of install_signal_handlers to avoid errors in the test should be documented.
- You agreed to add an explicit test for SerialScheduler.

Thanks for your patience with me ... ;-)

review: Approve (code)
Revision history for this message
Jonathan Lange (jml) wrote :

On Mon, Feb 8, 2010 at 5:38 PM, Henning Eggers
<email address hidden> wrote:
> Review: Approve code
> Thanks for this improvement!
>
> We talked about some things on IRC:
> - Use of lambda is against LP coding style but common in Twisted. We will raise the issue at the reviewers meeting and you can leave it as it is for now. If the style change gets rejected, I ask you to fix this in a follow-up branch. Or wait until after the meeting before landing.

FWIW, if the reviewers decree that lambda is too hard to read, then
it's probably worth defining a function like this:

  def kerchop(f):
    def callback(ignored, *args, **kwargs):
      return f(*args, **kwargs)
    return mergeFunctionMetadata(f, callback)

And then replacing:
  d.addCallback(lambda ignored: reactor.stop())

with:
  d.addCallback(kerchop(reactor.stop))

The function ought to go somewhere in lp.services, and have a better
name than 'kerchop'.

jml

Revision history for this message
Gavin Panella (allenap) wrote :

Hi Henning,

Thank you for the review. I added some more info about install_signal_handlers, and added some tests for SerialScheduler.

I also switched to using the default reactor thread pool instead of creating my own in TwistedThreadScheduler. I felt it was better to be explicit, but two things changed my mind:

 * We're calling reactor.run(). It's ours, we're not sharing it, so no harm in just using its thread pool.

 * The import fascist was complaining about importing deferToThreadPool. Maybe this should be in its module's __all__, maybe not. I'll file a bug for that.

Incremental diff: http://paste.ubuntu.com/372542/

Thanks, Gavin.

Revision history for this message
Gavin Panella (allenap) wrote :

> The import fascist was complaining about importing
> deferToThreadPool. Maybe this should be in its module's
> __all__, maybe not. I'll file a bug for that.

http://twistedmatrix.com/trac/ticket/4264

Revision history for this message
Jonathan Lange (jml) wrote :

The fascist has a place where you can disable the warning: lib/lp/scripts/utilities/importfascist.py, "valid_imports_not_in_all".

Revision history for this message
Gavin Panella (allenap) wrote :

Okay, I'm flip-flopping on this. Back To The Threadpool.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt'
2--- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-17 11:19:25 +0000
3+++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2010-03-15 14:09:31 +0000
4@@ -34,7 +34,7 @@
5 >>> from lp.testing.factory import LaunchpadObjectFactory
6
7 >>> LaunchpadZopelessLayer.switchDbUser('launchpad')
8- >>> login('foo.bar@canonical.com')
9+ >>> login('foo.bar@canonical.com')
10 >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker(
11 ... 'http://example.com')
12 >>> bug_tracker.active = False
13
14=== modified file 'lib/lp/bugs/doc/externalbugtracker.txt'
15--- lib/lp/bugs/doc/externalbugtracker.txt 2010-02-24 03:49:45 +0000
16+++ lib/lp/bugs/doc/externalbugtracker.txt 2010-03-15 14:09:31 +0000
17@@ -1105,110 +1105,3 @@
18 the updateBugTrackers() method. This, too, takes a batch_size parameter, which
19 allows it to be passed as a command-line option when the checkwatches script
20 is run.
21-
22-Before going further, we must abort the current transaction to avoid
23-deadlock; updateBugTrackers() runs updateBugTracker() in a different
24-thread.
25-
26- >>> transaction.abort()
27-
28- >>> from canonical.launchpad.scripts.logger import FakeLogger
29- >>> bug_watch_updater = NonConnectingBugWatchUpdater(
30- ... transaction, FakeLogger())
31- >>> bug_watch_updater.updateBugTrackers(
32- ... bug_tracker_names=[standard_bugzilla.name], batch_size=2)
33- DEBUG Using a global batch size of 2
34- INFO Updating 2 watches for 2 bugs on http://example.com
35- initializeRemoteBugDB() called: [u'5', u'6']
36- getRemoteStatus() called: u'5'
37- getRemoteStatus() called: u'6'
38-
39- >>> # We should log in again because updateBugTrackers() logs out.
40- >>> login('test@canonical.com')
41-
42-By default, the updateBugTrackers() only spawns one thread, but it can
43-spawn as many as required.
44-
45- >>> import threading
46-
47- >>> class OutputFileForThreads:
48- ... def __init__(self):
49- ... self.output = {}
50- ... self.lock = threading.Lock()
51- ... def write(self, data):
52- ... thread_name = threading.currentThread().getName()
53- ... self.lock.acquire()
54- ... try:
55- ... if thread_name in self.output:
56- ... self.output[thread_name].append(data)
57- ... else:
58- ... self.output[thread_name] = [data]
59- ... finally:
60- ... self.lock.release()
61-
62- >>> output_file = OutputFileForThreads()
63-
64- >>> class ExternalBugTrackerForThreads(TestExternalBugTracker):
65- ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked):
66- ... print >> output_file, (
67- ... "getModifiedRemoteBugs(\n"
68- ... " remote_bug_ids=%r,\n"
69- ... " last_checked=%r)" % (remote_bug_ids, last_checked))
70- ... return [remote_bug_ids[0], remote_bug_ids[-1]]
71- ... def getRemoteStatus(self, bug_id):
72- ... print >> output_file, (
73- ... "getRemoteStatus(bug_id=%r)" % bug_id)
74- ... return 'UNKNOWN'
75- ... def getCurrentDBTime(self):
76- ... return None
77-
78- >>> class BugWatchUpdaterForThreads(BugWatchUpdater):
79- ... def _getExternalBugTrackersAndWatches(
80- ... self, bug_trackers, bug_watches):
81- ... return [(ExternalBugTrackerForThreads(), bug_watches)]
82-
83- >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads(
84- ... transaction, FakeLogger(output_file))
85- >>> threaded_bug_watch_updater.updateBugTrackers(
86- ... batch_size=5, num_threads=10)
87-
88-
89- >>> for thread_name in sorted(output_file.output):
90- ... print '== %s ==' % thread_name
91- ... print "".join(output_file.output[thread_name]),
92- == MainThread ==
93- DEBUG Using a global batch size of 5
94- DEBUG Skipping updating Ubuntu Bugzilla watches.
95- == auto-generic-string4.example.com ==
96- INFO Updating 5 watches for 5 bugs on http://example.com
97- getRemoteStatus(bug_id=u'1')
98- getRemoteStatus(bug_id=u'101')
99- getRemoteStatus(bug_id=u'5')
100- getRemoteStatus(bug_id=u'6')
101- getRemoteStatus(bug_id=u'7')
102- == debbugs ==
103- INFO Updating 5 watches for 5 bugs on http://example.com
104- getRemoteStatus(bug_id=u'280883')
105- getRemoteStatus(bug_id=u'304014')
106- getRemoteStatus(bug_id=u'308994')
107- getRemoteStatus(bug_id=u'327452')
108- getRemoteStatus(bug_id=u'327549')
109- == email ==
110- DEBUG No watches to update on mailto:bugs@example.com
111- == example-bugs ==
112- DEBUG No watches to update on http://bugs.example.com
113- == gnome-bugs ==
114- DEBUG No watches to update on http://bugzilla.gnome.org/
115- == gnome-bugzilla ==
116- INFO Updating 2 watches for 2 bugs on http://example.com
117- getRemoteStatus(bug_id=u'304070')
118- getRemoteStatus(bug_id=u'3224')
119- == mozilla.org ==
120- INFO Updating 4 watches for 3 bugs on http://example.com
121- getRemoteStatus(bug_id=u'123543')
122- getRemoteStatus(bug_id=u'2000')
123- getRemoteStatus(bug_id=u'42')
124- == savannah ==
125- DEBUG No watches to update on http://savannah.gnu.org/
126- == sf ==
127- DEBUG No watches to update on http://sourceforge.net/
128
129=== modified file 'lib/lp/bugs/scripts/checkwatches.py'
130--- lib/lp/bugs/scripts/checkwatches.py 2010-02-19 12:05:10 +0000
131+++ lib/lp/bugs/scripts/checkwatches.py 2010-03-15 14:09:31 +0000
132@@ -6,7 +6,6 @@
133
134 from copy import copy
135 from datetime import datetime, timedelta
136-import Queue as queue
137 import socket
138 import sys
139 import threading
140@@ -14,6 +13,11 @@
141
142 import pytz
143
144+from twisted.internet import reactor
145+from twisted.internet.defer import DeferredList
146+from twisted.internet.threads import deferToThreadPool
147+from twisted.python.threadpool import ThreadPool
148+
149 from zope.component import getUtility
150 from zope.event import notify
151
152@@ -271,44 +275,28 @@
153 self._logout()
154
155 def updateBugTrackers(
156- self, bug_tracker_names=None, batch_size=None, num_threads=1):
157+ self, bug_tracker_names=None, batch_size=None, scheduler=None):
158 """Update all the bug trackers that have watches pending.
159
160 If bug tracker names are specified in bug_tracker_names only
161 those bug trackers will be checked.
162
163- The updates are run in threads, so that long running updates
164- don't block progress. However, by default the number of
165- threads is 1, to help with testing.
166+ A custom scheduler can be passed in. This should inherit from
167+ `BaseScheduler`. If no scheduler is given, `SerialScheduler`
168+ will be used, which simply runs the jobs in order.
169 """
170 self.log.debug("Using a global batch size of %s" % batch_size)
171
172- # Put all the work on the queue. This is simpler than drip-feeding the
173- # queue, and avoids a situation where a worker thread exits because
174- # there's no work left and the feeding thread hasn't been scheduled to
175- # add work to the queue.
176- work = queue.Queue()
177+ # Default to using the very simple SerialScheduler.
178+ if scheduler is None:
179+ scheduler = SerialScheduler()
180+
181+ # Schedule all the jobs to run.
182 for updater in self._bugTrackerUpdaters(bug_tracker_names):
183- work.put(updater)
184-
185- # This will be run once in each worker thread.
186- def do_work():
187- while True:
188- try:
189- job = work.get(block=False)
190- except queue.Empty:
191- break
192- else:
193- job(batch_size)
194-
195- # Start and join the worker threads.
196- threads = []
197- for run in xrange(num_threads):
198- thread = threading.Thread(target=do_work)
199- thread.start()
200- threads.append(thread)
201- for thread in threads:
202- thread.join()
203+ scheduler.schedule(updater, batch_size)
204+
205+ # Run all the jobs.
206+ scheduler.run()
207
208 def updateBugTracker(self, bug_tracker, batch_size):
209 """Updates the given bug trackers's bug watches.
210@@ -1165,6 +1153,67 @@
211 self.log.error("%s (%s)" % (message, oops_info.oopsid))
212
213
214+class BaseScheduler:
215+ """Run jobs according to a policy."""
216+
217+ def schedule(self, func, *args, **kwargs):
218+ """Add a job to be run."""
219+ raise NotImplementedError(self.schedule)
220+
221+ def run(self):
222+ """Run the jobs."""
223+ raise NotImplementedError(self.run)
224+
225+
226+class SerialScheduler(BaseScheduler):
227+ """Run jobs in order, one at a time."""
228+
229+ def __init__(self):
230+ self._jobs = []
231+
232+ def schedule(self, func, *args, **kwargs):
233+ self._jobs.append((func, args, kwargs))
234+
235+ def run(self):
236+ jobs, self._jobs = self._jobs[:], []
237+ for (func, args, kwargs) in jobs:
238+ func(*args, **kwargs)
239+
240+
241+class TwistedThreadScheduler(BaseScheduler):
242+ """Run jobs in threads, chaperoned by Twisted."""
243+
244+ def __init__(self, num_threads, install_signal_handlers=True):
245+ """Create a new `TwistedThreadScheduler`.
246+
247+ :param num_threads: The number of threads to allocate to the
248+ thread pool.
249+ :type num_threads: int
250+
251+ :param install_signal_handlers: Whether the Twisted reactor
252+ should install signal handlers or not. This is intented for
253+ testing - set to False to avoid layer violations - but may
254+ be useful in other situations.
255+ :type install_signal_handlers: bool
256+ """
257+ self._thread_pool = ThreadPool(0, num_threads)
258+ self._install_signal_handlers = install_signal_handlers
259+ self._jobs = []
260+
261+ def schedule(self, func, *args, **kwargs):
262+ self._jobs.append(
263+ deferToThreadPool(
264+ reactor, self._thread_pool, func, *args, **kwargs))
265+
266+ def run(self):
267+ jobs, self._jobs = self._jobs[:], []
268+ jobs_done = DeferredList(jobs)
269+ jobs_done.addBoth(lambda ignore: self._thread_pool.stop())
270+ jobs_done.addBoth(lambda ignore: reactor.stop())
271+ reactor.callWhenRunning(self._thread_pool.start)
272+ reactor.run(self._install_signal_handlers)
273+
274+
275 class CheckWatchesCronScript(LaunchpadCronScript):
276
277 def add_my_options(self):
278@@ -1203,9 +1252,16 @@
279 else:
280 # Otherwise we just update those watches that need updating,
281 # and we let the BugWatchUpdater decide which those are.
282+ if self.options.jobs <= 1:
283+ # Use the default scheduler.
284+ scheduler = None
285+ else:
286+ # Run jobs in parallel.
287+ scheduler = TwistedThreadScheduler(self.options.jobs)
288 updater.updateBugTrackers(
289- self.options.bug_trackers, self.options.batch_size,
290- self.options.jobs)
291+ self.options.bug_trackers,
292+ self.options.batch_size,
293+ scheduler)
294
295 run_time = time.time() - start_time
296 self.logger.info("Time for this run: %.3f seconds." % run_time)
297
298=== modified file 'lib/lp/bugs/scripts/tests/test_checkwatches.py'
299--- lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-01-05 12:19:17 +0000
300+++ lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-03-15 14:09:31 +0000
301@@ -2,9 +2,13 @@
302 # GNU Affero General Public License version 3 (see the file LICENSE).
303 """Checkwatches unit tests."""
304
305+from __future__ import with_statement
306+
307 __metaclass__ = type
308
309+import threading
310 import unittest
311+
312 import transaction
313
314 from zope.component import getUtility
315@@ -19,11 +23,13 @@
316 from canonical.testing import LaunchpadZopelessLayer
317
318 from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI
319+from lp.bugs.interfaces.bugtracker import IBugTrackerSet
320 from lp.bugs.scripts import checkwatches
321-from lp.bugs.scripts.checkwatches import CheckWatchesErrorUtility
322+from lp.bugs.scripts.checkwatches import (
323+ BugWatchUpdater, CheckWatchesErrorUtility, TwistedThreadScheduler)
324 from lp.bugs.tests.externalbugtracker import (
325- TestBugzillaAPIXMLRPCTransport, new_bugtracker)
326-from lp.testing import TestCaseWithFactory
327+ TestBugzillaAPIXMLRPCTransport, TestExternalBugTracker, new_bugtracker)
328+from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess
329
330
331 def always_BugzillaAPI_get_external_bugtracker(bugtracker):
332@@ -202,5 +208,171 @@
333 self.bugtask_with_question.status.title))
334
335
336+class TestSchedulerBase:
337+
338+ def test_args_and_kwargs(self):
339+ def func(name, aptitude):
340+ self.failUnlessEqual("Robin Hood", name)
341+ self.failUnlessEqual("Riding through the glen", aptitude)
342+ # Positional args specified when adding a job are passed to
343+ # the job function at run time.
344+ self.scheduler.schedule(
345+ func, "Robin Hood", "Riding through the glen")
346+ # Keyword args specified when adding a job are passed to the
347+ # job function at run time.
348+ self.scheduler.schedule(
349+ func, name="Robin Hood", aptitude="Riding through the glen")
350+ # Positional and keyword args can both be specified.
351+ self.scheduler.schedule(
352+ func, "Robin Hood", aptitude="Riding through the glen")
353+ # Run everything.
354+ self.scheduler.run()
355+
356+
357+class TestSerialScheduler(TestSchedulerBase, unittest.TestCase):
358+ """Test SerialScheduler."""
359+
360+ def setUp(self):
361+ self.scheduler = checkwatches.SerialScheduler()
362+
363+ def test_ordering(self):
364+ # The numbers list will be emptied in the order we add jobs to
365+ # the scheduler.
366+ numbers = [1, 2, 3]
367+ # Remove 3 and check.
368+ self.scheduler.schedule(
369+ list.remove, numbers, 3)
370+ self.scheduler.schedule(
371+ lambda: self.failUnlessEqual([1, 2], numbers))
372+ # Remove 1 and check.
373+ self.scheduler.schedule(
374+ list.remove, numbers, 1)
375+ self.scheduler.schedule(
376+ lambda: self.failUnlessEqual([2], numbers))
377+ # Remove 2 and check.
378+ self.scheduler.schedule(
379+ list.remove, numbers, 2)
380+ self.scheduler.schedule(
381+ lambda: self.failUnlessEqual([], numbers))
382+ # Run the scheduler.
383+ self.scheduler.run()
384+
385+
386+class TestTwistedThreadScheduler(TestSchedulerBase, unittest.TestCase):
387+ """Test TwistedThreadScheduler.
388+
389+ By default, updateBugTrackers() runs jobs serially, but a
390+ different scheduling policy can be plugged in. One such policy,
391+ for running several jobs in parallel, is TwistedThreadScheduler.
392+ """
393+
394+ def setUp(self):
395+ self.scheduler = checkwatches.TwistedThreadScheduler(
396+ num_threads=5, install_signal_handlers=False)
397+
398+
399+class OutputFileForThreads:
400+ """Collates writes according to thread name."""
401+
402+ def __init__(self):
403+ self.output = {}
404+ self.lock = threading.Lock()
405+
406+ def write(self, data):
407+ thread_name = threading.currentThread().getName()
408+ with self.lock:
409+ if thread_name in self.output:
410+ self.output[thread_name].append(data)
411+ else:
412+ self.output[thread_name] = [data]
413+
414+
415+class ExternalBugTrackerForThreads(TestExternalBugTracker):
416+ """Fake which records interesting activity to a file."""
417+
418+ def __init__(self, output_file):
419+ super(ExternalBugTrackerForThreads, self).__init__()
420+ self.output_file = output_file
421+
422+ def getRemoteStatus(self, bug_id):
423+ self.output_file.write("getRemoteStatus(bug_id=%r)" % bug_id)
424+ return 'UNKNOWN'
425+
426+ def getCurrentDBTime(self):
427+ return None
428+
429+
430+class BugWatchUpdaterForThreads(BugWatchUpdater):
431+ """Fake updater.
432+
433+ Plumbs an `ExternalBugTrackerForThreads` into a given output file,
434+ which is expected to be an instance of `OutputFileForThreads`, and
435+ suppresses normal log activity.
436+ """
437+
438+ def __init__(self, output_file):
439+ logger = QuietFakeLogger()
440+ super(BugWatchUpdaterForThreads, self).__init__(transaction, logger)
441+ self.output_file = output_file
442+
443+ def _getExternalBugTrackersAndWatches(self, bug_trackers, bug_watches):
444+ return [(ExternalBugTrackerForThreads(self.output_file), bug_watches)]
445+
446+
447+class TestTwistedThreadSchedulerInPlace(
448+ ZopeTestInSubProcess, TestCaseWithFactory):
449+ """Test TwistedThreadScheduler in place.
450+
451+ As in, driving as much of the bug watch machinery as is possible
452+ without making external connections.
453+ """
454+
455+ layer = LaunchpadZopelessLayer
456+
457+ def test(self):
458+ # Prepare test data.
459+ self.owner = self.factory.makePerson()
460+ self.trackers = [
461+ getUtility(IBugTrackerSet).ensureBugTracker(
462+ "http://butterscotch.example.com", self.owner,
463+ BugTrackerType.BUGZILLA, name="butterscotch"),
464+ getUtility(IBugTrackerSet).ensureBugTracker(
465+ "http://strawberry.example.com", self.owner,
466+ BugTrackerType.BUGZILLA, name="strawberry"),
467+ ]
468+ self.bug = self.factory.makeBug(owner=self.owner)
469+ for tracker in self.trackers:
470+ for num in (1, 2, 3):
471+ self.factory.makeBugWatch(
472+ "%s-%d" % (tracker.name, num),
473+ tracker, self.bug, self.owner)
474+ # Commit so that threads all see the same database state.
475+ transaction.commit()
476+ # Prepare the updater with the Twisted scheduler.
477+ output_file = OutputFileForThreads()
478+ threaded_bug_watch_updater = BugWatchUpdaterForThreads(output_file)
479+ threaded_bug_watch_scheduler = TwistedThreadScheduler(
480+ num_threads=10, install_signal_handlers=False)
481+ threaded_bug_watch_updater.updateBugTrackers(
482+ bug_tracker_names=[tracker.name for tracker in self.trackers],
483+ batch_size=5, scheduler=threaded_bug_watch_scheduler)
484+ # The thread names should match the tracker names.
485+ self.assertEqual(
486+ ['butterscotch', 'strawberry'], sorted(output_file.output))
487+ # Check that getRemoteStatus() was called.
488+ self.assertEqual(
489+ ["getRemoteStatus(bug_id=u'butterscotch-1')",
490+ "getRemoteStatus(bug_id=u'butterscotch-2')",
491+ "getRemoteStatus(bug_id=u'butterscotch-3')"],
492+ output_file.output['butterscotch']
493+ )
494+ self.assertEqual(
495+ ["getRemoteStatus(bug_id=u'strawberry-1')",
496+ "getRemoteStatus(bug_id=u'strawberry-2')",
497+ "getRemoteStatus(bug_id=u'strawberry-3')"],
498+ output_file.output['strawberry']
499+ )
500+
501+
502 def test_suite():
503 return unittest.TestLoader().loadTestsFromName(__name__)
504
505=== modified file 'lib/lp/scripts/utilities/importfascist.py'
506--- lib/lp/scripts/utilities/importfascist.py 2010-02-23 11:53:24 +0000
507+++ lib/lp/scripts/utilities/importfascist.py 2010-03-15 14:09:31 +0000
508@@ -62,6 +62,7 @@
509 'openid.fetchers': set(['Urllib2Fetcher']),
510 'storm.database': set(['STATE_DISCONNECTED']),
511 'textwrap': set(['dedent']),
512+ 'twisted.internet.threads': set(['deferToThreadPool']),
513 'zope.component': set(
514 ['adapter',
515 'ComponentLookupError',
516
517=== modified file 'lib/lp/services/job/runner.py'
518--- lib/lp/services/job/runner.py 2010-02-25 12:07:15 +0000
519+++ lib/lp/services/job/runner.py 2010-03-15 14:09:31 +0000
520@@ -401,9 +401,10 @@
521 class JobCronScript(LaunchpadCronScript):
522 """Base class for scripts that run jobs."""
523
524- def __init__(self, runner_class=JobRunner):
525+ def __init__(self, runner_class=JobRunner, test_args=None):
526 self.dbuser = getattr(config, self.config_name).dbuser
527- super(JobCronScript, self).__init__(self.config_name, self.dbuser)
528+ super(JobCronScript, self).__init__(
529+ self.config_name, self.dbuser, test_args)
530 self.runner_class = runner_class
531
532 def main(self):
533
534=== modified file 'lib/lp/services/job/tests/test_runner.py'
535--- lib/lp/services/job/tests/test_runner.py 2010-02-24 19:52:01 +0000
536+++ lib/lp/services/job/tests/test_runner.py 2010-03-15 14:09:31 +0000
537@@ -11,23 +11,24 @@
538 from unittest import TestLoader
539
540 import transaction
541-from canonical.testing import LaunchpadZopelessLayer
542+
543 from zope.component import getUtility
544 from zope.error.interfaces import IErrorReportingUtility
545 from zope.interface import implements
546
547+from canonical.launchpad.webapp import errorlog
548+from canonical.launchpad.webapp.interfaces import (
549+ DEFAULT_FLAVOR, IStoreSelector, MAIN_STORE)
550+from canonical.testing import LaunchpadZopelessLayer
551+
552 from lp.code.interfaces.branchmergeproposal import (
553- IUpdatePreviewDiffJobSource,)
554-from lp.testing.mail_helpers import pop_notifications
555-from lp.services.job.runner import (
556- JobCronScript, JobRunner, BaseRunnableJob, TwistedJobRunner
557-)
558+ IUpdatePreviewDiffJobSource)
559 from lp.services.job.interfaces.job import JobStatus, IRunnableJob
560 from lp.services.job.model.job import Job
561-from lp.testing import TestCaseWithFactory
562-from canonical.launchpad.webapp import errorlog
563-from canonical.launchpad.webapp.interfaces import (
564- IStoreSelector, MAIN_STORE, DEFAULT_FLAVOR)
565+from lp.services.job.runner import (
566+ BaseRunnableJob, JobCronScript, JobRunner, TwistedJobRunner)
567+from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess
568+from lp.testing.mail_helpers import pop_notifications
569
570
571 class NullJob(BaseRunnableJob):
572@@ -300,7 +301,7 @@
573 self.entries.append(input)
574
575
576-class TestTwistedJobRunner(TestCaseWithFactory):
577+class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
578
579 layer = LaunchpadZopelessLayer
580
581@@ -325,7 +326,7 @@
582 self.assertIn('Job ran too long.', oops.value)
583
584
585-class TestJobCronScript(TestCaseWithFactory):
586+class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
587
588 layer = LaunchpadZopelessLayer
589
590@@ -351,7 +352,8 @@
591 source_interface = IUpdatePreviewDiffJobSource
592
593 def __init__(self):
594- super(JobCronScriptSubclass, self).__init__(DummyRunner)
595+ super(JobCronScriptSubclass, self).__init__(
596+ DummyRunner, test_args=[])
597 self.logger = ListLogger()
598
599 old_errorlog = errorlog.globalErrorUtility
600
601=== modified file 'lib/lp/testing/__init__.py'
602--- lib/lp/testing/__init__.py 2010-03-01 03:06:02 +0000
603+++ lib/lp/testing/__init__.py 2010-03-15 14:09:31 +0000
604@@ -29,6 +29,7 @@
605 'validate_mock_class',
606 'WindmillTestCase',
607 'with_anonymous_login',
608+ 'ZopeTestInSubProcess',
609 ]
610
611 import copy
612@@ -38,6 +39,8 @@
613 from pprint import pformat
614 import shutil
615 import subprocess
616+import subunit
617+import sys
618 import tempfile
619 import time
620
621@@ -62,6 +65,7 @@
622 from zope.interface.verify import verifyClass, verifyObject
623 from zope.security.proxy import (
624 isinstance as zope_isinstance, removeSecurityProxy)
625+from zope.testing.testrunner.runner import TestResult as ZopeTestResult
626
627 from canonical.launchpad.webapp import errorlog
628 from canonical.config import config
629@@ -586,6 +590,57 @@
630 self.client.open(url=u'http://launchpad.dev:8085')
631
632
633+class ZopeTestInSubProcess:
634+ """Run tests in a sub-process, respecting Zope idiosyncrasies.
635+
636+ Use this as a mixin with an interesting `TestCase` to isolate
637+ tests with side-effects. Each and every test *method* in the test
638+ case is run in a new, forked, sub-process. This will slow down
639+ your tests, so use it sparingly. However, when you need to, for
640+ example, start the Twisted reactor (which cannot currently be
641+ safely stopped and restarted in process) it is invaluable.
642+
643+ This is basically a reimplementation of subunit's
644+ `IsolatedTestCase` or `IsolatedTestSuite`, but adjusted to work
645+ with Zope. In particular, Zope's TestResult object is responsible
646+ for calling testSetUp() and testTearDown() on the selected layer.
647+ """
648+
649+ def run(self, result):
650+ assert isinstance(result, ZopeTestResult), (
651+ "result must be a Zope result object, not %r." % (result,))
652+ pread, pwrite = os.pipe()
653+ pid = os.fork()
654+ if pid == 0:
655+ # Child.
656+ os.close(pread)
657+ fdwrite = os.fdopen(pwrite, 'w', 1)
658+ # Send results to both the Zope result object (so that
659+ # layer setup and teardown are done properly, etc.) and to
660+ # the subunit stream client so that the parent process can
661+ # obtain the result.
662+ result = testtools.MultiTestResult(
663+ result, subunit.TestProtocolClient(fdwrite))
664+ super(ZopeTestInSubProcess, self).run(result)
665+ fdwrite.flush()
666+ sys.stdout.flush()
667+ sys.stderr.flush()
668+ # Exit hard.
669+ os._exit(0)
670+ else:
671+ # Parent.
672+ os.close(pwrite)
673+ fdread = os.fdopen(pread, 'rU')
674+ # Accept the result from the child process. Skip all the
675+ # Zope-specific result stuff by passing a super() of the
676+ # result.
677+ result = super(ZopeTestResult, result)
678+ protocol = subunit.TestProtocolServer(result)
679+ protocol.readFrom(fdread)
680+ fdread.close()
681+ os.waitpid(pid, 0)
682+
683+
684 def capture_events(callable_obj, *args, **kwargs):
685 """Capture the events emitted by a callable.
686
687
688=== added file 'lib/lp/testing/tests/test_zope_test_in_subprocess.py'
689--- lib/lp/testing/tests/test_zope_test_in_subprocess.py 1970-01-01 00:00:00 +0000
690+++ lib/lp/testing/tests/test_zope_test_in_subprocess.py 2010-03-15 14:09:31 +0000
691@@ -0,0 +1,131 @@
692+# Copyright 2010 Canonical Ltd. This software is licensed under the
693+# GNU Affero General Public License version 3 (see the file LICENSE).
694+
695+"""Test `lp.testing.ZopeTestInSubProcess`.
696+
697+How does it do this?
698+
699+A `TestCase`, mixed-in with `ZopeTestInSubProcess`, is run by the Zope
700+test runner. This test case sets its own layer, to keep track of the
701+PIDs when certain methods are called. It also records pids for its own
702+methods. Assertions are made as these methods are called to ensure that
703+they are running in the correct process - the parent or the child.
704+
705+Recording of the PIDs is handled using the `record_pid` decorator.
706+"""
707+
708+__metaclass__ = type
709+
710+import functools
711+import os
712+import unittest
713+
714+from lp.testing import ZopeTestInSubProcess
715+
716+
717+def record_pid(method):
718+ """Decorator that records the pid at method invocation.
719+
720+ Will probably only DTRT with class methods or bound instance
721+ methods.
722+ """
723+ @functools.wraps(method)
724+ def wrapper(self, *args, **kwargs):
725+ setattr(self, 'pid_in_%s' % method.__name__, os.getpid())
726+ return method(self, *args, **kwargs)
727+ return wrapper
728+
729+
730+class TestZopeTestInSubProcessLayer:
731+ """Helper to test `ZopeTestInSubProcess`.
732+
733+ Asserts that layers are set up and torn down in the expected way,
734+ namely that setUp() and tearDown() are called in the parent
735+ process, and testSetUp() and testTearDown() are called in the
736+ child process.
737+
738+ The assertions for tearDown() and testTearDown() must be done here
739+ because the test case runs before these methods are called. In the
740+ interests of symmetry and clarity, the assertions for setUp() and
741+ testSetUp() are done here too.
742+
743+ This layer expects to be *instantiated*, which is not the norm for
744+ Zope layers. See `TestZopeTestInSubProcess` for its use.
745+ """
746+
747+ @record_pid
748+ def __init__(self):
749+ # These are needed to satisfy the requirements of the
750+ # byzantine Zope layer machinery.
751+ self.__name__ = self.__class__.__name__
752+ self.__bases__ = self.__class__.__bases__
753+
754+ @record_pid
755+ def setUp(self):
756+ # Runs in the parent process.
757+ assert self.pid_in___init__ == self.pid_in_setUp, (
758+ "layer.setUp() not called in parent process.")
759+
760+ @record_pid
761+ def testSetUp(self):
762+ # Runs in the child process.
763+ assert self.pid_in___init__ != self.pid_in_testSetUp, (
764+ "layer.testSetUp() called in parent process.")
765+
766+ @record_pid
767+ def testTearDown(self):
768+ # Runs in the child process.
769+ assert self.pid_in_testSetUp == self.pid_in_testTearDown, (
770+ "layer.testTearDown() not called in same process as testSetUp().")
771+
772+ @record_pid
773+ def tearDown(self):
774+ # Runs in the parent process.
775+ assert self.pid_in___init__ == self.pid_in_tearDown, (
776+ "layer.tearDown() not called in parent process.")
777+
778+
779+class TestZopeTestInSubProcess(ZopeTestInSubProcess, unittest.TestCase):
780+ """Test `ZopeTestInSubProcess`.
781+
782+ Assert that setUp(), test() and tearDown() are called in the child
783+ process.
784+
785+ Sets its own layer attribute. This layer is then responsible for
786+ recording the PID at interesting moments. Specifically,
787+ layer.testSetUp() must be called in the same process as
788+ test.setUp().
789+ """
790+
791+ @record_pid
792+ def __init__(self, method_name='runTest'):
793+ # Runs in the parent process.
794+ super(TestZopeTestInSubProcess, self).__init__(method_name)
795+ self.layer = TestZopeTestInSubProcessLayer()
796+
797+ @record_pid
798+ def setUp(self):
799+ # Runs in the child process.
800+ super(TestZopeTestInSubProcess, self).setUp()
801+ self.failUnlessEqual(
802+ self.layer.pid_in_testSetUp, self.pid_in_setUp,
803+ "setUp() not called in same process as layer.testSetUp().")
804+
805+ @record_pid
806+ def test(self):
807+ # Runs in the child process.
808+ self.failUnlessEqual(
809+ self.pid_in_setUp, self.pid_in_test,
810+ "test method not run in same process as setUp().")
811+
812+ @record_pid
813+ def tearDown(self):
814+ # Runs in the child process.
815+ super(TestZopeTestInSubProcess, self).tearDown()
816+ self.failUnlessEqual(
817+ self.pid_in_setUp, self.pid_in_tearDown,
818+ "tearDown() not run in same process as setUp().")
819+
820+
821+def test_suite():
822+ return unittest.TestLoader().loadTestsFromName(__name__)