Merge lp:~allenap/launchpad/twisted-threading-bug-491870 into lp:launchpad

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: no longer in the source branch.
Merged at revision: not available
Proposed branch: lp:~allenap/launchpad/twisted-threading-bug-491870
Merge into: lp:launchpad
Prerequisite: lp:~allenap/launchpad/isolate-tests
Diff against target: 516 lines (+266/-143)
5 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+1/-1)
lib/lp/bugs/doc/externalbugtracker.txt (+0/-107)
lib/lp/bugs/scripts/checkwatches.py (+88/-32)
lib/lp/bugs/scripts/tests/test_checkwatches.py (+176/-3)
lib/lp/scripts/utilities/importfascist.py (+1/-0)
To merge this branch: bzr merge lp:~allenap/launchpad/twisted-threading-bug-491870
Reviewer Review Type Date Requested Status
Eleanor Berger (community) code Approve
Review via email: mp+21376@code.launchpad.net

Commit message

In checkwatches, use Twisted to manage threading instead of the threading module.

Description of the change

checkwatches can update bug watches for different remote trackers in parallel. Currently that's implemented using threads via the threading module. This branch continues to use threads, but does so with a Twisted ThreadPool.

To make testing easier, and to make the design more elegant, it's possible to pass in a scheduler to BugWatchUpdate.updateBugTrackers(). Previously the scheduling policy was baked into this method.

Getting Twisted in there also opens the door for using more async code in the future.

A previous review <https://code.edge.launchpad.net/~allenap/launchpad/twisted-threading-bug-491870/+merge/18843> of this branch was approved, but significant changes have been added since. Essentially, the test suite hung when some of the tests that started the Twisted reactor ran in the same process as some job runner tests. The new prerequisite branch contains the necessary code to isolate these tests.

