Merge lp:~allenap/launchpad/twisted-threading-bug-491870 into lp:launchpad

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: no longer in the source branch.
Merged at revision: not available
Proposed branch: lp:~allenap/launchpad/twisted-threading-bug-491870
Merge into: lp:launchpad
Prerequisite: lp:~allenap/launchpad/isolate-tests
Diff against target: 516 lines (+266/-143)
5 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+1/-1)
lib/lp/bugs/doc/externalbugtracker.txt (+0/-107)
lib/lp/bugs/scripts/checkwatches.py (+88/-32)
lib/lp/bugs/scripts/tests/test_checkwatches.py (+176/-3)
lib/lp/scripts/utilities/importfascist.py (+1/-0)
To merge this branch: bzr merge lp:~allenap/launchpad/twisted-threading-bug-491870
Reviewer Review Type Date Requested Status
Eleanor Berger (community) code Approve
Review via email: mp+21376@code.launchpad.net

Commit message

In checkwatches, use Twisted to manage threading instead of the threading module.

Description of the change

checkwatches can update bug watches for different remote trackers in parallel. Currently that's implemented using threads via the threading module. This branch continues to use threads, but does so with a Twisted ThreadPool.

To make testing easier, and to make the design more elegant, it's possible to pass in a scheduler to BugWatchUpdate.updateBugTrackers(). Previously the scheduling policy was baked into this method.

Getting Twisted in there also opens the door for using more async code in the future.

A previous review <https://code.edge.launchpad.net/~allenap/launchpad/twisted-threading-bug-491870/+merge/18843> of this branch was approved, but significant changes have been added since. Essentially, the test suite hung when some of the tests that started the Twisted reactor ran in the same process as some job runner tests. The new prerequisite branch contains the necessary code to isolate these tests.

