Merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad

Proposed by Michael Hudson-Doyle
Status: Merged
Approved by: Jonathan Lange
Approved revision: no longer in the source branch.
Merged at revision: not available
Proposed branch: lp:~mwhudson/launchpad/more-task-scheduled-bug-408638
Merge into: lp:launchpad
Diff against target: None lines
To merge this branch: bzr merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638
Reviewer Review Type Date Requested Status
Jonathan Lange (community) Approve
Canonical Launchpad Engineering Pending
Review via email: mp+9749@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

EPIGRAMS IN PROGRAMMING:

58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.

I hope I'm being a pragmatist here.

Hi Jono,

I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)

If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:

1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)

Cheers,
mwh

Revision history for this message
Jonathan Lange (jml) wrote :
Download full text (12.4 KiB)

On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>
> EPIGRAMS IN PROGRAMMING:
>
> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>
> I hope I'm being a pragmatist here.
>

I think so.

> Hi Jono,
>
> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>
> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>
> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>

Wow, concurrency really is quite hard!

However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.

I've got a few comments and questions that I'd like you to address before this lands.

> === modified file 'lib/canonical/twistedsupport/task.py'
> --- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000
> +++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
> @@ -37,6 +37,12 @@
> """Stop generating tasks.
>
> Any subsequent calls to `stop` are silently ignored.
> +
> + :return: A Deferred that will fire when the source is stopped. It is
> + possible that tasks may be produced until this deferred fires.
> + The deferred will fire with a boolean; True if the source is still
> + stopped, False if the source has been restarted since stop() was
> + called.

Do we know this for sure?

What I mean is, is it possible for the source to have been restarted but for
this to return False. It smells like a possible race condition.

> """
>
>
> @@ -100,10 +106,13 @@
> clock = reactor
> self._clock = clock
> self._looping_call = None
> + self._polling_lock = defer.DeferredLock()

I think it's worth adding a comment on how this lock is used & why.

> + self._started = False
>

And maybe for this one too.

> def start(self, task_consumer):
> """See `ITaskSource`."""
> self.stop()
> + self._started = True
> self._looping_call = LoopingCall(self._poll, task_consumer)
> self._looping_call.clock = self._clock
> self._looping_call.start(self._interval)
> @@ -122,15 +131,21 @@
> # If task production fails, we inform the consumer of this, but we
> # don't let any deferred it returns delay subsequent polls.
> task_consumer.taskProductionFailed(reason)
> - d = defer.maybeDeferred(self._task_producer)
> - d.addCallbacks(got_task, task_failed)
> - return d
> + def poll():
> + if self._started:
> + d = defer.maybeDeferred(...

review: Needs Fixing
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (13.9 KiB)

Jonathan Lange wrote:
> Review: Needs Fixing
> On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
>> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>>
>> EPIGRAMS IN PROGRAMMING:
>>
>> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>>
>> I hope I'm being a pragmatist here.
>>
>
> I think so.

Good :) Some genius can clean up my mess later.

>> Hi Jono,
>>
>> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>>
>> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>>
>> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
>> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>>
>
> Wow, concurrency really is quite hard!

Yeah, no kidding.

> However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.
>
> I've got a few comments and questions that I'd like you to address before this lands.
>
>> === modified file 'lib/canonical/twistedsupport/task.py'
>> --- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000
>> +++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
>> @@ -37,6 +37,12 @@
>> """Stop generating tasks.
>>
>> Any subsequent calls to `stop` are silently ignored.
>> +
>> + :return: A Deferred that will fire when the source is stopped. It is
>> + possible that tasks may be produced until this deferred fires.
>> + The deferred will fire with a boolean; True if the source is still
>> + stopped, False if the source has been restarted since stop() was
>> + called.
>
> Do we know this for sure?

Well it's an interface docstring, so we can say so.

> What I mean is, is it possible for the source to have been restarted but for
> this to return False. It smells like a possible race condition.

I think in a twisted world, it makes sense to talk about the state of
the source at the moment the deferred is fired. I guess you have to be
careful about any callbacks you add that themselves return deferred.

But anyway, until I did it like this, I had no idea how I was going to
make this all work.

>> """
>>
>>
>> @@ -100,10 +106,13 @@
>> clock = reactor
>> self._clock = clock
>> self._looping_call = None
>> + self._polling_lock = defer.DeferredLock()
>
> I think it's worth adding a comment on how this lock is used & why.
>
>> + self._started = False
>>
>
> And maybe for this one too.

