Merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad

Proposed by Michael Hudson-Doyle
Status: Merged
Approved by: Jonathan Lange
Approved revision: no longer in the source branch.
Merged at revision: not available
Proposed branch: lp:~mwhudson/launchpad/more-task-scheduled-bug-408638
Merge into: lp:launchpad
Diff against target: None lines
To merge this branch: bzr merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638
Reviewer Review Type Date Requested Status
Jonathan Lange (community) Approve
Canonical Launchpad Engineering Pending
Review via email: mp+9749@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

EPIGRAMS IN PROGRAMMING:

58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.

I hope I'm being a pragmatist here.

Hi Jono,

I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)

If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:

1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)

Cheers,
mwh

Revision history for this message
Jonathan Lange (jml) wrote :
Download full text (12.4 KiB)

On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>
> EPIGRAMS IN PROGRAMMING:
>
> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>
> I hope I'm being a pragmatist here.
>

I think so.

> Hi Jono,
>
> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>
> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>
> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>

Wow, concurrency really is quite hard!

However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.

I've got a few comments and questions that I'd like you to address before this lands.

> === modified file 'lib/canonical/twistedsupport/task.py'
> --- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000
> +++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
> @@ -37,6 +37,12 @@
> """Stop generating tasks.
>
> Any subsequent calls to `stop` are silently ignored.
> +
> + :return: A Deferred that will fire when the source is stopped. It is
> + possible that tasks may be produced until this deferred fires.
> + The deferred will fire with a boolean; True if the source is still
> + stopped, False if the source has been restarted since stop() was
> + called.

Do we know this for sure?

What I mean is, is it possible for the source to have been restarted but for
this to return False. It smells like a possible race condition.

> """
>
>
> @@ -100,10 +106,13 @@
> clock = reactor
> self._clock = clock
> self._looping_call = None
> + self._polling_lock = defer.DeferredLock()

I think it's worth adding a comment on how this lock is used & why.

> + self._started = False
>

And maybe for this one too.

> def start(self, task_consumer):
> """See `ITaskSource`."""
> self.stop()
> + self._started = True
> self._looping_call = LoopingCall(self._poll, task_consumer)
> self._looping_call.clock = self._clock
> self._looping_call.start(self._interval)
> @@ -122,15 +131,21 @@
> # If task production fails, we inform the consumer of this, but we
> # don't let any deferred it returns delay subsequent polls.
> task_consumer.taskProductionFailed(reason)
> - d = defer.maybeDeferred(self._task_producer)
> - d.addCallbacks(got_task, task_failed)
> - return d
> + def poll():
> + if self._started:
> + d = defer.maybeDeferred(...

review: Needs Fixing
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (13.9 KiB)

Jonathan Lange wrote:
> Review: Needs Fixing
> On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
>> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>>
>> EPIGRAMS IN PROGRAMMING:
>>
>> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>>
>> I hope I'm being a pragmatist here.
>>
>
> I think so.

Good :) Some genius can clean up my mess later.

>> Hi Jono,
>>
>> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>>
>> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>>
>> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
>> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>>
>
> Wow, concurrency really is quite hard!

Yeah, no kidding.

> However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.
>
> I've got a few comments and questions that I'd like you to address before this lands.
>
>> === modified file 'lib/canonical/twistedsupport/task.py'
>> --- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000
>> +++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
>> @@ -37,6 +37,12 @@
>> """Stop generating tasks.
>>
>> Any subsequent calls to `stop` are silently ignored.
>> +
>> + :return: A Deferred that will fire when the source is stopped. It is
>> + possible that tasks may be produced until this deferred fires.
>> + The deferred will fire with a boolean; True if the source is still
>> + stopped, False if the source has been restarted since stop() was
>> + called.
>
> Do we know this for sure?

Well it's an interface docstring, so we can say so.

> What I mean is, is it possible for the source to have been restarted but for
> this to return False. It smells like a possible race condition.

I think in a twisted world, it makes sense to talk about the state of
the source at the moment the deferred is fired. I guess you have to be
careful about any callbacks you add that themselves return deferred.

But anyway, until I did it like this, I had no idea how I was going to
make this all work.

>> """
>>
>>
>> @@ -100,10 +106,13 @@
>> clock = reactor
>> self._clock = clock
>> self._looping_call = None
>> + self._polling_lock = defer.DeferredLock()
>
> I think it's worth adding a comment on how this lock is used & why.
>
>> + self._started = False
>>
>
> And maybe for this one too.

