Merge lp:~allenap/launchpad/twisted-threading-bug-491870 into lp:launchpad
- twisted-threading-bug-491870
- Merge into devel
Status: | Rejected |
---|---|
Rejected by: | Gavin Panella |
Proposed branch: | lp:~allenap/launchpad/twisted-threading-bug-491870 |
Merge into: | lp:launchpad |
Diff against target: |
822 lines (+469/-158) 9 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+1/-1) lib/lp/bugs/doc/externalbugtracker.txt (+0/-107) lib/lp/bugs/scripts/checkwatches.py (+88/-32) lib/lp/bugs/scripts/tests/test_checkwatches.py (+175/-3) lib/lp/scripts/utilities/importfascist.py (+1/-0) lib/lp/services/job/runner.py (+3/-2) lib/lp/services/job/tests/test_runner.py (+15/-13) lib/lp/testing/__init__.py (+55/-0) lib/lp/testing/tests/test_zope_test_in_subprocess.py (+131/-0) |
To merge this branch: | bzr merge lp:~allenap/launchpad/twisted-threading-bug-491870 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Henning Eggers (community) | code | Approve | |
Review via email: mp+18843@code.launchpad.net |
Commit message
In checkwatches, use Twisted to manage threading instead of the threading module.
Description of the change
Gavin Panella (allenap) wrote : | # |
Henning Eggers (henninge) wrote : | # |
Thanks for this improvement!
We talked about some things on IRC:
- Use of lambda is against LP coding style but common in Twisted. We will raise the issue at the reviewers meeting and you can leave it as it is for now. If the style change gets rejected, I ask you to fix this in a follow-up branch. Or wait until after the meeting before landing.
- The intended use of install_
- You agreed to add an explicit test for SerialScheduler.
Thanks for your patience with me ... ;-)
Jonathan Lange (jml) wrote : | # |
On Mon, Feb 8, 2010 at 5:38 PM, Henning Eggers
<email address hidden> wrote:
> Review: Approve code
> Thanks for this improvement!
>
> We talked about some things on IRC:
> - Use of lambda is against LP coding style but common in Twisted. We will raise the issue at the reviewers meeting and you can leave it as it is for now. If the style change gets rejected, I ask you to fix this in a follow-up branch. Or wait until after the meeting before landing.
FWIW, if the reviewers decree that lambda is too hard to read, then
it's probably worth defining a function like this:
def kerchop(f):
def callback(ignored, *args, **kwargs):
return f(*args, **kwargs)
return mergeFunctionMe
And then replacing:
d.addCallback
with:
d.addCallback
The function ought to go somewhere in lp.services, and have a better
name than 'kerchop'.
jml
Gavin Panella (allenap) wrote : | # |
Hi Henning,
Thank you for the review. I added some more info about install_
I also switched to using the default reactor thread pool instead of creating my own in TwistedThreadSc
* We're calling reactor.run(). It's ours, we're not sharing it, so no harm in just using its thread pool.
* The import fascist was complaining about importing deferToThreadPool. Maybe this should be in its module's __all__, maybe not. I'll file a bug for that.
Incremental diff: http://
Thanks, Gavin.
Gavin Panella (allenap) wrote : | # |
> The import fascist was complaining about importing
> deferToThreadPool. Maybe this should be in its module's
> __all__, maybe not. I'll file a bug for that.
Jonathan Lange (jml) wrote : | # |
The fascist has a place where you can disable the warning: lib/lp/
Gavin Panella (allenap) wrote : | # |
Okay, I'm flip-flopping on this. Back To The Threadpool.
Preview Diff
1 | === modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt' |
2 | --- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-17 11:19:25 +0000 |
3 | +++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2010-03-15 14:09:31 +0000 |
4 | @@ -34,7 +34,7 @@ |
5 | >>> from lp.testing.factory import LaunchpadObjectFactory |
6 | |
7 | >>> LaunchpadZopelessLayer.switchDbUser('launchpad') |
8 | - >>> login('foo.bar@canonical.com') |
9 | + >>> login('foo.bar@canonical.com') |
10 | >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker( |
11 | ... 'http://example.com') |
12 | >>> bug_tracker.active = False |
13 | |
14 | === modified file 'lib/lp/bugs/doc/externalbugtracker.txt' |
15 | --- lib/lp/bugs/doc/externalbugtracker.txt 2010-02-24 03:49:45 +0000 |
16 | +++ lib/lp/bugs/doc/externalbugtracker.txt 2010-03-15 14:09:31 +0000 |
17 | @@ -1105,110 +1105,3 @@ |
18 | the updateBugTrackers() method. This, too, takes a batch_size parameter, which |
19 | allows it to be passed as a command-line option when the checkwatches script |
20 | is run. |
21 | - |
22 | -Before going further, we must abort the current transaction to avoid |
23 | -deadlock; updateBugTrackers() runs updateBugTracker() in a different |
24 | -thread. |
25 | - |
26 | - >>> transaction.abort() |
27 | - |
28 | - >>> from canonical.launchpad.scripts.logger import FakeLogger |
29 | - >>> bug_watch_updater = NonConnectingBugWatchUpdater( |
30 | - ... transaction, FakeLogger()) |
31 | - >>> bug_watch_updater.updateBugTrackers( |
32 | - ... bug_tracker_names=[standard_bugzilla.name], batch_size=2) |
33 | - DEBUG Using a global batch size of 2 |
34 | - INFO Updating 2 watches for 2 bugs on http://example.com |
35 | - initializeRemoteBugDB() called: [u'5', u'6'] |
36 | - getRemoteStatus() called: u'5' |
37 | - getRemoteStatus() called: u'6' |
38 | - |
39 | - >>> # We should log in again because updateBugTrackers() logs out. |
40 | - >>> login('test@canonical.com') |
41 | - |
42 | -By default, the updateBugTrackers() only spawns one thread, but it can |
43 | -spawn as many as required. |
44 | - |
45 | - >>> import threading |
46 | - |
47 | - >>> class OutputFileForThreads: |
48 | - ... def __init__(self): |
49 | - ... self.output = {} |
50 | - ... self.lock = threading.Lock() |
51 | - ... def write(self, data): |
52 | - ... thread_name = threading.currentThread().getName() |
53 | - ... self.lock.acquire() |
54 | - ... try: |
55 | - ... if thread_name in self.output: |
56 | - ... self.output[thread_name].append(data) |
57 | - ... else: |
58 | - ... self.output[thread_name] = [data] |
59 | - ... finally: |
60 | - ... self.lock.release() |
61 | - |
62 | - >>> output_file = OutputFileForThreads() |
63 | - |
64 | - >>> class ExternalBugTrackerForThreads(TestExternalBugTracker): |
65 | - ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked): |
66 | - ... print >> output_file, ( |
67 | - ... "getModifiedRemoteBugs(\n" |
68 | - ... " remote_bug_ids=%r,\n" |
69 | - ... " last_checked=%r)" % (remote_bug_ids, last_checked)) |
70 | - ... return [remote_bug_ids[0], remote_bug_ids[-1]] |
71 | - ... def getRemoteStatus(self, bug_id): |
72 | - ... print >> output_file, ( |
73 | - ... "getRemoteStatus(bug_id=%r)" % bug_id) |
74 | - ... return 'UNKNOWN' |
75 | - ... def getCurrentDBTime(self): |
76 | - ... return None |
77 | - |
78 | - >>> class BugWatchUpdaterForThreads(BugWatchUpdater): |
79 | - ... def _getExternalBugTrackersAndWatches( |
80 | - ... self, bug_trackers, bug_watches): |
81 | - ... return [(ExternalBugTrackerForThreads(), bug_watches)] |
82 | - |
83 | - >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads( |
84 | - ... transaction, FakeLogger(output_file)) |
85 | - >>> threaded_bug_watch_updater.updateBugTrackers( |
86 | - ... batch_size=5, num_threads=10) |
87 | - |
88 | - |
89 | - >>> for thread_name in sorted(output_file.output): |
90 | - ... print '== %s ==' % thread_name |
91 | - ... print "".join(output_file.output[thread_name]), |
92 | - == MainThread == |
93 | - DEBUG Using a global batch size of 5 |
94 | - DEBUG Skipping updating Ubuntu Bugzilla watches. |
95 | - == auto-generic-string4.example.com == |
96 | - INFO Updating 5 watches for 5 bugs on http://example.com |
97 | - getRemoteStatus(bug_id=u'1') |
98 | - getRemoteStatus(bug_id=u'101') |
99 | - getRemoteStatus(bug_id=u'5') |
100 | - getRemoteStatus(bug_id=u'6') |
101 | - getRemoteStatus(bug_id=u'7') |
102 | - == debbugs == |
103 | - INFO Updating 5 watches for 5 bugs on http://example.com |
104 | - getRemoteStatus(bug_id=u'280883') |
105 | - getRemoteStatus(bug_id=u'304014') |
106 | - getRemoteStatus(bug_id=u'308994') |
107 | - getRemoteStatus(bug_id=u'327452') |
108 | - getRemoteStatus(bug_id=u'327549') |
109 | - == email == |
110 | - DEBUG No watches to update on mailto:bugs@example.com |
111 | - == example-bugs == |
112 | - DEBUG No watches to update on http://bugs.example.com |
113 | - == gnome-bugs == |
114 | - DEBUG No watches to update on http://bugzilla.gnome.org/ |
115 | - == gnome-bugzilla == |
116 | - INFO Updating 2 watches for 2 bugs on http://example.com |
117 | - getRemoteStatus(bug_id=u'304070') |
118 | - getRemoteStatus(bug_id=u'3224') |
119 | - == mozilla.org == |
120 | - INFO Updating 4 watches for 3 bugs on http://example.com |
121 | - getRemoteStatus(bug_id=u'123543') |
122 | - getRemoteStatus(bug_id=u'2000') |
123 | - getRemoteStatus(bug_id=u'42') |
124 | - == savannah == |
125 | - DEBUG No watches to update on http://savannah.gnu.org/ |
126 | - == sf == |
127 | - DEBUG No watches to update on http://sourceforge.net/ |
128 | |
129 | === modified file 'lib/lp/bugs/scripts/checkwatches.py' |
130 | --- lib/lp/bugs/scripts/checkwatches.py 2010-02-19 12:05:10 +0000 |
131 | +++ lib/lp/bugs/scripts/checkwatches.py 2010-03-15 14:09:31 +0000 |
132 | @@ -6,7 +6,6 @@ |
133 | |
134 | from copy import copy |
135 | from datetime import datetime, timedelta |
136 | -import Queue as queue |
137 | import socket |
138 | import sys |
139 | import threading |
140 | @@ -14,6 +13,11 @@ |
141 | |
142 | import pytz |
143 | |
144 | +from twisted.internet import reactor |
145 | +from twisted.internet.defer import DeferredList |
146 | +from twisted.internet.threads import deferToThreadPool |
147 | +from twisted.python.threadpool import ThreadPool |
148 | + |
149 | from zope.component import getUtility |
150 | from zope.event import notify |
151 | |
152 | @@ -271,44 +275,28 @@ |
153 | self._logout() |
154 | |
155 | def updateBugTrackers( |
156 | - self, bug_tracker_names=None, batch_size=None, num_threads=1): |
157 | + self, bug_tracker_names=None, batch_size=None, scheduler=None): |
158 | """Update all the bug trackers that have watches pending. |
159 | |
160 | If bug tracker names are specified in bug_tracker_names only |
161 | those bug trackers will be checked. |
162 | |
163 | - The updates are run in threads, so that long running updates |
164 | - don't block progress. However, by default the number of |
165 | - threads is 1, to help with testing. |
166 | + A custom scheduler can be passed in. This should inherit from |
167 | + `BaseScheduler`. If no scheduler is given, `SerialScheduler` |
168 | + will be used, which simply runs the jobs in order. |
169 | """ |
170 | self.log.debug("Using a global batch size of %s" % batch_size) |
171 | |
172 | - # Put all the work on the queue. This is simpler than drip-feeding the |
173 | - # queue, and avoids a situation where a worker thread exits because |
174 | - # there's no work left and the feeding thread hasn't been scheduled to |
175 | - # add work to the queue. |
176 | - work = queue.Queue() |
177 | + # Default to using the very simple SerialScheduler. |
178 | + if scheduler is None: |
179 | + scheduler = SerialScheduler() |
180 | + |
181 | + # Schedule all the jobs to run. |
182 | for updater in self._bugTrackerUpdaters(bug_tracker_names): |
183 | - work.put(updater) |
184 | - |
185 | - # This will be run once in each worker thread. |
186 | - def do_work(): |
187 | - while True: |
188 | - try: |
189 | - job = work.get(block=False) |
190 | - except queue.Empty: |
191 | - break |
192 | - else: |
193 | - job(batch_size) |
194 | - |
195 | - # Start and join the worker threads. |
196 | - threads = [] |
197 | - for run in xrange(num_threads): |
198 | - thread = threading.Thread(target=do_work) |
199 | - thread.start() |
200 | - threads.append(thread) |
201 | - for thread in threads: |
202 | - thread.join() |
203 | + scheduler.schedule(updater, batch_size) |
204 | + |
205 | + # Run all the jobs. |
206 | + scheduler.run() |
207 | |
208 | def updateBugTracker(self, bug_tracker, batch_size): |
209 | """Updates the given bug trackers's bug watches. |
210 | @@ -1165,6 +1153,67 @@ |
211 | self.log.error("%s (%s)" % (message, oops_info.oopsid)) |
212 | |
213 | |
214 | +class BaseScheduler: |
215 | + """Run jobs according to a policy.""" |
216 | + |
217 | + def schedule(self, func, *args, **kwargs): |
218 | + """Add a job to be run.""" |
219 | + raise NotImplementedError(self.schedule) |
220 | + |
221 | + def run(self): |
222 | + """Run the jobs.""" |
223 | + raise NotImplementedError(self.run) |
224 | + |
225 | + |
226 | +class SerialScheduler(BaseScheduler): |
227 | + """Run jobs in order, one at a time.""" |
228 | + |
229 | + def __init__(self): |
230 | + self._jobs = [] |
231 | + |
232 | + def schedule(self, func, *args, **kwargs): |
233 | + self._jobs.append((func, args, kwargs)) |
234 | + |
235 | + def run(self): |
236 | + jobs, self._jobs = self._jobs[:], [] |
237 | + for (func, args, kwargs) in jobs: |
238 | + func(*args, **kwargs) |
239 | + |
240 | + |
241 | +class TwistedThreadScheduler(BaseScheduler): |
242 | + """Run jobs in threads, chaperoned by Twisted.""" |
243 | + |
244 | + def __init__(self, num_threads, install_signal_handlers=True): |
245 | + """Create a new `TwistedThreadScheduler`. |
246 | + |
247 | + :param num_threads: The number of threads to allocate to the |
248 | + thread pool. |
249 | + :type num_threads: int |
250 | + |
251 | + :param install_signal_handlers: Whether the Twisted reactor |
252 | + should install signal handlers or not. This is intented for |
253 | + testing - set to False to avoid layer violations - but may |
254 | + be useful in other situations. |
255 | + :type install_signal_handlers: bool |
256 | + """ |
257 | + self._thread_pool = ThreadPool(0, num_threads) |
258 | + self._install_signal_handlers = install_signal_handlers |
259 | + self._jobs = [] |
260 | + |
261 | + def schedule(self, func, *args, **kwargs): |
262 | + self._jobs.append( |
263 | + deferToThreadPool( |
264 | + reactor, self._thread_pool, func, *args, **kwargs)) |
265 | + |
266 | + def run(self): |
267 | + jobs, self._jobs = self._jobs[:], [] |
268 | + jobs_done = DeferredList(jobs) |
269 | + jobs_done.addBoth(lambda ignore: self._thread_pool.stop()) |
270 | + jobs_done.addBoth(lambda ignore: reactor.stop()) |
271 | + reactor.callWhenRunning(self._thread_pool.start) |
272 | + reactor.run(self._install_signal_handlers) |
273 | + |
274 | + |
275 | class CheckWatchesCronScript(LaunchpadCronScript): |
276 | |
277 | def add_my_options(self): |
278 | @@ -1203,9 +1252,16 @@ |
279 | else: |
280 | # Otherwise we just update those watches that need updating, |
281 | # and we let the BugWatchUpdater decide which those are. |
282 | + if self.options.jobs <= 1: |
283 | + # Use the default scheduler. |
284 | + scheduler = None |
285 | + else: |
286 | + # Run jobs in parallel. |
287 | + scheduler = TwistedThreadScheduler(self.options.jobs) |
288 | updater.updateBugTrackers( |
289 | - self.options.bug_trackers, self.options.batch_size, |
290 | - self.options.jobs) |
291 | + self.options.bug_trackers, |
292 | + self.options.batch_size, |
293 | + scheduler) |
294 | |
295 | run_time = time.time() - start_time |
296 | self.logger.info("Time for this run: %.3f seconds." % run_time) |
297 | |
298 | === modified file 'lib/lp/bugs/scripts/tests/test_checkwatches.py' |
299 | --- lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-01-05 12:19:17 +0000 |
300 | +++ lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-03-15 14:09:31 +0000 |
301 | @@ -2,9 +2,13 @@ |
302 | # GNU Affero General Public License version 3 (see the file LICENSE). |
303 | """Checkwatches unit tests.""" |
304 | |
305 | +from __future__ import with_statement |
306 | + |
307 | __metaclass__ = type |
308 | |
309 | +import threading |
310 | import unittest |
311 | + |
312 | import transaction |
313 | |
314 | from zope.component import getUtility |
315 | @@ -19,11 +23,13 @@ |
316 | from canonical.testing import LaunchpadZopelessLayer |
317 | |
318 | from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI |
319 | +from lp.bugs.interfaces.bugtracker import IBugTrackerSet |
320 | from lp.bugs.scripts import checkwatches |
321 | -from lp.bugs.scripts.checkwatches import CheckWatchesErrorUtility |
322 | +from lp.bugs.scripts.checkwatches import ( |
323 | + BugWatchUpdater, CheckWatchesErrorUtility, TwistedThreadScheduler) |
324 | from lp.bugs.tests.externalbugtracker import ( |
325 | - TestBugzillaAPIXMLRPCTransport, new_bugtracker) |
326 | -from lp.testing import TestCaseWithFactory |
327 | + TestBugzillaAPIXMLRPCTransport, TestExternalBugTracker, new_bugtracker) |
328 | +from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess |
329 | |
330 | |
331 | def always_BugzillaAPI_get_external_bugtracker(bugtracker): |
332 | @@ -202,5 +208,171 @@ |
333 | self.bugtask_with_question.status.title)) |
334 | |
335 | |
336 | +class TestSchedulerBase: |
337 | + |
338 | + def test_args_and_kwargs(self): |
339 | + def func(name, aptitude): |
340 | + self.failUnlessEqual("Robin Hood", name) |
341 | + self.failUnlessEqual("Riding through the glen", aptitude) |
342 | + # Positional args specified when adding a job are passed to |
343 | + # the job function at run time. |
344 | + self.scheduler.schedule( |
345 | + func, "Robin Hood", "Riding through the glen") |
346 | + # Keyword args specified when adding a job are passed to the |
347 | + # job function at run time. |
348 | + self.scheduler.schedule( |
349 | + func, name="Robin Hood", aptitude="Riding through the glen") |
350 | + # Positional and keyword args can both be specified. |
351 | + self.scheduler.schedule( |
352 | + func, "Robin Hood", aptitude="Riding through the glen") |
353 | + # Run everything. |
354 | + self.scheduler.run() |
355 | + |
356 | + |
357 | +class TestSerialScheduler(TestSchedulerBase, unittest.TestCase): |
358 | + """Test SerialScheduler.""" |
359 | + |
360 | + def setUp(self): |
361 | + self.scheduler = checkwatches.SerialScheduler() |
362 | + |
363 | + def test_ordering(self): |
364 | + # The numbers list will be emptied in the order we add jobs to |
365 | + # the scheduler. |
366 | + numbers = [1, 2, 3] |
367 | + # Remove 3 and check. |
368 | + self.scheduler.schedule( |
369 | + list.remove, numbers, 3) |
370 | + self.scheduler.schedule( |
371 | + lambda: self.failUnlessEqual([1, 2], numbers)) |
372 | + # Remove 1 and check. |
373 | + self.scheduler.schedule( |
374 | + list.remove, numbers, 1) |
375 | + self.scheduler.schedule( |
376 | + lambda: self.failUnlessEqual([2], numbers)) |
377 | + # Remove 2 and check. |
378 | + self.scheduler.schedule( |
379 | + list.remove, numbers, 2) |
380 | + self.scheduler.schedule( |
381 | + lambda: self.failUnlessEqual([], numbers)) |
382 | + # Run the scheduler. |
383 | + self.scheduler.run() |
384 | + |
385 | + |
386 | +class TestTwistedThreadScheduler(TestSchedulerBase, unittest.TestCase): |
387 | + """Test TwistedThreadScheduler. |
388 | + |
389 | + By default, updateBugTrackers() runs jobs serially, but a |
390 | + different scheduling policy can be plugged in. One such policy, |
391 | + for running several jobs in parallel, is TwistedThreadScheduler. |
392 | + """ |
393 | + |
394 | + def setUp(self): |
395 | + self.scheduler = checkwatches.TwistedThreadScheduler( |
396 | + num_threads=5, install_signal_handlers=False) |
397 | + |
398 | + |
399 | +class OutputFileForThreads: |
400 | + """Collates writes according to thread name.""" |
401 | + |
402 | + def __init__(self): |
403 | + self.output = {} |
404 | + self.lock = threading.Lock() |
405 | + |
406 | + def write(self, data): |
407 | + thread_name = threading.currentThread().getName() |
408 | + with self.lock: |
409 | + if thread_name in self.output: |
410 | + self.output[thread_name].append(data) |
411 | + else: |
412 | + self.output[thread_name] = [data] |
413 | + |
414 | + |
415 | +class ExternalBugTrackerForThreads(TestExternalBugTracker): |
416 | + """Fake which records interesting activity to a file.""" |
417 | + |
418 | + def __init__(self, output_file): |
419 | + super(ExternalBugTrackerForThreads, self).__init__() |
420 | + self.output_file = output_file |
421 | + |
422 | + def getRemoteStatus(self, bug_id): |
423 | + self.output_file.write("getRemoteStatus(bug_id=%r)" % bug_id) |
424 | + return 'UNKNOWN' |
425 | + |
426 | + def getCurrentDBTime(self): |
427 | + return None |
428 | + |
429 | + |
430 | +class BugWatchUpdaterForThreads(BugWatchUpdater): |
431 | + """Fake updater. |
432 | + |
433 | + Plumbs an `ExternalBugTrackerForThreads` into a given output file, |
434 | + which is expected to be an instance of `OutputFileForThreads`, and |
435 | + suppresses normal log activity. |
436 | + """ |
437 | + |
438 | + def __init__(self, output_file): |
439 | + logger = QuietFakeLogger() |
440 | + super(BugWatchUpdaterForThreads, self).__init__(transaction, logger) |
441 | + self.output_file = output_file |
442 | + |
443 | + def _getExternalBugTrackersAndWatches(self, bug_trackers, bug_watches): |
444 | + return [(ExternalBugTrackerForThreads(self.output_file), bug_watches)] |
445 | + |
446 | + |
447 | +class TestTwistedThreadSchedulerInPlace( |
448 | + ZopeTestInSubProcess, TestCaseWithFactory): |
449 | + """Test TwistedThreadScheduler in place. |
450 | + |
451 | + As in, driving as much of the bug watch machinery as is possible |
452 | + without making external connections. |
453 | + """ |
454 | + |
455 | + layer = LaunchpadZopelessLayer |
456 | + |
457 | + def test(self): |
458 | + # Prepare test data. |
459 | + self.owner = self.factory.makePerson() |
460 | + self.trackers = [ |
461 | + getUtility(IBugTrackerSet).ensureBugTracker( |
462 | + "http://butterscotch.example.com", self.owner, |
463 | + BugTrackerType.BUGZILLA, name="butterscotch"), |
464 | + getUtility(IBugTrackerSet).ensureBugTracker( |
465 | + "http://strawberry.example.com", self.owner, |
466 | + BugTrackerType.BUGZILLA, name="strawberry"), |
467 | + ] |
468 | + self.bug = self.factory.makeBug(owner=self.owner) |
469 | + for tracker in self.trackers: |
470 | + for num in (1, 2, 3): |
471 | + self.factory.makeBugWatch( |
472 | + "%s-%d" % (tracker.name, num), |
473 | + tracker, self.bug, self.owner) |
474 | + # Commit so that threads all see the same database state. |
475 | + transaction.commit() |
476 | + # Prepare the updater with the Twisted scheduler. |
477 | + output_file = OutputFileForThreads() |
478 | + threaded_bug_watch_updater = BugWatchUpdaterForThreads(output_file) |
479 | + threaded_bug_watch_scheduler = TwistedThreadScheduler( |
480 | + num_threads=10, install_signal_handlers=False) |
481 | + threaded_bug_watch_updater.updateBugTrackers( |
482 | + bug_tracker_names=[tracker.name for tracker in self.trackers], |
483 | + batch_size=5, scheduler=threaded_bug_watch_scheduler) |
484 | + # The thread names should match the tracker names. |
485 | + self.assertEqual( |
486 | + ['butterscotch', 'strawberry'], sorted(output_file.output)) |
487 | + # Check that getRemoteStatus() was called. |
488 | + self.assertEqual( |
489 | + ["getRemoteStatus(bug_id=u'butterscotch-1')", |
490 | + "getRemoteStatus(bug_id=u'butterscotch-2')", |
491 | + "getRemoteStatus(bug_id=u'butterscotch-3')"], |
492 | + output_file.output['butterscotch'] |
493 | + ) |
494 | + self.assertEqual( |
495 | + ["getRemoteStatus(bug_id=u'strawberry-1')", |
496 | + "getRemoteStatus(bug_id=u'strawberry-2')", |
497 | + "getRemoteStatus(bug_id=u'strawberry-3')"], |
498 | + output_file.output['strawberry'] |
499 | + ) |
500 | + |
501 | + |
502 | def test_suite(): |
503 | return unittest.TestLoader().loadTestsFromName(__name__) |
504 | |
505 | === modified file 'lib/lp/scripts/utilities/importfascist.py' |
506 | --- lib/lp/scripts/utilities/importfascist.py 2010-02-23 11:53:24 +0000 |
507 | +++ lib/lp/scripts/utilities/importfascist.py 2010-03-15 14:09:31 +0000 |
508 | @@ -62,6 +62,7 @@ |
509 | 'openid.fetchers': set(['Urllib2Fetcher']), |
510 | 'storm.database': set(['STATE_DISCONNECTED']), |
511 | 'textwrap': set(['dedent']), |
512 | + 'twisted.internet.threads': set(['deferToThreadPool']), |
513 | 'zope.component': set( |
514 | ['adapter', |
515 | 'ComponentLookupError', |
516 | |
517 | === modified file 'lib/lp/services/job/runner.py' |
518 | --- lib/lp/services/job/runner.py 2010-02-25 12:07:15 +0000 |
519 | +++ lib/lp/services/job/runner.py 2010-03-15 14:09:31 +0000 |
520 | @@ -401,9 +401,10 @@ |
521 | class JobCronScript(LaunchpadCronScript): |
522 | """Base class for scripts that run jobs.""" |
523 | |
524 | - def __init__(self, runner_class=JobRunner): |
525 | + def __init__(self, runner_class=JobRunner, test_args=None): |
526 | self.dbuser = getattr(config, self.config_name).dbuser |
527 | - super(JobCronScript, self).__init__(self.config_name, self.dbuser) |
528 | + super(JobCronScript, self).__init__( |
529 | + self.config_name, self.dbuser, test_args) |
530 | self.runner_class = runner_class |
531 | |
532 | def main(self): |
533 | |
534 | === modified file 'lib/lp/services/job/tests/test_runner.py' |
535 | --- lib/lp/services/job/tests/test_runner.py 2010-02-24 19:52:01 +0000 |
536 | +++ lib/lp/services/job/tests/test_runner.py 2010-03-15 14:09:31 +0000 |
537 | @@ -11,23 +11,24 @@ |
538 | from unittest import TestLoader |
539 | |
540 | import transaction |
541 | -from canonical.testing import LaunchpadZopelessLayer |
542 | + |
543 | from zope.component import getUtility |
544 | from zope.error.interfaces import IErrorReportingUtility |
545 | from zope.interface import implements |
546 | |
547 | +from canonical.launchpad.webapp import errorlog |
548 | +from canonical.launchpad.webapp.interfaces import ( |
549 | + DEFAULT_FLAVOR, IStoreSelector, MAIN_STORE) |
550 | +from canonical.testing import LaunchpadZopelessLayer |
551 | + |
552 | from lp.code.interfaces.branchmergeproposal import ( |
553 | - IUpdatePreviewDiffJobSource,) |
554 | -from lp.testing.mail_helpers import pop_notifications |
555 | -from lp.services.job.runner import ( |
556 | - JobCronScript, JobRunner, BaseRunnableJob, TwistedJobRunner |
557 | -) |
558 | + IUpdatePreviewDiffJobSource) |
559 | from lp.services.job.interfaces.job import JobStatus, IRunnableJob |
560 | from lp.services.job.model.job import Job |
561 | -from lp.testing import TestCaseWithFactory |
562 | -from canonical.launchpad.webapp import errorlog |
563 | -from canonical.launchpad.webapp.interfaces import ( |
564 | - IStoreSelector, MAIN_STORE, DEFAULT_FLAVOR) |
565 | +from lp.services.job.runner import ( |
566 | + BaseRunnableJob, JobCronScript, JobRunner, TwistedJobRunner) |
567 | +from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess |
568 | +from lp.testing.mail_helpers import pop_notifications |
569 | |
570 | |
571 | class NullJob(BaseRunnableJob): |
572 | @@ -300,7 +301,7 @@ |
573 | self.entries.append(input) |
574 | |
575 | |
576 | -class TestTwistedJobRunner(TestCaseWithFactory): |
577 | +class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory): |
578 | |
579 | layer = LaunchpadZopelessLayer |
580 | |
581 | @@ -325,7 +326,7 @@ |
582 | self.assertIn('Job ran too long.', oops.value) |
583 | |
584 | |
585 | -class TestJobCronScript(TestCaseWithFactory): |
586 | +class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory): |
587 | |
588 | layer = LaunchpadZopelessLayer |
589 | |
590 | @@ -351,7 +352,8 @@ |
591 | source_interface = IUpdatePreviewDiffJobSource |
592 | |
593 | def __init__(self): |
594 | - super(JobCronScriptSubclass, self).__init__(DummyRunner) |
595 | + super(JobCronScriptSubclass, self).__init__( |
596 | + DummyRunner, test_args=[]) |
597 | self.logger = ListLogger() |
598 | |
599 | old_errorlog = errorlog.globalErrorUtility |
600 | |
601 | === modified file 'lib/lp/testing/__init__.py' |
602 | --- lib/lp/testing/__init__.py 2010-03-01 03:06:02 +0000 |
603 | +++ lib/lp/testing/__init__.py 2010-03-15 14:09:31 +0000 |
604 | @@ -29,6 +29,7 @@ |
605 | 'validate_mock_class', |
606 | 'WindmillTestCase', |
607 | 'with_anonymous_login', |
608 | + 'ZopeTestInSubProcess', |
609 | ] |
610 | |
611 | import copy |
612 | @@ -38,6 +39,8 @@ |
613 | from pprint import pformat |
614 | import shutil |
615 | import subprocess |
616 | +import subunit |
617 | +import sys |
618 | import tempfile |
619 | import time |
620 | |
621 | @@ -62,6 +65,7 @@ |
622 | from zope.interface.verify import verifyClass, verifyObject |
623 | from zope.security.proxy import ( |
624 | isinstance as zope_isinstance, removeSecurityProxy) |
625 | +from zope.testing.testrunner.runner import TestResult as ZopeTestResult |
626 | |
627 | from canonical.launchpad.webapp import errorlog |
628 | from canonical.config import config |
629 | @@ -586,6 +590,57 @@ |
630 | self.client.open(url=u'http://launchpad.dev:8085') |
631 | |
632 | |
633 | +class ZopeTestInSubProcess: |
634 | + """Run tests in a sub-process, respecting Zope idiosyncrasies. |
635 | + |
636 | + Use this as a mixin with an interesting `TestCase` to isolate |
637 | + tests with side-effects. Each and every test *method* in the test |
638 | + case is run in a new, forked, sub-process. This will slow down |
639 | + your tests, so use it sparingly. However, when you need to, for |
640 | + example, start the Twisted reactor (which cannot currently be |
641 | + safely stopped and restarted in process) it is invaluable. |
642 | + |
643 | + This is basically a reimplementation of subunit's |
644 | + `IsolatedTestCase` or `IsolatedTestSuite`, but adjusted to work |
645 | + with Zope. In particular, Zope's TestResult object is responsible |
646 | + for calling testSetUp() and testTearDown() on the selected layer. |
647 | + """ |
648 | + |
649 | + def run(self, result): |
650 | + assert isinstance(result, ZopeTestResult), ( |
651 | + "result must be a Zope result object, not %r." % (result,)) |
652 | + pread, pwrite = os.pipe() |
653 | + pid = os.fork() |
654 | + if pid == 0: |
655 | + # Child. |
656 | + os.close(pread) |
657 | + fdwrite = os.fdopen(pwrite, 'w', 1) |
658 | + # Send results to both the Zope result object (so that |
659 | + # layer setup and teardown are done properly, etc.) and to |
660 | + # the subunit stream client so that the parent process can |
661 | + # obtain the result. |
662 | + result = testtools.MultiTestResult( |
663 | + result, subunit.TestProtocolClient(fdwrite)) |
664 | + super(ZopeTestInSubProcess, self).run(result) |
665 | + fdwrite.flush() |
666 | + sys.stdout.flush() |
667 | + sys.stderr.flush() |
668 | + # Exit hard. |
669 | + os._exit(0) |
670 | + else: |
671 | + # Parent. |
672 | + os.close(pwrite) |
673 | + fdread = os.fdopen(pread, 'rU') |
674 | + # Accept the result from the child process. Skip all the |
675 | + # Zope-specific result stuff by passing a super() of the |
676 | + # result. |
677 | + result = super(ZopeTestResult, result) |
678 | + protocol = subunit.TestProtocolServer(result) |
679 | + protocol.readFrom(fdread) |
680 | + fdread.close() |
681 | + os.waitpid(pid, 0) |
682 | + |
683 | + |
684 | def capture_events(callable_obj, *args, **kwargs): |
685 | """Capture the events emitted by a callable. |
686 | |
687 | |
688 | === added file 'lib/lp/testing/tests/test_zope_test_in_subprocess.py' |
689 | --- lib/lp/testing/tests/test_zope_test_in_subprocess.py 1970-01-01 00:00:00 +0000 |
690 | +++ lib/lp/testing/tests/test_zope_test_in_subprocess.py 2010-03-15 14:09:31 +0000 |
691 | @@ -0,0 +1,131 @@ |
692 | +# Copyright 2010 Canonical Ltd. This software is licensed under the |
693 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
694 | + |
695 | +"""Test `lp.testing.ZopeTestInSubProcess`. |
696 | + |
697 | +How does it do this? |
698 | + |
699 | +A `TestCase`, mixed-in with `ZopeTestInSubProcess`, is run by the Zope |
700 | +test runner. This test case sets its own layer, to keep track of the |
701 | +PIDs when certain methods are called. It also records pids for its own |
702 | +methods. Assertions are made as these methods are called to ensure that |
703 | +they are running in the correct process - the parent or the child. |
704 | + |
705 | +Recording of the PIDs is handled using the `record_pid` decorator. |
706 | +""" |
707 | + |
708 | +__metaclass__ = type |
709 | + |
710 | +import functools |
711 | +import os |
712 | +import unittest |
713 | + |
714 | +from lp.testing import ZopeTestInSubProcess |
715 | + |
716 | + |
717 | +def record_pid(method): |
718 | + """Decorator that records the pid at method invocation. |
719 | + |
720 | + Will probably only DTRT with class methods or bound instance |
721 | + methods. |
722 | + """ |
723 | + @functools.wraps(method) |
724 | + def wrapper(self, *args, **kwargs): |
725 | + setattr(self, 'pid_in_%s' % method.__name__, os.getpid()) |
726 | + return method(self, *args, **kwargs) |
727 | + return wrapper |
728 | + |
729 | + |
730 | +class TestZopeTestInSubProcessLayer: |
731 | + """Helper to test `ZopeTestInSubProcess`. |
732 | + |
733 | + Asserts that layers are set up and torn down in the expected way, |
734 | + namely that setUp() and tearDown() are called in the parent |
735 | + process, and testSetUp() and testTearDown() are called in the |
736 | + child process. |
737 | + |
738 | + The assertions for tearDown() and testTearDown() must be done here |
739 | + because the test case runs before these methods are called. In the |
740 | + interests of symmetry and clarity, the assertions for setUp() and |
741 | + testSetUp() are done here too. |
742 | + |
743 | + This layer expects to be *instantiated*, which is not the norm for |
744 | + Zope layers. See `TestZopeTestInSubProcess` for its use. |
745 | + """ |
746 | + |
747 | + @record_pid |
748 | + def __init__(self): |
749 | + # These are needed to satisfy the requirements of the |
750 | + # byzantine Zope layer machinery. |
751 | + self.__name__ = self.__class__.__name__ |
752 | + self.__bases__ = self.__class__.__bases__ |
753 | + |
754 | + @record_pid |
755 | + def setUp(self): |
756 | + # Runs in the parent process. |
757 | + assert self.pid_in___init__ == self.pid_in_setUp, ( |
758 | + "layer.setUp() not called in parent process.") |
759 | + |
760 | + @record_pid |
761 | + def testSetUp(self): |
762 | + # Runs in the child process. |
763 | + assert self.pid_in___init__ != self.pid_in_testSetUp, ( |
764 | + "layer.testSetUp() called in parent process.") |
765 | + |
766 | + @record_pid |
767 | + def testTearDown(self): |
768 | + # Runs in the child process. |
769 | + assert self.pid_in_testSetUp == self.pid_in_testTearDown, ( |
770 | + "layer.testTearDown() not called in same process as testSetUp().") |
771 | + |
772 | + @record_pid |
773 | + def tearDown(self): |
774 | + # Runs in the parent process. |
775 | + assert self.pid_in___init__ == self.pid_in_tearDown, ( |
776 | + "layer.tearDown() not called in parent process.") |
777 | + |
778 | + |
779 | +class TestZopeTestInSubProcess(ZopeTestInSubProcess, unittest.TestCase): |
780 | + """Test `ZopeTestInSubProcess`. |
781 | + |
782 | + Assert that setUp(), test() and tearDown() are called in the child |
783 | + process. |
784 | + |
785 | + Sets its own layer attribute. This layer is then responsible for |
786 | + recording the PID at interesting moments. Specifically, |
787 | + layer.testSetUp() must be called in the same process as |
788 | + test.setUp(). |
789 | + """ |
790 | + |
791 | + @record_pid |
792 | + def __init__(self, method_name='runTest'): |
793 | + # Runs in the parent process. |
794 | + super(TestZopeTestInSubProcess, self).__init__(method_name) |
795 | + self.layer = TestZopeTestInSubProcessLayer() |
796 | + |
797 | + @record_pid |
798 | + def setUp(self): |
799 | + # Runs in the child process. |
800 | + super(TestZopeTestInSubProcess, self).setUp() |
801 | + self.failUnlessEqual( |
802 | + self.layer.pid_in_testSetUp, self.pid_in_setUp, |
803 | + "setUp() not called in same process as layer.testSetUp().") |
804 | + |
805 | + @record_pid |
806 | + def test(self): |
807 | + # Runs in the child process. |
808 | + self.failUnlessEqual( |
809 | + self.pid_in_setUp, self.pid_in_test, |
810 | + "test method not run in same process as setUp().") |
811 | + |
812 | + @record_pid |
813 | + def tearDown(self): |
814 | + # Runs in the child process. |
815 | + super(TestZopeTestInSubProcess, self).tearDown() |
816 | + self.failUnlessEqual( |
817 | + self.pid_in_setUp, self.pid_in_tearDown, |
818 | + "tearDown() not run in same process as setUp().") |
819 | + |
820 | + |
821 | +def test_suite(): |
822 | + return unittest.TestLoader().loadTestsFromName(__name__) |
checkwatches can update bug watches for different remote trackers in parallel. Currently that's implemented using threads via the threading module. This branch continues to use threads, but does so with a Twisted ThreadPool.
To make testing easier, and to make the design more elegant, it's possible to pass in a scheduler to BugWatchUpdate. updateBugTracke rs(). Previously the scheduling policy was baked into this method.
Getting Twisted in there also opens the door for using more async code in the future.