Merge lp:~allenap/launchpad/multithreaded-checkwatches into lp:launchpad

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~allenap/launchpad/multithreaded-checkwatches
Merge into: lp:launchpad
Diff against target: 600 lines (+284/-122)
5 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+3/-0)
lib/lp/bugs/doc/checkwatches.txt (+4/-4)
lib/lp/bugs/doc/externalbugtracker.txt (+97/-1)
lib/lp/bugs/scripts/checkwatches.py (+177/-114)
lib/lp/bugs/scripts/tests/test_bugimport.py (+3/-3)
To merge this branch: bzr merge lp:~allenap/launchpad/multithreaded-checkwatches
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+15283@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

This branch makes checkwatches update bug trackers in multiple threads. The
watches for a specific bug tracker are updated sequentially in a single
thread, but other bug trackers may be being updated concurrently in separate
threads.

Most of the changes are refactoring of or new methods for BugWatchUpdater in
checkwatches.py:

 * The existing updateBugTracker() method was renamed to _updateBugTracker().

 * The existing exception handling and oops reporting code in
   updateBugTrackers() was moved to a new updateBugTracker() method.

 * forceUpdateAll() had some identical exception handling code which was
   replaced by a call to the new updateBugTracker() method.

 * updateBugTrackers() was eviscerated and replaced with the thread setup and
   run. It calls a new _bugTrackerUpdaters() method which generates functions
   for the threads to run, each of which will update one bug tracker. These
   functions are put into a work queue from which the threads will pull.

 * A new _interactionDecorator() method was created to support running jobs in
   an interaction. Each of the functions that _bugTrackerUpdaters() yields is
   decorated with this. This is needed because interactions are specific to a
   thread.

Other work:

 * Add a --jobs option to the checkwatches.py script.

 * Tests that called updateBugTracker() now call _updateBugTracker() to avoid
   the transaction stuff.

 * Demonstrate updateBugTrackers() spawning multiple threads.

Testing:

  bin/test -vvct 'checkwatch|externalbug'

Lint:

  lib/lp/bugs/scripts/checkwatches.py
      22: [F0401] Unable to import 'lazr.lifecycle.event' (No module named
          lifecycle)

Revision history for this message
Abel Deuring (adeuring) wrote :

Hi Gavin,

