Merge lp:~allenap/launchpad/twisted-threading-bug-491870 into lp:launchpad
- twisted-threading-bug-491870
- Merge into devel
Status: | Merged |
---|---|
Approved by: | Gavin Panella |
Approved revision: | no longer in the source branch. |
Merged at revision: | not available |
Proposed branch: | lp:~allenap/launchpad/twisted-threading-bug-491870 |
Merge into: | lp:launchpad |
Prerequisite: | lp:~allenap/launchpad/isolate-tests |
Diff against target: |
516 lines (+266/-143) 5 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+1/-1) lib/lp/bugs/doc/externalbugtracker.txt (+0/-107) lib/lp/bugs/scripts/checkwatches.py (+88/-32) lib/lp/bugs/scripts/tests/test_checkwatches.py (+176/-3) lib/lp/scripts/utilities/importfascist.py (+1/-0) |
To merge this branch: | bzr merge lp:~allenap/launchpad/twisted-threading-bug-491870 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Eleanor Berger (community) | code | Approve | |
Review via email: mp+21376@code.launchpad.net |
Commit message
In checkwatches, use Twisted to manage threading instead of the threading module.
Description of the change
checkwatches can update bug watches for different remote trackers in parallel. Currently that's implemented using threads via the threading module. This branch continues to use threads, but does so with a Twisted ThreadPool.
To make testing easier, and to make the design more elegant, it's possible to pass in a scheduler to BugWatchUpdate.
Getting Twisted in there also opens the door for using more async code in the future.
A previous review <https:/
Eleanor Berger (intellectronica) : | # |
Preview Diff
1 | === modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt' | |||
2 | --- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-17 11:19:25 +0000 | |||
3 | +++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2010-03-16 18:08:23 +0000 | |||
4 | @@ -34,7 +34,7 @@ | |||
5 | 34 | >>> from lp.testing.factory import LaunchpadObjectFactory | 34 | >>> from lp.testing.factory import LaunchpadObjectFactory |
6 | 35 | 35 | ||
7 | 36 | >>> LaunchpadZopelessLayer.switchDbUser('launchpad') | 36 | >>> LaunchpadZopelessLayer.switchDbUser('launchpad') |
9 | 37 | >>> login('foo.bar@canonical.com') | 37 | >>> login('foo.bar@canonical.com') |
10 | 38 | >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker( | 38 | >>> bug_tracker = LaunchpadObjectFactory().makeBugTracker( |
11 | 39 | ... 'http://example.com') | 39 | ... 'http://example.com') |
12 | 40 | >>> bug_tracker.active = False | 40 | >>> bug_tracker.active = False |
13 | 41 | 41 | ||
14 | === modified file 'lib/lp/bugs/doc/externalbugtracker.txt' | |||
15 | --- lib/lp/bugs/doc/externalbugtracker.txt 2010-02-24 03:49:45 +0000 | |||
16 | +++ lib/lp/bugs/doc/externalbugtracker.txt 2010-03-16 18:08:23 +0000 | |||
17 | @@ -1105,110 +1105,3 @@ | |||
18 | 1105 | the updateBugTrackers() method. This, too, takes a batch_size parameter, which | 1105 | the updateBugTrackers() method. This, too, takes a batch_size parameter, which |
19 | 1106 | allows it to be passed as a command-line option when the checkwatches script | 1106 | allows it to be passed as a command-line option when the checkwatches script |
20 | 1107 | is run. | 1107 | is run. |
21 | 1108 | |||
22 | 1109 | Before going further, we must abort the current transaction to avoid | ||
23 | 1110 | deadlock; updateBugTrackers() runs updateBugTracker() in a different | ||
24 | 1111 | thread. | ||
25 | 1112 | |||
26 | 1113 | >>> transaction.abort() | ||
27 | 1114 | |||
28 | 1115 | >>> from canonical.launchpad.scripts.logger import FakeLogger | ||
29 | 1116 | >>> bug_watch_updater = NonConnectingBugWatchUpdater( | ||
30 | 1117 | ... transaction, FakeLogger()) | ||
31 | 1118 | >>> bug_watch_updater.updateBugTrackers( | ||
32 | 1119 | ... bug_tracker_names=[standard_bugzilla.name], batch_size=2) | ||
33 | 1120 | DEBUG Using a global batch size of 2 | ||
34 | 1121 | INFO Updating 2 watches for 2 bugs on http://example.com | ||
35 | 1122 | initializeRemoteBugDB() called: [u'5', u'6'] | ||
36 | 1123 | getRemoteStatus() called: u'5' | ||
37 | 1124 | getRemoteStatus() called: u'6' | ||
38 | 1125 | |||
39 | 1126 | >>> # We should log in again because updateBugTrackers() logs out. | ||
40 | 1127 | >>> login('test@canonical.com') | ||
41 | 1128 | |||
42 | 1129 | By default, the updateBugTrackers() only spawns one thread, but it can | ||
43 | 1130 | spawn as many as required. | ||
44 | 1131 | |||
45 | 1132 | >>> import threading | ||
46 | 1133 | |||
47 | 1134 | >>> class OutputFileForThreads: | ||
48 | 1135 | ... def __init__(self): | ||
49 | 1136 | ... self.output = {} | ||
50 | 1137 | ... self.lock = threading.Lock() | ||
51 | 1138 | ... def write(self, data): | ||
52 | 1139 | ... thread_name = threading.currentThread().getName() | ||
53 | 1140 | ... self.lock.acquire() | ||
54 | 1141 | ... try: | ||
55 | 1142 | ... if thread_name in self.output: | ||
56 | 1143 | ... self.output[thread_name].append(data) | ||
57 | 1144 | ... else: | ||
58 | 1145 | ... self.output[thread_name] = [data] | ||
59 | 1146 | ... finally: | ||
60 | 1147 | ... self.lock.release() | ||
61 | 1148 | |||
62 | 1149 | >>> output_file = OutputFileForThreads() | ||
63 | 1150 | |||
64 | 1151 | >>> class ExternalBugTrackerForThreads(TestExternalBugTracker): | ||
65 | 1152 | ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked): | ||
66 | 1153 | ... print >> output_file, ( | ||
67 | 1154 | ... "getModifiedRemoteBugs(\n" | ||
68 | 1155 | ... " remote_bug_ids=%r,\n" | ||
69 | 1156 | ... " last_checked=%r)" % (remote_bug_ids, last_checked)) | ||
70 | 1157 | ... return [remote_bug_ids[0], remote_bug_ids[-1]] | ||
71 | 1158 | ... def getRemoteStatus(self, bug_id): | ||
72 | 1159 | ... print >> output_file, ( | ||
73 | 1160 | ... "getRemoteStatus(bug_id=%r)" % bug_id) | ||
74 | 1161 | ... return 'UNKNOWN' | ||
75 | 1162 | ... def getCurrentDBTime(self): | ||
76 | 1163 | ... return None | ||
77 | 1164 | |||
78 | 1165 | >>> class BugWatchUpdaterForThreads(BugWatchUpdater): | ||
79 | 1166 | ... def _getExternalBugTrackersAndWatches( | ||
80 | 1167 | ... self, bug_trackers, bug_watches): | ||
81 | 1168 | ... return [(ExternalBugTrackerForThreads(), bug_watches)] | ||
82 | 1169 | |||
83 | 1170 | >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads( | ||
84 | 1171 | ... transaction, FakeLogger(output_file)) | ||
85 | 1172 | >>> threaded_bug_watch_updater.updateBugTrackers( | ||
86 | 1173 | ... batch_size=5, num_threads=10) | ||
87 | 1174 | |||
88 | 1175 | |||
89 | 1176 | >>> for thread_name in sorted(output_file.output): | ||
90 | 1177 | ... print '== %s ==' % thread_name | ||
91 | 1178 | ... print "".join(output_file.output[thread_name]), | ||
92 | 1179 | == MainThread == | ||
93 | 1180 | DEBUG Using a global batch size of 5 | ||
94 | 1181 | DEBUG Skipping updating Ubuntu Bugzilla watches. | ||
95 | 1182 | == auto-generic-string4.example.com == | ||
96 | 1183 | INFO Updating 5 watches for 5 bugs on http://example.com | ||
97 | 1184 | getRemoteStatus(bug_id=u'1') | ||
98 | 1185 | getRemoteStatus(bug_id=u'101') | ||
99 | 1186 | getRemoteStatus(bug_id=u'5') | ||
100 | 1187 | getRemoteStatus(bug_id=u'6') | ||
101 | 1188 | getRemoteStatus(bug_id=u'7') | ||
102 | 1189 | == debbugs == | ||
103 | 1190 | INFO Updating 5 watches for 5 bugs on http://example.com | ||
104 | 1191 | getRemoteStatus(bug_id=u'280883') | ||
105 | 1192 | getRemoteStatus(bug_id=u'304014') | ||
106 | 1193 | getRemoteStatus(bug_id=u'308994') | ||
107 | 1194 | getRemoteStatus(bug_id=u'327452') | ||
108 | 1195 | getRemoteStatus(bug_id=u'327549') | ||
109 | 1196 | == email == | ||
110 | 1197 | DEBUG No watches to update on mailto:bugs@example.com | ||
111 | 1198 | == example-bugs == | ||
112 | 1199 | DEBUG No watches to update on http://bugs.example.com | ||
113 | 1200 | == gnome-bugs == | ||
114 | 1201 | DEBUG No watches to update on http://bugzilla.gnome.org/ | ||
115 | 1202 | == gnome-bugzilla == | ||
116 | 1203 | INFO Updating 2 watches for 2 bugs on http://example.com | ||
117 | 1204 | getRemoteStatus(bug_id=u'304070') | ||
118 | 1205 | getRemoteStatus(bug_id=u'3224') | ||
119 | 1206 | == mozilla.org == | ||
120 | 1207 | INFO Updating 4 watches for 3 bugs on http://example.com | ||
121 | 1208 | getRemoteStatus(bug_id=u'123543') | ||
122 | 1209 | getRemoteStatus(bug_id=u'2000') | ||
123 | 1210 | getRemoteStatus(bug_id=u'42') | ||
124 | 1211 | == savannah == | ||
125 | 1212 | DEBUG No watches to update on http://savannah.gnu.org/ | ||
126 | 1213 | == sf == | ||
127 | 1214 | DEBUG No watches to update on http://sourceforge.net/ | ||
128 | 1215 | 1108 | ||
129 | === modified file 'lib/lp/bugs/scripts/checkwatches.py' | |||
130 | --- lib/lp/bugs/scripts/checkwatches.py 2010-02-19 12:05:10 +0000 | |||
131 | +++ lib/lp/bugs/scripts/checkwatches.py 2010-03-16 18:08:23 +0000 | |||
132 | @@ -6,7 +6,6 @@ | |||
133 | 6 | 6 | ||
134 | 7 | from copy import copy | 7 | from copy import copy |
135 | 8 | from datetime import datetime, timedelta | 8 | from datetime import datetime, timedelta |
136 | 9 | import Queue as queue | ||
137 | 10 | import socket | 9 | import socket |
138 | 11 | import sys | 10 | import sys |
139 | 12 | import threading | 11 | import threading |
140 | @@ -14,6 +13,11 @@ | |||
141 | 14 | 13 | ||
142 | 15 | import pytz | 14 | import pytz |
143 | 16 | 15 | ||
144 | 16 | from twisted.internet import reactor | ||
145 | 17 | from twisted.internet.defer import DeferredList | ||
146 | 18 | from twisted.internet.threads import deferToThreadPool | ||
147 | 19 | from twisted.python.threadpool import ThreadPool | ||
148 | 20 | |||
149 | 17 | from zope.component import getUtility | 21 | from zope.component import getUtility |
150 | 18 | from zope.event import notify | 22 | from zope.event import notify |
151 | 19 | 23 | ||
152 | @@ -271,44 +275,28 @@ | |||
153 | 271 | self._logout() | 275 | self._logout() |
154 | 272 | 276 | ||
155 | 273 | def updateBugTrackers( | 277 | def updateBugTrackers( |
157 | 274 | self, bug_tracker_names=None, batch_size=None, num_threads=1): | 278 | self, bug_tracker_names=None, batch_size=None, scheduler=None): |
158 | 275 | """Update all the bug trackers that have watches pending. | 279 | """Update all the bug trackers that have watches pending. |
159 | 276 | 280 | ||
160 | 277 | If bug tracker names are specified in bug_tracker_names only | 281 | If bug tracker names are specified in bug_tracker_names only |
161 | 278 | those bug trackers will be checked. | 282 | those bug trackers will be checked. |
162 | 279 | 283 | ||
166 | 280 | The updates are run in threads, so that long running updates | 284 | A custom scheduler can be passed in. This should inherit from |
167 | 281 | don't block progress. However, by default the number of | 285 | `BaseScheduler`. If no scheduler is given, `SerialScheduler` |
168 | 282 | threads is 1, to help with testing. | 286 | will be used, which simply runs the jobs in order. |
169 | 283 | """ | 287 | """ |
170 | 284 | self.log.debug("Using a global batch size of %s" % batch_size) | 288 | self.log.debug("Using a global batch size of %s" % batch_size) |
171 | 285 | 289 | ||
177 | 286 | # Put all the work on the queue. This is simpler than drip-feeding the | 290 | # Default to using the very simple SerialScheduler. |
178 | 287 | # queue, and avoids a situation where a worker thread exits because | 291 | if scheduler is None: |
179 | 288 | # there's no work left and the feeding thread hasn't been scheduled to | 292 | scheduler = SerialScheduler() |
180 | 289 | # add work to the queue. | 293 | |
181 | 290 | work = queue.Queue() | 294 | # Schedule all the jobs to run. |
182 | 291 | for updater in self._bugTrackerUpdaters(bug_tracker_names): | 295 | for updater in self._bugTrackerUpdaters(bug_tracker_names): |
203 | 292 | work.put(updater) | 296 | scheduler.schedule(updater, batch_size) |
204 | 293 | 297 | ||
205 | 294 | # This will be run once in each worker thread. | 298 | # Run all the jobs. |
206 | 295 | def do_work(): | 299 | scheduler.run() |
187 | 296 | while True: | ||
188 | 297 | try: | ||
189 | 298 | job = work.get(block=False) | ||
190 | 299 | except queue.Empty: | ||
191 | 300 | break | ||
192 | 301 | else: | ||
193 | 302 | job(batch_size) | ||
194 | 303 | |||
195 | 304 | # Start and join the worker threads. | ||
196 | 305 | threads = [] | ||
197 | 306 | for run in xrange(num_threads): | ||
198 | 307 | thread = threading.Thread(target=do_work) | ||
199 | 308 | thread.start() | ||
200 | 309 | threads.append(thread) | ||
201 | 310 | for thread in threads: | ||
202 | 311 | thread.join() | ||
207 | 312 | 300 | ||
208 | 313 | def updateBugTracker(self, bug_tracker, batch_size): | 301 | def updateBugTracker(self, bug_tracker, batch_size): |
209 | 314 | """Updates the given bug trackers's bug watches. | 302 | """Updates the given bug trackers's bug watches. |
210 | @@ -1165,6 +1153,67 @@ | |||
211 | 1165 | self.log.error("%s (%s)" % (message, oops_info.oopsid)) | 1153 | self.log.error("%s (%s)" % (message, oops_info.oopsid)) |
212 | 1166 | 1154 | ||
213 | 1167 | 1155 | ||
214 | 1156 | class BaseScheduler: | ||
215 | 1157 | """Run jobs according to a policy.""" | ||
216 | 1158 | |||
217 | 1159 | def schedule(self, func, *args, **kwargs): | ||
218 | 1160 | """Add a job to be run.""" | ||
219 | 1161 | raise NotImplementedError(self.schedule) | ||
220 | 1162 | |||
221 | 1163 | def run(self): | ||
222 | 1164 | """Run the jobs.""" | ||
223 | 1165 | raise NotImplementedError(self.run) | ||
224 | 1166 | |||
225 | 1167 | |||
226 | 1168 | class SerialScheduler(BaseScheduler): | ||
227 | 1169 | """Run jobs in order, one at a time.""" | ||
228 | 1170 | |||
229 | 1171 | def __init__(self): | ||
230 | 1172 | self._jobs = [] | ||
231 | 1173 | |||
232 | 1174 | def schedule(self, func, *args, **kwargs): | ||
233 | 1175 | self._jobs.append((func, args, kwargs)) | ||
234 | 1176 | |||
235 | 1177 | def run(self): | ||
236 | 1178 | jobs, self._jobs = self._jobs[:], [] | ||
237 | 1179 | for (func, args, kwargs) in jobs: | ||
238 | 1180 | func(*args, **kwargs) | ||
239 | 1181 | |||
240 | 1182 | |||
241 | 1183 | class TwistedThreadScheduler(BaseScheduler): | ||
242 | 1184 | """Run jobs in threads, chaperoned by Twisted.""" | ||
243 | 1185 | |||
244 | 1186 | def __init__(self, num_threads, install_signal_handlers=True): | ||
245 | 1187 | """Create a new `TwistedThreadScheduler`. | ||
246 | 1188 | |||
247 | 1189 | :param num_threads: The number of threads to allocate to the | ||
248 | 1190 | thread pool. | ||
249 | 1191 | :type num_threads: int | ||
250 | 1192 | |||
251 | 1193 | :param install_signal_handlers: Whether the Twisted reactor | ||
252 | 1194 | should install signal handlers or not. This is intented for | ||
253 | 1195 | testing - set to False to avoid layer violations - but may | ||
254 | 1196 | be useful in other situations. | ||
255 | 1197 | :type install_signal_handlers: bool | ||
256 | 1198 | """ | ||
257 | 1199 | self._thread_pool = ThreadPool(0, num_threads) | ||
258 | 1200 | self._install_signal_handlers = install_signal_handlers | ||
259 | 1201 | self._jobs = [] | ||
260 | 1202 | |||
261 | 1203 | def schedule(self, func, *args, **kwargs): | ||
262 | 1204 | self._jobs.append( | ||
263 | 1205 | deferToThreadPool( | ||
264 | 1206 | reactor, self._thread_pool, func, *args, **kwargs)) | ||
265 | 1207 | |||
266 | 1208 | def run(self): | ||
267 | 1209 | jobs, self._jobs = self._jobs[:], [] | ||
268 | 1210 | jobs_done = DeferredList(jobs) | ||
269 | 1211 | jobs_done.addBoth(lambda ignore: self._thread_pool.stop()) | ||
270 | 1212 | jobs_done.addBoth(lambda ignore: reactor.stop()) | ||
271 | 1213 | reactor.callWhenRunning(self._thread_pool.start) | ||
272 | 1214 | reactor.run(self._install_signal_handlers) | ||
273 | 1215 | |||
274 | 1216 | |||
275 | 1168 | class CheckWatchesCronScript(LaunchpadCronScript): | 1217 | class CheckWatchesCronScript(LaunchpadCronScript): |
276 | 1169 | 1218 | ||
277 | 1170 | def add_my_options(self): | 1219 | def add_my_options(self): |
278 | @@ -1203,9 +1252,16 @@ | |||
279 | 1203 | else: | 1252 | else: |
280 | 1204 | # Otherwise we just update those watches that need updating, | 1253 | # Otherwise we just update those watches that need updating, |
281 | 1205 | # and we let the BugWatchUpdater decide which those are. | 1254 | # and we let the BugWatchUpdater decide which those are. |
282 | 1255 | if self.options.jobs <= 1: | ||
283 | 1256 | # Use the default scheduler. | ||
284 | 1257 | scheduler = None | ||
285 | 1258 | else: | ||
286 | 1259 | # Run jobs in parallel. | ||
287 | 1260 | scheduler = TwistedThreadScheduler(self.options.jobs) | ||
288 | 1206 | updater.updateBugTrackers( | 1261 | updater.updateBugTrackers( |
291 | 1207 | self.options.bug_trackers, self.options.batch_size, | 1262 | self.options.bug_trackers, |
292 | 1208 | self.options.jobs) | 1263 | self.options.batch_size, |
293 | 1264 | scheduler) | ||
294 | 1209 | 1265 | ||
295 | 1210 | run_time = time.time() - start_time | 1266 | run_time = time.time() - start_time |
296 | 1211 | self.logger.info("Time for this run: %.3f seconds." % run_time) | 1267 | self.logger.info("Time for this run: %.3f seconds." % run_time) |
297 | 1212 | 1268 | ||
298 | === modified file 'lib/lp/bugs/scripts/tests/test_checkwatches.py' | |||
299 | --- lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-01-05 12:19:17 +0000 | |||
300 | +++ lib/lp/bugs/scripts/tests/test_checkwatches.py 2010-03-16 18:08:23 +0000 | |||
301 | @@ -2,9 +2,13 @@ | |||
302 | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). |
303 | 3 | """Checkwatches unit tests.""" | 3 | """Checkwatches unit tests.""" |
304 | 4 | 4 | ||
305 | 5 | from __future__ import with_statement | ||
306 | 6 | |||
307 | 5 | __metaclass__ = type | 7 | __metaclass__ = type |
308 | 6 | 8 | ||
309 | 9 | import threading | ||
310 | 7 | import unittest | 10 | import unittest |
311 | 11 | |||
312 | 8 | import transaction | 12 | import transaction |
313 | 9 | 13 | ||
314 | 10 | from zope.component import getUtility | 14 | from zope.component import getUtility |
315 | @@ -19,11 +23,13 @@ | |||
316 | 19 | from canonical.testing import LaunchpadZopelessLayer | 23 | from canonical.testing import LaunchpadZopelessLayer |
317 | 20 | 24 | ||
318 | 21 | from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI | 25 | from lp.bugs.externalbugtracker.bugzilla import BugzillaAPI |
319 | 26 | from lp.bugs.interfaces.bugtracker import IBugTrackerSet | ||
320 | 22 | from lp.bugs.scripts import checkwatches | 27 | from lp.bugs.scripts import checkwatches |
322 | 23 | from lp.bugs.scripts.checkwatches import CheckWatchesErrorUtility | 28 | from lp.bugs.scripts.checkwatches import ( |
323 | 29 | BugWatchUpdater, CheckWatchesErrorUtility, TwistedThreadScheduler) | ||
324 | 24 | from lp.bugs.tests.externalbugtracker import ( | 30 | from lp.bugs.tests.externalbugtracker import ( |
327 | 25 | TestBugzillaAPIXMLRPCTransport, new_bugtracker) | 31 | TestBugzillaAPIXMLRPCTransport, TestExternalBugTracker, new_bugtracker) |
328 | 26 | from lp.testing import TestCaseWithFactory | 32 | from lp.testing import TestCaseWithFactory, ZopeTestInSubProcess |
329 | 27 | 33 | ||
330 | 28 | 34 | ||
331 | 29 | def always_BugzillaAPI_get_external_bugtracker(bugtracker): | 35 | def always_BugzillaAPI_get_external_bugtracker(bugtracker): |
332 | @@ -202,5 +208,172 @@ | |||
333 | 202 | self.bugtask_with_question.status.title)) | 208 | self.bugtask_with_question.status.title)) |
334 | 203 | 209 | ||
335 | 204 | 210 | ||
336 | 211 | class TestSchedulerBase: | ||
337 | 212 | |||
338 | 213 | def test_args_and_kwargs(self): | ||
339 | 214 | def func(name, aptitude): | ||
340 | 215 | self.failUnlessEqual("Robin Hood", name) | ||
341 | 216 | self.failUnlessEqual("Riding through the glen", aptitude) | ||
342 | 217 | # Positional args specified when adding a job are passed to | ||
343 | 218 | # the job function at run time. | ||
344 | 219 | self.scheduler.schedule( | ||
345 | 220 | func, "Robin Hood", "Riding through the glen") | ||
346 | 221 | # Keyword args specified when adding a job are passed to the | ||
347 | 222 | # job function at run time. | ||
348 | 223 | self.scheduler.schedule( | ||
349 | 224 | func, name="Robin Hood", aptitude="Riding through the glen") | ||
350 | 225 | # Positional and keyword args can both be specified. | ||
351 | 226 | self.scheduler.schedule( | ||
352 | 227 | func, "Robin Hood", aptitude="Riding through the glen") | ||
353 | 228 | # Run everything. | ||
354 | 229 | self.scheduler.run() | ||
355 | 230 | |||
356 | 231 | |||
357 | 232 | class TestSerialScheduler(TestSchedulerBase, unittest.TestCase): | ||
358 | 233 | """Test SerialScheduler.""" | ||
359 | 234 | |||
360 | 235 | def setUp(self): | ||
361 | 236 | self.scheduler = checkwatches.SerialScheduler() | ||
362 | 237 | |||
363 | 238 | def test_ordering(self): | ||
364 | 239 | # The numbers list will be emptied in the order we add jobs to | ||
365 | 240 | # the scheduler. | ||
366 | 241 | numbers = [1, 2, 3] | ||
367 | 242 | # Remove 3 and check. | ||
368 | 243 | self.scheduler.schedule( | ||
369 | 244 | list.remove, numbers, 3) | ||
370 | 245 | self.scheduler.schedule( | ||
371 | 246 | lambda: self.failUnlessEqual([1, 2], numbers)) | ||
372 | 247 | # Remove 1 and check. | ||
373 | 248 | self.scheduler.schedule( | ||
374 | 249 | list.remove, numbers, 1) | ||
375 | 250 | self.scheduler.schedule( | ||
376 | 251 | lambda: self.failUnlessEqual([2], numbers)) | ||
377 | 252 | # Remove 2 and check. | ||
378 | 253 | self.scheduler.schedule( | ||
379 | 254 | list.remove, numbers, 2) | ||
380 | 255 | self.scheduler.schedule( | ||
381 | 256 | lambda: self.failUnlessEqual([], numbers)) | ||
382 | 257 | # Run the scheduler. | ||
383 | 258 | self.scheduler.run() | ||
384 | 259 | |||
385 | 260 | |||
386 | 261 | class TestTwistedThreadScheduler( | ||
387 | 262 | TestSchedulerBase, ZopeTestInSubProcess, unittest.TestCase): | ||
388 | 263 | """Test TwistedThreadScheduler. | ||
389 | 264 | |||
390 | 265 | By default, updateBugTrackers() runs jobs serially, but a | ||
391 | 266 | different scheduling policy can be plugged in. One such policy, | ||
392 | 267 | for running several jobs in parallel, is TwistedThreadScheduler. | ||
393 | 268 | """ | ||
394 | 269 | |||
395 | 270 | def setUp(self): | ||
396 | 271 | self.scheduler = checkwatches.TwistedThreadScheduler( | ||
397 | 272 | num_threads=5, install_signal_handlers=False) | ||
398 | 273 | |||
399 | 274 | |||
400 | 275 | class OutputFileForThreads: | ||
401 | 276 | """Collates writes according to thread name.""" | ||
402 | 277 | |||
403 | 278 | def __init__(self): | ||
404 | 279 | self.output = {} | ||
405 | 280 | self.lock = threading.Lock() | ||
406 | 281 | |||
407 | 282 | def write(self, data): | ||
408 | 283 | thread_name = threading.currentThread().getName() | ||
409 | 284 | with self.lock: | ||
410 | 285 | if thread_name in self.output: | ||
411 | 286 | self.output[thread_name].append(data) | ||
412 | 287 | else: | ||
413 | 288 | self.output[thread_name] = [data] | ||
414 | 289 | |||
415 | 290 | |||
416 | 291 | class ExternalBugTrackerForThreads(TestExternalBugTracker): | ||
417 | 292 | """Fake which records interesting activity to a file.""" | ||
418 | 293 | |||
419 | 294 | def __init__(self, output_file): | ||
420 | 295 | super(ExternalBugTrackerForThreads, self).__init__() | ||
421 | 296 | self.output_file = output_file | ||
422 | 297 | |||
423 | 298 | def getRemoteStatus(self, bug_id): | ||
424 | 299 | self.output_file.write("getRemoteStatus(bug_id=%r)" % bug_id) | ||
425 | 300 | return 'UNKNOWN' | ||
426 | 301 | |||
427 | 302 | def getCurrentDBTime(self): | ||
428 | 303 | return None | ||
429 | 304 | |||
430 | 305 | |||
431 | 306 | class BugWatchUpdaterForThreads(BugWatchUpdater): | ||
432 | 307 | """Fake updater. | ||
433 | 308 | |||
434 | 309 | Plumbs an `ExternalBugTrackerForThreads` into a given output file, | ||
435 | 310 | which is expected to be an instance of `OutputFileForThreads`, and | ||
436 | 311 | suppresses normal log activity. | ||
437 | 312 | """ | ||
438 | 313 | |||
439 | 314 | def __init__(self, output_file): | ||
440 | 315 | logger = QuietFakeLogger() | ||
441 | 316 | super(BugWatchUpdaterForThreads, self).__init__(transaction, logger) | ||
442 | 317 | self.output_file = output_file | ||
443 | 318 | |||
444 | 319 | def _getExternalBugTrackersAndWatches(self, bug_trackers, bug_watches): | ||
445 | 320 | return [(ExternalBugTrackerForThreads(self.output_file), bug_watches)] | ||
446 | 321 | |||
447 | 322 | |||
448 | 323 | class TestTwistedThreadSchedulerInPlace( | ||
449 | 324 | ZopeTestInSubProcess, TestCaseWithFactory): | ||
450 | 325 | """Test TwistedThreadScheduler in place. | ||
451 | 326 | |||
452 | 327 | As in, driving as much of the bug watch machinery as is possible | ||
453 | 328 | without making external connections. | ||
454 | 329 | """ | ||
455 | 330 | |||
456 | 331 | layer = LaunchpadZopelessLayer | ||
457 | 332 | |||
458 | 333 | def test(self): | ||
459 | 334 | # Prepare test data. | ||
460 | 335 | self.owner = self.factory.makePerson() | ||
461 | 336 | self.trackers = [ | ||
462 | 337 | getUtility(IBugTrackerSet).ensureBugTracker( | ||
463 | 338 | "http://butterscotch.example.com", self.owner, | ||
464 | 339 | BugTrackerType.BUGZILLA, name="butterscotch"), | ||
465 | 340 | getUtility(IBugTrackerSet).ensureBugTracker( | ||
466 | 341 | "http://strawberry.example.com", self.owner, | ||
467 | 342 | BugTrackerType.BUGZILLA, name="strawberry"), | ||
468 | 343 | ] | ||
469 | 344 | self.bug = self.factory.makeBug(owner=self.owner) | ||
470 | 345 | for tracker in self.trackers: | ||
471 | 346 | for num in (1, 2, 3): | ||
472 | 347 | self.factory.makeBugWatch( | ||
473 | 348 | "%s-%d" % (tracker.name, num), | ||
474 | 349 | tracker, self.bug, self.owner) | ||
475 | 350 | # Commit so that threads all see the same database state. | ||
476 | 351 | transaction.commit() | ||
477 | 352 | # Prepare the updater with the Twisted scheduler. | ||
478 | 353 | output_file = OutputFileForThreads() | ||
479 | 354 | threaded_bug_watch_updater = BugWatchUpdaterForThreads(output_file) | ||
480 | 355 | threaded_bug_watch_scheduler = TwistedThreadScheduler( | ||
481 | 356 | num_threads=10, install_signal_handlers=False) | ||
482 | 357 | threaded_bug_watch_updater.updateBugTrackers( | ||
483 | 358 | bug_tracker_names=[tracker.name for tracker in self.trackers], | ||
484 | 359 | batch_size=5, scheduler=threaded_bug_watch_scheduler) | ||
485 | 360 | # The thread names should match the tracker names. | ||
486 | 361 | self.assertEqual( | ||
487 | 362 | ['butterscotch', 'strawberry'], sorted(output_file.output)) | ||
488 | 363 | # Check that getRemoteStatus() was called. | ||
489 | 364 | self.assertEqual( | ||
490 | 365 | ["getRemoteStatus(bug_id=u'butterscotch-1')", | ||
491 | 366 | "getRemoteStatus(bug_id=u'butterscotch-2')", | ||
492 | 367 | "getRemoteStatus(bug_id=u'butterscotch-3')"], | ||
493 | 368 | output_file.output['butterscotch'] | ||
494 | 369 | ) | ||
495 | 370 | self.assertEqual( | ||
496 | 371 | ["getRemoteStatus(bug_id=u'strawberry-1')", | ||
497 | 372 | "getRemoteStatus(bug_id=u'strawberry-2')", | ||
498 | 373 | "getRemoteStatus(bug_id=u'strawberry-3')"], | ||
499 | 374 | output_file.output['strawberry'] | ||
500 | 375 | ) | ||
501 | 376 | |||
502 | 377 | |||
503 | 205 | def test_suite(): | 378 | def test_suite(): |
504 | 206 | return unittest.TestLoader().loadTestsFromName(__name__) | 379 | return unittest.TestLoader().loadTestsFromName(__name__) |
505 | 207 | 380 | ||
506 | === modified file 'lib/lp/scripts/utilities/importfascist.py' | |||
507 | --- lib/lp/scripts/utilities/importfascist.py 2010-02-23 11:53:24 +0000 | |||
508 | +++ lib/lp/scripts/utilities/importfascist.py 2010-03-16 18:08:23 +0000 | |||
509 | @@ -62,6 +62,7 @@ | |||
510 | 62 | 'openid.fetchers': set(['Urllib2Fetcher']), | 62 | 'openid.fetchers': set(['Urllib2Fetcher']), |
511 | 63 | 'storm.database': set(['STATE_DISCONNECTED']), | 63 | 'storm.database': set(['STATE_DISCONNECTED']), |
512 | 64 | 'textwrap': set(['dedent']), | 64 | 'textwrap': set(['dedent']), |
513 | 65 | 'twisted.internet.threads': set(['deferToThreadPool']), | ||
514 | 65 | 'zope.component': set( | 66 | 'zope.component': set( |
515 | 66 | ['adapter', | 67 | ['adapter', |
516 | 67 | 'ComponentLookupError', | 68 | 'ComponentLookupError', |