Merge lp:~allenap/launchpad/twisted-threading-bug-491870 into lp:launchpad

Proposed by Gavin Panella
Status: Rejected
Rejected by: Gavin Panella
Proposed branch: lp:~allenap/launchpad/twisted-threading-bug-491870
Merge into: lp:launchpad
Diff against target: 822 lines (+469/-158)
9 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+1/-1)
lib/lp/bugs/doc/externalbugtracker.txt (+0/-107)
lib/lp/bugs/scripts/checkwatches.py (+88/-32)
lib/lp/bugs/scripts/tests/test_checkwatches.py (+175/-3)
lib/lp/scripts/utilities/importfascist.py (+1/-0)
lib/lp/services/job/runner.py (+3/-2)
lib/lp/services/job/tests/test_runner.py (+15/-13)
lib/lp/testing/__init__.py (+55/-0)
lib/lp/testing/tests/test_zope_test_in_subprocess.py (+131/-0)
To merge this branch: bzr merge lp:~allenap/launchpad/twisted-threading-bug-491870
Reviewer Review Type Date Requested Status
Henning Eggers (community) code Approve
Review via email: mp+18843@code.launchpad.net

Commit message

In checkwatches, use Twisted to manage threading instead of the threading module.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

checkwatches can update bug watches for different remote trackers in parallel. Currently that's implemented using threads via the threading module. This branch continues to use threads, but does so with a Twisted ThreadPool.

To make testing easier, and to make the design more elegant, it's possible to pass in a scheduler to BugWatchUpdate.updateBugTrackers(). Previously the scheduling policy was baked into this method.

Getting Twisted in there also opens the door for using more async code in the future.

Revision history for this message
Henning Eggers (henninge) wrote :

Thanks for this improvement!

We talked about some things on IRC:
- Use of lambda is against LP coding style but common in Twisted. We will raise the issue at the reviewers meeting and you can leave it as it is for now. If the style change gets rejected, I ask you to fix this in a follow-up branch. Or wait until after the meeting before landing.
- The intended use of install_signal_handlers to avoid errors in the test should be documented.
- You agreed to add an explicit test for SerialScheduler.

Thanks for your patience with me ... ;-)

review: Approve (code)
Revision history for this message
Jonathan Lange (jml) wrote :

On Mon, Feb 8, 2010 at 5:38 PM, Henning Eggers
<email address hidden> wrote:
> Review: Approve code
> Thanks for this improvement!
>
> We talked about some things on IRC:
> - Use of lambda is against LP coding style but common in Twisted. We will raise the issue at the reviewers meeting and you can leave it as it is for now. If the style change gets rejected, I ask you to fix this in a follow-up branch. Or wait until after the meeting before landing.

FWIW, if the reviewers decree that lambda is too hard to read, then
it's probably worth defining a function like this:

  def kerchop(f):
    def callback(ignored, *args, **kwargs):
      return f(*args, **kwargs)
    return mergeFunctionMetadata(f, callback)

And then replacing:
  d.addCallback(lambda ignored: reactor.stop())

with:
  d.addCallback(kerchop(reactor.stop))

The function ought to go somewhere in lp.services, and have a better
name than 'kerchop'.

jml

Revision history for this message
Gavin Panella (allenap) wrote :

Hi Henning,

Thank you for the review. I added some more info about install_signal_handlers, and added some tests for SerialScheduler.

I also switched to using the default reactor thread pool instead of creating my own in TwistedThreadScheduler. I felt it was better to be explicit, but two things changed my mind:

 * We're calling reactor.run(). It's ours, we're not sharing it, so no harm in just using its thread pool.

 * The import fascist was complaining about importing deferToThreadPool. Maybe this should be in its module's __all__, maybe not. I'll file a bug for that.

Incremental diff: http://paste.ubuntu.com/372542/

Thanks, Gavin.

Revision history for this message
Gavin Panella (allenap) wrote :

> The import fascist was complaining about importing
> deferToThreadPool. Maybe this should be in its module's
> __all__, maybe not. I'll file a bug for that.

http://twistedmatrix.com/trac/ticket/4264

Revision history for this message
Jonathan Lange (jml) wrote :

The fascist has a place where you can disable the warning: lib/lp/scripts/utilities/importfascist.py, "valid_imports_not_in_all".

Revision history for this message
Gavin Panella (allenap) wrote :