great work! And thanks for answering all my somewhat paranoid questions on IRC

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt'
--- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-10-09 20:52:12 +0000
+++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-27 16:20:37 +0000
@@ -138,3 +138,6 @@
138 --reset Update all the watches on the bug tracker,138 --reset Update all the watches on the bug tracker,
139 regardless of whether or not they need139 regardless of whether or not they need
140 checking.140 checking.
141 --jobs=JOBS The number of simulataneous jobs to run, 1
142 by default.
143 <BLANKLINE>
141144
=== modified file 'lib/lp/bugs/doc/checkwatches.txt'
--- lib/lp/bugs/doc/checkwatches.txt 2009-11-19 14:41:14 +0000
+++ lib/lp/bugs/doc/checkwatches.txt 2009-11-27 16:20:37 +0000
@@ -46,6 +46,7 @@
46 >>> print err46 >>> print err
47 INFO creating lockfile47 INFO creating lockfile
48 DEBUG Using a global batch size of None48 DEBUG Using a global batch size of None
49 DEBUG Skipping updating Ubuntu Bugzilla watches.
49 DEBUG No watches to update on http://bugs.debian.org50 DEBUG No watches to update on http://bugs.debian.org
50 DEBUG No watches to update on mailto:bugs@example.com51 DEBUG No watches to update on mailto:bugs@example.com
51 WARNING ExternalBugtracker for BugTrackerType 'SAVANE' is not known.52 WARNING ExternalBugtracker for BugTrackerType 'SAVANE' is not known.
@@ -53,7 +54,6 @@
53 DEBUG No watches to update on http://sourceforge.net/54 DEBUG No watches to update on http://sourceforge.net/
54 DEBUG No watches to update on http://bugzilla.gnome.org/55 DEBUG No watches to update on http://bugzilla.gnome.org/
55 DEBUG No watches to update on https://bugzilla.mozilla.org/56 DEBUG No watches to update on https://bugzilla.mozilla.org/
56 DEBUG Skipping updating Ubuntu Bugzilla watches.
57 INFO Time for this run: ... seconds.57 INFO Time for this run: ... seconds.
58 DEBUG Removing lock file:...58 DEBUG Removing lock file:...
59 <BLANKLINE>59 <BLANKLINE>
@@ -201,7 +201,7 @@
201 ... broken_get_external_bugtracker)201 ... broken_get_external_bugtracker)
202 ... updater = BugWatchUpdater(transaction)202 ... updater = BugWatchUpdater(transaction)
203 ... updater._login()203 ... updater._login()
204 ... updater.updateBugTracker(example_bug_tracker)204 ... updater._updateBugTracker(example_bug_tracker)
205 ... finally:205 ... finally:
206 ... externalbugtracker.get_external_bugtracker = (206 ... externalbugtracker.get_external_bugtracker = (
207 ... real_get_external_bugtracker)207 ... real_get_external_bugtracker)
@@ -223,7 +223,7 @@
223a given ExternalBugTracker in each checkwatches run: the batch size.223a given ExternalBugTracker in each checkwatches run: the batch size.
224224
225We need to add some bug watches again since225We need to add some bug watches again since
226BugWatchUpdate.updateBugTracker() automatically rolls back the226BugWatchUpdate._updateBugTracker() automatically rolls back the
227transaction if something goes wrong.227transaction if something goes wrong.
228228
229 >>> login('test@canonical.com')229 >>> login('test@canonical.com')
@@ -353,7 +353,7 @@
353353
354 >>> class NonConnectingUpdater(BugWatchUpdater):354 >>> class NonConnectingUpdater(BugWatchUpdater):
355 ...355 ...
356 ... def updateBugTracker(self, bug_tracker, batch_size):356 ... def _updateBugTracker(self, bug_tracker, batch_size):
357 ... # Update as many watches as the batch size says.357 ... # Update as many watches as the batch size says.
358 ... watches_to_update = (358 ... watches_to_update = (
359 ... bug_tracker.getBugWatchesNeedingUpdate(23)[:batch_size])359 ... bug_tracker.getBugWatchesNeedingUpdate(23)[:batch_size])
360360
=== modified file 'lib/lp/bugs/doc/externalbugtracker.txt'
--- lib/lp/bugs/doc/externalbugtracker.txt 2009-09-22 15:22:53 +0000
+++ lib/lp/bugs/doc/externalbugtracker.txt 2009-11-27 16:20:37 +0000
@@ -1079,7 +1079,8 @@
10791079
1080 >>> bug_watch_updater = NonConnectingBugWatchUpdater(1080 >>> bug_watch_updater = NonConnectingBugWatchUpdater(
1081 ... transaction, QuietFakeLogger())1081 ... transaction, QuietFakeLogger())
1082 >>> bug_watch_updater.updateBugTracker(standard_bugzilla, batch_size=2)1082 >>> bug_watch_updater._updateBugTracker(
1083 ... standard_bugzilla, batch_size=2)
1083 initializeRemoteBugDB() called: [u'5', u'6']1084 initializeRemoteBugDB() called: [u'5', u'6']
1084 getRemoteStatus() called: u'5'1085 getRemoteStatus() called: u'5'
1085 getRemoteStatus() called: u'6'1086 getRemoteStatus() called: u'6'
@@ -1089,6 +1090,12 @@
1089allows it to be passed as a command-line option when the checkwatches script1090allows it to be passed as a command-line option when the checkwatches script
1090is run.1091is run.
10911092
1093Before going further, we must abort the current transaction to avoid
1094deadlock; updateBugTrackers() runs updateBugTracker() in a different
1095thread.
1096
1097 >>> transaction.abort()
1098
1092 >>> from canonical.launchpad.scripts.logger import FakeLogger1099 >>> from canonical.launchpad.scripts.logger import FakeLogger
1093 >>> bug_watch_updater = NonConnectingBugWatchUpdater(1100 >>> bug_watch_updater = NonConnectingBugWatchUpdater(
1094 ... transaction, FakeLogger())1101 ... transaction, FakeLogger())
@@ -1099,3 +1106,92 @@
1099 initializeRemoteBugDB() called: [u'5', u'6']1106 initializeRemoteBugDB() called: [u'5', u'6']
1100 getRemoteStatus() called: u'5'1107 getRemoteStatus() called: u'5'
1101 getRemoteStatus() called: u'6'1108 getRemoteStatus() called: u'6'
1109
1110 >>> # We should log in again because updateBugTrackers() logs out.
1111 >>> login('test@canonical.com')
1112
1113By default, the updateBugTrackers() only spawns one thread, but it can
1114spawn as many as required.
1115
1116 >>> import threading
1117
1118 >>> class OutputFileForThreads:
1119 ... def __init__(self):
1120 ... self.output = {}
1121 ... self.lock = threading.Lock()
1122 ... def write(self, data):
1123 ... thread_id = id(threading.currentThread())
1124 ... self.lock.acquire()
1125 ... try:
1126 ... if thread_id in self.output:
1127 ... self.output[thread_id].append(data)
1128 ... else:
1129 ... self.output[thread_id] = [data]
1130 ... finally:
1131 ... self.lock.release()
1132
1133 >>> output_file = OutputFileForThreads()
1134
1135 >>> class ExternalBugTrackerForThreads(TestExternalBugTracker):
1136 ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked):
1137 ... print >> output_file, (
1138 ... "getModifiedRemoteBugs(\n"
1139 ... " remote_bug_ids=%r,\n"
1140 ... " last_checked=%r)" % (remote_bug_ids, last_checked))
1141 ... return [remote_bug_ids[0], remote_bug_ids[-1]]
1142 ... def getRemoteStatus(self, bug_id):
1143 ... print >> output_file, (
1144 ... "getRemoteStatus(bug_id=%r)" % bug_id)
1145 ... return 'UNKNOWN'
1146 ... def getCurrentDBTime(self):
1147 ... return None
1148
1149 >>> class BugWatchUpdaterForThreads(BugWatchUpdater):
1150 ... def _getExternalBugTrackersAndWatches(
1151 ... self, bug_trackers, bug_watches):
1152 ... return [(ExternalBugTrackerForThreads(), bug_watches)]
1153
1154 >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads(
1155 ... transaction, FakeLogger(output_file))
1156 >>> threaded_bug_watch_updater.updateBugTrackers(
1157 ... batch_size=5, num_threads=10)
1158
1159 >>> for output in sorted(output_file.output.itervalues()):
1160 ... print "".join(output),
1161 ... print '--'
1162 DEBUG No watches to update on http://bugs.example.com
1163 --
1164 DEBUG No watches to update on http://bugzilla.gnome.org/
1165 --
1166 DEBUG No watches to update on http://savannah.gnu.org/
1167 --
1168 DEBUG No watches to update on http://sourceforge.net/
1169 --
1170 DEBUG No watches to update on mailto:bugs@example.com
1171 --
1172 DEBUG Using a global batch size of 5
1173 DEBUG Skipping updating Ubuntu Bugzilla watches.
1174 --
1175 INFO Updating 2 watches for 2 bugs on http://example.com
1176 getRemoteStatus(bug_id=u'304070')
1177 getRemoteStatus(bug_id=u'3224')
1178 --
1179 INFO Updating 4 watches for 3 bugs on http://example.com
1180 getRemoteStatus(bug_id=u'123543')
1181 getRemoteStatus(bug_id=u'2000')
1182 getRemoteStatus(bug_id=u'42')
1183 --
1184 INFO Updating 5 watches for 5 bugs on http://example.com
1185 getRemoteStatus(bug_id=u'1')
1186 getRemoteStatus(bug_id=u'101')
1187 getRemoteStatus(bug_id=u'5')
1188 getRemoteStatus(bug_id=u'6')
1189 getRemoteStatus(bug_id=u'7')
1190 --
1191 INFO Updating 5 watches for 5 bugs on http://example.com
1192 getRemoteStatus(bug_id=u'280883')
1193 getRemoteStatus(bug_id=u'304014')
1194 getRemoteStatus(bug_id=u'308994')
1195 getRemoteStatus(bug_id=u'327452')
1196 getRemoteStatus(bug_id=u'327549')
1197 --
11021198
=== modified file 'lib/lp/bugs/scripts/checkwatches.py'
--- lib/lp/bugs/scripts/checkwatches.py 2009-11-19 12:49:04 +0000
+++ lib/lp/bugs/scripts/checkwatches.py 2009-11-27 16:20:37 +0000
@@ -6,8 +6,10 @@
66
7from copy import copy7from copy import copy
8from datetime import datetime, timedelta8from datetime import datetime, timedelta
9import Queue as queue
9import socket10import socket
10import sys11import sys
12import threading
11import time13import time
1214
13import pytz15import pytz
@@ -31,7 +33,7 @@
31 ErrorReportingUtility, ScriptRequest)33 ErrorReportingUtility, ScriptRequest)
32from canonical.launchpad.webapp.interfaces import IPlacelessAuthUtility34from canonical.launchpad.webapp.interfaces import IPlacelessAuthUtility
33from canonical.launchpad.webapp.interaction import (35from canonical.launchpad.webapp.interaction import (
34 setupInteraction, endInteraction)36 setupInteraction, endInteraction, queryInteraction)
35from canonical.launchpad.webapp.publisher import canonical_url37from canonical.launchpad.webapp.publisher import canonical_url
3638
37from lp.bugs import externalbugtracker39from lp.bugs import externalbugtracker
@@ -161,7 +163,21 @@
161163
162 ACCEPTABLE_TIME_SKEW = timedelta(minutes=10)164 ACCEPTABLE_TIME_SKEW = timedelta(minutes=10)
163165
166 LOGIN = 'bugwatch@bugs.launchpad.net'
167
164 def __init__(self, txn, log=default_log, syncable_gnome_products=None):168 def __init__(self, txn, log=default_log, syncable_gnome_products=None):
169 """Initialize a BugWatchUpdater.
170
171 :param txn: A transaction manager on which `begin()`,
172 `abort()` and `commit()` can be called. Additionally, it
173 should be safe for different threads to use its methods to
174 manage their own transactions (i.e. with thread-local
175 storage).
176
177 :param log: An instance of `logging.Logger`, or something that
178 provides a similar interface.
179
180 """
165 self.txn = txn181 self.txn = txn
166 self.log = log182 self.log = log
167183
@@ -171,96 +187,176 @@
171 else:187 else:
172 self._syncable_gnome_products = list(SYNCABLE_GNOME_PRODUCTS)188 self._syncable_gnome_products = list(SYNCABLE_GNOME_PRODUCTS)
173189
190 self._principal = (
191 getUtility(IPlacelessAuthUtility).getPrincipalByLogin(
192 self.LOGIN, want_password=False))
193
174 def _login(self):194 def _login(self):
175 """Set up an interaction as the Bug Watch Updater"""195 """Set up an interaction as the Bug Watch Updater"""
176 auth_utility = getUtility(IPlacelessAuthUtility)196 setupInteraction(self._principal, login=self.LOGIN)
177 setupInteraction(
178 auth_utility.getPrincipalByLogin(
179 'bugwatch@bugs.launchpad.net', want_password=False),
180 login='bugwatch@bugs.launchpad.net')
181197
182 def _logout(self):198 def _logout(self):
183 """Tear down the Bug Watch Updater Interaction."""199 """Tear down the Bug Watch Updater Interaction."""
184 endInteraction()200 endInteraction()
185201
186 def updateBugTrackers(self, bug_tracker_names=None, batch_size=None):202 def _interactionDecorator(self, func):
187 """Update all the bug trackers that have watches pending.203 """Wrap a function to ensure that it runs within an interaction.
188204
189 If bug tracker names are specified in bug_tracker_names only205 If an interaction is already set up, this simply calls the
190 those bug trackers will be checked.206 function. If no interaction exists, it will set one up, call the
207 function, then end the interaction.
208
209 This is intended to make sure the right thing happens whether or not
210 the function is run in a different thread.
191 """211 """
192 self.txn.begin()212 def wrapper(*args, **kwargs):
213 if queryInteraction() is None:
214 self._login()
215 try:
216 return func(*args, **kwargs)
217 finally:
218 self._logout()
219 else:
220 return func(*args, **kwargs)
221 return wrapper
222
223 def _bugTrackerUpdaters(self, bug_tracker_names=None):
224 """Yields functions that can be used to update each bug tracker."""
225 # Set up an interaction as the Bug Watch Updater since the
226 # notification code expects a logged in user.
227 self._login()
228
193 ubuntu_bugzilla = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla229 ubuntu_bugzilla = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla
194 # Save the name, so we can use it in other transactions.230 # Save the name, so we can use it in other transactions.
195 ubuntu_bugzilla_name = ubuntu_bugzilla.name231 ubuntu_bugzilla_name = ubuntu_bugzilla.name
196232
197 # Set up an interaction as the Bug Watch Updater since the
198 # notification code expects a logged in user.
199 self._login()
200
201 self.log.debug("Using a global batch size of %s" % batch_size)
202
203 if bug_tracker_names is None:233 if bug_tracker_names is None:
204 bug_tracker_names = [234 bug_tracker_names = [
205 bugtracker.name for bugtracker in getUtility(IBugTrackerSet)]235 bugtracker.name for bugtracker in getUtility(IBugTrackerSet)]
206 self.txn.commit()236
237 def make_updater(bug_tracker_id):
238 """Returns a function that can update the given bug tracker."""
239 def updater(batch_size=None):
240 run = self._interactionDecorator(self.updateBugTracker)
241 return run(bug_tracker_id, batch_size)
242 return updater
243
207 for bug_tracker_name in bug_tracker_names:244 for bug_tracker_name in bug_tracker_names:
208 self.txn.begin()245 if bug_tracker_name == ubuntu_bugzilla_name:
209 bug_tracker = getUtility(IBugTrackerSet).getByName(246 # XXX: 2007-09-11 Graham Binns
210 bug_tracker_name)247 # We automatically ignore the Ubuntu Bugzilla
211248 # here as all its bugs have been imported into
212 if not bug_tracker.active:249 # Launchpad. Ideally we would have some means
250 # to identify all bug trackers like this so
251 # that hard-coding like this can be genericised
252 # (Bug 138949).
213 self.log.debug(253 self.log.debug(
214 "Updates are disabled for bug tracker at %s" %254 "Skipping updating Ubuntu Bugzilla watches.")
215 bug_tracker.baseurl)255 else:
216 self.txn.abort()256 bug_tracker = getUtility(IBugTrackerSet).getByName(
217 continue257 bug_tracker_name)
218258 if bug_tracker.active:
219 # Save the url for later, since we might need it to report an259 yield make_updater(bug_tracker.id)
220 # error after a transaction has been aborted.260 else:
221 bug_tracker_url = bug_tracker.baseurl
222 try:
223 if bug_tracker_name == ubuntu_bugzilla_name:
224 # XXX: 2007-09-11 Graham Binns
225 # We automatically ignore the Ubuntu Bugzilla
226 # here as all its bugs have been imported into
227 # Launchpad. Ideally we would have some means
228 # to identify all bug trackers like this so
229 # that hard-coding like this can be genericised
230 # (Bug 138949).
231 self.log.debug(261 self.log.debug(
232 "Skipping updating Ubuntu Bugzilla watches.")262 "Updates are disabled for bug tracker at %s" %
233 else:263 bug_tracker.baseurl)
234 self.updateBugTracker(bug_tracker, batch_size)
235264
236 self.txn.commit()
237 except (KeyboardInterrupt, SystemExit):
238 # We should never catch KeyboardInterrupt or SystemExit.
239 raise
240 except Exception, error:
241 # If something unexpected goes wrong, we log it and
242 # continue: a failure shouldn't break the updating of
243 # the other bug trackers.
244 info = sys.exc_info()
245 properties = [
246 ('bugtracker', bug_tracker_name),
247 ('baseurl', bug_tracker_url)]
248 if isinstance(error, BugWatchUpdateError):
249 self.error(
250 str(error), properties=properties, info=info)
251 elif isinstance(error, socket.timeout):
252 self.error(
253 "Connection timed out when updating %s" %
254 bug_tracker_url,
255 properties=properties, info=info)
256 else:
257 self.error(
258 "An exception was raised when updating %s" %
259 bug_tracker_url,
260 properties=properties, info=info)
261 self.txn.abort()
262 self._logout()265 self._logout()
263266
267 def updateBugTrackers(
268 self, bug_tracker_names=None, batch_size=None, num_threads=1):
269 """Update all the bug trackers that have watches pending.
270
271 If bug tracker names are specified in bug_tracker_names only
272 those bug trackers will be checked.
273
274 The updates are run in threads, so that long running updates
275 don't block progress. However, by default the number of
276 threads is 1, to help with testing.
277 """
278 self.log.debug("Using a global batch size of %s" % batch_size)
279
280 # Put all the work on the queue. This is simpler than drip-feeding the
281 # queue, and avoids a situation where a worker thread exits because
282 # there's no work left and the feeding thread hasn't been scheduled to
283 # add work to the queue.
284 work = queue.Queue()
285 for updater in self._bugTrackerUpdaters(bug_tracker_names):
286 work.put(updater)
287
288 # This will be run once in each worker thread.
289 def do_work():
290 while True:
291 try:
292 job = work.get(block=False)
293 except queue.Empty:
294 break
295 else:
296 job(batch_size)
297
298 # Start and join the worker threads.
299 threads = []
300 for run in xrange(num_threads):
301 thread = threading.Thread(target=do_work)
302 thread.start()
303 threads.append(thread)
304 for thread in threads:
305 thread.join()
306
307 def updateBugTracker(self, bug_tracker, batch_size):
308 """Updates the given bug trackers's bug watches.
309
310 If there is an error, logs are updated, and the transaction is
311 aborted.
312
313 :param bug_tracker: An IBugTracker or the ID of one, so that this
314 method can be called from a different interaction.
315
316 :return: A boolean indicating if the operation was successful.
317 """
318 # Get the bug tracker.
319 if isinstance(bug_tracker, (int, long)):
320 bug_tracker = getUtility(IBugTrackerSet).get(bug_tracker)
321
322 # Save the name and url for later, since we might need it to report an
323 # error after a transaction has been aborted.
324 bug_tracker_name = bug_tracker.name
325 bug_tracker_url = bug_tracker.baseurl
326
327 try:
328 self.txn.begin()
329 self._updateBugTracker(bug_tracker, batch_size)
330 self.txn.commit()
331 except (KeyboardInterrupt, SystemExit):
332 # We should never catch KeyboardInterrupt or SystemExit.
333 raise
334 except Exception, error:
335 # If something unexpected goes wrong, we log it and
336 # continue: a failure shouldn't break the updating of
337 # the other bug trackers.
338 info = sys.exc_info()
339 properties = [
340 ('bugtracker', bug_tracker_name),
341 ('baseurl', bug_tracker_url)]
342 if isinstance(error, BugWatchUpdateError):
343 self.error(
344 str(error), properties=properties, info=info)
345 elif isinstance(error, socket.timeout):
346 self.error(
347 "Connection timed out when updating %s" %
348 bug_tracker_url,
349 properties=properties, info=info)
350 else:
351 self.error(
352 "An exception was raised when updating %s" %
353 bug_tracker_url,
354 properties=properties, info=info)
355 self.txn.abort()
356 return False
357 else:
358 return True
359
264 def forceUpdateAll(self, bug_tracker_name, batch_size):360 def forceUpdateAll(self, bug_tracker_name, batch_size):
265 """Update all the watches for `bug_tracker_name`.361 """Update all the watches for `bug_tracker_name`.
266362
@@ -290,50 +386,16 @@
290 bug_tracker.resetWatches()386 bug_tracker.resetWatches()
291 self.txn.commit()387 self.txn.commit()
292388
293 # Take a copy of the bug tracker URL. If the transaction fails
294 # later we can't refer to the baseurl attribute of the bug
295 # tracker.
296 bug_tracker_url = bug_tracker.baseurl
297
298 # Loop over the bug watches in batches as specificed by389 # Loop over the bug watches in batches as specificed by
299 # batch_size until there are none left to update.390 # batch_size until there are none left to update.
300 self.log.info(391 self.log.info(
301 "Updating %s watches on bug tracker '%s'" %392 "Updating %s watches on bug tracker '%s'" %
302 (bug_tracker.watches.count(), bug_tracker_name))393 (bug_tracker.watches.count(), bug_tracker_name))
303 iteration = 0
304 has_watches_to_update = True394 has_watches_to_update = True
305 while has_watches_to_update:395 while has_watches_to_update:
306 self.txn.begin()396 self.txn.begin()
307 try:397 if not self.updateBugTracker(bug_tracker, batch_size):
308 self.updateBugTracker(bug_tracker, batch_size)
309 self.txn.commit()
310 except (KeyboardInterrupt, SystemExit):
311 # We should never catch KeyboardInterrupt or SystemExit.
312 raise
313 except Exception, error:
314 # If something unexpected goes wrong, we log it and
315 # continue: a failure shouldn't break the updating of
316 # the other bug trackers.
317 info = sys.exc_info()
318 properties = [
319 ('bugtracker', bug_tracker_name),
320 ('baseurl', bug_tracker_url)]
321 if isinstance(error, BugWatchUpdateError):
322 self.error(
323 str(error), properties=properties, info=info)
324 elif isinstance(error, socket.timeout):
325 self.error(
326 "Connection timed out when updating %s" %
327 bug_tracker_url,
328 properties=properties, info=info)
329 else:
330 self.error(
331 "An exception was raised when updating %s" %
332 bug_tracker_url,
333 properties=properties, info=info)
334 self.txn.abort()
335 break398 break
336
337 watches_left = bug_tracker.getBugWatchesNeedingUpdate(23).count()399 watches_left = bug_tracker.getBugWatchesNeedingUpdate(23).count()
338 self.log.info(400 self.log.info(
339 "%s watches left to check on bug tracker '%s'" %401 "%s watches left to check on bug tracker '%s'" %
@@ -409,7 +471,7 @@
409471
410 return trackers_and_watches472 return trackers_and_watches
411473
412 def updateBugTracker(self, bug_tracker, batch_size=None):474 def _updateBugTracker(self, bug_tracker, batch_size=None):
413 """Updates the given bug trackers's bug watches."""475 """Updates the given bug trackers's bug watches."""
414 # XXX 2007-01-18 gmb:476 # XXX 2007-01-18 gmb:
415 # Once we start running checkwatches more frequently we need477 # Once we start running checkwatches more frequently we need
@@ -1085,7 +1147,7 @@
1085 "one bugtracker using this option will check all the "1147 "one bugtracker using this option will check all the "
1086 "bugtrackers specified.")1148 "bugtrackers specified.")
1087 self.parser.add_option(1149 self.parser.add_option(
1088 '-b', '--batch-size', action='store', dest='batch_size',1150 '-b', '--batch-size', action='store', type=int, dest='batch_size',
1089 help="Set the number of watches to be checked per bug "1151 help="Set the number of watches to be checked per bug "
1090 "tracker in this run. If BATCH_SIZE is 0, all watches "1152 "tracker in this run. If BATCH_SIZE is 0, all watches "
1091 "on the bug tracker that are eligible for checking will "1153 "on the bug tracker that are eligible for checking will "
@@ -1094,26 +1156,27 @@
1094 '--reset', action='store_true', dest='update_all',1156 '--reset', action='store_true', dest='update_all',
1095 help="Update all the watches on the bug tracker, regardless of "1157 help="Update all the watches on the bug tracker, regardless of "
1096 "whether or not they need checking.")1158 "whether or not they need checking.")
1159 self.parser.add_option(
1160 '--jobs', action='store', type=int, dest='jobs', default=1,
1161 help=("The number of simulataneous jobs to run, %default by "
1162 "default."))
10971163
1098 def main(self):1164 def main(self):
1099 start_time = time.time()1165 start_time = time.time()
11001166
1101 updater = BugWatchUpdater(self.txn, self.logger)1167 updater = BugWatchUpdater(self.txn, self.logger)
11021168
1103 # Make sure batch_size is an integer or None.
1104 batch_size = self.options.batch_size
1105 if batch_size is not None:
1106 batch_size = int(batch_size)
1107
1108 if self.options.update_all and len(self.options.bug_trackers) > 0:1169 if self.options.update_all and len(self.options.bug_trackers) > 0:
1109 # The user has requested that we update *all* the watches1170 # The user has requested that we update *all* the watches
1110 # for these bugtrackers1171 # for these bugtrackers
1111 for bug_tracker in self.options.bug_trackers:1172 for bug_tracker in self.options.bug_trackers:
1112 updater.forceUpdateAll(bug_tracker, batch_size)1173 updater.forceUpdateAll(bug_tracker, self.options.batch_size)
1113 else:1174 else:
1114 # Otherwise we just update those watches that need updating,1175 # Otherwise we just update those watches that need updating,
1115 # and we let the BugWatchUpdater decide which those are.1176 # and we let the BugWatchUpdater decide which those are.
1116 updater.updateBugTrackers(self.options.bug_trackers, batch_size)1177 updater.updateBugTrackers(
1178 self.options.bug_trackers, self.options.batch_size,
1179 self.options.jobs)
11171180
1118 run_time = time.time() - start_time1181 run_time = time.time() - start_time
1119 self.logger.info("Time for this run: %.3f seconds." % run_time)1182 self.logger.info("Time for this run: %.3f seconds." % run_time)
11201183
=== modified file 'lib/lp/bugs/scripts/tests/test_bugimport.py'
--- lib/lp/bugs/scripts/tests/test_bugimport.py 2009-10-21 18:46:29 +0000
+++ lib/lp/bugs/scripts/tests/test_bugimport.py 2009-11-27 16:20:37 +0000
@@ -883,10 +883,10 @@
883class TestBugWatchUpdater(BugWatchUpdater):883class TestBugWatchUpdater(BugWatchUpdater):
884 """A mock `BugWatchUpdater` object."""884 """A mock `BugWatchUpdater` object."""
885885
886 def updateBugTracker(self, bug_tracker):886 def _updateBugTracker(self, bug_tracker):
887 # Save the current bug tracker, so _getBugWatch can reference it.887 # Save the current bug tracker, so _getBugWatch can reference it.
888 self.bugtracker = bug_tracker888 self.bugtracker = bug_tracker
889 super(TestBugWatchUpdater, self).updateBugTracker(bug_tracker)889 super(TestBugWatchUpdater, self)._updateBugTracker(bug_tracker)
890890
891 def _getExternalBugTrackersAndWatches(self, bug_tracker, bug_watches):891 def _getExternalBugTrackersAndWatches(self, bug_tracker, bug_watches):
892 """See `BugWatchUpdater`."""892 """See `BugWatchUpdater`."""
@@ -928,7 +928,7 @@
928 # trigger a DB error, the second updates successfully.928 # trigger a DB error, the second updates successfully.
929 bug_tracker = TestBugTracker(test_bug_one, test_bug_two)929 bug_tracker = TestBugTracker(test_bug_one, test_bug_two)
930 bug_watch_updater = TestBugWatchUpdater(self.layer.txn)930 bug_watch_updater = TestBugWatchUpdater(self.layer.txn)
931 bug_watch_updater.updateBugTracker(bug_tracker)931 bug_watch_updater._updateBugTracker(bug_tracker)
932 # We verify that the first bug watch didn't update the status,932 # We verify that the first bug watch didn't update the status,
933 # and the second did.933 # and the second did.
934 for bugtask in test_bug_one.bugtasks:934 for bugtask in test_bug_one.bugtasks: