Merge lp:~allenap/launchpad/multithreaded-checkwatches into lp:launchpad
- multithreaded-checkwatches
- Merge into devel
Proposed by
Gavin Panella
Status: | Merged |
---|---|
Approved by: | Gavin Panella |
Approved revision: | not available |
Merged at revision: | not available |
Proposed branch: | lp:~allenap/launchpad/multithreaded-checkwatches |
Merge into: | lp:launchpad |
Diff against target: |
600 lines (+284/-122) 5 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+3/-0) lib/lp/bugs/doc/checkwatches.txt (+4/-4) lib/lp/bugs/doc/externalbugtracker.txt (+97/-1) lib/lp/bugs/scripts/checkwatches.py (+177/-114) lib/lp/bugs/scripts/tests/test_bugimport.py (+3/-3) |
To merge this branch: | bzr merge lp:~allenap/launchpad/multithreaded-checkwatches |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Abel Deuring (community) | code | Approve | |
Review via email: mp+15283@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote : | # |
Revision history for this message
Abel Deuring (adeuring) wrote : | # |
Hi Gavin,
great work! And thanks for answering all my somewhat paranoid questions on IRC
review:
Approve
(code)
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt' | |||
2 | --- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-10-09 20:52:12 +0000 | |||
3 | +++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-27 16:20:37 +0000 | |||
4 | @@ -138,3 +138,6 @@ | |||
5 | 138 | --reset Update all the watches on the bug tracker, | 138 | --reset Update all the watches on the bug tracker, |
6 | 139 | regardless of whether or not they need | 139 | regardless of whether or not they need |
7 | 140 | checking. | 140 | checking. |
8 | 141 | --jobs=JOBS The number of simulataneous jobs to run, 1 | ||
9 | 142 | by default. | ||
10 | 143 | <BLANKLINE> | ||
11 | 141 | 144 | ||
12 | === modified file 'lib/lp/bugs/doc/checkwatches.txt' | |||
13 | --- lib/lp/bugs/doc/checkwatches.txt 2009-11-19 14:41:14 +0000 | |||
14 | +++ lib/lp/bugs/doc/checkwatches.txt 2009-11-27 16:20:37 +0000 | |||
15 | @@ -46,6 +46,7 @@ | |||
16 | 46 | >>> print err | 46 | >>> print err |
17 | 47 | INFO creating lockfile | 47 | INFO creating lockfile |
18 | 48 | DEBUG Using a global batch size of None | 48 | DEBUG Using a global batch size of None |
19 | 49 | DEBUG Skipping updating Ubuntu Bugzilla watches. | ||
20 | 49 | DEBUG No watches to update on http://bugs.debian.org | 50 | DEBUG No watches to update on http://bugs.debian.org |
21 | 50 | DEBUG No watches to update on mailto:bugs@example.com | 51 | DEBUG No watches to update on mailto:bugs@example.com |
22 | 51 | WARNING ExternalBugtracker for BugTrackerType 'SAVANE' is not known. | 52 | WARNING ExternalBugtracker for BugTrackerType 'SAVANE' is not known. |
23 | @@ -53,7 +54,6 @@ | |||
24 | 53 | DEBUG No watches to update on http://sourceforge.net/ | 54 | DEBUG No watches to update on http://sourceforge.net/ |
25 | 54 | DEBUG No watches to update on http://bugzilla.gnome.org/ | 55 | DEBUG No watches to update on http://bugzilla.gnome.org/ |
26 | 55 | DEBUG No watches to update on https://bugzilla.mozilla.org/ | 56 | DEBUG No watches to update on https://bugzilla.mozilla.org/ |
27 | 56 | DEBUG Skipping updating Ubuntu Bugzilla watches. | ||
28 | 57 | INFO Time for this run: ... seconds. | 57 | INFO Time for this run: ... seconds. |
29 | 58 | DEBUG Removing lock file:... | 58 | DEBUG Removing lock file:... |
30 | 59 | <BLANKLINE> | 59 | <BLANKLINE> |
31 | @@ -201,7 +201,7 @@ | |||
32 | 201 | ... broken_get_external_bugtracker) | 201 | ... broken_get_external_bugtracker) |
33 | 202 | ... updater = BugWatchUpdater(transaction) | 202 | ... updater = BugWatchUpdater(transaction) |
34 | 203 | ... updater._login() | 203 | ... updater._login() |
36 | 204 | ... updater.updateBugTracker(example_bug_tracker) | 204 | ... updater._updateBugTracker(example_bug_tracker) |
37 | 205 | ... finally: | 205 | ... finally: |
38 | 206 | ... externalbugtracker.get_external_bugtracker = ( | 206 | ... externalbugtracker.get_external_bugtracker = ( |
39 | 207 | ... real_get_external_bugtracker) | 207 | ... real_get_external_bugtracker) |
40 | @@ -223,7 +223,7 @@ | |||
41 | 223 | a given ExternalBugTracker in each checkwatches run: the batch size. | 223 | a given ExternalBugTracker in each checkwatches run: the batch size. |
42 | 224 | 224 | ||
43 | 225 | We need to add some bug watches again since | 225 | We need to add some bug watches again since |
45 | 226 | BugWatchUpdate.updateBugTracker() automatically rolls back the | 226 | BugWatchUpdate._updateBugTracker() automatically rolls back the |
46 | 227 | transaction if something goes wrong. | 227 | transaction if something goes wrong. |
47 | 228 | 228 | ||
48 | 229 | >>> login('test@canonical.com') | 229 | >>> login('test@canonical.com') |
49 | @@ -353,7 +353,7 @@ | |||
50 | 353 | 353 | ||
51 | 354 | >>> class NonConnectingUpdater(BugWatchUpdater): | 354 | >>> class NonConnectingUpdater(BugWatchUpdater): |
52 | 355 | ... | 355 | ... |
54 | 356 | ... def updateBugTracker(self, bug_tracker, batch_size): | 356 | ... def _updateBugTracker(self, bug_tracker, batch_size): |
55 | 357 | ... # Update as many watches as the batch size says. | 357 | ... # Update as many watches as the batch size says. |
56 | 358 | ... watches_to_update = ( | 358 | ... watches_to_update = ( |
57 | 359 | ... bug_tracker.getBugWatchesNeedingUpdate(23)[:batch_size]) | 359 | ... bug_tracker.getBugWatchesNeedingUpdate(23)[:batch_size]) |
58 | 360 | 360 | ||
59 | === modified file 'lib/lp/bugs/doc/externalbugtracker.txt' | |||
60 | --- lib/lp/bugs/doc/externalbugtracker.txt 2009-09-22 15:22:53 +0000 | |||
61 | +++ lib/lp/bugs/doc/externalbugtracker.txt 2009-11-27 16:20:37 +0000 | |||
62 | @@ -1079,7 +1079,8 @@ | |||
63 | 1079 | 1079 | ||
64 | 1080 | >>> bug_watch_updater = NonConnectingBugWatchUpdater( | 1080 | >>> bug_watch_updater = NonConnectingBugWatchUpdater( |
65 | 1081 | ... transaction, QuietFakeLogger()) | 1081 | ... transaction, QuietFakeLogger()) |
67 | 1082 | >>> bug_watch_updater.updateBugTracker(standard_bugzilla, batch_size=2) | 1082 | >>> bug_watch_updater._updateBugTracker( |
68 | 1083 | ... standard_bugzilla, batch_size=2) | ||
69 | 1083 | initializeRemoteBugDB() called: [u'5', u'6'] | 1084 | initializeRemoteBugDB() called: [u'5', u'6'] |
70 | 1084 | getRemoteStatus() called: u'5' | 1085 | getRemoteStatus() called: u'5' |
71 | 1085 | getRemoteStatus() called: u'6' | 1086 | getRemoteStatus() called: u'6' |
72 | @@ -1089,6 +1090,12 @@ | |||
73 | 1089 | allows it to be passed as a command-line option when the checkwatches script | 1090 | allows it to be passed as a command-line option when the checkwatches script |
74 | 1090 | is run. | 1091 | is run. |
75 | 1091 | 1092 | ||
76 | 1093 | Before going further, we must abort the current transaction to avoid | ||
77 | 1094 | deadlock; updateBugTrackers() runs updateBugTracker() in a different | ||
78 | 1095 | thread. | ||
79 | 1096 | |||
80 | 1097 | >>> transaction.abort() | ||
81 | 1098 | |||
82 | 1092 | >>> from canonical.launchpad.scripts.logger import FakeLogger | 1099 | >>> from canonical.launchpad.scripts.logger import FakeLogger |
83 | 1093 | >>> bug_watch_updater = NonConnectingBugWatchUpdater( | 1100 | >>> bug_watch_updater = NonConnectingBugWatchUpdater( |
84 | 1094 | ... transaction, FakeLogger()) | 1101 | ... transaction, FakeLogger()) |
85 | @@ -1099,3 +1106,92 @@ | |||
86 | 1099 | initializeRemoteBugDB() called: [u'5', u'6'] | 1106 | initializeRemoteBugDB() called: [u'5', u'6'] |
87 | 1100 | getRemoteStatus() called: u'5' | 1107 | getRemoteStatus() called: u'5' |
88 | 1101 | getRemoteStatus() called: u'6' | 1108 | getRemoteStatus() called: u'6' |
89 | 1109 | |||
90 | 1110 | >>> # We should log in again because updateBugTrackers() logs out. | ||
91 | 1111 | >>> login('test@canonical.com') | ||
92 | 1112 | |||
93 | 1113 | By default, the updateBugTrackers() only spawns one thread, but it can | ||
94 | 1114 | spawn as many as required. | ||
95 | 1115 | |||
96 | 1116 | >>> import threading | ||
97 | 1117 | |||
98 | 1118 | >>> class OutputFileForThreads: | ||
99 | 1119 | ... def __init__(self): | ||
100 | 1120 | ... self.output = {} | ||
101 | 1121 | ... self.lock = threading.Lock() | ||
102 | 1122 | ... def write(self, data): | ||
103 | 1123 | ... thread_id = id(threading.currentThread()) | ||
104 | 1124 | ... self.lock.acquire() | ||
105 | 1125 | ... try: | ||
106 | 1126 | ... if thread_id in self.output: | ||
107 | 1127 | ... self.output[thread_id].append(data) | ||
108 | 1128 | ... else: | ||
109 | 1129 | ... self.output[thread_id] = [data] | ||
110 | 1130 | ... finally: | ||
111 | 1131 | ... self.lock.release() | ||
112 | 1132 | |||
113 | 1133 | >>> output_file = OutputFileForThreads() | ||
114 | 1134 | |||
115 | 1135 | >>> class ExternalBugTrackerForThreads(TestExternalBugTracker): | ||
116 | 1136 | ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked): | ||
117 | 1137 | ... print >> output_file, ( | ||
118 | 1138 | ... "getModifiedRemoteBugs(\n" | ||
119 | 1139 | ... " remote_bug_ids=%r,\n" | ||
120 | 1140 | ... " last_checked=%r)" % (remote_bug_ids, last_checked)) | ||
121 | 1141 | ... return [remote_bug_ids[0], remote_bug_ids[-1]] | ||
122 | 1142 | ... def getRemoteStatus(self, bug_id): | ||
123 | 1143 | ... print >> output_file, ( | ||
124 | 1144 | ... "getRemoteStatus(bug_id=%r)" % bug_id) | ||
125 | 1145 | ... return 'UNKNOWN' | ||
126 | 1146 | ... def getCurrentDBTime(self): | ||
127 | 1147 | ... return None | ||
128 | 1148 | |||
129 | 1149 | >>> class BugWatchUpdaterForThreads(BugWatchUpdater): | ||
130 | 1150 | ... def _getExternalBugTrackersAndWatches( | ||
131 | 1151 | ... self, bug_trackers, bug_watches): | ||
132 | 1152 | ... return [(ExternalBugTrackerForThreads(), bug_watches)] | ||
133 | 1153 | |||
134 | 1154 | >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads( | ||
135 | 1155 | ... transaction, FakeLogger(output_file)) | ||
136 | 1156 | >>> threaded_bug_watch_updater.updateBugTrackers( | ||
137 | 1157 | ... batch_size=5, num_threads=10) | ||
138 | 1158 | |||
139 | 1159 | >>> for output in sorted(output_file.output.itervalues()): | ||
140 | 1160 | ... print "".join(output), | ||
141 | 1161 | ... print '--' | ||
142 | 1162 | DEBUG No watches to update on http://bugs.example.com | ||
143 | 1163 | -- | ||
144 | 1164 | DEBUG No watches to update on http://bugzilla.gnome.org/ | ||
145 | 1165 | -- | ||
146 | 1166 | DEBUG No watches to update on http://savannah.gnu.org/ | ||
147 | 1167 | -- | ||
148 | 1168 | DEBUG No watches to update on http://sourceforge.net/ | ||
149 | 1169 | -- | ||
150 | 1170 | DEBUG No watches to update on mailto:bugs@example.com | ||
151 | 1171 | -- | ||
152 | 1172 | DEBUG Using a global batch size of 5 | ||
153 | 1173 | DEBUG Skipping updating Ubuntu Bugzilla watches. | ||
154 | 1174 | -- | ||
155 | 1175 | INFO Updating 2 watches for 2 bugs on http://example.com | ||
156 | 1176 | getRemoteStatus(bug_id=u'304070') | ||
157 | 1177 | getRemoteStatus(bug_id=u'3224') | ||
158 | 1178 | -- | ||
159 | 1179 | INFO Updating 4 watches for 3 bugs on http://example.com | ||
160 | 1180 | getRemoteStatus(bug_id=u'123543') | ||
161 | 1181 | getRemoteStatus(bug_id=u'2000') | ||
162 | 1182 | getRemoteStatus(bug_id=u'42') | ||
163 | 1183 | -- | ||
164 | 1184 | INFO Updating 5 watches for 5 bugs on http://example.com | ||
165 | 1185 | getRemoteStatus(bug_id=u'1') | ||
166 | 1186 | getRemoteStatus(bug_id=u'101') | ||
167 | 1187 | getRemoteStatus(bug_id=u'5') | ||
168 | 1188 | getRemoteStatus(bug_id=u'6') | ||
169 | 1189 | getRemoteStatus(bug_id=u'7') | ||
170 | 1190 | -- | ||
171 | 1191 | INFO Updating 5 watches for 5 bugs on http://example.com | ||
172 | 1192 | getRemoteStatus(bug_id=u'280883') | ||
173 | 1193 | getRemoteStatus(bug_id=u'304014') | ||
174 | 1194 | getRemoteStatus(bug_id=u'308994') | ||
175 | 1195 | getRemoteStatus(bug_id=u'327452') | ||
176 | 1196 | getRemoteStatus(bug_id=u'327549') | ||
177 | 1197 | -- | ||
178 | 1102 | 1198 | ||
179 | === modified file 'lib/lp/bugs/scripts/checkwatches.py' | |||
180 | --- lib/lp/bugs/scripts/checkwatches.py 2009-11-19 12:49:04 +0000 | |||
181 | +++ lib/lp/bugs/scripts/checkwatches.py 2009-11-27 16:20:37 +0000 | |||
182 | @@ -6,8 +6,10 @@ | |||
183 | 6 | 6 | ||
184 | 7 | from copy import copy | 7 | from copy import copy |
185 | 8 | from datetime import datetime, timedelta | 8 | from datetime import datetime, timedelta |
186 | 9 | import Queue as queue | ||
187 | 9 | import socket | 10 | import socket |
188 | 10 | import sys | 11 | import sys |
189 | 12 | import threading | ||
190 | 11 | import time | 13 | import time |
191 | 12 | 14 | ||
192 | 13 | import pytz | 15 | import pytz |
193 | @@ -31,7 +33,7 @@ | |||
194 | 31 | ErrorReportingUtility, ScriptRequest) | 33 | ErrorReportingUtility, ScriptRequest) |
195 | 32 | from canonical.launchpad.webapp.interfaces import IPlacelessAuthUtility | 34 | from canonical.launchpad.webapp.interfaces import IPlacelessAuthUtility |
196 | 33 | from canonical.launchpad.webapp.interaction import ( | 35 | from canonical.launchpad.webapp.interaction import ( |
198 | 34 | setupInteraction, endInteraction) | 36 | setupInteraction, endInteraction, queryInteraction) |
199 | 35 | from canonical.launchpad.webapp.publisher import canonical_url | 37 | from canonical.launchpad.webapp.publisher import canonical_url |
200 | 36 | 38 | ||
201 | 37 | from lp.bugs import externalbugtracker | 39 | from lp.bugs import externalbugtracker |
202 | @@ -161,7 +163,21 @@ | |||
203 | 161 | 163 | ||
204 | 162 | ACCEPTABLE_TIME_SKEW = timedelta(minutes=10) | 164 | ACCEPTABLE_TIME_SKEW = timedelta(minutes=10) |
205 | 163 | 165 | ||
206 | 166 | LOGIN = 'bugwatch@bugs.launchpad.net' | ||
207 | 167 | |||
208 | 164 | def __init__(self, txn, log=default_log, syncable_gnome_products=None): | 168 | def __init__(self, txn, log=default_log, syncable_gnome_products=None): |
209 | 169 | """Initialize a BugWatchUpdater. | ||
210 | 170 | |||
211 | 171 | :param txn: A transaction manager on which `begin()`, | ||
212 | 172 | `abort()` and `commit()` can be called. Additionally, it | ||
213 | 173 | should be safe for different threads to use its methods to | ||
214 | 174 | manage their own transactions (i.e. with thread-local | ||
215 | 175 | storage). | ||
216 | 176 | |||
217 | 177 | :param log: An instance of `logging.Logger`, or something that | ||
218 | 178 | provides a similar interface. | ||
219 | 179 | |||
220 | 180 | """ | ||
221 | 165 | self.txn = txn | 181 | self.txn = txn |
222 | 166 | self.log = log | 182 | self.log = log |
223 | 167 | 183 | ||
224 | @@ -171,96 +187,176 @@ | |||
225 | 171 | else: | 187 | else: |
226 | 172 | self._syncable_gnome_products = list(SYNCABLE_GNOME_PRODUCTS) | 188 | self._syncable_gnome_products = list(SYNCABLE_GNOME_PRODUCTS) |
227 | 173 | 189 | ||
228 | 190 | self._principal = ( | ||
229 | 191 | getUtility(IPlacelessAuthUtility).getPrincipalByLogin( | ||
230 | 192 | self.LOGIN, want_password=False)) | ||
231 | 193 | |||
232 | 174 | def _login(self): | 194 | def _login(self): |
233 | 175 | """Set up an interaction as the Bug Watch Updater""" | 195 | """Set up an interaction as the Bug Watch Updater""" |
239 | 176 | auth_utility = getUtility(IPlacelessAuthUtility) | 196 | setupInteraction(self._principal, login=self.LOGIN) |
235 | 177 | setupInteraction( | ||
236 | 178 | auth_utility.getPrincipalByLogin( | ||
237 | 179 | 'bugwatch@bugs.launchpad.net', want_password=False), | ||
238 | 180 | login='bugwatch@bugs.launchpad.net') | ||
240 | 181 | 197 | ||
241 | 182 | def _logout(self): | 198 | def _logout(self): |
242 | 183 | """Tear down the Bug Watch Updater Interaction.""" | 199 | """Tear down the Bug Watch Updater Interaction.""" |
243 | 184 | endInteraction() | 200 | endInteraction() |
244 | 185 | 201 | ||
250 | 186 | def updateBugTrackers(self, bug_tracker_names=None, batch_size=None): | 202 | def _interactionDecorator(self, func): |
251 | 187 | """Update all the bug trackers that have watches pending. | 203 | """Wrap a function to ensure that it runs within an interaction. |
252 | 188 | 204 | ||
253 | 189 | If bug tracker names are specified in bug_tracker_names only | 205 | If an interaction is already set up, this simply calls the |
254 | 190 | those bug trackers will be checked. | 206 | function. If no interaction exists, it will set one up, call the |
255 | 207 | function, then end the interaction. | ||
256 | 208 | |||
257 | 209 | This is intended to make sure the right thing happens whether or not | ||
258 | 210 | the function is run in a different thread. | ||
259 | 191 | """ | 211 | """ |
261 | 192 | self.txn.begin() | 212 | def wrapper(*args, **kwargs): |
262 | 213 | if queryInteraction() is None: | ||
263 | 214 | self._login() | ||
264 | 215 | try: | ||
265 | 216 | return func(*args, **kwargs) | ||
266 | 217 | finally: | ||
267 | 218 | self._logout() | ||
268 | 219 | else: | ||
269 | 220 | return func(*args, **kwargs) | ||
270 | 221 | return wrapper | ||
271 | 222 | |||
272 | 223 | def _bugTrackerUpdaters(self, bug_tracker_names=None): | ||
273 | 224 | """Yields functions that can be used to update each bug tracker.""" | ||
274 | 225 | # Set up an interaction as the Bug Watch Updater since the | ||
275 | 226 | # notification code expects a logged in user. | ||
276 | 227 | self._login() | ||
277 | 228 | |||
278 | 193 | ubuntu_bugzilla = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla | 229 | ubuntu_bugzilla = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla |
279 | 194 | # Save the name, so we can use it in other transactions. | 230 | # Save the name, so we can use it in other transactions. |
280 | 195 | ubuntu_bugzilla_name = ubuntu_bugzilla.name | 231 | ubuntu_bugzilla_name = ubuntu_bugzilla.name |
281 | 196 | 232 | ||
282 | 197 | # Set up an interaction as the Bug Watch Updater since the | ||
283 | 198 | # notification code expects a logged in user. | ||
284 | 199 | self._login() | ||
285 | 200 | |||
286 | 201 | self.log.debug("Using a global batch size of %s" % batch_size) | ||
287 | 202 | |||
288 | 203 | if bug_tracker_names is None: | 233 | if bug_tracker_names is None: |
289 | 204 | bug_tracker_names = [ | 234 | bug_tracker_names = [ |
290 | 205 | bugtracker.name for bugtracker in getUtility(IBugTrackerSet)] | 235 | bugtracker.name for bugtracker in getUtility(IBugTrackerSet)] |
292 | 206 | self.txn.commit() | 236 | |
293 | 237 | def make_updater(bug_tracker_id): | ||
294 | 238 | """Returns a function that can update the given bug tracker.""" | ||
295 | 239 | def updater(batch_size=None): | ||
296 | 240 | run = self._interactionDecorator(self.updateBugTracker) | ||
297 | 241 | return run(bug_tracker_id, batch_size) | ||
298 | 242 | return updater | ||
299 | 243 | |||
300 | 207 | for bug_tracker_name in bug_tracker_names: | 244 | for bug_tracker_name in bug_tracker_names: |
306 | 208 | self.txn.begin() | 245 | if bug_tracker_name == ubuntu_bugzilla_name: |
307 | 209 | bug_tracker = getUtility(IBugTrackerSet).getByName( | 246 | # XXX: 2007-09-11 Graham Binns |
308 | 210 | bug_tracker_name) | 247 | # We automatically ignore the Ubuntu Bugzilla |
309 | 211 | 248 | # here as all its bugs have been imported into | |
310 | 212 | if not bug_tracker.active: | 249 | # Launchpad. Ideally we would have some means |
311 | 250 | # to identify all bug trackers like this so | ||
312 | 251 | # that hard-coding like this can be genericised | ||
313 | 252 | # (Bug 138949). | ||
314 | 213 | self.log.debug( | 253 | self.log.debug( |
332 | 214 | "Updates are disabled for bug tracker at %s" % | 254 | "Skipping updating Ubuntu Bugzilla watches.") |
333 | 215 | bug_tracker.baseurl) | 255 | else: |
334 | 216 | self.txn.abort() | 256 | bug_tracker = getUtility(IBugTrackerSet).getByName( |
335 | 217 | continue | 257 | bug_tracker_name) |
336 | 218 | 258 | if bug_tracker.active: | |
337 | 219 | # Save the url for later, since we might need it to report an | 259 | yield make_updater(bug_tracker.id) |
338 | 220 | # error after a transaction has been aborted. | 260 | else: |
322 | 221 | bug_tracker_url = bug_tracker.baseurl | ||
323 | 222 | try: | ||
324 | 223 | if bug_tracker_name == ubuntu_bugzilla_name: | ||
325 | 224 | # XXX: 2007-09-11 Graham Binns | ||
326 | 225 | # We automatically ignore the Ubuntu Bugzilla | ||
327 | 226 | # here as all its bugs have been imported into | ||
328 | 227 | # Launchpad. Ideally we would have some means | ||
329 | 228 | # to identify all bug trackers like this so | ||
330 | 229 | # that hard-coding like this can be genericised | ||
331 | 230 | # (Bug 138949). | ||
339 | 231 | self.log.debug( | 261 | self.log.debug( |
343 | 232 | "Skipping updating Ubuntu Bugzilla watches.") | 262 | "Updates are disabled for bug tracker at %s" % |
344 | 233 | else: | 263 | bug_tracker.baseurl) |
342 | 234 | self.updateBugTracker(bug_tracker, batch_size) | ||
345 | 235 | 264 | ||
346 | 236 | self.txn.commit() | ||
347 | 237 | except (KeyboardInterrupt, SystemExit): | ||
348 | 238 | # We should never catch KeyboardInterrupt or SystemExit. | ||
349 | 239 | raise | ||
350 | 240 | except Exception, error: | ||
351 | 241 | # If something unexpected goes wrong, we log it and | ||
352 | 242 | # continue: a failure shouldn't break the updating of | ||
353 | 243 | # the other bug trackers. | ||
354 | 244 | info = sys.exc_info() | ||
355 | 245 | properties = [ | ||
356 | 246 | ('bugtracker', bug_tracker_name), | ||
357 | 247 | ('baseurl', bug_tracker_url)] | ||
358 | 248 | if isinstance(error, BugWatchUpdateError): | ||
359 | 249 | self.error( | ||
360 | 250 | str(error), properties=properties, info=info) | ||
361 | 251 | elif isinstance(error, socket.timeout): | ||
362 | 252 | self.error( | ||
363 | 253 | "Connection timed out when updating %s" % | ||
364 | 254 | bug_tracker_url, | ||
365 | 255 | properties=properties, info=info) | ||
366 | 256 | else: | ||
367 | 257 | self.error( | ||
368 | 258 | "An exception was raised when updating %s" % | ||
369 | 259 | bug_tracker_url, | ||
370 | 260 | properties=properties, info=info) | ||
371 | 261 | self.txn.abort() | ||
372 | 262 | self._logout() | 265 | self._logout() |
373 | 263 | 266 | ||
374 | 267 | def updateBugTrackers( | ||
375 | 268 | self, bug_tracker_names=None, batch_size=None, num_threads=1): | ||
376 | 269 | """Update all the bug trackers that have watches pending. | ||
377 | 270 | |||
378 | 271 | If bug tracker names are specified in bug_tracker_names only | ||
379 | 272 | those bug trackers will be checked. | ||
380 | 273 | |||
381 | 274 | The updates are run in threads, so that long running updates | ||
382 | 275 | don't block progress. However, by default the number of | ||
383 | 276 | threads is 1, to help with testing. | ||
384 | 277 | """ | ||
385 | 278 | self.log.debug("Using a global batch size of %s" % batch_size) | ||
386 | 279 | |||
387 | 280 | # Put all the work on the queue. This is simpler than drip-feeding the | ||
388 | 281 | # queue, and avoids a situation where a worker thread exits because | ||
389 | 282 | # there's no work left and the feeding thread hasn't been scheduled to | ||
390 | 283 | # add work to the queue. | ||
391 | 284 | work = queue.Queue() | ||
392 | 285 | for updater in self._bugTrackerUpdaters(bug_tracker_names): | ||
393 | 286 | work.put(updater) | ||
394 | 287 | |||
395 | 288 | # This will be run once in each worker thread. | ||
396 | 289 | def do_work(): | ||
397 | 290 | while True: | ||
398 | 291 | try: | ||
399 | 292 | job = work.get(block=False) | ||
400 | 293 | except queue.Empty: | ||
401 | 294 | break | ||
402 | 295 | else: | ||
403 | 296 | job(batch_size) | ||
404 | 297 | |||
405 | 298 | # Start and join the worker threads. | ||
406 | 299 | threads = [] | ||
407 | 300 | for run in xrange(num_threads): | ||
408 | 301 | thread = threading.Thread(target=do_work) | ||
409 | 302 | thread.start() | ||
410 | 303 | threads.append(thread) | ||
411 | 304 | for thread in threads: | ||
412 | 305 | thread.join() | ||
413 | 306 | |||
414 | 307 | def updateBugTracker(self, bug_tracker, batch_size): | ||
415 | 308 | """Updates the given bug trackers's bug watches. | ||
416 | 309 | |||
417 | 310 | If there is an error, logs are updated, and the transaction is | ||
418 | 311 | aborted. | ||
419 | 312 | |||
420 | 313 | :param bug_tracker: An IBugTracker or the ID of one, so that this | ||
421 | 314 | method can be called from a different interaction. | ||
422 | 315 | |||
423 | 316 | :return: A boolean indicating if the operation was successful. | ||
424 | 317 | """ | ||
425 | 318 | # Get the bug tracker. | ||
426 | 319 | if isinstance(bug_tracker, (int, long)): | ||
427 | 320 | bug_tracker = getUtility(IBugTrackerSet).get(bug_tracker) | ||
428 | 321 | |||
429 | 322 | # Save the name and url for later, since we might need it to report an | ||
430 | 323 | # error after a transaction has been aborted. | ||
431 | 324 | bug_tracker_name = bug_tracker.name | ||
432 | 325 | bug_tracker_url = bug_tracker.baseurl | ||
433 | 326 | |||
434 | 327 | try: | ||
435 | 328 | self.txn.begin() | ||
436 | 329 | self._updateBugTracker(bug_tracker, batch_size) | ||
437 | 330 | self.txn.commit() | ||
438 | 331 | except (KeyboardInterrupt, SystemExit): | ||
439 | 332 | # We should never catch KeyboardInterrupt or SystemExit. | ||
440 | 333 | raise | ||
441 | 334 | except Exception, error: | ||
442 | 335 | # If something unexpected goes wrong, we log it and | ||
443 | 336 | # continue: a failure shouldn't break the updating of | ||
444 | 337 | # the other bug trackers. | ||
445 | 338 | info = sys.exc_info() | ||
446 | 339 | properties = [ | ||
447 | 340 | ('bugtracker', bug_tracker_name), | ||
448 | 341 | ('baseurl', bug_tracker_url)] | ||
449 | 342 | if isinstance(error, BugWatchUpdateError): | ||
450 | 343 | self.error( | ||
451 | 344 | str(error), properties=properties, info=info) | ||
452 | 345 | elif isinstance(error, socket.timeout): | ||
453 | 346 | self.error( | ||
454 | 347 | "Connection timed out when updating %s" % | ||
455 | 348 | bug_tracker_url, | ||
456 | 349 | properties=properties, info=info) | ||
457 | 350 | else: | ||
458 | 351 | self.error( | ||
459 | 352 | "An exception was raised when updating %s" % | ||
460 | 353 | bug_tracker_url, | ||
461 | 354 | properties=properties, info=info) | ||
462 | 355 | self.txn.abort() | ||
463 | 356 | return False | ||
464 | 357 | else: | ||
465 | 358 | return True | ||
466 | 359 | |||
467 | 264 | def forceUpdateAll(self, bug_tracker_name, batch_size): | 360 | def forceUpdateAll(self, bug_tracker_name, batch_size): |
468 | 265 | """Update all the watches for `bug_tracker_name`. | 361 | """Update all the watches for `bug_tracker_name`. |
469 | 266 | 362 | ||
470 | @@ -290,50 +386,16 @@ | |||
471 | 290 | bug_tracker.resetWatches() | 386 | bug_tracker.resetWatches() |
472 | 291 | self.txn.commit() | 387 | self.txn.commit() |
473 | 292 | 388 | ||
474 | 293 | # Take a copy of the bug tracker URL. If the transaction fails | ||
475 | 294 | # later we can't refer to the baseurl attribute of the bug | ||
476 | 295 | # tracker. | ||
477 | 296 | bug_tracker_url = bug_tracker.baseurl | ||
478 | 297 | |||
479 | 298 | # Loop over the bug watches in batches as specificed by | 389 | # Loop over the bug watches in batches as specificed by |
480 | 299 | # batch_size until there are none left to update. | 390 | # batch_size until there are none left to update. |
481 | 300 | self.log.info( | 391 | self.log.info( |
482 | 301 | "Updating %s watches on bug tracker '%s'" % | 392 | "Updating %s watches on bug tracker '%s'" % |
483 | 302 | (bug_tracker.watches.count(), bug_tracker_name)) | 393 | (bug_tracker.watches.count(), bug_tracker_name)) |
484 | 303 | iteration = 0 | ||
485 | 304 | has_watches_to_update = True | 394 | has_watches_to_update = True |
486 | 305 | while has_watches_to_update: | 395 | while has_watches_to_update: |
487 | 306 | self.txn.begin() | 396 | self.txn.begin() |
516 | 307 | try: | 397 | if not self.updateBugTracker(bug_tracker, batch_size): |
489 | 308 | self.updateBugTracker(bug_tracker, batch_size) | ||
490 | 309 | self.txn.commit() | ||
491 | 310 | except (KeyboardInterrupt, SystemExit): | ||
492 | 311 | # We should never catch KeyboardInterrupt or SystemExit. | ||
493 | 312 | raise | ||
494 | 313 | except Exception, error: | ||
495 | 314 | # If something unexpected goes wrong, we log it and | ||
496 | 315 | # continue: a failure shouldn't break the updating of | ||
497 | 316 | # the other bug trackers. | ||
498 | 317 | info = sys.exc_info() | ||
499 | 318 | properties = [ | ||
500 | 319 | ('bugtracker', bug_tracker_name), | ||
501 | 320 | ('baseurl', bug_tracker_url)] | ||
502 | 321 | if isinstance(error, BugWatchUpdateError): | ||
503 | 322 | self.error( | ||
504 | 323 | str(error), properties=properties, info=info) | ||
505 | 324 | elif isinstance(error, socket.timeout): | ||
506 | 325 | self.error( | ||
507 | 326 | "Connection timed out when updating %s" % | ||
508 | 327 | bug_tracker_url, | ||
509 | 328 | properties=properties, info=info) | ||
510 | 329 | else: | ||
511 | 330 | self.error( | ||
512 | 331 | "An exception was raised when updating %s" % | ||
513 | 332 | bug_tracker_url, | ||
514 | 333 | properties=properties, info=info) | ||
515 | 334 | self.txn.abort() | ||
517 | 335 | break | 398 | break |
518 | 336 | |||
519 | 337 | watches_left = bug_tracker.getBugWatchesNeedingUpdate(23).count() | 399 | watches_left = bug_tracker.getBugWatchesNeedingUpdate(23).count() |
520 | 338 | self.log.info( | 400 | self.log.info( |
521 | 339 | "%s watches left to check on bug tracker '%s'" % | 401 | "%s watches left to check on bug tracker '%s'" % |
522 | @@ -409,7 +471,7 @@ | |||
523 | 409 | 471 | ||
524 | 410 | return trackers_and_watches | 472 | return trackers_and_watches |
525 | 411 | 473 | ||
527 | 412 | def updateBugTracker(self, bug_tracker, batch_size=None): | 474 | def _updateBugTracker(self, bug_tracker, batch_size=None): |
528 | 413 | """Updates the given bug trackers's bug watches.""" | 475 | """Updates the given bug trackers's bug watches.""" |
529 | 414 | # XXX 2007-01-18 gmb: | 476 | # XXX 2007-01-18 gmb: |
530 | 415 | # Once we start running checkwatches more frequently we need | 477 | # Once we start running checkwatches more frequently we need |
531 | @@ -1085,7 +1147,7 @@ | |||
532 | 1085 | "one bugtracker using this option will check all the " | 1147 | "one bugtracker using this option will check all the " |
533 | 1086 | "bugtrackers specified.") | 1148 | "bugtrackers specified.") |
534 | 1087 | self.parser.add_option( | 1149 | self.parser.add_option( |
536 | 1088 | '-b', '--batch-size', action='store', dest='batch_size', | 1150 | '-b', '--batch-size', action='store', type=int, dest='batch_size', |
537 | 1089 | help="Set the number of watches to be checked per bug " | 1151 | help="Set the number of watches to be checked per bug " |
538 | 1090 | "tracker in this run. If BATCH_SIZE is 0, all watches " | 1152 | "tracker in this run. If BATCH_SIZE is 0, all watches " |
539 | 1091 | "on the bug tracker that are eligible for checking will " | 1153 | "on the bug tracker that are eligible for checking will " |
540 | @@ -1094,26 +1156,27 @@ | |||
541 | 1094 | '--reset', action='store_true', dest='update_all', | 1156 | '--reset', action='store_true', dest='update_all', |
542 | 1095 | help="Update all the watches on the bug tracker, regardless of " | 1157 | help="Update all the watches on the bug tracker, regardless of " |
543 | 1096 | "whether or not they need checking.") | 1158 | "whether or not they need checking.") |
544 | 1159 | self.parser.add_option( | ||
545 | 1160 | '--jobs', action='store', type=int, dest='jobs', default=1, | ||
546 | 1161 | help=("The number of simulataneous jobs to run, %default by " | ||
547 | 1162 | "default.")) | ||
548 | 1097 | 1163 | ||
549 | 1098 | def main(self): | 1164 | def main(self): |
550 | 1099 | start_time = time.time() | 1165 | start_time = time.time() |
551 | 1100 | 1166 | ||
552 | 1101 | updater = BugWatchUpdater(self.txn, self.logger) | 1167 | updater = BugWatchUpdater(self.txn, self.logger) |
553 | 1102 | 1168 | ||
554 | 1103 | # Make sure batch_size is an integer or None. | ||
555 | 1104 | batch_size = self.options.batch_size | ||
556 | 1105 | if batch_size is not None: | ||
557 | 1106 | batch_size = int(batch_size) | ||
558 | 1107 | |||
559 | 1108 | if self.options.update_all and len(self.options.bug_trackers) > 0: | 1169 | if self.options.update_all and len(self.options.bug_trackers) > 0: |
560 | 1109 | # The user has requested that we update *all* the watches | 1170 | # The user has requested that we update *all* the watches |
561 | 1110 | # for these bugtrackers | 1171 | # for these bugtrackers |
562 | 1111 | for bug_tracker in self.options.bug_trackers: | 1172 | for bug_tracker in self.options.bug_trackers: |
564 | 1112 | updater.forceUpdateAll(bug_tracker, batch_size) | 1173 | updater.forceUpdateAll(bug_tracker, self.options.batch_size) |
565 | 1113 | else: | 1174 | else: |
566 | 1114 | # Otherwise we just update those watches that need updating, | 1175 | # Otherwise we just update those watches that need updating, |
567 | 1115 | # and we let the BugWatchUpdater decide which those are. | 1176 | # and we let the BugWatchUpdater decide which those are. |
569 | 1116 | updater.updateBugTrackers(self.options.bug_trackers, batch_size) | 1177 | updater.updateBugTrackers( |
570 | 1178 | self.options.bug_trackers, self.options.batch_size, | ||
571 | 1179 | self.options.jobs) | ||
572 | 1117 | 1180 | ||
573 | 1118 | run_time = time.time() - start_time | 1181 | run_time = time.time() - start_time |
574 | 1119 | self.logger.info("Time for this run: %.3f seconds." % run_time) | 1182 | self.logger.info("Time for this run: %.3f seconds." % run_time) |
575 | 1120 | 1183 | ||
576 | === modified file 'lib/lp/bugs/scripts/tests/test_bugimport.py' | |||
577 | --- lib/lp/bugs/scripts/tests/test_bugimport.py 2009-10-21 18:46:29 +0000 | |||
578 | +++ lib/lp/bugs/scripts/tests/test_bugimport.py 2009-11-27 16:20:37 +0000 | |||
579 | @@ -883,10 +883,10 @@ | |||
580 | 883 | class TestBugWatchUpdater(BugWatchUpdater): | 883 | class TestBugWatchUpdater(BugWatchUpdater): |
581 | 884 | """A mock `BugWatchUpdater` object.""" | 884 | """A mock `BugWatchUpdater` object.""" |
582 | 885 | 885 | ||
584 | 886 | def updateBugTracker(self, bug_tracker): | 886 | def _updateBugTracker(self, bug_tracker): |
585 | 887 | # Save the current bug tracker, so _getBugWatch can reference it. | 887 | # Save the current bug tracker, so _getBugWatch can reference it. |
586 | 888 | self.bugtracker = bug_tracker | 888 | self.bugtracker = bug_tracker |
588 | 889 | super(TestBugWatchUpdater, self).updateBugTracker(bug_tracker) | 889 | super(TestBugWatchUpdater, self)._updateBugTracker(bug_tracker) |
589 | 890 | 890 | ||
590 | 891 | def _getExternalBugTrackersAndWatches(self, bug_tracker, bug_watches): | 891 | def _getExternalBugTrackersAndWatches(self, bug_tracker, bug_watches): |
591 | 892 | """See `BugWatchUpdater`.""" | 892 | """See `BugWatchUpdater`.""" |
592 | @@ -928,7 +928,7 @@ | |||
593 | 928 | # trigger a DB error, the second updates successfully. | 928 | # trigger a DB error, the second updates successfully. |
594 | 929 | bug_tracker = TestBugTracker(test_bug_one, test_bug_two) | 929 | bug_tracker = TestBugTracker(test_bug_one, test_bug_two) |
595 | 930 | bug_watch_updater = TestBugWatchUpdater(self.layer.txn) | 930 | bug_watch_updater = TestBugWatchUpdater(self.layer.txn) |
597 | 931 | bug_watch_updater.updateBugTracker(bug_tracker) | 931 | bug_watch_updater._updateBugTracker(bug_tracker) |
598 | 932 | # We verify that the first bug watch didn't update the status, | 932 | # We verify that the first bug watch didn't update the status, |
599 | 933 | # and the second did. | 933 | # and the second did. |
600 | 934 | for bugtask in test_bug_one.bugtasks: | 934 | for bugtask in test_bug_one.bugtasks: |
This branch makes checkwatches update bug trackers in multiple threads. The
watches for a specific bug tracker are updated sequentially in a single
thread, but other bug trackers may be being updated concurrently in separate
threads.
Most of the changes are refactoring of or new methods for BugWatchUpdater in
checkwatches.py:
* The existing updateBugTracker() method was renamed to _updateBugTrack er().
* The existing exception handling and oops reporting code in ckers() was moved to a new updateBugTracker() method.
updateBugTra
* forceUpdateAll() had some identical exception handling code which was
replaced by a call to the new updateBugTracker() method.
* updateBugTrackers() was eviscerated and replaced with the thread setup and ters() method which generates functions
run. It calls a new _bugTrackerUpda
for the threads to run, each of which will update one bug tracker. These
functions are put into a work queue from which the threads will pull.
* A new _interactionDec orator( ) method was created to support running jobs in ters() yields is
an interaction. Each of the functions that _bugTrackerUpda
decorated with this. This is needed because interactions are specific to a
thread.
Other work:
* Add a --jobs option to the checkwatches.py script.
* Tests that called updateBugTracker() now call _updateBugTracker() to avoid
the transaction stuff.
* Demonstrate updateBugTrackers() spawning multiple threads.
Testing:
bin/test -vvct 'checkwatch| externalbug'
Lint:
lib/lp/ bugs/scripts/ checkwatches. py .event' (No module named
lifecycle)
22: [F0401] Unable to import 'lazr.lifecycle