Merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad
- more-task-scheduled-bug-408638
- Merge into devel
Status: | Merged |
---|---|
Approved by: | Jonathan Lange |
Approved revision: | no longer in the source branch. |
Merged at revision: | not available |
Proposed branch: | lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 |
Merge into: | lp:launchpad |
Diff against target: | None lines |
To merge this branch: | bzr merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Jonathan Lange (community) | Approve | ||
Canonical Launchpad Engineering | Pending | ||
Review via email: mp+9749@code.launchpad.net |
Commit message
Description of the change
Michael Hudson-Doyle (mwhudson) wrote : | # |
Jonathan Lange (jml) wrote : | # |
On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>
> EPIGRAMS IN PROGRAMMING:
>
> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>
> I hope I'm being a pragmatist here.
>
I think so.
> Hi Jono,
>
> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>
> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>
> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>
Wow, concurrency really is quite hard!
However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.
I've got a few comments and questions that I'd like you to address before this lands.
> === modified file 'lib/canonical/
> --- lib/canonical/
> +++ lib/canonical/
> @@ -37,6 +37,12 @@
> """Stop generating tasks.
>
> Any subsequent calls to `stop` are silently ignored.
> +
> + :return: A Deferred that will fire when the source is stopped. It is
> + possible that tasks may be produced until this deferred fires.
> + The deferred will fire with a boolean; True if the source is still
> + stopped, False if the source has been restarted since stop() was
> + called.
Do we know this for sure?
What I mean is, is it possible for the source to have been restarted but for
this to return False. It smells like a possible race condition.
> """
>
>
> @@ -100,10 +106,13 @@
> clock = reactor
> self._clock = clock
> self._looping_call = None
> + self._polling_lock = defer.DeferredL
I think it's worth adding a comment on how this lock is used & why.
> + self._started = False
>
And maybe for this one too.
> def start(self, task_consumer):
> """See `ITaskSource`."""
> self.stop()
> + self._started = True
> self._looping_call = LoopingCall(
> self._looping_
> self._looping_
> @@ -122,15 +131,21 @@
> # If task production fails, we inform the consumer of this, but we
> # don't let any deferred it returns delay subsequent polls.
> task_consumer.
> - d = defer.maybeDefe
> - d.addCallbacks(
> - return d
> + def poll():
> + if self._started:
> + d = defer.maybeDefe
Michael Hudson-Doyle (mwhudson) wrote : | # |
Jonathan Lange wrote:
> Review: Needs Fixing
> On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
>> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>>
>> EPIGRAMS IN PROGRAMMING:
>>
>> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>>
>> I hope I'm being a pragmatist here.
>>
>
> I think so.
Good :) Some genius can clean up my mess later.
>> Hi Jono,
>>
>> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>>
>> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>>
>> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
>> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>>
>
> Wow, concurrency really is quite hard!
Yeah, no kidding.
> However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.
>
> I've got a few comments and questions that I'd like you to address before this lands.
>
>> === modified file 'lib/canonical/
>> --- lib/canonical/
>> +++ lib/canonical/
>> @@ -37,6 +37,12 @@
>> """Stop generating tasks.
>>
>> Any subsequent calls to `stop` are silently ignored.
>> +
>> + :return: A Deferred that will fire when the source is stopped. It is
>> + possible that tasks may be produced until this deferred fires.
>> + The deferred will fire with a boolean; True if the source is still
>> + stopped, False if the source has been restarted since stop() was
>> + called.
>
> Do we know this for sure?
Well it's an interface docstring, so we can say so.
> What I mean is, is it possible for the source to have been restarted but for
> this to return False. It smells like a possible race condition.
I think in a twisted world, it makes sense to talk about the state of
the source at the moment the deferred is fired. I guess you have to be
careful about any callbacks you add that themselves return deferred.
But anyway, until I did it like this, I had no idea how I was going to
make this all work.
>> """
>>
>>
>> @@ -100,10 +106,13 @@
>> clock = reactor
>> self._clock = clock
>> self._looping_call = None
>> + self._polling_lock = defer.DeferredL
>
> I think it's worth adding a comment on how this lock is used & why.
>
>> + self._started = False
>>
>
> And maybe for this one too.
I realized that "_loopingcall is not None" was identical with "_started"
so I deleted _started.
>> def start(self, task_consumer):
>> """See `ITaskSource`."""
>> self.stop()
>> + self._sta...
1 | === modified file 'lib/canonical/twistedsupport/task.py' |
2 | --- lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000 |
3 | +++ lib/canonical/twistedsupport/task.py 2009-08-06 10:38:24 +0000 |
4 | @@ -106,13 +106,14 @@ |
5 | clock = reactor |
6 | self._clock = clock |
7 | self._looping_call = None |
8 | + # _polling_lock is used to prevent concurrent attempts to poll for |
9 | + # work, and to delay the firing of the deferred returned from stop() |
10 | + # until any poll in progress at the moment of the call is complete. |
11 | self._polling_lock = defer.DeferredLock() |
12 | - self._started = False |
13 | |
14 | def start(self, task_consumer): |
15 | """See `ITaskSource`.""" |
16 | self.stop() |
17 | - self._started = True |
18 | self._looping_call = LoopingCall(self._poll, task_consumer) |
19 | self._looping_call.clock = self._clock |
20 | self._looping_call.start(self._interval) |
21 | @@ -132,7 +133,9 @@ |
22 | # don't let any deferred it returns delay subsequent polls. |
23 | task_consumer.taskProductionFailed(reason) |
24 | def poll(): |
25 | - if self._started: |
26 | + # If stop() has been called before the lock was acquired, don't |
27 | + # actually poll for more work. |
28 | + if self._looping_call: |
29 | d = defer.maybeDeferred(self._task_producer) |
30 | return d.addCallbacks(got_task, task_failed) |
31 | return self._polling_lock.run(poll) |
32 | @@ -142,9 +145,8 @@ |
33 | if self._looping_call is not None: |
34 | self._looping_call.stop() |
35 | self._looping_call = None |
36 | - self._started = False |
37 | def _return_still_stopped(): |
38 | - return not self._started |
39 | + return self._looping_call is None |
40 | return self._polling_lock.run(_return_still_stopped) |
41 | |
42 | |
43 | @@ -190,8 +192,11 @@ |
44 | else: |
45 | self._stopping_lock.release() |
46 | def _call_stop(ignored): |
47 | - return self._task_source.stop().addCallback(_release_or_stop) |
48 | - return self._stopping_lock.acquire().addCallback(_call_stop) |
49 | + return self._task_source.stop() |
50 | + d = self._stopping_lock.acquire() |
51 | + d.addCallback(_call_stop) |
52 | + d.addCallback(_release_or_stop) |
53 | + return d |
54 | |
55 | def consume(self, task_source): |
56 | """Start consuming tasks from 'task_source'. |
57 | |
58 | === modified file 'lib/canonical/twistedsupport/tests/test_task.py' |
59 | --- lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000 |
60 | +++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 10:41:48 +0000 |
61 | @@ -303,7 +303,9 @@ |
62 | self.assertEqual([False], stop_called) |
63 | |
64 | def test_stop_start_stop_when_polling_doesnt_poll_again(self): |
65 | - # XXX |
66 | + # If, while task acquisition is in progress, stop(), start() and |
67 | + # stop() again are called in sequence, we shouldn't try to acquire |
68 | + # another job when the first acquisition completes. |
69 | produced_deferreds = [] |
70 | def producer(): |
71 | d = Deferred() |
Jonathan Lange (jml) wrote : | # |
Thanks Michael, this looks great.
Preview Diff
1 | === modified file 'lib/canonical/twistedsupport/task.py' |
2 | --- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000 |
3 | +++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000 |
4 | @@ -37,6 +37,12 @@ |
5 | """Stop generating tasks. |
6 | |
7 | Any subsequent calls to `stop` are silently ignored. |
8 | + |
9 | + :return: A Deferred that will fire when the source is stopped. It is |
10 | + possible that tasks may be produced until this deferred fires. |
11 | + The deferred will fire with a boolean; True if the source is still |
12 | + stopped, False if the source has been restarted since stop() was |
13 | + called. |
14 | """ |
15 | |
16 | |
17 | @@ -100,10 +106,13 @@ |
18 | clock = reactor |
19 | self._clock = clock |
20 | self._looping_call = None |
21 | + self._polling_lock = defer.DeferredLock() |
22 | + self._started = False |
23 | |
24 | def start(self, task_consumer): |
25 | """See `ITaskSource`.""" |
26 | self.stop() |
27 | + self._started = True |
28 | self._looping_call = LoopingCall(self._poll, task_consumer) |
29 | self._looping_call.clock = self._clock |
30 | self._looping_call.start(self._interval) |
31 | @@ -122,15 +131,21 @@ |
32 | # If task production fails, we inform the consumer of this, but we |
33 | # don't let any deferred it returns delay subsequent polls. |
34 | task_consumer.taskProductionFailed(reason) |
35 | - d = defer.maybeDeferred(self._task_producer) |
36 | - d.addCallbacks(got_task, task_failed) |
37 | - return d |
38 | + def poll(): |
39 | + if self._started: |
40 | + d = defer.maybeDeferred(self._task_producer) |
41 | + return d.addCallbacks(got_task, task_failed) |
42 | + return self._polling_lock.run(poll) |
43 | |
44 | def stop(self): |
45 | """See `ITaskSource`.""" |
46 | if self._looping_call is not None: |
47 | self._looping_call.stop() |
48 | self._looping_call = None |
49 | + self._started = False |
50 | + def _return_still_stopped(): |
51 | + return not self._started |
52 | + return self._polling_lock.run(_return_still_stopped) |
53 | |
54 | |
55 | class AlreadyRunningError(Exception): |
56 | @@ -164,6 +179,19 @@ |
57 | self._worker_limit = worker_limit |
58 | self._worker_count = 0 |
59 | self._terminationDeferred = None |
60 | + self._stopping_lock = None |
61 | + |
62 | + def _stop(self): |
63 | + def _release_or_stop(still_stopped): |
64 | + if still_stopped and self._worker_count == 0: |
65 | + self._terminationDeferred.callback(None) |
66 | + # Note that in this case we don't release the lock: we don't |
67 | + # want to try to fire the _terminationDeferred twice! |
68 | + else: |
69 | + self._stopping_lock.release() |
70 | + def _call_stop(ignored): |
71 | + return self._task_source.stop().addCallback(_release_or_stop) |
72 | + return self._stopping_lock.acquire().addCallback(_call_stop) |
73 | |
74 | def consume(self, task_source): |
75 | """Start consuming tasks from 'task_source'. |
76 | @@ -178,9 +206,7 @@ |
77 | raise AlreadyRunningError(self, self._task_source) |
78 | self._task_source = task_source |
79 | self._terminationDeferred = defer.Deferred() |
80 | - # This merely begins polling. This means that we acquire our initial |
81 | - # batch of work at the rate of one task per polling interval. As long |
82 | - # as the polling interval is small, this is probably OK. |
83 | + self._stopping_lock = defer.DeferredLock() |
84 | task_source.start(self) |
85 | return self._terminationDeferred |
86 | |
87 | @@ -196,7 +222,9 @@ |
88 | raise NotRunningError(self) |
89 | self._worker_count += 1 |
90 | if self._worker_count >= self._worker_limit: |
91 | - self._task_source.stop() |
92 | + self._stop() |
93 | + else: |
94 | + self._task_source.start(self) |
95 | d = defer.maybeDeferred(task) |
96 | # We don't expect these tasks to have interesting return values or |
97 | # failure modes. |
98 | @@ -213,8 +241,7 @@ |
99 | find any jobs, if we actually start any jobs then the exit condition |
100 | in _taskEnded will always be reached before this one. |
101 | """ |
102 | - if self._worker_count == 0: |
103 | - self._terminationDeferred.callback(None) |
104 | + self._stop() |
105 | |
106 | def taskProductionFailed(self, reason): |
107 | """See `ITaskConsumer`. |
108 | @@ -236,9 +263,7 @@ |
109 | """ |
110 | if self._task_source is None: |
111 | raise NotRunningError(self) |
112 | - self._task_source.stop() |
113 | - if self._worker_count == 0: |
114 | - self._terminationDeferred.callback(None) |
115 | + self._stop() |
116 | |
117 | def _taskEnded(self, ignored): |
118 | """Handle a task reaching completion. |
119 | @@ -252,8 +277,7 @@ |
120 | """ |
121 | self._worker_count -= 1 |
122 | if self._worker_count == 0: |
123 | - self._task_source.stop() |
124 | - self._terminationDeferred.callback(None) |
125 | + self._stop() |
126 | elif self._worker_count < self._worker_limit: |
127 | self._task_source.start(self) |
128 | else: |
129 | |
130 | === modified file 'lib/canonical/twistedsupport/tests/test_task.py' |
131 | --- lib/canonical/twistedsupport/tests/test_task.py 2009-07-17 00:26:05 +0000 |
132 | +++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000 |
133 | @@ -7,7 +7,7 @@ |
134 | |
135 | import unittest |
136 | |
137 | -from twisted.internet.defer import Deferred |
138 | +from twisted.internet.defer import Deferred, succeed |
139 | from twisted.internet.task import Clock |
140 | |
141 | from zope.interface import implements |
142 | @@ -49,14 +49,19 @@ |
143 | |
144 | implements(ITaskSource) |
145 | |
146 | - def __init__(self, log): |
147 | + def __init__(self, log, stop_deferred=None): |
148 | self._log = log |
149 | + if stop_deferred is None: |
150 | + self.stop_deferred = succeed(True) |
151 | + else: |
152 | + self.stop_deferred = stop_deferred |
153 | |
154 | def start(self, consumer): |
155 | self._log.append(('start', consumer)) |
156 | |
157 | def stop(self): |
158 | self._log.append('stop') |
159 | + return self.stop_deferred |
160 | |
161 | |
162 | class TestPollingTaskSource(TestCase): |
163 | @@ -144,6 +149,16 @@ |
164 | # No more calls were made. |
165 | self.assertEqual(0, self._num_task_producer_calls) |
166 | |
167 | + def test_stop_deferred_fires_immediately_if_no_polling(self): |
168 | + # Calling stop when the source is not polling returns a deferred that |
169 | + # fires immediately with True. |
170 | + task_source = self.makeTaskSource() |
171 | + task_source.start(NoopTaskConsumer()) |
172 | + stop_deferred = task_source.stop() |
173 | + stop_calls = [] |
174 | + stop_deferred.addCallback(stop_calls.append) |
175 | + self.assertEqual([True], stop_calls) |
176 | + |
177 | def test_start_multiple_times_polls_immediately(self): |
178 | # Starting a task source multiple times polls immediately. |
179 | clock = Clock() |
180 | @@ -241,6 +256,74 @@ |
181 | clock.advance(interval) |
182 | self.assertEqual(len(produced_deferreds), 2) |
183 | |
184 | + def test_stop_deferred_doesnt_fire_until_polling_finished(self): |
185 | + # If there is a call to the task producer outstanding when stop() is |
186 | + # called, stop() returns a deferred that fires when the poll finishes. |
187 | + # The value fired with is True if the source is still stopped when the |
188 | + # deferred fires. |
189 | + produced_deferred = Deferred() |
190 | + def producer(): |
191 | + return produced_deferred |
192 | + task_source = self.makeTaskSource(task_producer=producer) |
193 | + task_source.start(NoopTaskConsumer()) |
194 | + # The call to start calls producer. It returns produced_deferred |
195 | + # which has not been fired, so stop returns a deferred that has not |
196 | + # been fired. |
197 | + stop_deferred = task_source.stop() |
198 | + stop_called = [] |
199 | + stop_deferred.addCallback(stop_called.append) |
200 | + self.assertEqual([], stop_called) |
201 | + # When the task producing deferred fires, the stop deferred fires with |
202 | + # 'True' to indicate that the source is still stopped. |
203 | + produced_deferred.callback(None) |
204 | + self.assertEqual([True], stop_called) |
205 | + |
206 | + def test_stop_deferred_fires_with_false_if_source_restarted(self): |
207 | + # If there is a call to the task producer outstanding when stop() is |
208 | + # called, stop() returns a deferred that fires when the poll finishes. |
209 | + # The value fired with is False if the source is no longer stopped |
210 | + # when the deferred fires. |
211 | + produced_deferred = Deferred() |
212 | + def producer(): |
213 | + return produced_deferred |
214 | + task_source = self.makeTaskSource(task_producer=producer) |
215 | + task_source.start(NoopTaskConsumer()) |
216 | + # The call to start calls producer. It returns produced_deferred |
217 | + # which has not been fired so stop returns a deferred that has not |
218 | + # been fired. |
219 | + stop_deferred = task_source.stop() |
220 | + stop_called = [] |
221 | + stop_deferred.addCallback(stop_called.append) |
222 | + # Now we restart the source. |
223 | + task_source.start(NoopTaskConsumer()) |
224 | + self.assertEqual([], stop_called) |
225 | + # When the task producing deferred fires, the stop deferred fires with |
226 | + # 'False' to indicate that the source has been restarted. |
227 | + produced_deferred.callback(None) |
228 | + self.assertEqual([False], stop_called) |
229 | + |
230 | + def test_stop_start_stop_when_polling_doesnt_poll_again(self): |
231 | + # XXX |
232 | + produced_deferreds = [] |
233 | + def producer(): |
234 | + d = Deferred() |
235 | + produced_deferreds.append(d) |
236 | + return d |
237 | + task_source = self.makeTaskSource(task_producer=producer) |
238 | + # Start the source. This calls the producer. |
239 | + task_source.start(NoopTaskConsumer()) |
240 | + self.assertEqual(1, len(produced_deferreds)) |
241 | + task_source.stop() |
242 | + # If we start it again, this does not call the producer because |
243 | + # the above call is still in process. |
244 | + task_source.start(NoopTaskConsumer()) |
245 | + self.assertEqual(1, len(produced_deferreds)) |
246 | + # If we now stop the source and the initial poll for a task completes, |
247 | + # we don't poll again. |
248 | + task_source.stop() |
249 | + produced_deferreds[0].callback(None) |
250 | + self.assertEqual(1, len(produced_deferreds)) |
251 | + |
252 | def test_taskStarted_deferred_doesnt_delay_polling(self): |
253 | # If taskStarted returns a deferred, we don't wait for it to fire |
254 | # before polling again. |
255 | @@ -354,7 +437,7 @@ |
256 | consumer.consume(source) |
257 | self.assertRaises(AlreadyRunningError, consumer.consume, source) |
258 | |
259 | - def test_consume_returns_deferred_doesnt_fire_until_tasks(self): |
260 | + def test_consumer_doesnt_finish_until_tasks_finish(self): |
261 | # `consume` returns a Deferred that fires when no more tasks are |
262 | # running, but only after we've actually done something. |
263 | consumer = self.makeConsumer() |
264 | @@ -363,7 +446,7 @@ |
265 | d.addCallback(log.append) |
266 | self.assertEqual([], log) |
267 | |
268 | - def test_consume_returns_deferred_fires_when_tasks_done(self): |
269 | + def test_consumer_finishes_when_tasks_done(self): |
270 | # `consume` returns a Deferred that fires when no more tasks are |
271 | # running. |
272 | consumer = self.makeConsumer() |
273 | @@ -373,7 +456,7 @@ |
274 | consumer.taskStarted(lambda: None) |
275 | self.assertEqual([None], task_log) |
276 | |
277 | - def test_consume_returns_deferred_fires_if_no_tasks_found(self): |
278 | + def test_consumer_finishes_if_no_tasks_found(self): |
279 | # `consume` returns a Deferred that fires if no tasks are found when |
280 | # no tasks are running. |
281 | consumer = self.makeConsumer() |
282 | @@ -383,7 +466,37 @@ |
283 | consumer.noTasksFound() |
284 | self.assertEqual([None], task_log) |
285 | |
286 | - def test_consume_deferred_no_fire_if_no_tasks_found_and_job_running(self): |
287 | + def test_consumer_doesnt_finish_until_stop_deferred_fires(self): |
288 | + # The Deferred returned by `consume` does not fire until the deferred |
289 | + # returned by the source's stop() method fires with True to indicate |
290 | + # that the source is still stopped. |
291 | + consumer = self.makeConsumer() |
292 | + consume_log = [] |
293 | + stop_deferred = Deferred() |
294 | + source = LoggingSource([], stop_deferred) |
295 | + d = consumer.consume(source) |
296 | + d.addCallback(consume_log.append) |
297 | + consumer.noTasksFound() |
298 | + self.assertEqual([], consume_log) |
299 | + stop_deferred.callback(True) |
300 | + self.assertEqual([None], consume_log) |
301 | + |
302 | + def test_consumer_doesnt_finish_if_stop_doesnt_stop(self): |
303 | + # The Deferred returned by `consume` does not fire when the deferred |
304 | + # returned by the source's stop() method fires with False to indicate |
305 | + # that the source has been restarted. |
306 | + consumer = self.makeConsumer() |
307 | + consume_log = [] |
308 | + stop_deferred = Deferred() |
309 | + source = LoggingSource([], stop_deferred) |
310 | + d = consumer.consume(source) |
311 | + d.addCallback(consume_log.append) |
312 | + consumer.noTasksFound() |
313 | + self.assertEqual([], consume_log) |
314 | + stop_deferred.callback(False) |
315 | + self.assertEqual([], consume_log) |
316 | + |
317 | + def test_consumer_doesnt_finish_if_no_tasks_found_and_job_running(self): |
318 | # If no tasks are found while a job is running, the Deferred returned |
319 | # by `consume` is not fired. |
320 | consumer = self.makeConsumer() |
321 | @@ -402,7 +515,7 @@ |
322 | del log[:] |
323 | # Finishes immediately, all tasks are done. |
324 | consumer.taskStarted(lambda: None) |
325 | - self.assertEqual(['stop'], log) |
326 | + self.assertEqual(1, log.count('stop')) |
327 | |
328 | def test_taskStarted_before_consume_raises_error(self): |
329 | # taskStarted can only be called after we have started consuming. This |
330 | @@ -427,6 +540,18 @@ |
331 | consumer.taskStarted(lambda: log.append('task')) |
332 | self.assertEqual(['task'], log) |
333 | |
334 | + def test_taskStarted_restarts_source(self): |
335 | + # If, after the task passed to taskStarted has been started, the |
336 | + # consumer is not yet at its worker_limit, it starts the source again |
337 | + # in order consume as many pending jobs as we can as quickly as we |
338 | + # can. |
339 | + log = [] |
340 | + consumer = self.makeConsumer() |
341 | + consumer.consume(LoggingSource(log)) |
342 | + del log[:] |
343 | + consumer.taskStarted(self._neverEndingTask) |
344 | + self.assertEqual([('start', consumer)], log) |
345 | + |
346 | def test_reaching_working_limit_stops_source(self): |
347 | # Each time taskStarted is called, we start a worker. When we reach |
348 | # the worker limit, we tell the source to stop generating work. |
349 | @@ -437,10 +562,10 @@ |
350 | consumer.consume(source) |
351 | del log[:] |
352 | consumer.taskStarted(self._neverEndingTask) |
353 | - self.assertEqual([], log) |
354 | + self.assertEqual(0, log.count('stop')) |
355 | for i in range(worker_limit - 1): |
356 | consumer.taskStarted(self._neverEndingTask) |
357 | - self.assertEqual(['stop'], log) |
358 | + self.assertEqual(1, log.count('stop')) |
359 | |
360 | def test_passing_working_limit_stops_source(self): |
361 | # If we have already reached the worker limit, and taskStarted is |
EPIGRAMS IN PROGRAMMING:
58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
I hope I'm being a pragmatist here.
Hi Jono,
I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
Cheers,
mwh