I realized that "_loopingcall is not None" was identical with "_started"
so I deleted _started.

>> def start(self, task_consumer):
>> """See `ITaskSource`."""
>> self.stop()
>> + self._sta...

=== modified file 'lib/canonical/twistedsupport/task.py'
--- lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
+++ lib/canonical/twistedsupport/task.py 2009-08-06 10:38:24 +0000
@@ -106,13 +106,14 @@
106 clock = reactor106 clock = reactor
107 self._clock = clock107 self._clock = clock
108 self._looping_call = None108 self._looping_call = None
109 # _polling_lock is used to prevent concurrent attempts to poll for
110 # work, and to delay the firing of the deferred returned from stop()
111 # until any poll in progress at the moment of the call is complete.
109 self._polling_lock = defer.DeferredLock()112 self._polling_lock = defer.DeferredLock()
110 self._started = False
111113
112 def start(self, task_consumer):114 def start(self, task_consumer):
113 """See `ITaskSource`."""115 """See `ITaskSource`."""
114 self.stop()116 self.stop()
115 self._started = True
116 self._looping_call = LoopingCall(self._poll, task_consumer)117 self._looping_call = LoopingCall(self._poll, task_consumer)
117 self._looping_call.clock = self._clock118 self._looping_call.clock = self._clock
118 self._looping_call.start(self._interval)119 self._looping_call.start(self._interval)
@@ -132,7 +133,9 @@
132 # don't let any deferred it returns delay subsequent polls.133 # don't let any deferred it returns delay subsequent polls.
133 task_consumer.taskProductionFailed(reason)134 task_consumer.taskProductionFailed(reason)
134 def poll():135 def poll():
135 if self._started:136 # If stop() has been called before the lock was acquired, don't
137 # actually poll for more work.
138 if self._looping_call:
136 d = defer.maybeDeferred(self._task_producer)139 d = defer.maybeDeferred(self._task_producer)
137 return d.addCallbacks(got_task, task_failed)140 return d.addCallbacks(got_task, task_failed)
138 return self._polling_lock.run(poll)141 return self._polling_lock.run(poll)
@@ -142,9 +145,8 @@
142 if self._looping_call is not None:145 if self._looping_call is not None:
143 self._looping_call.stop()146 self._looping_call.stop()
144 self._looping_call = None147 self._looping_call = None
145 self._started = False
146 def _return_still_stopped():148 def _return_still_stopped():
147 return not self._started149 return self._looping_call is None
148 return self._polling_lock.run(_return_still_stopped)150 return self._polling_lock.run(_return_still_stopped)
149151
150152
@@ -190,8 +192,11 @@
190 else:192 else:
191 self._stopping_lock.release()193 self._stopping_lock.release()
192 def _call_stop(ignored):194 def _call_stop(ignored):
193 return self._task_source.stop().addCallback(_release_or_stop)195 return self._task_source.stop()
194 return self._stopping_lock.acquire().addCallback(_call_stop)196 d = self._stopping_lock.acquire()
197 d.addCallback(_call_stop)
198 d.addCallback(_release_or_stop)
199 return d
195200
196 def consume(self, task_source):201 def consume(self, task_source):
197 """Start consuming tasks from 'task_source'.202 """Start consuming tasks from 'task_source'.
198203
=== modified file 'lib/canonical/twistedsupport/tests/test_task.py'
--- lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000
+++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 10:41:48 +0000
@@ -303,7 +303,9 @@
303 self.assertEqual([False], stop_called)303 self.assertEqual([False], stop_called)
304304
305 def test_stop_start_stop_when_polling_doesnt_poll_again(self):305 def test_stop_start_stop_when_polling_doesnt_poll_again(self):
306 # XXX306 # If, while task acquisition is in progress, stop(), start() and
307 # stop() again are called in sequence, we shouldn't try to acquire
308 # another job when the first acquisition completes.
307 produced_deferreds = []309 produced_deferreds = []
308 def producer():310 def producer():
309 d = Deferred()311 d = Deferred()
Revision history for this message
Jonathan Lange (jml) wrote :