I realized that "_loopingcall is not None" was identical with "_started"
so I deleted _started.

>> def start(self, task_consumer):
>> """See `ITaskSource`."""
>> self.stop()
>> + self._sta...

1=== modified file 'lib/canonical/twistedsupport/task.py'
2--- lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
3+++ lib/canonical/twistedsupport/task.py 2009-08-06 10:38:24 +0000
4@@ -106,13 +106,14 @@
5 clock = reactor
6 self._clock = clock
7 self._looping_call = None
8+ # _polling_lock is used to prevent concurrent attempts to poll for
9+ # work, and to delay the firing of the deferred returned from stop()
10+ # until any poll in progress at the moment of the call is complete.
11 self._polling_lock = defer.DeferredLock()
12- self._started = False
13
14 def start(self, task_consumer):
15 """See `ITaskSource`."""
16 self.stop()
17- self._started = True
18 self._looping_call = LoopingCall(self._poll, task_consumer)
19 self._looping_call.clock = self._clock
20 self._looping_call.start(self._interval)
21@@ -132,7 +133,9 @@
22 # don't let any deferred it returns delay subsequent polls.
23 task_consumer.taskProductionFailed(reason)
24 def poll():
25- if self._started:
26+ # If stop() has been called before the lock was acquired, don't
27+ # actually poll for more work.
28+ if self._looping_call:
29 d = defer.maybeDeferred(self._task_producer)
30 return d.addCallbacks(got_task, task_failed)
31 return self._polling_lock.run(poll)
32@@ -142,9 +145,8 @@
33 if self._looping_call is not None:
34 self._looping_call.stop()
35 self._looping_call = None
36- self._started = False
37 def _return_still_stopped():
38- return not self._started
39+ return self._looping_call is None
40 return self._polling_lock.run(_return_still_stopped)
41
42
43@@ -190,8 +192,11 @@
44 else:
45 self._stopping_lock.release()
46 def _call_stop(ignored):
47- return self._task_source.stop().addCallback(_release_or_stop)
48- return self._stopping_lock.acquire().addCallback(_call_stop)
49+ return self._task_source.stop()
50+ d = self._stopping_lock.acquire()
51+ d.addCallback(_call_stop)
52+ d.addCallback(_release_or_stop)
53+ return d
54
55 def consume(self, task_source):
56 """Start consuming tasks from 'task_source'.
57
58=== modified file 'lib/canonical/twistedsupport/tests/test_task.py'
59--- lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000
60+++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 10:41:48 +0000
61@@ -303,7 +303,9 @@
62 self.assertEqual([False], stop_called)
63
64 def test_stop_start_stop_when_polling_doesnt_poll_again(self):
65- # XXX
66+ # If, while task acquisition is in progress, stop(), start() and
67+ # stop() again are called in sequence, we shouldn't try to acquire
68+ # another job when the first acquisition completes.
69 produced_deferreds = []
70 def producer():
71 d = Deferred()
Revision history for this message
Jonathan Lange (jml) wrote :

Thanks Michael, this looks great.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/canonical/twistedsupport/task.py'
2--- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000
3+++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
4@@ -37,6 +37,12 @@
5 """Stop generating tasks.
6
7 Any subsequent calls to `stop` are silently ignored.
8+
9+ :return: A Deferred that will fire when the source is stopped. It is
10+ possible that tasks may be produced until this deferred fires.
11+ The deferred will fire with a boolean; True if the source is still
12+ stopped, False if the source has been restarted since stop() was
13+ called.
14 """
15
16
17@@ -100,10 +106,13 @@
18 clock = reactor
19 self._clock = clock
20 self._looping_call = None
21+ self._polling_lock = defer.DeferredLock()
22+ self._started = False
23
24 def start(self, task_consumer):
25 """See `ITaskSource`."""
26 self.stop()
27+ self._started = True
28 self._looping_call = LoopingCall(self._poll, task_consumer)
29 self._looping_call.clock = self._clock
30 self._looping_call.start(self._interval)
31@@ -122,15 +131,21 @@
32 # If task production fails, we inform the consumer of this, but we
33 # don't let any deferred it returns delay subsequent polls.
34 task_consumer.taskProductionFailed(reason)
35- d = defer.maybeDeferred(self._task_producer)
36- d.addCallbacks(got_task, task_failed)
37- return d
38+ def poll():
39+ if self._started:
40+ d = defer.maybeDeferred(self._task_producer)
41+ return d.addCallbacks(got_task, task_failed)
42+ return self._polling_lock.run(poll)
43
44 def stop(self):
45 """See `ITaskSource`."""
46 if self._looping_call is not None:
47 self._looping_call.stop()
48 self._looping_call = None
49+ self._started = False
50+ def _return_still_stopped():
51+ return not self._started
52+ return self._polling_lock.run(_return_still_stopped)
53
54
55 class AlreadyRunningError(Exception):
56@@ -164,6 +179,19 @@
57 self._worker_limit = worker_limit
58 self._worker_count = 0
59 self._terminationDeferred = None
60+ self._stopping_lock = None
61+
62+ def _stop(self):
63+ def _release_or_stop(still_stopped):
64+ if still_stopped and self._worker_count == 0:
65+ self._terminationDeferred.callback(None)
66+ # Note that in this case we don't release the lock: we don't
67+ # want to try to fire the _terminationDeferred twice!
68+ else:
69+ self._stopping_lock.release()
70+ def _call_stop(ignored):
71+ return self._task_source.stop().addCallback(_release_or_stop)
72+ return self._stopping_lock.acquire().addCallback(_call_stop)
73
74 def consume(self, task_source):
75 """Start consuming tasks from 'task_source'.
76@@ -178,9 +206,7 @@
77 raise AlreadyRunningError(self, self._task_source)
78 self._task_source = task_source
79 self._terminationDeferred = defer.Deferred()
80- # This merely begins polling. This means that we acquire our initial
81- # batch of work at the rate of one task per polling interval. As long
82- # as the polling interval is small, this is probably OK.
83+ self._stopping_lock = defer.DeferredLock()
84 task_source.start(self)
85 return self._terminationDeferred
86
87@@ -196,7 +222,9 @@
88 raise NotRunningError(self)
89 self._worker_count += 1
90 if self._worker_count >= self._worker_limit:
91- self._task_source.stop()
92+ self._stop()
93+ else:
94+ self._task_source.start(self)
95 d = defer.maybeDeferred(task)
96 # We don't expect these tasks to have interesting return values or
97 # failure modes.
98@@ -213,8 +241,7 @@
99 find any jobs, if we actually start any jobs then the exit condition
100 in _taskEnded will always be reached before this one.
101 """
102- if self._worker_count == 0:
103- self._terminationDeferred.callback(None)
104+ self._stop()
105
106 def taskProductionFailed(self, reason):
107 """See `ITaskConsumer`.
108@@ -236,9 +263,7 @@
109 """
110 if self._task_source is None:
111 raise NotRunningError(self)
112- self._task_source.stop()
113- if self._worker_count == 0:
114- self._terminationDeferred.callback(None)
115+ self._stop()
116
117 def _taskEnded(self, ignored):
118 """Handle a task reaching completion.
119@@ -252,8 +277,7 @@
120 """
121 self._worker_count -= 1
122 if self._worker_count == 0:
123- self._task_source.stop()
124- self._terminationDeferred.callback(None)
125+ self._stop()
126 elif self._worker_count < self._worker_limit:
127 self._task_source.start(self)
128 else:
129
130=== modified file 'lib/canonical/twistedsupport/tests/test_task.py'
131--- lib/canonical/twistedsupport/tests/test_task.py 2009-07-17 00:26:05 +0000
132+++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000
133@@ -7,7 +7,7 @@
134
135 import unittest
136
137-from twisted.internet.defer import Deferred
138+from twisted.internet.defer import Deferred, succeed
139 from twisted.internet.task import Clock
140
141 from zope.interface import implements
142@@ -49,14 +49,19 @@
143
144 implements(ITaskSource)
145
146- def __init__(self, log):
147+ def __init__(self, log, stop_deferred=None):
148 self._log = log
149+ if stop_deferred is None:
150+ self.stop_deferred = succeed(True)
151+ else:
152+ self.stop_deferred = stop_deferred
153
154 def start(self, consumer):
155 self._log.append(('start', consumer))
156
157 def stop(self):
158 self._log.append('stop')
159+ return self.stop_deferred
160
161
162 class TestPollingTaskSource(TestCase):
163@@ -144,6 +149,16 @@
164 # No more calls were made.
165 self.assertEqual(0, self._num_task_producer_calls)
166
167+ def test_stop_deferred_fires_immediately_if_no_polling(self):
168+ # Calling stop when the source is not polling returns a deferred that
169+ # fires immediately with True.
170+ task_source = self.makeTaskSource()
171+ task_source.start(NoopTaskConsumer())
172+ stop_deferred = task_source.stop()
173+ stop_calls = []
174+ stop_deferred.addCallback(stop_calls.append)
175+ self.assertEqual([True], stop_calls)
176+
177 def test_start_multiple_times_polls_immediately(self):
178 # Starting a task source multiple times polls immediately.
179 clock = Clock()
180@@ -241,6 +256,74 @@
181 clock.advance(interval)
182 self.assertEqual(len(produced_deferreds), 2)
183
184+ def test_stop_deferred_doesnt_fire_until_polling_finished(self):
185+ # If there is a call to the task producer outstanding when stop() is
186+ # called, stop() returns a deferred that fires when the poll finishes.
187+ # The value fired with is True if the source is still stopped when the
188+ # deferred fires.
189+ produced_deferred = Deferred()
190+ def producer():
191+ return produced_deferred
192+ task_source = self.makeTaskSource(task_producer=producer)
193+ task_source.start(NoopTaskConsumer())
194+ # The call to start calls producer. It returns produced_deferred
195+ # which has not been fired, so stop returns a deferred that has not
196+ # been fired.
197+ stop_deferred = task_source.stop()
198+ stop_called = []
199+ stop_deferred.addCallback(stop_called.append)
200+ self.assertEqual([], stop_called)
201+ # When the task producing deferred fires, the stop deferred fires with
202+ # 'True' to indicate that the source is still stopped.
203+ produced_deferred.callback(None)
204+ self.assertEqual([True], stop_called)
205+
206+ def test_stop_deferred_fires_with_false_if_source_restarted(self):
207+ # If there is a call to the task producer outstanding when stop() is
208+ # called, stop() returns a deferred that fires when the poll finishes.
209+ # The value fired with is False if the source is no longer stopped
210+ # when the deferred fires.
211+ produced_deferred = Deferred()
212+ def producer():
213+ return produced_deferred
214+ task_source = self.makeTaskSource(task_producer=producer)
215+ task_source.start(NoopTaskConsumer())
216+ # The call to start calls producer. It returns produced_deferred
217+ # which has not been fired so stop returns a deferred that has not
218+ # been fired.
219+ stop_deferred = task_source.stop()
220+ stop_called = []
221+ stop_deferred.addCallback(stop_called.append)
222+ # Now we restart the source.
223+ task_source.start(NoopTaskConsumer())
224+ self.assertEqual([], stop_called)
225+ # When the task producing deferred fires, the stop deferred fires with
226+ # 'False' to indicate that the source has been restarted.
227+ produced_deferred.callback(None)
228+ self.assertEqual([False], stop_called)
229+
230+ def test_stop_start_stop_when_polling_doesnt_poll_again(self):
231+ # XXX
232+ produced_deferreds = []
233+ def producer():
234+ d = Deferred()
235+ produced_deferreds.append(d)
236+ return d
237+ task_source = self.makeTaskSource(task_producer=producer)
238+ # Start the source. This calls the producer.
239+ task_source.start(NoopTaskConsumer())
240+ self.assertEqual(1, len(produced_deferreds))
241+ task_source.stop()
242+ # If we start it again, this does not call the producer because
243+ # the above call is still in process.
244+ task_source.start(NoopTaskConsumer())
245+ self.assertEqual(1, len(produced_deferreds))
246+ # If we now stop the source and the initial poll for a task completes,
247+ # we don't poll again.
248+ task_source.stop()
249+ produced_deferreds[0].callback(None)
250+ self.assertEqual(1, len(produced_deferreds))
251+
252 def test_taskStarted_deferred_doesnt_delay_polling(self):
253 # If taskStarted returns a deferred, we don't wait for it to fire
254 # before polling again.
255@@ -354,7 +437,7 @@
256 consumer.consume(source)
257 self.assertRaises(AlreadyRunningError, consumer.consume, source)
258
259- def test_consume_returns_deferred_doesnt_fire_until_tasks(self):
260+ def test_consumer_doesnt_finish_until_tasks_finish(self):
261 # `consume` returns a Deferred that fires when no more tasks are
262 # running, but only after we've actually done something.
263 consumer = self.makeConsumer()
264@@ -363,7 +446,7 @@
265 d.addCallback(log.append)
266 self.assertEqual([], log)
267
268- def test_consume_returns_deferred_fires_when_tasks_done(self):
269+ def test_consumer_finishes_when_tasks_done(self):
270 # `consume` returns a Deferred that fires when no more tasks are
271 # running.
272 consumer = self.makeConsumer()
273@@ -373,7 +456,7 @@
274 consumer.taskStarted(lambda: None)
275 self.assertEqual([None], task_log)
276
277- def test_consume_returns_deferred_fires_if_no_tasks_found(self):
278+ def test_consumer_finishes_if_no_tasks_found(self):
279 # `consume` returns a Deferred that fires if no tasks are found when
280 # no tasks are running.
281 consumer = self.makeConsumer()
282@@ -383,7 +466,37 @@
283 consumer.noTasksFound()
284 self.assertEqual([None], task_log)
285
286- def test_consume_deferred_no_fire_if_no_tasks_found_and_job_running(self):
287+ def test_consumer_doesnt_finish_until_stop_deferred_fires(self):
288+ # The Deferred returned by `consume` does not fire until the deferred
289+ # returned by the source's stop() method fires with True to indicate
290+ # that the source is still stopped.
291+ consumer = self.makeConsumer()
292+ consume_log = []
293+ stop_deferred = Deferred()
294+ source = LoggingSource([], stop_deferred)
295+ d = consumer.consume(source)
296+ d.addCallback(consume_log.append)
297+ consumer.noTasksFound()
298+ self.assertEqual([], consume_log)
299+ stop_deferred.callback(True)
300+ self.assertEqual([None], consume_log)
301+
302+ def test_consumer_doesnt_finish_if_stop_doesnt_stop(self):
303+ # The Deferred returned by `consume` does not fire when the deferred
304+ # returned by the source's stop() method fires with False to indicate
305+ # that the source has been restarted.
306+ consumer = self.makeConsumer()
307+ consume_log = []
308+ stop_deferred = Deferred()
309+ source = LoggingSource([], stop_deferred)
310+ d = consumer.consume(source)
311+ d.addCallback(consume_log.append)
312+ consumer.noTasksFound()
313+ self.assertEqual([], consume_log)
314+ stop_deferred.callback(False)
315+ self.assertEqual([], consume_log)
316+
317+ def test_consumer_doesnt_finish_if_no_tasks_found_and_job_running(self):
318 # If no tasks are found while a job is running, the Deferred returned
319 # by `consume` is not fired.
320 consumer = self.makeConsumer()
321@@ -402,7 +515,7 @@
322 del log[:]
323 # Finishes immediately, all tasks are done.
324 consumer.taskStarted(lambda: None)
325- self.assertEqual(['stop'], log)
326+ self.assertEqual(1, log.count('stop'))
327
328 def test_taskStarted_before_consume_raises_error(self):
329 # taskStarted can only be called after we have started consuming. This
330@@ -427,6 +540,18 @@
331 consumer.taskStarted(lambda: log.append('task'))
332 self.assertEqual(['task'], log)
333
334+ def test_taskStarted_restarts_source(self):
335+ # If, after the task passed to taskStarted has been started, the
336+ # consumer is not yet at its worker_limit, it starts the source again
337+ # in order consume as many pending jobs as we can as quickly as we
338+ # can.
339+ log = []
340+ consumer = self.makeConsumer()
341+ consumer.consume(LoggingSource(log))
342+ del log[:]
343+ consumer.taskStarted(self._neverEndingTask)
344+ self.assertEqual([('start', consumer)], log)
345+
346 def test_reaching_working_limit_stops_source(self):
347 # Each time taskStarted is called, we start a worker. When we reach
348 # the worker limit, we tell the source to stop generating work.
349@@ -437,10 +562,10 @@
350 consumer.consume(source)
351 del log[:]
352 consumer.taskStarted(self._neverEndingTask)
353- self.assertEqual([], log)
354+ self.assertEqual(0, log.count('stop'))
355 for i in range(worker_limit - 1):
356 consumer.taskStarted(self._neverEndingTask)
357- self.assertEqual(['stop'], log)
358+ self.assertEqual(1, log.count('stop'))
359
360 def test_passing_working_limit_stops_source(self):
361 # If we have already reached the worker limit, and taskStarted is