Okay, I'm flip-flopping on this. Back To The Threadpool.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt'
--- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-17 11:19:25 +0000
+++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2010-03-15 14:09:31 +0000
@@ -34,7 +34,7 @@
34 >>> from lp.testing.factory import LaunchpadObjectFactory34 >>> from lp.testing.factory import LaunchpadObjectFactory
3535
36 >>> LaunchpadZopelessLayer.switchDbUser('launchpad')36 >>> LaunchpadZopelessLayer.switchDbUser('launchpad')
37 >>> login('foo.bar@canonical.com')37 >>> login('foo.bar@canonical.com')
38 >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker(38 >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker(
39 ... 'http://example.com')39 ... 'http://example.com')
40 >>> bug_tracker.active = False40 >>> bug_tracker.active = False
4141
=== modified file 'lib/lp/bugs/doc/externalbugtracker.txt'
--- lib/lp/bugs/doc/externalbugtracker.txt 2010-02-24 03:49:45 +0000
+++ lib/lp/bugs/doc/externalbugtracker.txt 2010-03-15 14:09:31 +0000
@@ -1105,110 +1105,3 @@
1105the updateBugTrackers() method. This, too, takes a batch_size parameter, which1105the updateBugTrackers() method. This, too, takes a batch_size parameter, which
1106allows it to be passed as a command-line option when the checkwatches script1106allows it to be passed as a command-line option when the checkwatches script
1107is run.1107is run.
1108
1109Before going further, we must abort the current transaction to avoid
1110deadlock; updateBugTrackers() runs updateBugTracker() in a different
1111thread.
1112
1113 >>> transaction.abort()
1114
1115 >>> from canonical.launchpad.scripts.logger import FakeLogger
1116 >>> bug_watch_updater = NonConnectingBugWatchUpdater(
1117 ... transaction, FakeLogger())
1118 >>> bug_watch_updater.updateBugTrackers(
1119 ... bug_tracker_names=[standard_bugzilla.name], batch_size=2)
1120 DEBUG Using a global batch size of 2
1121 INFO Updating 2 watches for 2 bugs on http://example.com
1122 initializeRemoteBugDB() called: [u'5', u'6']
1123 getRemoteStatus() called: u'5'
1124 getRemoteStatus() called: u'6'
1125
1126 >>> # We should log in again because updateBugTrackers() logs out.
1127 >>> login('test@canonical.com')
1128
1129By default, the updateBugTrackers() only spawns one thread, but it can
1130spawn as many as required.
1131
1132 >>> import threading
1133
1134 >>> class OutputFileForThreads:
1135 ... def __init__(self):
1136 ... self.output = {}
1137 ... self.lock = threading.Lock()
1138 ... def write(self, data):
1139 ... thread_name = threading.currentThread().getName()
1140 ... self.lock.acquire()
1141 ... try:
1142 ... if thread_name in self.output:
1143 ... self.output[thread_name].append(data)
1144 ... else:
1145 ... self.output[thread_name] = [data]
1146 ... finally:
1147 ... self.lock.release()
1148
1149 >>> output_file = OutputFileForThreads()
1150
1151 >>> class ExternalBugTrackerForThreads(TestExternalBugTracker):
1152 ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked):
1153 ... print >> output_file, (
1154 ... "getModifiedRemoteBugs(\n"
1155 ... " remote_bug_ids=%r,\n"
1156 ... " last_checked=%r)" % (remote_bug_ids, last_checked))
1157 ... return [remote_bug_ids[0], remote_bug_ids[-1]]
1158 ... def getRemoteStatus(self, bug_id):
1159 ... print >> output_file, (
1160 ... "getRemoteStatus(bug_id=%r)" % bug_id)
1161 ... return 'UNKNOWN'
1162 ... def getCurrentDBTime(self):
1163 ... return None
1164
1165 >>> class BugWatchUpdaterForThreads(BugWatchUpdater):
1166 ... def _getExternalBugTrackersAndWatches(
1167 ... self, bug_trackers, bug_watches):
1168 ... return [(ExternalBugTrackerForThreads(), bug_watches)]
1169
1170 >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads(
1171 ... transaction, FakeLogger(output_file))
1172 >>> threaded_bug_watch_updater.updateBugTrackers(
1173 ... batch_size=5, num_threads=10)
1174
1175
1176 >>> for thread_name in sorted(output_file.output):
1177 ... print '== %s ==' % thread_name
1178 ... print "".join(output_file.output[thread_name]),
1179 == MainThread ==
1180 DEBUG Using a global batch size of 5
1181 DEBUG Skipping updating Ubuntu Bugzilla watches.
1182 == auto-generic-string4.example.com ==
1183 INFO Updating 5 watches for 5 bugs on http://example.com
1184 getRemoteStatus(bug_id=u'1')
1185 getRemoteStatus(bug_id=u'101')
1186 getRemoteStatus(bug_id=u'5')
1187 getRemoteStatus(bug_id=u'6')
1188 getRemoteStatus(bug_id=u'7')
1189 == debbugs ==
1190 INFO Updating 5 watches for 5 bugs on http://example.com
1191 getRemoteStatus(bug_id=u'280883')
1192 getRemoteStatus(bug_id=u'304014')
1193 getRemoteStatus(bug_id=u'308994')
1194 getRemoteStatus(bug_id=u'327452')
1195 getRemoteStatus(bug_id=u'327549')
1196 == email ==
1197 DEBUG No watches to update on mailto:bugs@example.com
1198 == example-bugs ==
1199 DEBUG No watches to update on http://bugs.example.com
1200 == gnome-bugs ==
1201 DEBUG No watches to update on http://bugzilla.gnome.org/
1202 == gnome-bugzilla ==
1203 INFO Updating 2 watches for 2 bugs on http://example.com
1204 getRemoteStatus(bug_id=u'304070')
1205 getRemoteStatus(bug_id=u'3224')
1206 == mozilla.org ==
1207 INFO Updating 4 watches for 3 bugs on http://example.com
1208 getRemoteStatus(bug_id=u'123543')
1209 getRemoteStatus(bug_id=u'2000')
1210 getRemoteStatus(bug_id=u'42')
1211 == savannah ==
1212 DEBUG No watches to update on http://savannah.gnu.org/
1213 == sf ==
1214 DEBUG No watches to update on http://sourceforge.net/
12151108
=== modified file 'lib/lp/bugs/scripts/checkwatches.py'
--- lib/lp/bugs/scripts/checkwatches.py 2010-02-19 12:05:10 +0000
+++ lib/lp/bugs/scripts/checkwatches.py 2010-03-15 14:09:31 +0000
@@ -6,7 +6,6 @@
66
7from copy import copy7from copy import copy
8from datetime import datetime, timedelta8from datetime import datetime, timedelta
9import Queue as queue
10import socket9import socket
11import sys10import sys
12import threading11import threading
@@ -14,6 +13,11 @@
1413
15import pytz14import pytz
1615
16from twisted.internet import reactor
17from twisted.internet.defer import DeferredList
18from twisted.internet.threads import deferToThreadPool
19from twisted.python.threadpool import ThreadPool
20
17from zope.component import getUtility21from zope.component import getUtility
18from zope.event import notify22from zope.event import notify
1923
@@ -271,44 +275,28 @@
271 self._logout()275 self._logout()
272276
273 def updateBugTrackers(277 def updateBugTrackers(
274 self, bug_tracker_names=None, batch_size=None, num_threads=1):278 self, bug_tracker_names=None, batch_size=None, scheduler=None):
275 """Update all the bug trackers that have watches pending.279 """Update all the bug trackers that have watches pending.
276280
277 If bug tracker names are specified in bug_tracker_names only281 If bug tracker names are specified in bug_tracker_names only
278 those bug trackers will be checked.282 those bug trackers will be checked.
279283
280 The updates are run in threads, so that long running updates284 A custom scheduler can be passed in. This should inherit from
281 don't block progress. However, by default the number of285 `BaseScheduler`. If no scheduler is given, `SerialScheduler`
282 threads is 1, to help with testing.286 will be used, which simply runs the jobs in order.
283 """287 """
284 self.log.debug("Using a global batch size of %s" % batch_size)288 self.log.debug("Using a global batch size of %s" % batch_size)
285289
286 # Put all the work on the queue. This is simpler than drip-feeding the290 # Default to using the very simple SerialScheduler.
287 # queue, and avoids a situation where a worker thread exits because291 if scheduler is None:
288 # there's no work left and the feeding thread hasn't been scheduled to292 scheduler = SerialScheduler()
289 # add work to the queue.293
290 work = queue.Queue()294 # Schedule all the jobs to run.
291 for updater in self._bugTrackerUpdaters(bug_tracker_names):295 for updater in self._bugTrackerUpdaters(bug_tracker_names):
292 work.put(updater)296 scheduler.schedule(updater, batch_size)
293297
294 # This will be run once in each worker thread.298 # Run all the jobs.
295 def do_work():299 scheduler.run()
296 while True:
297 try:
298 job = work.get(block=False)
299 except queue.Empty:
300 break
301 else:
302 job(batch_size)
303
304 # Start and join the worker threads.
305 threads = []
306 for run in xrange(num_threads):
307 thread = threading.Thread(target=do_work)
308 thread.start()
309 threads.append(thread)
310 for thread in threads:
311 thread.join()
312300
313 def updateBugTracker(self, bug_tracker, batch_size):301 def updateBugTracker(self, bug_tracker, batch_size):
314 """Updates the given bug trackers's bug watches.302 """Updates the given bug trackers's bug watches.
@@ -1165,6 +1153,67 @@
1165 self.log.error("%s (%s)" % (message, oops_info.oopsid))1153 self.log.error("%s (%s)" % (message, oops_info.oopsid))
11661154
11671155
1156class BaseScheduler:
1157 """Run jobs according to a policy."""
1158
1159 def schedule(self, func, *args, **kwargs):
1160 """Add a job to be run."""
1161 raise NotImplementedError(self.schedule)
1162
1163 def run(self):
1164 """Run the jobs."""
1165 raise NotImplementedError(self.run)
1166
1167
1168class SerialScheduler(BaseScheduler):
1169 """Run jobs in order, one at a time."""
1170
1171 def __init__(self):
1172 self._jobs = []
1173
1174 def schedule(self, func, *args, **kwargs):
1175 self._jobs.append((func, args, kwargs))
1176
1177 def run(self):
1178 jobs, self._jobs = self._jobs[:], []
1179 for (func, args, kwargs) in jobs:
1180 func(*args, **kwargs)
1181
1182
1183class TwistedThreadScheduler(BaseScheduler):
1184 """Run jobs in threads, chaperoned by Twisted."""
1185
1186 def __init__(self, num_threads, install_signal_handlers=True):
1187 """Create a new `TwistedThreadScheduler`.
1188
1189 :param num_threads: The number of threads to allocate to the
1190 thread pool.
1191 :type num_threads: int
1192
1193 :param install_signal_handlers: Whether the Twisted reactor
1194 should install signal handlers or not. This is intented for
1195 testing - set to False to avoid layer violations - but may
1196 be useful in other situations.
1197 :type install_signal_handlers: bool
1198 """
1199 self._thread_pool = ThreadPool(0, num_threads)
1200 self._install_signal_handlers = install_signal_handlers
1201 self._jobs = []
1202
1203 def schedule(self, func, *args, **kwargs):
1204 self._jobs.append(
1205 deferToThreadPool(
1206 reactor, self._thread_pool, func, *args, **kwargs))
1207
1208 def run(self):
1209 jobs, self._jobs = self._jobs[:], []
1210 jobs_done = DeferredList(jobs)
1211 jobs_done.addBoth(lambda ignore: self._thread_pool.stop())
1212 jobs_done.addBoth(lambda ignore: reactor.stop())
1213 reactor.callWhenRunning(self._thread_pool.start)
1214 reactor.run(self._install_signal_handlers)
1215
1216
1168class CheckWatchesCronScript(LaunchpadCronScript):1217class CheckWatchesCronScript(LaunchpadCronScript):
11691218
1170 def add_my_options(self):1219 def add_my_options(self):
@@ -1203,9 +1252,16 @@
1203 else:1252 else:
1204 # Otherwise we just update those watches that need updating,1253 # Otherwise we just update those watches that need updating,
1205 # and we let the BugWatchUpdater decide which those are.1254 # and we let the BugWatchUpdater decide which those are.
1255 if self.options.jobs <= 1:
1256 # Use the default scheduler.
1257 scheduler = None
1258 else:
1259 # Run jobs in parallel.
1260 scheduler = TwistedThreadScheduler(self.options.jobs)
1206 updater.updateBugTrackers(1261 updater.updateBugTrackers(
1207 self.options.bug_trackers, self.options.batch_size,1262 self.options.bug_trackers,
1208 self.options.jobs)1263 self.options.batch_size,
1264 scheduler)
12091265
1210 run_time = time.time() - start_time1266 run_time = time.time() - start_time
1211 self.logger.info("Time for this run: %.3f seconds." % run_time)1267 self.logger.info("Time for this run: %.3f seconds." % run_time)
12121268
=== modified file 'lib/lp/bugs/scripts/tests/test_checkwatches.py'
--- lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-01-05 12:19:17 +0000
+++ lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-03-15 14:09:31 +0000
@@ -2,9 +2,13 @@
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
3"""Checkwatches unit tests."""3"""Checkwatches unit tests."""
44
5from __future__ import with_statement
6
5__metaclass__ = type7__metaclass__ = type
68
9import threading
7import unittest10import unittest
11
8import transaction12import transaction
913
10from zope.component import getUtility14from zope.component import getUtility
@@ -19,11 +23,13 @@
19from canonical.testing import LaunchpadZopelessLayer23from canonical.testing import LaunchpadZopelessLayer
2024
21from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI25from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI
26from lp.bugs.interfaces.bugtracker import IBugTrackerSet
22from lp.bugs.scripts import checkwatches27from lp.bugs.scripts import checkwatches
23from lp.bugs.scripts.checkwatches import CheckWatchesErrorUtility28from lp.bugs.scripts.checkwatches import (
29 BugWatchUpdater, CheckWatchesErrorUtility, TwistedThreadScheduler)
24from lp.bugs.tests.externalbugtracker import (30from lp.bugs.tests.externalbugtracker import (
25 TestBugzillaAPIXMLRPCTransport, new_bugtracker)31 TestBugzillaAPIXMLRPCTransport, TestExternalBugTracker, new_bugtracker)
26from lp.testing import TestCaseWithFactory32from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess
2733
2834
29def always_BugzillaAPI_get_external_bugtracker(bugtracker):35def always_BugzillaAPI_get_external_bugtracker(bugtracker):
@@ -202,5 +208,171 @@
202 self.bugtask_with_question.status.title))208 self.bugtask_with_question.status.title))
203209
204210
211class TestSchedulerBase:
212
213 def test_args_and_kwargs(self):
214 def func(name, aptitude):
215 self.failUnlessEqual("Robin Hood", name)
216 self.failUnlessEqual("Riding through the glen", aptitude)
217 # Positional args specified when adding a job are passed to
218 # the job function at run time.
219 self.scheduler.schedule(
220 func, "Robin Hood", "Riding through the glen")
221 # Keyword args specified when adding a job are passed to the
222 # job function at run time.
223 self.scheduler.schedule(
224 func, name="Robin Hood", aptitude="Riding through the glen")
225 # Positional and keyword args can both be specified.
226 self.scheduler.schedule(
227 func, "Robin Hood", aptitude="Riding through the glen")
228 # Run everything.
229 self.scheduler.run()
230
231
232class TestSerialScheduler(TestSchedulerBase, unittest.TestCase):
233 """Test SerialScheduler."""
234
235 def setUp(self):
236 self.scheduler = checkwatches.SerialScheduler()
237
238 def test_ordering(self):
239 # The numbers list will be emptied in the order we add jobs to
240 # the scheduler.
241 numbers = [1, 2, 3]
242 # Remove 3 and check.
243 self.scheduler.schedule(
244 list.remove, numbers, 3)
245 self.scheduler.schedule(
246 lambda: self.failUnlessEqual([1, 2], numbers))
247 # Remove 1 and check.
248 self.scheduler.schedule(
249 list.remove, numbers, 1)
250 self.scheduler.schedule(
251 lambda: self.failUnlessEqual([2], numbers))
252 # Remove 2 and check.
253 self.scheduler.schedule(
254 list.remove, numbers, 2)
255 self.scheduler.schedule(
256 lambda: self.failUnlessEqual([], numbers))
257 # Run the scheduler.
258 self.scheduler.run()
259
260
261class TestTwistedThreadScheduler(TestSchedulerBase, unittest.TestCase):
262 """Test TwistedThreadScheduler.
263
264 By default, updateBugTrackers() runs jobs serially, but a
265 different scheduling policy can be plugged in. One such policy,
266 for running several jobs in parallel, is TwistedThreadScheduler.
267 """
268
269 def setUp(self):
270 self.scheduler = checkwatches.TwistedThreadScheduler(
271 num_threads=5, install_signal_handlers=False)
272
273
274class OutputFileForThreads:
275 """Collates writes according to thread name."""
276
277 def __init__(self):
278 self.output = {}
279 self.lock = threading.Lock()
280
281 def write(self, data):
282 thread_name = threading.currentThread().getName()
283 with self.lock:
284 if thread_name in self.output:
285 self.output[thread_name].append(data)
286 else:
287 self.output[thread_name] = [data]
288
289
290class ExternalBugTrackerForThreads(TestExternalBugTracker):
291 """Fake which records interesting activity to a file."""
292
293 def __init__(self, output_file):
294 super(ExternalBugTrackerForThreads, self).__init__()
295 self.output_file = output_file
296
297 def getRemoteStatus(self, bug_id):
298 self.output_file.write("getRemoteStatus(bug_id=%r)" % bug_id)
299 return 'UNKNOWN'
300
301 def getCurrentDBTime(self):
302 return None
303
304
305class BugWatchUpdaterForThreads(BugWatchUpdater):
306 """Fake updater.
307
308 Plumbs an `ExternalBugTrackerForThreads` into a given output file,
309 which is expected to be an instance of `OutputFileForThreads`, and
310 suppresses normal log activity.
311 """
312
313 def __init__(self, output_file):
314 logger = QuietFakeLogger()
315 super(BugWatchUpdaterForThreads, self).__init__(transaction, logger)
316 self.output_file = output_file
317
318 def _getExternalBugTrackersAndWatches(self, bug_trackers, bug_watches):
319 return [(ExternalBugTrackerForThreads(self.output_file), bug_watches)]
320
321
322class TestTwistedThreadSchedulerInPlace(
323 ZopeTestInSubProcess, TestCaseWithFactory):
324 """Test TwistedThreadScheduler in place.
325
326 As in, driving as much of the bug watch machinery as is possible
327 without making external connections.
328 """
329
330 layer = LaunchpadZopelessLayer
331
332 def test(self):
333 # Prepare test data.
334 self.owner = self.factory.makePerson()
335 self.trackers = [
336 getUtility(IBugTrackerSet).ensureBugTracker(
337 "http://butterscotch.example.com", self.owner,
338 BugTrackerType.BUGZILLA, name="butterscotch"),
339 getUtility(IBugTrackerSet).ensureBugTracker(
340 "http://strawberry.example.com", self.owner,
341 BugTrackerType.BUGZILLA, name="strawberry"),
342 ]
343 self.bug = self.factory.makeBug(owner=self.owner)
344 for tracker in self.trackers:
345 for num in (1, 2, 3):
346 self.factory.makeBugWatch(
347 "%s-%d" % (tracker.name, num),
348 tracker, self.bug, self.owner)
349 # Commit so that threads all see the same database state.
350 transaction.commit()
351 # Prepare the updater with the Twisted scheduler.
352 output_file = OutputFileForThreads()
353 threaded_bug_watch_updater = BugWatchUpdaterForThreads(output_file)
354 threaded_bug_watch_scheduler = TwistedThreadScheduler(
355 num_threads=10, install_signal_handlers=False)
356 threaded_bug_watch_updater.updateBugTrackers(
357 bug_tracker_names=[tracker.name for tracker in self.trackers],
358 batch_size=5, scheduler=threaded_bug_watch_scheduler)
359 # The thread names should match the tracker names.
360 self.assertEqual(
361 ['butterscotch', 'strawberry'], sorted(output_file.output))
362 # Check that getRemoteStatus() was called.
363 self.assertEqual(
364 ["getRemoteStatus(bug_id=u'butterscotch-1')",
365 "getRemoteStatus(bug_id=u'butterscotch-2')",
366 "getRemoteStatus(bug_id=u'butterscotch-3')"],
367 output_file.output['butterscotch']
368 )
369 self.assertEqual(
370 ["getRemoteStatus(bug_id=u'strawberry-1')",
371 "getRemoteStatus(bug_id=u'strawberry-2')",
372 "getRemoteStatus(bug_id=u'strawberry-3')"],
373 output_file.output['strawberry']
374 )
375
376
205def test_suite():377def test_suite():
206 return unittest.TestLoader().loadTestsFromName(__name__)378 return unittest.TestLoader().loadTestsFromName(__name__)
207379
=== modified file 'lib/lp/scripts/utilities/importfascist.py'
--- lib/lp/scripts/utilities/importfascist.py 2010-02-23 11:53:24 +0000
+++ lib/lp/scripts/utilities/importfascist.py 2010-03-15 14:09:31 +0000
@@ -62,6 +62,7 @@
62 'openid.fetchers': set(['Urllib2Fetcher']),62 'openid.fetchers': set(['Urllib2Fetcher']),
63 'storm.database': set(['STATE_DISCONNECTED']),63 'storm.database': set(['STATE_DISCONNECTED']),
64 'textwrap': set(['dedent']),64 'textwrap': set(['dedent']),
65 'twisted.internet.threads': set(['deferToThreadPool']),
65 'zope.component': set(66 'zope.component': set(
66 ['adapter',67 ['adapter',
67 'ComponentLookupError',68 'ComponentLookupError',
6869
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2010-02-25 12:07:15 +0000
+++ lib/lp/services/job/runner.py 2010-03-15 14:09:31 +0000
@@ -401,9 +401,10 @@
401class JobCronScript(LaunchpadCronScript):401class JobCronScript(LaunchpadCronScript):
402 """Base class for scripts that run jobs."""402 """Base class for scripts that run jobs."""
403403
404 def __init__(self, runner_class=JobRunner):404 def __init__(self, runner_class=JobRunner, test_args=None):
405 self.dbuser = getattr(config, self.config_name).dbuser405 self.dbuser = getattr(config, self.config_name).dbuser
406 super(JobCronScript, self).__init__(self.config_name, self.dbuser)406 super(JobCronScript, self).__init__(
407 self.config_name, self.dbuser, test_args)
407 self.runner_class = runner_class408 self.runner_class = runner_class
408409
409 def main(self):410 def main(self):
410411
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2010-02-24 19:52:01 +0000
+++ lib/lp/services/job/tests/test_runner.py 2010-03-15 14:09:31 +0000
@@ -11,23 +11,24 @@
11from unittest import TestLoader11from unittest import TestLoader
1212
13import transaction13import transaction
14from canonical.testing import LaunchpadZopelessLayer14
15from zope.component import getUtility15from zope.component import getUtility
16from zope.error.interfaces import IErrorReportingUtility16from zope.error.interfaces import IErrorReportingUtility
17from zope.interface import implements17from zope.interface import implements
1818
19from canonical.launchpad.webapp import errorlog
20from canonical.launchpad.webapp.interfaces import (
21 DEFAULT_FLAVOR, IStoreSelector, MAIN_STORE)
22from canonical.testing import LaunchpadZopelessLayer
23
19from lp.code.interfaces.branchmergeproposal import (24from lp.code.interfaces.branchmergeproposal import (
20 IUpdatePreviewDiffJobSource,)25 IUpdatePreviewDiffJobSource)
21from lp.testing.mail_helpers import pop_notifications
22from lp.services.job.runner import (
23 JobCronScript, JobRunner, BaseRunnableJob, TwistedJobRunner
24)
25from lp.services.job.interfaces.job import JobStatus, IRunnableJob26from lp.services.job.interfaces.job import JobStatus, IRunnableJob
26from lp.services.job.model.job import Job27from lp.services.job.model.job import Job
27from lp.testing import TestCaseWithFactory28from lp.services.job.runner import (
28from canonical.launchpad.webapp import errorlog29 BaseRunnableJob, JobCronScript, JobRunner, TwistedJobRunner)
29from canonical.launchpad.webapp.interfaces import (30from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess
30 IStoreSelector, MAIN_STORE, DEFAULT_FLAVOR)31from lp.testing.mail_helpers import pop_notifications
3132
3233
33class NullJob(BaseRunnableJob):34class NullJob(BaseRunnableJob):
@@ -300,7 +301,7 @@
300 self.entries.append(input)301 self.entries.append(input)
301302
302303
303class TestTwistedJobRunner(TestCaseWithFactory):304class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
304305
305 layer = LaunchpadZopelessLayer306 layer = LaunchpadZopelessLayer
306307
@@ -325,7 +326,7 @@
325 self.assertIn('Job ran too long.', oops.value)326 self.assertIn('Job ran too long.', oops.value)
326327
327328
328class TestJobCronScript(TestCaseWithFactory):329class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
329330
330 layer = LaunchpadZopelessLayer331 layer = LaunchpadZopelessLayer
331332
@@ -351,7 +352,8 @@
351 source_interface = IUpdatePreviewDiffJobSource352 source_interface = IUpdatePreviewDiffJobSource
352353
353 def __init__(self):354 def __init__(self):
354 super(JobCronScriptSubclass, self).__init__(DummyRunner)355 super(JobCronScriptSubclass, self).__init__(
356 DummyRunner, test_args=[])
355 self.logger = ListLogger()357 self.logger = ListLogger()
356358
357 old_errorlog = errorlog.globalErrorUtility359 old_errorlog = errorlog.globalErrorUtility
358360
=== modified file 'lib/lp/testing/__init__.py'
--- lib/lp/testing/__init__.py 2010-03-01 03:06:02 +0000
+++ lib/lp/testing/__init__.py 2010-03-15 14:09:31 +0000
@@ -29,6 +29,7 @@
29 'validate_mock_class',29 'validate_mock_class',
30 'WindmillTestCase',30 'WindmillTestCase',
31 'with_anonymous_login',31 'with_anonymous_login',
32 'ZopeTestInSubProcess',
32 ]33 ]
3334
34import copy35import copy
@@ -38,6 +39,8 @@
38from pprint import pformat39from pprint import pformat
39import shutil40import shutil
40import subprocess41import subprocess
42import subunit
43import sys
41import tempfile44import tempfile
42import time45import time
4346
@@ -62,6 +65,7 @@
62from zope.interface.verify import verifyClass, verifyObject65from zope.interface.verify import verifyClass, verifyObject
63from zope.security.proxy import (66from zope.security.proxy import (
64 isinstance as zope_isinstance, removeSecurityProxy)67 isinstance as zope_isinstance, removeSecurityProxy)
68from zope.testing.testrunner.runner import TestResult as ZopeTestResult
6569
66from canonical.launchpad.webapp import errorlog70from canonical.launchpad.webapp import errorlog
67from canonical.config import config71from canonical.config import config
@@ -586,6 +590,57 @@
586 self.client.open(url=u'http://launchpad.dev:8085')590 self.client.open(url=u'http://launchpad.dev:8085')
587591
588592
593class ZopeTestInSubProcess:
594 """Run tests in a sub-process, respecting Zope idiosyncrasies.
595
596 Use this as a mixin with an interesting `TestCase` to isolate
597 tests with side-effects. Each and every test *method* in the test
598 case is run in a new, forked, sub-process. This will slow down
599 your tests, so use it sparingly. However, when you need to, for
600 example, start the Twisted reactor (which cannot currently be
601 safely stopped and restarted in process) it is invaluable.
602
603 This is basically a reimplementation of subunit's
604 `IsolatedTestCase` or `IsolatedTestSuite`, but adjusted to work
605 with Zope. In particular, Zope's TestResult object is responsible
606 for calling testSetUp() and testTearDown() on the selected layer.
607 """
608
609 def run(self, result):
610 assert isinstance(result, ZopeTestResult), (
611 "result must be a Zope result object, not %r." % (result,))
612 pread, pwrite = os.pipe()
613 pid = os.fork()
614 if pid == 0:
615 # Child.
616 os.close(pread)
617 fdwrite = os.fdopen(pwrite, 'w', 1)
618 # Send results to both the Zope result object (so that
619 # layer setup and teardown are done properly, etc.) and to
620 # the subunit stream client so that the parent process can
621 # obtain the result.
622 result = testtools.MultiTestResult(
623 result, subunit.TestProtocolClient(fdwrite))
624 super(ZopeTestInSubProcess, self).run(result)
625 fdwrite.flush()
626 sys.stdout.flush()
627 sys.stderr.flush()
628 # Exit hard.
629 os._exit(0)
630 else:
631 # Parent.
632 os.close(pwrite)
633 fdread = os.fdopen(pread, 'rU')
634 # Accept the result from the child process. Skip all the
635 # Zope-specific result stuff by passing a super() of the
636 # result.
637 result = super(ZopeTestResult, result)
638 protocol = subunit.TestProtocolServer(result)
639 protocol.readFrom(fdread)
640 fdread.close()
641 os.waitpid(pid, 0)
642
643
589def capture_events(callable_obj, *args, **kwargs):644def capture_events(callable_obj, *args, **kwargs):
590 """Capture the events emitted by a callable.645 """Capture the events emitted by a callable.
591646
592647
=== added file 'lib/lp/testing/tests/test_zope_test_in_subprocess.py'
--- lib/lp/testing/tests/test_zope_test_in_subprocess.py 1970-01-01 00:00:00 +0000
+++ lib/lp/testing/tests/test_zope_test_in_subprocess.py 2010-03-15 14:09:31 +0000
@@ -0,0 +1,131 @@
1# Copyright 2010 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).
3
4"""Test `lp.testing.ZopeTestInSubProcess`.
5
6How does it do this?
7
8A `TestCase`, mixed-in with `ZopeTestInSubProcess`, is run by the Zope
9test runner. This test case sets its own layer, to keep track of the
10PIDs when certain methods are called. It also records pids for its own
11methods. Assertions are made as these methods are called to ensure that
12they are running in the correct process - the parent or the child.
13
14Recording of the PIDs is handled using the `record_pid` decorator.
15"""
16
17__metaclass__ = type
18
19import functools
20import os
21import unittest
22
23from lp.testing import ZopeTestInSubProcess
24
25
26def record_pid(method):
27 """Decorator that records the pid at method invocation.
28
29 Will probably only DTRT with class methods or bound instance
30 methods.
31 """
32 @functools.wraps(method)
33 def wrapper(self, *args, **kwargs):
34 setattr(self, 'pid_in_%s' % method.__name__, os.getpid())
35 return method(self, *args, **kwargs)
36 return wrapper
37
38
39class TestZopeTestInSubProcessLayer:
40 """Helper to test `ZopeTestInSubProcess`.
41
42 Asserts that layers are set up and torn down in the expected way,
43 namely that setUp() and tearDown() are called in the parent
44 process, and testSetUp() and testTearDown() are called in the
45 child process.
46
47 The assertions for tearDown() and testTearDown() must be done here
48 because the test case runs before these methods are called. In the
49 interests of symmetry and clarity, the assertions for setUp() and
50 testSetUp() are done here too.
51
52 This layer expects to be *instantiated*, which is not the norm for
53 Zope layers. See `TestZopeTestInSubProcess` for its use.
54 """
55
56 @record_pid
57 def __init__(self):
58 # These are needed to satisfy the requirements of the
59 # byzantine Zope layer machinery.
60 self.__name__ = self.__class__.__name__
61 self.__bases__ = self.__class__.__bases__
62
63 @record_pid
64 def setUp(self):
65 # Runs in the parent process.
66 assert self.pid_in___init__ == self.pid_in_setUp, (
67 "layer.setUp() not called in parent process.")
68
69 @record_pid
70 def testSetUp(self):
71 # Runs in the child process.
72 assert self.pid_in___init__ != self.pid_in_testSetUp, (
73 "layer.testSetUp() called in parent process.")
74
75 @record_pid
76 def testTearDown(self):
77 # Runs in the child process.
78 assert self.pid_in_testSetUp == self.pid_in_testTearDown, (
79 "layer.testTearDown() not called in same process as testSetUp().")
80
81 @record_pid
82 def tearDown(self):
83 # Runs in the parent process.
84 assert self.pid_in___init__ == self.pid_in_tearDown, (
85 "layer.tearDown() not called in parent process.")
86
87
88class TestZopeTestInSubProcess(ZopeTestInSubProcess, unittest.TestCase):
89 """Test `ZopeTestInSubProcess`.
90
91 Assert that setUp(), test() and tearDown() are called in the child
92 process.
93
94 Sets its own layer attribute. This layer is then responsible for
95 recording the PID at interesting moments. Specifically,
96 layer.testSetUp() must be called in the same process as
97 test.setUp().
98 """
99
100 @record_pid
101 def __init__(self, method_name='runTest'):
102 # Runs in the parent process.
103 super(TestZopeTestInSubProcess, self).__init__(method_name)
104 self.layer = TestZopeTestInSubProcessLayer()
105
106 @record_pid
107 def setUp(self):
108 # Runs in the child process.
109 super(TestZopeTestInSubProcess, self).setUp()
110 self.failUnlessEqual(
111 self.layer.pid_in_testSetUp, self.pid_in_setUp,
112 "setUp() not called in same process as layer.testSetUp().")
113
114 @record_pid
115 def test(self):
116 # Runs in the child process.
117 self.failUnlessEqual(
118 self.pid_in_setUp, self.pid_in_test,
119 "test method not run in same process as setUp().")
120
121 @record_pid
122 def tearDown(self):
123 # Runs in the child process.
124 super(TestZopeTestInSubProcess, self).tearDown()
125 self.failUnlessEqual(
126 self.pid_in_setUp, self.pid_in_tearDown,
127 "tearDown() not run in same process as setUp().")
128
129
130def test_suite():
131 return unittest.TestLoader().loadTestsFromName(__name__)