To post a comment you must log in.
Revision history for this message
Eleanor Berger (intellectronica) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt'
--- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-17 11:19:25 +0000
+++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2010-03-16 18:08:23 +0000
@@ -34,7 +34,7 @@
34 >>> from lp.testing.factory import LaunchpadObjectFactory34 >>> from lp.testing.factory import LaunchpadObjectFactory
3535
36 >>> LaunchpadZopelessLayer.switchDbUser('launchpad')36 >>> LaunchpadZopelessLayer.switchDbUser('launchpad')
37 >>> login('foo.bar@canonical.com')37 >>> login('foo.bar@canonical.com')
38 >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker(38 >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker(
39 ... 'http://example.com')39 ... 'http://example.com')
40 >>> bug_tracker.active = False40 >>> bug_tracker.active = False
4141
=== modified file 'lib/lp/bugs/doc/externalbugtracker.txt'
--- lib/lp/bugs/doc/externalbugtracker.txt 2010-02-24 03:49:45 +0000
+++ lib/lp/bugs/doc/externalbugtracker.txt 2010-03-16 18:08:23 +0000
@@ -1105,110 +1105,3 @@
1105the updateBugTrackers() method. This, too, takes a batch_size parameter, which1105the updateBugTrackers() method. This, too, takes a batch_size parameter, which
1106allows it to be passed as a command-line option when the checkwatches script1106allows it to be passed as a command-line option when the checkwatches script
1107is run.1107is run.
1108
1109Before going further, we must abort the current transaction to avoid
1110deadlock; updateBugTrackers() runs updateBugTracker() in a different
1111thread.
1112
1113 >>> transaction.abort()
1114
1115 >>> from canonical.launchpad.scripts.logger import FakeLogger
1116 >>> bug_watch_updater = NonConnectingBugWatchUpdater(
1117 ... transaction, FakeLogger())
1118 >>> bug_watch_updater.updateBugTrackers(
1119 ... bug_tracker_names=[standard_bugzilla.name], batch_size=2)
1120 DEBUG Using a global batch size of 2
1121 INFO Updating 2 watches for 2 bugs on http://example.com
1122 initializeRemoteBugDB() called: [u'5', u'6']
1123 getRemoteStatus() called: u'5'
1124 getRemoteStatus() called: u'6'
1125
1126 >>> # We should log in again because updateBugTrackers() logs out.
1127 >>> login('test@canonical.com')
1128
1129By default, the updateBugTrackers() only spawns one thread, but it can
1130spawn as many as required.
1131
1132 >>> import threading
1133
1134 >>> class OutputFileForThreads:
1135 ... def __init__(self):
1136 ... self.output = {}
1137 ... self.lock = threading.Lock()
1138 ... def write(self, data):
1139 ... thread_name = threading.currentThread().getName()
1140 ... self.lock.acquire()
1141 ... try:
1142 ... if thread_name in self.output:
1143 ... self.output[thread_name].append(data)
1144 ... else:
1145 ... self.output[thread_name] = [data]
1146 ... finally:
1147 ... self.lock.release()
1148
1149 >>> output_file = OutputFileForThreads()
1150
1151 >>> class ExternalBugTrackerForThreads(TestExternalBugTracker):
1152 ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked):
1153 ... print >> output_file, (
1154 ... "getModifiedRemoteBugs(\n"
1155 ... " remote_bug_ids=%r,\n"
1156 ... " last_checked=%r)" % (remote_bug_ids, last_checked))
1157 ... return [remote_bug_ids[0], remote_bug_ids[-1]]
1158 ... def getRemoteStatus(self, bug_id):
1159 ... print >> output_file, (
1160 ... "getRemoteStatus(bug_id=%r)" % bug_id)
1161 ... return 'UNKNOWN'
1162 ... def getCurrentDBTime(self):
1163 ... return None
1164
1165 >>> class BugWatchUpdaterForThreads(BugWatchUpdater):
1166 ... def _getExternalBugTrackersAndWatches(
1167 ... self, bug_trackers, bug_watches):
1168 ... return [(ExternalBugTrackerForThreads(), bug_watches)]
1169
1170 >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads(
1171 ... transaction, FakeLogger(output_file))
1172 >>> threaded_bug_watch_updater.updateBugTrackers(
1173 ... batch_size=5, num_threads=10)
1174
1175
1176 >>> for thread_name in sorted(output_file.output):
1177 ... print '== %s ==' % thread_name
1178 ... print "".join(output_file.output[thread_name]),
1179 == MainThread ==
1180 DEBUG Using a global batch size of 5
1181 DEBUG Skipping updating Ubuntu Bugzilla watches.
1182 == auto-generic-string4.example.com ==
1183 INFO Updating 5 watches for 5 bugs on http://example.com
1184 getRemoteStatus(bug_id=u'1')
1185 getRemoteStatus(bug_id=u'101')
1186 getRemoteStatus(bug_id=u'5')
1187 getRemoteStatus(bug_id=u'6')
1188 getRemoteStatus(bug_id=u'7')
1189 == debbugs ==
1190 INFO Updating 5 watches for 5 bugs on http://example.com
1191 getRemoteStatus(bug_id=u'280883')
1192 getRemoteStatus(bug_id=u'304014')
1193 getRemoteStatus(bug_id=u'308994')
1194 getRemoteStatus(bug_id=u'327452')
1195 getRemoteStatus(bug_id=u'327549')
1196 == email ==
1197 DEBUG No watches to update on mailto:bugs@example.com
1198 == example-bugs ==
1199 DEBUG No watches to update on http://bugs.example.com
1200 == gnome-bugs ==
1201 DEBUG No watches to update on http://bugzilla.gnome.org/
1202 == gnome-bugzilla ==
1203 INFO Updating 2 watches for 2 bugs on http://example.com
1204 getRemoteStatus(bug_id=u'304070')
1205 getRemoteStatus(bug_id=u'3224')
1206 == mozilla.org ==
1207 INFO Updating 4 watches for 3 bugs on http://example.com
1208 getRemoteStatus(bug_id=u'123543')
1209 getRemoteStatus(bug_id=u'2000')
1210 getRemoteStatus(bug_id=u'42')
1211 == savannah ==
1212 DEBUG No watches to update on http://savannah.gnu.org/
1213 == sf ==
1214 DEBUG No watches to update on http://sourceforge.net/
12151108
=== modified file 'lib/lp/bugs/scripts/checkwatches.py'
--- lib/lp/bugs/scripts/checkwatches.py 2010-02-19 12:05:10 +0000
+++ lib/lp/bugs/scripts/checkwatches.py 2010-03-16 18:08:23 +0000
@@ -6,7 +6,6 @@
66
7from copy import copy7from copy import copy
8from datetime import datetime, timedelta8from datetime import datetime, timedelta
9import Queue as queue
10import socket9import socket
11import sys10import sys
12import threading11import threading
@@ -14,6 +13,11 @@
1413
15import pytz14import pytz
1615
16from twisted.internet import reactor
17from twisted.internet.defer import DeferredList
18from twisted.internet.threads import deferToThreadPool
19from twisted.python.threadpool import ThreadPool
20
17from zope.component import getUtility21from zope.component import getUtility
18from zope.event import notify22from zope.event import notify
1923
@@ -271,44 +275,28 @@
271 self._logout()275 self._logout()
272276
273 def updateBugTrackers(277 def updateBugTrackers(
274 self, bug_tracker_names=None, batch_size=None, num_threads=1):278 self, bug_tracker_names=None, batch_size=None, scheduler=None):
275 """Update all the bug trackers that have watches pending.279 """Update all the bug trackers that have watches pending.
276280
277 If bug tracker names are specified in bug_tracker_names only281 If bug tracker names are specified in bug_tracker_names only
278 those bug trackers will be checked.282 those bug trackers will be checked.
279283
280 The updates are run in threads, so that long running updates284 A custom scheduler can be passed in. This should inherit from
281 don't block progress. However, by default the number of285 `BaseScheduler`. If no scheduler is given, `SerialScheduler`
282 threads is 1, to help with testing.286 will be used, which simply runs the jobs in order.
283 """287 """
284 self.log.debug("Using a global batch size of %s" % batch_size)288 self.log.debug("Using a global batch size of %s" % batch_size)
285289
286 # Put all the work on the queue. This is simpler than drip-feeding the290 # Default to using the very simple SerialScheduler.
287 # queue, and avoids a situation where a worker thread exits because291 if scheduler is None:
288 # there's no work left and the feeding thread hasn't been scheduled to292 scheduler = SerialScheduler()
289 # add work to the queue.293
290 work = queue.Queue()294 # Schedule all the jobs to run.
291 for updater in self._bugTrackerUpdaters(bug_tracker_names):295 for updater in self._bugTrackerUpdaters(bug_tracker_names):
292 work.put(updater)296 scheduler.schedule(updater, batch_size)
293297
294 # This will be run once in each worker thread.298 # Run all the jobs.
295 def do_work():299 scheduler.run()
296 while True:
297 try:
298 job = work.get(block=False)
299 except queue.Empty:
300 break
301 else:
302 job(batch_size)
303
304 # Start and join the worker threads.
305 threads = []
306 for run in xrange(num_threads):
307 thread = threading.Thread(target=do_work)
308 thread.start()
309 threads.append(thread)
310 for thread in threads:
311 thread.join()
312300
313 def updateBugTracker(self, bug_tracker, batch_size):301 def updateBugTracker(self, bug_tracker, batch_size):
314 """Updates the given bug trackers's bug watches.302 """Updates the given bug trackers's bug watches.
@@ -1165,6 +1153,67 @@
1165 self.log.error("%s (%s)" % (message, oops_info.oopsid))1153 self.log.error("%s (%s)" % (message, oops_info.oopsid))
11661154
11671155
1156class BaseScheduler:
1157 """Run jobs according to a policy."""
1158
1159 def schedule(self, func, *args, **kwargs):
1160 """Add a job to be run."""
1161 raise NotImplementedError(self.schedule)
1162
1163 def run(self):
1164 """Run the jobs."""
1165 raise NotImplementedError(self.run)
1166
1167
1168class SerialScheduler(BaseScheduler):
1169 """Run jobs in order, one at a time."""
1170
1171 def __init__(self):
1172 self._jobs = []
1173
1174 def schedule(self, func, *args, **kwargs):
1175 self._jobs.append((func, args, kwargs))
1176
1177 def run(self):
1178 jobs, self._jobs = self._jobs[:], []
1179 for (func, args, kwargs) in jobs:
1180 func(*args, **kwargs)
1181
1182
1183class TwistedThreadScheduler(BaseScheduler):
1184 """Run jobs in threads, chaperoned by Twisted."""
1185
1186 def __init__(self, num_threads, install_signal_handlers=True):
1187 """Create a new `TwistedThreadScheduler`.
1188
1189 :param num_threads: The number of threads to allocate to the
1190 thread pool.
1191 :type num_threads: int
1192
1193 :param install_signal_handlers: Whether the Twisted reactor
1194 should install signal handlers or not. This is intented for
1195 testing - set to False to avoid layer violations - but may
1196 be useful in other situations.
1197 :type install_signal_handlers: bool
1198 """
1199 self._thread_pool = ThreadPool(0, num_threads)
1200 self._install_signal_handlers = install_signal_handlers
1201 self._jobs = []
1202
1203 def schedule(self, func, *args, **kwargs):
1204 self._jobs.append(
1205 deferToThreadPool(
1206 reactor, self._thread_pool, func, *args, **kwargs))
1207
1208 def run(self):
1209 jobs, self._jobs = self._jobs[:], []
1210 jobs_done = DeferredList(jobs)
1211 jobs_done.addBoth(lambda ignore: self._thread_pool.stop())
1212 jobs_done.addBoth(lambda ignore: reactor.stop())
1213 reactor.callWhenRunning(self._thread_pool.start)
1214 reactor.run(self._install_signal_handlers)
1215
1216
1168class CheckWatchesCronScript(LaunchpadCronScript):1217class CheckWatchesCronScript(LaunchpadCronScript):
11691218
1170 def add_my_options(self):1219 def add_my_options(self):
@@ -1203,9 +1252,16 @@
1203 else:1252 else:
1204 # Otherwise we just update those watches that need updating,1253 # Otherwise we just update those watches that need updating,
1205 # and we let the BugWatchUpdater decide which those are.1254 # and we let the BugWatchUpdater decide which those are.
1255 if self.options.jobs <= 1:
1256 # Use the default scheduler.
1257 scheduler = None
1258 else:
1259 # Run jobs in parallel.
1260 scheduler = TwistedThreadScheduler(self.options.jobs)
1206 updater.updateBugTrackers(1261 updater.updateBugTrackers(
1207 self.options.bug_trackers, self.options.batch_size,1262 self.options.bug_trackers,
1208 self.options.jobs)1263 self.options.batch_size,
1264 scheduler)
12091265
1210 run_time = time.time() - start_time1266 run_time = time.time() - start_time
1211 self.logger.info("Time for this run: %.3f seconds." % run_time)1267 self.logger.info("Time for this run: %.3f seconds." % run_time)
12121268
=== modified file 'lib/lp/bugs/scripts/tests/test_checkwatches.py'
--- lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-01-05 12:19:17 +0000
+++ lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-03-16 18:08:23 +0000
@@ -2,9 +2,13 @@
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
3"""Checkwatches unit tests."""3"""Checkwatches unit tests."""
44
5from __future__ import with_statement
6
5__metaclass__ = type7__metaclass__ = type
68
9import threading
7import unittest10import unittest
11
8import transaction12import transaction
913
10from zope.component import getUtility14from zope.component import getUtility
@@ -19,11 +23,13 @@
19from canonical.testing import LaunchpadZopelessLayer23from canonical.testing import LaunchpadZopelessLayer
2024
21from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI25from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI
26from lp.bugs.interfaces.bugtracker import IBugTrackerSet
22from lp.bugs.scripts import checkwatches27from lp.bugs.scripts import checkwatches
23from lp.bugs.scripts.checkwatches import CheckWatchesErrorUtility28from lp.bugs.scripts.checkwatches import (
29 BugWatchUpdater, CheckWatchesErrorUtility, TwistedThreadScheduler)
24from lp.bugs.tests.externalbugtracker import (30from lp.bugs.tests.externalbugtracker import (
25 TestBugzillaAPIXMLRPCTransport, new_bugtracker)31 TestBugzillaAPIXMLRPCTransport, TestExternalBugTracker, new_bugtracker)
26from lp.testing import TestCaseWithFactory32from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess
2733
2834
29def always_BugzillaAPI_get_external_bugtracker(bugtracker):35def always_BugzillaAPI_get_external_bugtracker(bugtracker):
@@ -202,5 +208,172 @@
202 self.bugtask_with_question.status.title))208 self.bugtask_with_question.status.title))
203209
204210
211class TestSchedulerBase:
212
213 def test_args_and_kwargs(self):
214 def func(name, aptitude):
215 self.failUnlessEqual("Robin Hood", name)
216 self.failUnlessEqual("Riding through the glen", aptitude)
217 # Positional args specified when adding a job are passed to
218 # the job function at run time.
219 self.scheduler.schedule(
220 func, "Robin Hood", "Riding through the glen")
221 # Keyword args specified when adding a job are passed to the
222 # job function at run time.
223 self.scheduler.schedule(
224 func, name="Robin Hood", aptitude="Riding through the glen")
225 # Positional and keyword args can both be specified.
226 self.scheduler.schedule(
227 func, "Robin Hood", aptitude="Riding through the glen")
228 # Run everything.
229 self.scheduler.run()
230
231
232class TestSerialScheduler(TestSchedulerBase, unittest.TestCase):
233 """Test SerialScheduler."""
234
235 def setUp(self):
236 self.scheduler = checkwatches.SerialScheduler()
237
238 def test_ordering(self):
239 # The numbers list will be emptied in the order we add jobs to
240 # the scheduler.
241 numbers = [1, 2, 3]
242 # Remove 3 and check.
243 self.scheduler.schedule(
244 list.remove, numbers, 3)
245 self.scheduler.schedule(
246 lambda: self.failUnlessEqual([1, 2], numbers))
247 # Remove 1 and check.
248 self.scheduler.schedule(
249 list.remove, numbers, 1)
250 self.scheduler.schedule(
251 lambda: self.failUnlessEqual([2], numbers))
252 # Remove 2 and check.
253 self.scheduler.schedule(
254 list.remove, numbers, 2)
255 self.scheduler.schedule(
256 lambda: self.failUnlessEqual([], numbers))
257 # Run the scheduler.
258 self.scheduler.run()
259
260
261class TestTwistedThreadScheduler(
262 TestSchedulerBase, ZopeTestInSubProcess, unittest.TestCase):
263 """Test TwistedThreadScheduler.
264
265 By default, updateBugTrackers() runs jobs serially, but a
266 different scheduling policy can be plugged in. One such policy,
267 for running several jobs in parallel, is TwistedThreadScheduler.
268 """
269
270 def setUp(self):
271 self.scheduler = checkwatches.TwistedThreadScheduler(
272 num_threads=5, install_signal_handlers=False)
273
274
275class OutputFileForThreads:
276 """Collates writes according to thread name."""
277
278 def __init__(self):
279 self.output = {}
280 self.lock = threading.Lock()
281
282 def write(self, data):
283 thread_name = threading.currentThread().getName()
284 with self.lock:
285 if thread_name in self.output:
286 self.output[thread_name].append(data)
287 else:
288 self.output[thread_name] = [data]
289
290
291class ExternalBugTrackerForThreads(TestExternalBugTracker):
292 """Fake which records interesting activity to a file."""
293
294 def __init__(self, output_file):
295 super(ExternalBugTrackerForThreads, self).__init__()
296 self.output_file = output_file
297
298 def getRemoteStatus(self, bug_id):
299 self.output_file.write("getRemoteStatus(bug_id=%r)" % bug_id)
300 return 'UNKNOWN'
301
302 def getCurrentDBTime(self):
303 return None
304
305
306class BugWatchUpdaterForThreads(BugWatchUpdater):
307 """Fake updater.
308
309 Plumbs an `ExternalBugTrackerForThreads` into a given output file,
310 which is expected to be an instance of `OutputFileForThreads`, and
311 suppresses normal log activity.
312 """
313
314 def __init__(self, output_file):
315 logger = QuietFakeLogger()
316 super(BugWatchUpdaterForThreads, self).__init__(transaction, logger)
317 self.output_file = output_file
318
319 def _getExternalBugTrackersAndWatches(self, bug_trackers, bug_watches):
320 return [(ExternalBugTrackerForThreads(self.output_file), bug_watches)]
321
322
323class TestTwistedThreadSchedulerInPlace(
324 ZopeTestInSubProcess, TestCaseWithFactory):
325 """Test TwistedThreadScheduler in place.
326
327 As in, driving as much of the bug watch machinery as is possible
328 without making external connections.
329 """
330
331 layer = LaunchpadZopelessLayer
332
333 def test(self):
334 # Prepare test data.
335 self.owner = self.factory.makePerson()
336 self.trackers = [
337 getUtility(IBugTrackerSet).ensureBugTracker(
338 "http://butterscotch.example.com", self.owner,
339 BugTrackerType.BUGZILLA, name="butterscotch"),
340 getUtility(IBugTrackerSet).ensureBugTracker(
341 "http://strawberry.example.com", self.owner,
342 BugTrackerType.BUGZILLA, name="strawberry"),
343 ]
344 self.bug = self.factory.makeBug(owner=self.owner)
345 for tracker in self.trackers:
346 for num in (1, 2, 3):
347 self.factory.makeBugWatch(
348 "%s-%d" % (tracker.name, num),
349 tracker, self.bug, self.owner)
350 # Commit so that threads all see the same database state.
351 transaction.commit()
352 # Prepare the updater with the Twisted scheduler.
353 output_file = OutputFileForThreads()
354 threaded_bug_watch_updater = BugWatchUpdaterForThreads(output_file)
355 threaded_bug_watch_scheduler = TwistedThreadScheduler(
356 num_threads=10, install_signal_handlers=False)
357 threaded_bug_watch_updater.updateBugTrackers(
358 bug_tracker_names=[tracker.name for tracker in self.trackers],
359 batch_size=5, scheduler=threaded_bug_watch_scheduler)
360 # The thread names should match the tracker names.
361 self.assertEqual(
362 ['butterscotch', 'strawberry'], sorted(output_file.output))
363 # Check that getRemoteStatus() was called.
364 self.assertEqual(
365 ["getRemoteStatus(bug_id=u'butterscotch-1')",
366 "getRemoteStatus(bug_id=u'butterscotch-2')",
367 "getRemoteStatus(bug_id=u'butterscotch-3')"],
368 output_file.output['butterscotch']
369 )
370 self.assertEqual(
371 ["getRemoteStatus(bug_id=u'strawberry-1')",
372 "getRemoteStatus(bug_id=u'strawberry-2')",
373 "getRemoteStatus(bug_id=u'strawberry-3')"],
374 output_file.output['strawberry']
375 )
376
377
205def test_suite():378def test_suite():
206 return unittest.TestLoader().loadTestsFromName(__name__)379 return unittest.TestLoader().loadTestsFromName(__name__)
207380
=== modified file 'lib/lp/scripts/utilities/importfascist.py'
--- lib/lp/scripts/utilities/importfascist.py 2010-02-23 11:53:24 +0000
+++ lib/lp/scripts/utilities/importfascist.py 2010-03-16 18:08:23 +0000
@@ -62,6 +62,7 @@
62 'openid.fetchers': set(['Urllib2Fetcher']),62 'openid.fetchers': set(['Urllib2Fetcher']),
63 'storm.database': set(['STATE_DISCONNECTED']),63 'storm.database': set(['STATE_DISCONNECTED']),
64 'textwrap': set(['dedent']),64 'textwrap': set(['dedent']),
65 'twisted.internet.threads': set(['deferToThreadPool']),
65 'zope.component': set(66 'zope.component': set(
66 ['adapter',67 ['adapter',
67 'ComponentLookupError',68 'ComponentLookupError',