Thanks Michael, this looks great.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/canonical/twistedsupport/task.py'
--- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000
+++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000
@@ -37,6 +37,12 @@
37 """Stop generating tasks.37 """Stop generating tasks.
3838
39 Any subsequent calls to `stop` are silently ignored.39 Any subsequent calls to `stop` are silently ignored.
40
41 :return: A Deferred that will fire when the source is stopped. It is
42 possible that tasks may be produced until this deferred fires.
43 The deferred will fire with a boolean; True if the source is still
44 stopped, False if the source has been restarted since stop() was
45 called.
40 """46 """
4147
4248
@@ -100,10 +106,13 @@
100 clock = reactor106 clock = reactor
101 self._clock = clock107 self._clock = clock
102 self._looping_call = None108 self._looping_call = None
109 self._polling_lock = defer.DeferredLock()
110 self._started = False
103111
104 def start(self, task_consumer):112 def start(self, task_consumer):
105 """See `ITaskSource`."""113 """See `ITaskSource`."""
106 self.stop()114 self.stop()
115 self._started = True
107 self._looping_call = LoopingCall(self._poll, task_consumer)116 self._looping_call = LoopingCall(self._poll, task_consumer)
108 self._looping_call.clock = self._clock117 self._looping_call.clock = self._clock
109 self._looping_call.start(self._interval)118 self._looping_call.start(self._interval)
@@ -122,15 +131,21 @@
122 # If task production fails, we inform the consumer of this, but we131 # If task production fails, we inform the consumer of this, but we
123 # don't let any deferred it returns delay subsequent polls.132 # don't let any deferred it returns delay subsequent polls.
124 task_consumer.taskProductionFailed(reason)133 task_consumer.taskProductionFailed(reason)
125 d = defer.maybeDeferred(self._task_producer)134 def poll():
126 d.addCallbacks(got_task, task_failed)135 if self._started:
127 return d136 d = defer.maybeDeferred(self._task_producer)
137 return d.addCallbacks(got_task, task_failed)
138 return self._polling_lock.run(poll)
128139
129 def stop(self):140 def stop(self):
130 """See `ITaskSource`."""141 """See `ITaskSource`."""
131 if self._looping_call is not None:142 if self._looping_call is not None:
132 self._looping_call.stop()143 self._looping_call.stop()
133 self._looping_call = None144 self._looping_call = None
145 self._started = False
146 def _return_still_stopped():
147 return not self._started
148 return self._polling_lock.run(_return_still_stopped)
134149
135150
136class AlreadyRunningError(Exception):151class AlreadyRunningError(Exception):
@@ -164,6 +179,19 @@
164 self._worker_limit = worker_limit179 self._worker_limit = worker_limit
165 self._worker_count = 0180 self._worker_count = 0
166 self._terminationDeferred = None181 self._terminationDeferred = None
182 self._stopping_lock = None
183
184 def _stop(self):
185 def _release_or_stop(still_stopped):
186 if still_stopped and self._worker_count == 0:
187 self._terminationDeferred.callback(None)
188 # Note that in this case we don't release the lock: we don't
189 # want to try to fire the _terminationDeferred twice!
190 else:
191 self._stopping_lock.release()
192 def _call_stop(ignored):
193 return self._task_source.stop().addCallback(_release_or_stop)
194 return self._stopping_lock.acquire().addCallback(_call_stop)
167195
168 def consume(self, task_source):196 def consume(self, task_source):
169 """Start consuming tasks from 'task_source'.197 """Start consuming tasks from 'task_source'.
@@ -178,9 +206,7 @@
178 raise AlreadyRunningError(self, self._task_source)206 raise AlreadyRunningError(self, self._task_source)
179 self._task_source = task_source207 self._task_source = task_source
180 self._terminationDeferred = defer.Deferred()208 self._terminationDeferred = defer.Deferred()
181 # This merely begins polling. This means that we acquire our initial209 self._stopping_lock = defer.DeferredLock()
182 # batch of work at the rate of one task per polling interval. As long
183 # as the polling interval is small, this is probably OK.
184 task_source.start(self)210 task_source.start(self)
185 return self._terminationDeferred211 return self._terminationDeferred
186212
@@ -196,7 +222,9 @@
196 raise NotRunningError(self)222 raise NotRunningError(self)
197 self._worker_count += 1223 self._worker_count += 1
198 if self._worker_count >= self._worker_limit:224 if self._worker_count >= self._worker_limit:
199 self._task_source.stop()225 self._stop()
226 else:
227 self._task_source.start(self)
200 d = defer.maybeDeferred(task)228 d = defer.maybeDeferred(task)
201 # We don't expect these tasks to have interesting return values or229 # We don't expect these tasks to have interesting return values or
202 # failure modes.230 # failure modes.
@@ -213,8 +241,7 @@
213 find any jobs, if we actually start any jobs then the exit condition241 find any jobs, if we actually start any jobs then the exit condition
214 in _taskEnded will always be reached before this one.242 in _taskEnded will always be reached before this one.
215 """243 """
216 if self._worker_count == 0:244 self._stop()
217 self._terminationDeferred.callback(None)
218245
219 def taskProductionFailed(self, reason):246 def taskProductionFailed(self, reason):
220 """See `ITaskConsumer`.247 """See `ITaskConsumer`.
@@ -236,9 +263,7 @@
236 """263 """
237 if self._task_source is None:264 if self._task_source is None:
238 raise NotRunningError(self)265 raise NotRunningError(self)
239 self._task_source.stop()266 self._stop()
240 if self._worker_count == 0:
241 self._terminationDeferred.callback(None)
242267
243 def _taskEnded(self, ignored):268 def _taskEnded(self, ignored):
244 """Handle a task reaching completion.269 """Handle a task reaching completion.
@@ -252,8 +277,7 @@
252 """277 """
253 self._worker_count -= 1278 self._worker_count -= 1
254 if self._worker_count == 0:279 if self._worker_count == 0:
255 self._task_source.stop()280 self._stop()
256 self._terminationDeferred.callback(None)
257 elif self._worker_count < self._worker_limit:281 elif self._worker_count < self._worker_limit:
258 self._task_source.start(self)282 self._task_source.start(self)
259 else:283 else:
260284
=== modified file 'lib/canonical/twistedsupport/tests/test_task.py'
--- lib/canonical/twistedsupport/tests/test_task.py 2009-07-17 00:26:05 +0000
+++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000
@@ -7,7 +7,7 @@
77
8import unittest8import unittest
99
10from twisted.internet.defer import Deferred10from twisted.internet.defer import Deferred, succeed
11from twisted.internet.task import Clock11from twisted.internet.task import Clock
1212
13from zope.interface import implements13from zope.interface import implements
@@ -49,14 +49,19 @@
4949
50 implements(ITaskSource)50 implements(ITaskSource)
5151
52 def __init__(self, log):52 def __init__(self, log, stop_deferred=None):
53 self._log = log53 self._log = log
54 if stop_deferred is None:
55 self.stop_deferred = succeed(True)
56 else:
57 self.stop_deferred = stop_deferred
5458
55 def start(self, consumer):59 def start(self, consumer):
56 self._log.append(('start', consumer))60 self._log.append(('start', consumer))
5761
58 def stop(self):62 def stop(self):
59 self._log.append('stop')63 self._log.append('stop')
64 return self.stop_deferred
6065
6166
62class TestPollingTaskSource(TestCase):67class TestPollingTaskSource(TestCase):
@@ -144,6 +149,16 @@
144 # No more calls were made.149 # No more calls were made.
145 self.assertEqual(0, self._num_task_producer_calls)150 self.assertEqual(0, self._num_task_producer_calls)
146151
152 def test_stop_deferred_fires_immediately_if_no_polling(self):
153 # Calling stop when the source is not polling returns a deferred that
154 # fires immediately with True.
155 task_source = self.makeTaskSource()
156 task_source.start(NoopTaskConsumer())
157 stop_deferred = task_source.stop()
158 stop_calls = []
159 stop_deferred.addCallback(stop_calls.append)
160 self.assertEqual([True], stop_calls)
161
147 def test_start_multiple_times_polls_immediately(self):162 def test_start_multiple_times_polls_immediately(self):
148 # Starting a task source multiple times polls immediately.163 # Starting a task source multiple times polls immediately.
149 clock = Clock()164 clock = Clock()
@@ -241,6 +256,74 @@
241 clock.advance(interval)256 clock.advance(interval)
242 self.assertEqual(len(produced_deferreds), 2)257 self.assertEqual(len(produced_deferreds), 2)
243258
259 def test_stop_deferred_doesnt_fire_until_polling_finished(self):
260 # If there is a call to the task producer outstanding when stop() is
261 # called, stop() returns a deferred that fires when the poll finishes.
262 # The value fired with is True if the source is still stopped when the
263 # deferred fires.
264 produced_deferred = Deferred()
265 def producer():
266 return produced_deferred
267 task_source = self.makeTaskSource(task_producer=producer)
268 task_source.start(NoopTaskConsumer())
269 # The call to start calls producer. It returns produced_deferred
270 # which has not been fired, so stop returns a deferred that has not
271 # been fired.
272 stop_deferred = task_source.stop()
273 stop_called = []
274 stop_deferred.addCallback(stop_called.append)
275 self.assertEqual([], stop_called)
276 # When the task producing deferred fires, the stop deferred fires with
277 # 'True' to indicate that the source is still stopped.
278 produced_deferred.callback(None)
279 self.assertEqual([True], stop_called)
280
281 def test_stop_deferred_fires_with_false_if_source_restarted(self):
282 # If there is a call to the task producer outstanding when stop() is
283 # called, stop() returns a deferred that fires when the poll finishes.
284 # The value fired with is False if the source is no longer stopped
285 # when the deferred fires.
286 produced_deferred = Deferred()
287 def producer():
288 return produced_deferred
289 task_source = self.makeTaskSource(task_producer=producer)
290 task_source.start(NoopTaskConsumer())
291 # The call to start calls producer. It returns produced_deferred
292 # which has not been fired so stop returns a deferred that has not
293 # been fired.
294 stop_deferred = task_source.stop()
295 stop_called = []
296 stop_deferred.addCallback(stop_called.append)
297 # Now we restart the source.
298 task_source.start(NoopTaskConsumer())
299 self.assertEqual([], stop_called)
300 # When the task producing deferred fires, the stop deferred fires with
301 # 'False' to indicate that the source has been restarted.
302 produced_deferred.callback(None)
303 self.assertEqual([False], stop_called)
304
305 def test_stop_start_stop_when_polling_doesnt_poll_again(self):
306 # XXX
307 produced_deferreds = []
308 def producer():
309 d = Deferred()
310 produced_deferreds.append(d)
311 return d
312 task_source = self.makeTaskSource(task_producer=producer)
313 # Start the source. This calls the producer.
314 task_source.start(NoopTaskConsumer())
315 self.assertEqual(1, len(produced_deferreds))
316 task_source.stop()
317 # If we start it again, this does not call the producer because
318 # the above call is still in process.
319 task_source.start(NoopTaskConsumer())
320 self.assertEqual(1, len(produced_deferreds))
321 # If we now stop the source and the initial poll for a task completes,
322 # we don't poll again.
323 task_source.stop()
324 produced_deferreds[0].callback(None)
325 self.assertEqual(1, len(produced_deferreds))
326
244 def test_taskStarted_deferred_doesnt_delay_polling(self):327 def test_taskStarted_deferred_doesnt_delay_polling(self):
245 # If taskStarted returns a deferred, we don't wait for it to fire328 # If taskStarted returns a deferred, we don't wait for it to fire
246 # before polling again.329 # before polling again.
@@ -354,7 +437,7 @@
354 consumer.consume(source)437 consumer.consume(source)
355 self.assertRaises(AlreadyRunningError, consumer.consume, source)438 self.assertRaises(AlreadyRunningError, consumer.consume, source)
356439
357 def test_consume_returns_deferred_doesnt_fire_until_tasks(self):440 def test_consumer_doesnt_finish_until_tasks_finish(self):
358 # `consume` returns a Deferred that fires when no more tasks are441 # `consume` returns a Deferred that fires when no more tasks are
359 # running, but only after we've actually done something.442 # running, but only after we've actually done something.
360 consumer = self.makeConsumer()443 consumer = self.makeConsumer()
@@ -363,7 +446,7 @@
363 d.addCallback(log.append)446 d.addCallback(log.append)
364 self.assertEqual([], log)447 self.assertEqual([], log)
365448
366 def test_consume_returns_deferred_fires_when_tasks_done(self):449 def test_consumer_finishes_when_tasks_done(self):
367 # `consume` returns a Deferred that fires when no more tasks are450 # `consume` returns a Deferred that fires when no more tasks are
368 # running.451 # running.
369 consumer = self.makeConsumer()452 consumer = self.makeConsumer()
@@ -373,7 +456,7 @@
373 consumer.taskStarted(lambda: None)456 consumer.taskStarted(lambda: None)
374 self.assertEqual([None], task_log)457 self.assertEqual([None], task_log)
375458
376 def test_consume_returns_deferred_fires_if_no_tasks_found(self):459 def test_consumer_finishes_if_no_tasks_found(self):
377 # `consume` returns a Deferred that fires if no tasks are found when460 # `consume` returns a Deferred that fires if no tasks are found when
378 # no tasks are running.461 # no tasks are running.
379 consumer = self.makeConsumer()462 consumer = self.makeConsumer()
@@ -383,7 +466,37 @@
383 consumer.noTasksFound()466 consumer.noTasksFound()
384 self.assertEqual([None], task_log)467 self.assertEqual([None], task_log)
385468
386 def test_consume_deferred_no_fire_if_no_tasks_found_and_job_running(self):469 def test_consumer_doesnt_finish_until_stop_deferred_fires(self):
470 # The Deferred returned by `consume` does not fire until the deferred
471 # returned by the source's stop() method fires with True to indicate
472 # that the source is still stopped.
473 consumer = self.makeConsumer()
474 consume_log = []
475 stop_deferred = Deferred()
476 source = LoggingSource([], stop_deferred)
477 d = consumer.consume(source)
478 d.addCallback(consume_log.append)
479 consumer.noTasksFound()
480 self.assertEqual([], consume_log)
481 stop_deferred.callback(True)
482 self.assertEqual([None], consume_log)
483
484 def test_consumer_doesnt_finish_if_stop_doesnt_stop(self):
485 # The Deferred returned by `consume` does not fire when the deferred
486 # returned by the source's stop() method fires with False to indicate
487 # that the source has been restarted.
488 consumer = self.makeConsumer()
489 consume_log = []
490 stop_deferred = Deferred()
491 source = LoggingSource([], stop_deferred)
492 d = consumer.consume(source)
493 d.addCallback(consume_log.append)
494 consumer.noTasksFound()
495 self.assertEqual([], consume_log)
496 stop_deferred.callback(False)
497 self.assertEqual([], consume_log)
498
499 def test_consumer_doesnt_finish_if_no_tasks_found_and_job_running(self):
387 # If no tasks are found while a job is running, the Deferred returned500 # If no tasks are found while a job is running, the Deferred returned
388 # by `consume` is not fired.501 # by `consume` is not fired.
389 consumer = self.makeConsumer()502 consumer = self.makeConsumer()
@@ -402,7 +515,7 @@
402 del log[:]515 del log[:]
403 # Finishes immediately, all tasks are done.516 # Finishes immediately, all tasks are done.
404 consumer.taskStarted(lambda: None)517 consumer.taskStarted(lambda: None)
405 self.assertEqual(['stop'], log)518 self.assertEqual(1, log.count('stop'))
406519
407 def test_taskStarted_before_consume_raises_error(self):520 def test_taskStarted_before_consume_raises_error(self):
408 # taskStarted can only be called after we have started consuming. This521 # taskStarted can only be called after we have started consuming. This
@@ -427,6 +540,18 @@
427 consumer.taskStarted(lambda: log.append('task'))540 consumer.taskStarted(lambda: log.append('task'))
428 self.assertEqual(['task'], log)541 self.assertEqual(['task'], log)
429542
543 def test_taskStarted_restarts_source(self):
544 # If, after the task passed to taskStarted has been started, the
545 # consumer is not yet at its worker_limit, it starts the source again
546 # in order consume as many pending jobs as we can as quickly as we
547 # can.
548 log = []
549 consumer = self.makeConsumer()
550 consumer.consume(LoggingSource(log))
551 del log[:]
552 consumer.taskStarted(self._neverEndingTask)
553 self.assertEqual([('start', consumer)], log)
554
430 def test_reaching_working_limit_stops_source(self):555 def test_reaching_working_limit_stops_source(self):
431 # Each time taskStarted is called, we start a worker. When we reach556 # Each time taskStarted is called, we start a worker. When we reach
432 # the worker limit, we tell the source to stop generating work.557 # the worker limit, we tell the source to stop generating work.
@@ -437,10 +562,10 @@
437 consumer.consume(source)562 consumer.consume(source)
438 del log[:]563 del log[:]
439 consumer.taskStarted(self._neverEndingTask)564 consumer.taskStarted(self._neverEndingTask)
440 self.assertEqual([], log)565 self.assertEqual(0, log.count('stop'))
441 for i in range(worker_limit - 1):566 for i in range(worker_limit - 1):
442 consumer.taskStarted(self._neverEndingTask)567 consumer.taskStarted(self._neverEndingTask)
443 self.assertEqual(['stop'], log)568 self.assertEqual(1, log.count('stop'))
444569
445 def test_passing_working_limit_stops_source(self):570 def test_passing_working_limit_stops_source(self):
446 # If we have already reached the worker limit, and taskStarted is571 # If we have already reached the worker limit, and taskStarted is