To post a comment you must log in.
Revision history for this message
Eleanor Berger (intellectronica) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt'
2--- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-17 11:19:25 +0000
3+++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2010-03-16 18:08:23 +0000
4@@ -34,7 +34,7 @@
5 >>> from lp.testing.factory import LaunchpadObjectFactory
6
7 >>> LaunchpadZopelessLayer.switchDbUser('launchpad')
8- >>> login('foo.bar@canonical.com')
9+ >>> login('foo.bar@canonical.com')
10 >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker(
11 ... 'http://example.com')
12 >>> bug_tracker.active = False
13
14=== modified file 'lib/lp/bugs/doc/externalbugtracker.txt'
15--- lib/lp/bugs/doc/externalbugtracker.txt 2010-02-24 03:49:45 +0000
16+++ lib/lp/bugs/doc/externalbugtracker.txt 2010-03-16 18:08:23 +0000
17@@ -1105,110 +1105,3 @@
18 the updateBugTrackers() method. This, too, takes a batch_size parameter, which
19 allows it to be passed as a command-line option when the checkwatches script
20 is run.
21-
22-Before going further, we must abort the current transaction to avoid
23-deadlock; updateBugTrackers() runs updateBugTracker() in a different
24-thread.
25-
26- >>> transaction.abort()
27-
28- >>> from canonical.launchpad.scripts.logger import FakeLogger
29- >>> bug_watch_updater = NonConnectingBugWatchUpdater(
30- ... transaction, FakeLogger())
31- >>> bug_watch_updater.updateBugTrackers(
32- ... bug_tracker_names=[standard_bugzilla.name], batch_size=2)
33- DEBUG Using a global batch size of 2
34- INFO Updating 2 watches for 2 bugs on http://example.com
35- initializeRemoteBugDB() called: [u'5', u'6']
36- getRemoteStatus() called: u'5'
37- getRemoteStatus() called: u'6'
38-
39- >>> # We should log in again because updateBugTrackers() logs out.
40- >>> login('test@canonical.com')
41-
42-By default, the updateBugTrackers() only spawns one thread, but it can
43-spawn as many as required.
44-
45- >>> import threading
46-
47- >>> class OutputFileForThreads:
48- ... def __init__(self):
49- ... self.output = {}
50- ... self.lock = threading.Lock()
51- ... def write(self, data):
52- ... thread_name = threading.currentThread().getName()
53- ... self.lock.acquire()
54- ... try:
55- ... if thread_name in self.output:
56- ... self.output[thread_name].append(data)
57- ... else:
58- ... self.output[thread_name] = [data]
59- ... finally:
60- ... self.lock.release()
61-
62- >>> output_file = OutputFileForThreads()
63-
64- >>> class ExternalBugTrackerForThreads(TestExternalBugTracker):
65- ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked):
66- ... print >> output_file, (
67- ... "getModifiedRemoteBugs(\n"
68- ... " remote_bug_ids=%r,\n"
69- ... " last_checked=%r)" % (remote_bug_ids, last_checked))
70- ... return [remote_bug_ids[0], remote_bug_ids[-1]]
71- ... def getRemoteStatus(self, bug_id):
72- ... print >> output_file, (
73- ... "getRemoteStatus(bug_id=%r)" % bug_id)
74- ... return 'UNKNOWN'
75- ... def getCurrentDBTime(self):
76- ... return None
77-
78- >>> class BugWatchUpdaterForThreads(BugWatchUpdater):
79- ... def _getExternalBugTrackersAndWatches(
80- ... self, bug_trackers, bug_watches):
81- ... return [(ExternalBugTrackerForThreads(), bug_watches)]
82-
83- >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads(
84- ... transaction, FakeLogger(output_file))
85- >>> threaded_bug_watch_updater.updateBugTrackers(
86- ... batch_size=5, num_threads=10)
87-
88-
89- >>> for thread_name in sorted(output_file.output):
90- ... print '== %s ==' % thread_name
91- ... print "".join(output_file.output[thread_name]),
92- == MainThread ==
93- DEBUG Using a global batch size of 5
94- DEBUG Skipping updating Ubuntu Bugzilla watches.
95- == auto-generic-string4.example.com ==
96- INFO Updating 5 watches for 5 bugs on http://example.com
97- getRemoteStatus(bug_id=u'1')
98- getRemoteStatus(bug_id=u'101')
99- getRemoteStatus(bug_id=u'5')
100- getRemoteStatus(bug_id=u'6')
101- getRemoteStatus(bug_id=u'7')
102- == debbugs ==
103- INFO Updating 5 watches for 5 bugs on http://example.com
104- getRemoteStatus(bug_id=u'280883')
105- getRemoteStatus(bug_id=u'304014')
106- getRemoteStatus(bug_id=u'308994')
107- getRemoteStatus(bug_id=u'327452')
108- getRemoteStatus(bug_id=u'327549')
109- == email ==
110- DEBUG No watches to update on mailto:bugs@example.com
111- == example-bugs ==
112- DEBUG No watches to update on http://bugs.example.com
113- == gnome-bugs ==
114- DEBUG No watches to update on http://bugzilla.gnome.org/
115- == gnome-bugzilla ==
116- INFO Updating 2 watches for 2 bugs on http://example.com
117- getRemoteStatus(bug_id=u'304070')
118- getRemoteStatus(bug_id=u'3224')
119- == mozilla.org ==
120- INFO Updating 4 watches for 3 bugs on http://example.com
121- getRemoteStatus(bug_id=u'123543')
122- getRemoteStatus(bug_id=u'2000')
123- getRemoteStatus(bug_id=u'42')
124- == savannah ==
125- DEBUG No watches to update on http://savannah.gnu.org/
126- == sf ==
127- DEBUG No watches to update on http://sourceforge.net/
128
129=== modified file 'lib/lp/bugs/scripts/checkwatches.py'
130--- lib/lp/bugs/scripts/checkwatches.py 2010-02-19 12:05:10 +0000
131+++ lib/lp/bugs/scripts/checkwatches.py 2010-03-16 18:08:23 +0000
132@@ -6,7 +6,6 @@
133
134 from copy import copy
135 from datetime import datetime, timedelta
136-import Queue as queue
137 import socket
138 import sys
139 import threading
140@@ -14,6 +13,11 @@
141
142 import pytz
143
144+from twisted.internet import reactor
145+from twisted.internet.defer import DeferredList
146+from twisted.internet.threads import deferToThreadPool
147+from twisted.python.threadpool import ThreadPool
148+
149 from zope.component import getUtility
150 from zope.event import notify
151
152@@ -271,44 +275,28 @@
153 self._logout()
154
155 def updateBugTrackers(
156- self, bug_tracker_names=None, batch_size=None, num_threads=1):
157+ self, bug_tracker_names=None, batch_size=None, scheduler=None):
158 """Update all the bug trackers that have watches pending.
159
160 If bug tracker names are specified in bug_tracker_names only
161 those bug trackers will be checked.
162
163- The updates are run in threads, so that long running updates
164- don't block progress. However, by default the number of
165- threads is 1, to help with testing.
166+ A custom scheduler can be passed in. This should inherit from
167+ `BaseScheduler`. If no scheduler is given, `SerialScheduler`
168+ will be used, which simply runs the jobs in order.
169 """
170 self.log.debug("Using a global batch size of %s" % batch_size)
171
172- # Put all the work on the queue. This is simpler than drip-feeding the
173- # queue, and avoids a situation where a worker thread exits because
174- # there's no work left and the feeding thread hasn't been scheduled to
175- # add work to the queue.
176- work = queue.Queue()
177+ # Default to using the very simple SerialScheduler.
178+ if scheduler is None:
179+ scheduler = SerialScheduler()
180+
181+ # Schedule all the jobs to run.
182 for updater in self._bugTrackerUpdaters(bug_tracker_names):
183- work.put(updater)
184-
185- # This will be run once in each worker thread.
186- def do_work():
187- while True:
188- try:
189- job = work.get(block=False)
190- except queue.Empty:
191- break
192- else:
193- job(batch_size)
194-
195- # Start and join the worker threads.
196- threads = []
197- for run in xrange(num_threads):
198- thread = threading.Thread(target=do_work)
199- thread.start()
200- threads.append(thread)
201- for thread in threads:
202- thread.join()
203+ scheduler.schedule(updater, batch_size)
204+
205+ # Run all the jobs.
206+ scheduler.run()
207
208 def updateBugTracker(self, bug_tracker, batch_size):
209 """Updates the given bug trackers's bug watches.
210@@ -1165,6 +1153,67 @@
211 self.log.error("%s (%s)" % (message, oops_info.oopsid))
212
213
214+class BaseScheduler:
215+ """Run jobs according to a policy."""
216+
217+ def schedule(self, func, *args, **kwargs):
218+ """Add a job to be run."""
219+ raise NotImplementedError(self.schedule)
220+
221+ def run(self):
222+ """Run the jobs."""
223+ raise NotImplementedError(self.run)
224+
225+
226+class SerialScheduler(BaseScheduler):
227+ """Run jobs in order, one at a time."""
228+
229+ def __init__(self):
230+ self._jobs = []
231+
232+ def schedule(self, func, *args, **kwargs):
233+ self._jobs.append((func, args, kwargs))
234+
235+ def run(self):
236+ jobs, self._jobs = self._jobs[:], []
237+ for (func, args, kwargs) in jobs:
238+ func(*args, **kwargs)
239+
240+
241+class TwistedThreadScheduler(BaseScheduler):
242+ """Run jobs in threads, chaperoned by Twisted."""
243+
244+ def __init__(self, num_threads, install_signal_handlers=True):
245+ """Create a new `TwistedThreadScheduler`.
246+
247+ :param num_threads: The number of threads to allocate to the
248+ thread pool.
249+ :type num_threads: int
250+
251+ :param install_signal_handlers: Whether the Twisted reactor
252+ should install signal handlers or not. This is intented for
253+ testing - set to False to avoid layer violations - but may
254+ be useful in other situations.
255+ :type install_signal_handlers: bool
256+ """
257+ self._thread_pool = ThreadPool(0, num_threads)
258+ self._install_signal_handlers = install_signal_handlers
259+ self._jobs = []
260+
261+ def schedule(self, func, *args, **kwargs):
262+ self._jobs.append(
263+ deferToThreadPool(
264+ reactor, self._thread_pool, func, *args, **kwargs))
265+
266+ def run(self):
267+ jobs, self._jobs = self._jobs[:], []
268+ jobs_done = DeferredList(jobs)
269+ jobs_done.addBoth(lambda ignore: self._thread_pool.stop())
270+ jobs_done.addBoth(lambda ignore: reactor.stop())
271+ reactor.callWhenRunning(self._thread_pool.start)
272+ reactor.run(self._install_signal_handlers)
273+
274+
275 class CheckWatchesCronScript(LaunchpadCronScript):
276
277 def add_my_options(self):
278@@ -1203,9 +1252,16 @@
279 else:
280 # Otherwise we just update those watches that need updating,
281 # and we let the BugWatchUpdater decide which those are.
282+ if self.options.jobs <= 1:
283+ # Use the default scheduler.
284+ scheduler = None
285+ else:
286+ # Run jobs in parallel.
287+ scheduler = TwistedThreadScheduler(self.options.jobs)
288 updater.updateBugTrackers(
289- self.options.bug_trackers, self.options.batch_size,
290- self.options.jobs)
291+ self.options.bug_trackers,
292+ self.options.batch_size,
293+ scheduler)
294
295 run_time = time.time() - start_time
296 self.logger.info("Time for this run: %.3f seconds." % run_time)
297
298=== modified file 'lib/lp/bugs/scripts/tests/test_checkwatches.py'
299--- lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-01-05 12:19:17 +0000
300+++ lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-03-16 18:08:23 +0000
301@@ -2,9 +2,13 @@
302 # GNU Affero General Public License version 3 (see the file LICENSE).
303 """Checkwatches unit tests."""
304
305+from __future__ import with_statement
306+
307 __metaclass__ = type
308
309+import threading
310 import unittest
311+
312 import transaction
313
314 from zope.component import getUtility
315@@ -19,11 +23,13 @@
316 from canonical.testing import LaunchpadZopelessLayer
317
318 from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI
319+from lp.bugs.interfaces.bugtracker import IBugTrackerSet
320 from lp.bugs.scripts import checkwatches
321-from lp.bugs.scripts.checkwatches import CheckWatchesErrorUtility
322+from lp.bugs.scripts.checkwatches import (
323+ BugWatchUpdater, CheckWatchesErrorUtility, TwistedThreadScheduler)
324 from lp.bugs.tests.externalbugtracker import (
325- TestBugzillaAPIXMLRPCTransport, new_bugtracker)
326-from lp.testing import TestCaseWithFactory
327+ TestBugzillaAPIXMLRPCTransport, TestExternalBugTracker, new_bugtracker)
328+from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess
329
330
331 def always_BugzillaAPI_get_external_bugtracker(bugtracker):
332@@ -202,5 +208,172 @@
333 self.bugtask_with_question.status.title))
334
335
336+class TestSchedulerBase:
337+
338+ def test_args_and_kwargs(self):
339+ def func(name, aptitude):
340+ self.failUnlessEqual("Robin Hood", name)
341+ self.failUnlessEqual("Riding through the glen", aptitude)
342+ # Positional args specified when adding a job are passed to
343+ # the job function at run time.
344+ self.scheduler.schedule(
345+ func, "Robin Hood", "Riding through the glen")
346+ # Keyword args specified when adding a job are passed to the
347+ # job function at run time.
348+ self.scheduler.schedule(
349+ func, name="Robin Hood", aptitude="Riding through the glen")
350+ # Positional and keyword args can both be specified.
351+ self.scheduler.schedule(
352+ func, "Robin Hood", aptitude="Riding through the glen")
353+ # Run everything.
354+ self.scheduler.run()
355+
356+
357+class TestSerialScheduler(TestSchedulerBase, unittest.TestCase):
358+ """Test SerialScheduler."""
359+
360+ def setUp(self):
361+ self.scheduler = checkwatches.SerialScheduler()
362+
363+ def test_ordering(self):
364+ # The numbers list will be emptied in the order we add jobs to
365+ # the scheduler.
366+ numbers = [1, 2, 3]
367+ # Remove 3 and check.
368+ self.scheduler.schedule(
369+ list.remove, numbers, 3)
370+ self.scheduler.schedule(
371+ lambda: self.failUnlessEqual([1, 2], numbers))
372+ # Remove 1 and check.
373+ self.scheduler.schedule(
374+ list.remove, numbers, 1)
375+ self.scheduler.schedule(
376+ lambda: self.failUnlessEqual([2], numbers))
377+ # Remove 2 and check.
378+ self.scheduler.schedule(
379+ list.remove, numbers, 2)
380+ self.scheduler.schedule(
381+ lambda: self.failUnlessEqual([], numbers))
382+ # Run the scheduler.
383+ self.scheduler.run()
384+
385+
386+class TestTwistedThreadScheduler(
387+ TestSchedulerBase, ZopeTestInSubProcess, unittest.TestCase):
388+ """Test TwistedThreadScheduler.
389+
390+ By default, updateBugTrackers() runs jobs serially, but a
391+ different scheduling policy can be plugged in. One such policy,
392+ for running several jobs in parallel, is TwistedThreadScheduler.
393+ """
394+
395+ def setUp(self):
396+ self.scheduler = checkwatches.TwistedThreadScheduler(
397+ num_threads=5, install_signal_handlers=False)
398+
399+
400+class OutputFileForThreads:
401+ """Collates writes according to thread name."""
402+
403+ def __init__(self):
404+ self.output = {}
405+ self.lock = threading.Lock()
406+
407+ def write(self, data):
408+ thread_name = threading.currentThread().getName()
409+ with self.lock:
410+ if thread_name in self.output:
411+ self.output[thread_name].append(data)
412+ else:
413+ self.output[thread_name] = [data]
414+
415+
416+class ExternalBugTrackerForThreads(TestExternalBugTracker):
417+ """Fake which records interesting activity to a file."""
418+
419+ def __init__(self, output_file):
420+ super(ExternalBugTrackerForThreads, self).__init__()
421+ self.output_file = output_file
422+
423+ def getRemoteStatus(self, bug_id):
424+ self.output_file.write("getRemoteStatus(bug_id=%r)" % bug_id)
425+ return 'UNKNOWN'
426+
427+ def getCurrentDBTime(self):
428+ return None
429+
430+
431+class BugWatchUpdaterForThreads(BugWatchUpdater):
432+ """Fake updater.
433+
434+ Plumbs an `ExternalBugTrackerForThreads` into a given output file,
435+ which is expected to be an instance of `OutputFileForThreads`, and
436+ suppresses normal log activity.
437+ """
438+
439+ def __init__(self, output_file):
440+ logger = QuietFakeLogger()
441+ super(BugWatchUpdaterForThreads, self).__init__(transaction, logger)
442+ self.output_file = output_file
443+
444+ def _getExternalBugTrackersAndWatches(self, bug_trackers, bug_watches):
445+ return [(ExternalBugTrackerForThreads(self.output_file), bug_watches)]
446+
447+
448+class TestTwistedThreadSchedulerInPlace(
449+ ZopeTestInSubProcess, TestCaseWithFactory):
450+ """Test TwistedThreadScheduler in place.
451+
452+ As in, driving as much of the bug watch machinery as is possible
453+ without making external connections.
454+ """
455+
456+ layer = LaunchpadZopelessLayer
457+
458+ def test(self):
459+ # Prepare test data.
460+ self.owner = self.factory.makePerson()
461+ self.trackers = [
462+ getUtility(IBugTrackerSet).ensureBugTracker(
463+ "http://butterscotch.example.com", self.owner,
464+ BugTrackerType.BUGZILLA, name="butterscotch"),
465+ getUtility(IBugTrackerSet).ensureBugTracker(
466+ "http://strawberry.example.com", self.owner,
467+ BugTrackerType.BUGZILLA, name="strawberry"),
468+ ]
469+ self.bug = self.factory.makeBug(owner=self.owner)
470+ for tracker in self.trackers:
471+ for num in (1, 2, 3):
472+ self.factory.makeBugWatch(
473+ "%s-%d" % (tracker.name, num),
474+ tracker, self.bug, self.owner)
475+ # Commit so that threads all see the same database state.
476+ transaction.commit()
477+ # Prepare the updater with the Twisted scheduler.
478+ output_file = OutputFileForThreads()
479+ threaded_bug_watch_updater = BugWatchUpdaterForThreads(output_file)
480+ threaded_bug_watch_scheduler = TwistedThreadScheduler(
481+ num_threads=10, install_signal_handlers=False)
482+ threaded_bug_watch_updater.updateBugTrackers(
483+ bug_tracker_names=[tracker.name for tracker in self.trackers],
484+ batch_size=5, scheduler=threaded_bug_watch_scheduler)
485+ # The thread names should match the tracker names.
486+ self.assertEqual(
487+ ['butterscotch', 'strawberry'], sorted(output_file.output))
488+ # Check that getRemoteStatus() was called.
489+ self.assertEqual(
490+ ["getRemoteStatus(bug_id=u'butterscotch-1')",
491+ "getRemoteStatus(bug_id=u'butterscotch-2')",
492+ "getRemoteStatus(bug_id=u'butterscotch-3')"],
493+ output_file.output['butterscotch']
494+ )
495+ self.assertEqual(
496+ ["getRemoteStatus(bug_id=u'strawberry-1')",
497+ "getRemoteStatus(bug_id=u'strawberry-2')",
498+ "getRemoteStatus(bug_id=u'strawberry-3')"],
499+ output_file.output['strawberry']
500+ )
501+
502+
503 def test_suite():
504 return unittest.TestLoader().loadTestsFromName(__name__)
505
506=== modified file 'lib/lp/scripts/utilities/importfascist.py'
507--- lib/lp/scripts/utilities/importfascist.py 2010-02-23 11:53:24 +0000
508+++ lib/lp/scripts/utilities/importfascist.py 2010-03-16 18:08:23 +0000
509@@ -62,6 +62,7 @@
510 'openid.fetchers': set(['Urllib2Fetcher']),
511 'storm.database': set(['STATE_DISCONNECTED']),
512 'textwrap': set(['dedent']),
513+ 'twisted.internet.threads': set(['deferToThreadPool']),
514 'zope.component': set(
515 ['adapter',
516 'ComponentLookupError',