Merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad
- more-task-scheduled-bug-408638
- Merge into devel
Status: | Merged |
---|---|
Approved by: | Jonathan Lange |
Approved revision: | no longer in the source branch. |
Merged at revision: | not available |
Proposed branch: | lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 |
Merge into: | lp:launchpad |
Diff against target: | None lines |
To merge this branch: | bzr merge lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Jonathan Lange (community) | Approve | ||
Canonical Launchpad Engineering | Pending | ||
Review via email: mp+9749@code.launchpad.net |
Commit message
Description of the change
Michael Hudson-Doyle (mwhudson) wrote : | # |
Jonathan Lange (jml) wrote : | # |
On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>
> EPIGRAMS IN PROGRAMMING:
>
> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>
> I hope I'm being a pragmatist here.
>
I think so.
> Hi Jono,
>
> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>
> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>
> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>
Wow, concurrency really is quite hard!
However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.
I've got a few comments and questions that I'd like you to address before this lands.
> === modified file 'lib/canonical/
> --- lib/canonical/
> +++ lib/canonical/
> @@ -37,6 +37,12 @@
> """Stop generating tasks.
>
> Any subsequent calls to `stop` are silently ignored.
> +
> + :return: A Deferred that will fire when the source is stopped. It is
> + possible that tasks may be produced until this deferred fires.
> + The deferred will fire with a boolean; True if the source is still
> + stopped, False if the source has been restarted since stop() was
> + called.
Do we know this for sure?
What I mean is, is it possible for the source to have been restarted but for
this to return False. It smells like a possible race condition.
> """
>
>
> @@ -100,10 +106,13 @@
> clock = reactor
> self._clock = clock
> self._looping_call = None
> + self._polling_lock = defer.DeferredL
I think it's worth adding a comment on how this lock is used & why.
> + self._started = False
>
And maybe for this one too.
> def start(self, task_consumer):
> """See `ITaskSource`."""
> self.stop()
> + self._started = True
> self._looping_call = LoopingCall(
> self._looping_
> self._looping_
> @@ -122,15 +131,21 @@
> # If task production fails, we inform the consumer of this, but we
> # don't let any deferred it returns delay subsequent polls.
> task_consumer.
> - d = defer.maybeDefe
> - d.addCallbacks(
> - return d
> + def poll():
> + if self._started:
> + d = defer.maybeDefe
Michael Hudson-Doyle (mwhudson) wrote : | # |
Jonathan Lange wrote:
> Review: Needs Fixing
> On Thu, Aug 6, 2009 at 10:18 AM, Michael Hudson<email address hidden> wrote:
>> You have been requested to review the proposed merge of lp:~mwhudson/launchpad/more-task-scheduled-bug-408638 into lp:launchpad/devel.
>>
>> EPIGRAMS IN PROGRAMMING:
>>
>> 58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
>>
>> I hope I'm being a pragmatist here.
>>
>
> I think so.
Good :) Some genius can clean up my mess later.
>> Hi Jono,
>>
>> I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
>>
>> If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
>>
>> 1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
>> 2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
>>
>
> Wow, concurrency really is quite hard!
Yeah, no kidding.
> However, I really do think that this branch is as simple as possible, barring any paradigm breakthroughs. Thanks for doing such a good job with it.
>
> I've got a few comments and questions that I'd like you to address before this lands.
>
>> === modified file 'lib/canonical/
>> --- lib/canonical/
>> +++ lib/canonical/
>> @@ -37,6 +37,12 @@
>> """Stop generating tasks.
>>
>> Any subsequent calls to `stop` are silently ignored.
>> +
>> + :return: A Deferred that will fire when the source is stopped. It is
>> + possible that tasks may be produced until this deferred fires.
>> + The deferred will fire with a boolean; True if the source is still
>> + stopped, False if the source has been restarted since stop() was
>> + called.
>
> Do we know this for sure?
Well it's an interface docstring, so we can say so.
> What I mean is, is it possible for the source to have been restarted but for
> this to return False. It smells like a possible race condition.
I think in a twisted world, it makes sense to talk about the state of
the source at the moment the deferred is fired. I guess you have to be
careful about any callbacks you add that themselves return deferred.
But anyway, until I did it like this, I had no idea how I was going to
make this all work.
>> """
>>
>>
>> @@ -100,10 +106,13 @@
>> clock = reactor
>> self._clock = clock
>> self._looping_call = None
>> + self._polling_lock = defer.DeferredL
>
> I think it's worth adding a comment on how this lock is used & why.
>
>> + self._started = False
>>
>
> And maybe for this one too.
I realized that "_loopingcall is not None" was identical with "_started"
so I deleted _started.
>> def start(self, task_consumer):
>> """See `ITaskSource`."""
>> self.stop()
>> + self._sta...
1 | === modified file 'lib/canonical/twistedsupport/task.py' | |||
2 | --- lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000 | |||
3 | +++ lib/canonical/twistedsupport/task.py 2009-08-06 10:38:24 +0000 | |||
4 | @@ -106,13 +106,14 @@ | |||
5 | 106 | clock = reactor | 106 | clock = reactor |
6 | 107 | self._clock = clock | 107 | self._clock = clock |
7 | 108 | self._looping_call = None | 108 | self._looping_call = None |
8 | 109 | # _polling_lock is used to prevent concurrent attempts to poll for | ||
9 | 110 | # work, and to delay the firing of the deferred returned from stop() | ||
10 | 111 | # until any poll in progress at the moment of the call is complete. | ||
11 | 109 | self._polling_lock = defer.DeferredLock() | 112 | self._polling_lock = defer.DeferredLock() |
12 | 110 | self._started = False | ||
13 | 111 | 113 | ||
14 | 112 | def start(self, task_consumer): | 114 | def start(self, task_consumer): |
15 | 113 | """See `ITaskSource`.""" | 115 | """See `ITaskSource`.""" |
16 | 114 | self.stop() | 116 | self.stop() |
17 | 115 | self._started = True | ||
18 | 116 | self._looping_call = LoopingCall(self._poll, task_consumer) | 117 | self._looping_call = LoopingCall(self._poll, task_consumer) |
19 | 117 | self._looping_call.clock = self._clock | 118 | self._looping_call.clock = self._clock |
20 | 118 | self._looping_call.start(self._interval) | 119 | self._looping_call.start(self._interval) |
21 | @@ -132,7 +133,9 @@ | |||
22 | 132 | # don't let any deferred it returns delay subsequent polls. | 133 | # don't let any deferred it returns delay subsequent polls. |
23 | 133 | task_consumer.taskProductionFailed(reason) | 134 | task_consumer.taskProductionFailed(reason) |
24 | 134 | def poll(): | 135 | def poll(): |
26 | 135 | if self._started: | 136 | # If stop() has been called before the lock was acquired, don't |
27 | 137 | # actually poll for more work. | ||
28 | 138 | if self._looping_call: | ||
29 | 136 | d = defer.maybeDeferred(self._task_producer) | 139 | d = defer.maybeDeferred(self._task_producer) |
30 | 137 | return d.addCallbacks(got_task, task_failed) | 140 | return d.addCallbacks(got_task, task_failed) |
31 | 138 | return self._polling_lock.run(poll) | 141 | return self._polling_lock.run(poll) |
32 | @@ -142,9 +145,8 @@ | |||
33 | 142 | if self._looping_call is not None: | 145 | if self._looping_call is not None: |
34 | 143 | self._looping_call.stop() | 146 | self._looping_call.stop() |
35 | 144 | self._looping_call = None | 147 | self._looping_call = None |
36 | 145 | self._started = False | ||
37 | 146 | def _return_still_stopped(): | 148 | def _return_still_stopped(): |
39 | 147 | return not self._started | 149 | return self._looping_call is None |
40 | 148 | return self._polling_lock.run(_return_still_stopped) | 150 | return self._polling_lock.run(_return_still_stopped) |
41 | 149 | 151 | ||
42 | 150 | 152 | ||
43 | @@ -190,8 +192,11 @@ | |||
44 | 190 | else: | 192 | else: |
45 | 191 | self._stopping_lock.release() | 193 | self._stopping_lock.release() |
46 | 192 | def _call_stop(ignored): | 194 | def _call_stop(ignored): |
49 | 193 | return self._task_source.stop().addCallback(_release_or_stop) | 195 | return self._task_source.stop() |
50 | 194 | return self._stopping_lock.acquire().addCallback(_call_stop) | 196 | d = self._stopping_lock.acquire() |
51 | 197 | d.addCallback(_call_stop) | ||
52 | 198 | d.addCallback(_release_or_stop) | ||
53 | 199 | return d | ||
54 | 195 | 200 | ||
55 | 196 | def consume(self, task_source): | 201 | def consume(self, task_source): |
56 | 197 | """Start consuming tasks from 'task_source'. | 202 | """Start consuming tasks from 'task_source'. |
57 | 198 | 203 | ||
58 | === modified file 'lib/canonical/twistedsupport/tests/test_task.py' | |||
59 | --- lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000 | |||
60 | +++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 10:41:48 +0000 | |||
61 | @@ -303,7 +303,9 @@ | |||
62 | 303 | self.assertEqual([False], stop_called) | 303 | self.assertEqual([False], stop_called) |
63 | 304 | 304 | ||
64 | 305 | def test_stop_start_stop_when_polling_doesnt_poll_again(self): | 305 | def test_stop_start_stop_when_polling_doesnt_poll_again(self): |
66 | 306 | # XXX | 306 | # If, while task acquisition is in progress, stop(), start() and |
67 | 307 | # stop() again are called in sequence, we shouldn't try to acquire | ||
68 | 308 | # another job when the first acquisition completes. | ||
69 | 307 | produced_deferreds = [] | 309 | produced_deferreds = [] |
70 | 308 | def producer(): | 310 | def producer(): |
71 | 309 | d = Deferred() | 311 | d = Deferred() |
Jonathan Lange (jml) wrote : | # |
Thanks Michael, this looks great.
Preview Diff
1 | === modified file 'lib/canonical/twistedsupport/task.py' | |||
2 | --- lib/canonical/twistedsupport/task.py 2009-07-17 00:26:05 +0000 | |||
3 | +++ lib/canonical/twistedsupport/task.py 2009-08-06 09:08:47 +0000 | |||
4 | @@ -37,6 +37,12 @@ | |||
5 | 37 | """Stop generating tasks. | 37 | """Stop generating tasks. |
6 | 38 | 38 | ||
7 | 39 | Any subsequent calls to `stop` are silently ignored. | 39 | Any subsequent calls to `stop` are silently ignored. |
8 | 40 | |||
9 | 41 | :return: A Deferred that will fire when the source is stopped. It is | ||
10 | 42 | possible that tasks may be produced until this deferred fires. | ||
11 | 43 | The deferred will fire with a boolean; True if the source is still | ||
12 | 44 | stopped, False if the source has been restarted since stop() was | ||
13 | 45 | called. | ||
14 | 40 | """ | 46 | """ |
15 | 41 | 47 | ||
16 | 42 | 48 | ||
17 | @@ -100,10 +106,13 @@ | |||
18 | 100 | clock = reactor | 106 | clock = reactor |
19 | 101 | self._clock = clock | 107 | self._clock = clock |
20 | 102 | self._looping_call = None | 108 | self._looping_call = None |
21 | 109 | self._polling_lock = defer.DeferredLock() | ||
22 | 110 | self._started = False | ||
23 | 103 | 111 | ||
24 | 104 | def start(self, task_consumer): | 112 | def start(self, task_consumer): |
25 | 105 | """See `ITaskSource`.""" | 113 | """See `ITaskSource`.""" |
26 | 106 | self.stop() | 114 | self.stop() |
27 | 115 | self._started = True | ||
28 | 107 | self._looping_call = LoopingCall(self._poll, task_consumer) | 116 | self._looping_call = LoopingCall(self._poll, task_consumer) |
29 | 108 | self._looping_call.clock = self._clock | 117 | self._looping_call.clock = self._clock |
30 | 109 | self._looping_call.start(self._interval) | 118 | self._looping_call.start(self._interval) |
31 | @@ -122,15 +131,21 @@ | |||
32 | 122 | # If task production fails, we inform the consumer of this, but we | 131 | # If task production fails, we inform the consumer of this, but we |
33 | 123 | # don't let any deferred it returns delay subsequent polls. | 132 | # don't let any deferred it returns delay subsequent polls. |
34 | 124 | task_consumer.taskProductionFailed(reason) | 133 | task_consumer.taskProductionFailed(reason) |
38 | 125 | d = defer.maybeDeferred(self._task_producer) | 134 | def poll(): |
39 | 126 | d.addCallbacks(got_task, task_failed) | 135 | if self._started: |
40 | 127 | return d | 136 | d = defer.maybeDeferred(self._task_producer) |
41 | 137 | return d.addCallbacks(got_task, task_failed) | ||
42 | 138 | return self._polling_lock.run(poll) | ||
43 | 128 | 139 | ||
44 | 129 | def stop(self): | 140 | def stop(self): |
45 | 130 | """See `ITaskSource`.""" | 141 | """See `ITaskSource`.""" |
46 | 131 | if self._looping_call is not None: | 142 | if self._looping_call is not None: |
47 | 132 | self._looping_call.stop() | 143 | self._looping_call.stop() |
48 | 133 | self._looping_call = None | 144 | self._looping_call = None |
49 | 145 | self._started = False | ||
50 | 146 | def _return_still_stopped(): | ||
51 | 147 | return not self._started | ||
52 | 148 | return self._polling_lock.run(_return_still_stopped) | ||
53 | 134 | 149 | ||
54 | 135 | 150 | ||
55 | 136 | class AlreadyRunningError(Exception): | 151 | class AlreadyRunningError(Exception): |
56 | @@ -164,6 +179,19 @@ | |||
57 | 164 | self._worker_limit = worker_limit | 179 | self._worker_limit = worker_limit |
58 | 165 | self._worker_count = 0 | 180 | self._worker_count = 0 |
59 | 166 | self._terminationDeferred = None | 181 | self._terminationDeferred = None |
60 | 182 | self._stopping_lock = None | ||
61 | 183 | |||
62 | 184 | def _stop(self): | ||
63 | 185 | def _release_or_stop(still_stopped): | ||
64 | 186 | if still_stopped and self._worker_count == 0: | ||
65 | 187 | self._terminationDeferred.callback(None) | ||
66 | 188 | # Note that in this case we don't release the lock: we don't | ||
67 | 189 | # want to try to fire the _terminationDeferred twice! | ||
68 | 190 | else: | ||
69 | 191 | self._stopping_lock.release() | ||
70 | 192 | def _call_stop(ignored): | ||
71 | 193 | return self._task_source.stop().addCallback(_release_or_stop) | ||
72 | 194 | return self._stopping_lock.acquire().addCallback(_call_stop) | ||
73 | 167 | 195 | ||
74 | 168 | def consume(self, task_source): | 196 | def consume(self, task_source): |
75 | 169 | """Start consuming tasks from 'task_source'. | 197 | """Start consuming tasks from 'task_source'. |
76 | @@ -178,9 +206,7 @@ | |||
77 | 178 | raise AlreadyRunningError(self, self._task_source) | 206 | raise AlreadyRunningError(self, self._task_source) |
78 | 179 | self._task_source = task_source | 207 | self._task_source = task_source |
79 | 180 | self._terminationDeferred = defer.Deferred() | 208 | self._terminationDeferred = defer.Deferred() |
83 | 181 | # This merely begins polling. This means that we acquire our initial | 209 | self._stopping_lock = defer.DeferredLock() |
81 | 182 | # batch of work at the rate of one task per polling interval. As long | ||
82 | 183 | # as the polling interval is small, this is probably OK. | ||
84 | 184 | task_source.start(self) | 210 | task_source.start(self) |
85 | 185 | return self._terminationDeferred | 211 | return self._terminationDeferred |
86 | 186 | 212 | ||
87 | @@ -196,7 +222,9 @@ | |||
88 | 196 | raise NotRunningError(self) | 222 | raise NotRunningError(self) |
89 | 197 | self._worker_count += 1 | 223 | self._worker_count += 1 |
90 | 198 | if self._worker_count >= self._worker_limit: | 224 | if self._worker_count >= self._worker_limit: |
92 | 199 | self._task_source.stop() | 225 | self._stop() |
93 | 226 | else: | ||
94 | 227 | self._task_source.start(self) | ||
95 | 200 | d = defer.maybeDeferred(task) | 228 | d = defer.maybeDeferred(task) |
96 | 201 | # We don't expect these tasks to have interesting return values or | 229 | # We don't expect these tasks to have interesting return values or |
97 | 202 | # failure modes. | 230 | # failure modes. |
98 | @@ -213,8 +241,7 @@ | |||
99 | 213 | find any jobs, if we actually start any jobs then the exit condition | 241 | find any jobs, if we actually start any jobs then the exit condition |
100 | 214 | in _taskEnded will always be reached before this one. | 242 | in _taskEnded will always be reached before this one. |
101 | 215 | """ | 243 | """ |
104 | 216 | if self._worker_count == 0: | 244 | self._stop() |
103 | 217 | self._terminationDeferred.callback(None) | ||
105 | 218 | 245 | ||
106 | 219 | def taskProductionFailed(self, reason): | 246 | def taskProductionFailed(self, reason): |
107 | 220 | """See `ITaskConsumer`. | 247 | """See `ITaskConsumer`. |
108 | @@ -236,9 +263,7 @@ | |||
109 | 236 | """ | 263 | """ |
110 | 237 | if self._task_source is None: | 264 | if self._task_source is None: |
111 | 238 | raise NotRunningError(self) | 265 | raise NotRunningError(self) |
115 | 239 | self._task_source.stop() | 266 | self._stop() |
113 | 240 | if self._worker_count == 0: | ||
114 | 241 | self._terminationDeferred.callback(None) | ||
116 | 242 | 267 | ||
117 | 243 | def _taskEnded(self, ignored): | 268 | def _taskEnded(self, ignored): |
118 | 244 | """Handle a task reaching completion. | 269 | """Handle a task reaching completion. |
119 | @@ -252,8 +277,7 @@ | |||
120 | 252 | """ | 277 | """ |
121 | 253 | self._worker_count -= 1 | 278 | self._worker_count -= 1 |
122 | 254 | if self._worker_count == 0: | 279 | if self._worker_count == 0: |
125 | 255 | self._task_source.stop() | 280 | self._stop() |
124 | 256 | self._terminationDeferred.callback(None) | ||
126 | 257 | elif self._worker_count < self._worker_limit: | 281 | elif self._worker_count < self._worker_limit: |
127 | 258 | self._task_source.start(self) | 282 | self._task_source.start(self) |
128 | 259 | else: | 283 | else: |
129 | 260 | 284 | ||
130 | === modified file 'lib/canonical/twistedsupport/tests/test_task.py' | |||
131 | --- lib/canonical/twistedsupport/tests/test_task.py 2009-07-17 00:26:05 +0000 | |||
132 | +++ lib/canonical/twistedsupport/tests/test_task.py 2009-08-06 09:08:47 +0000 | |||
133 | @@ -7,7 +7,7 @@ | |||
134 | 7 | 7 | ||
135 | 8 | import unittest | 8 | import unittest |
136 | 9 | 9 | ||
138 | 10 | from twisted.internet.defer import Deferred | 10 | from twisted.internet.defer import Deferred, succeed |
139 | 11 | from twisted.internet.task import Clock | 11 | from twisted.internet.task import Clock |
140 | 12 | 12 | ||
141 | 13 | from zope.interface import implements | 13 | from zope.interface import implements |
142 | @@ -49,14 +49,19 @@ | |||
143 | 49 | 49 | ||
144 | 50 | implements(ITaskSource) | 50 | implements(ITaskSource) |
145 | 51 | 51 | ||
147 | 52 | def __init__(self, log): | 52 | def __init__(self, log, stop_deferred=None): |
148 | 53 | self._log = log | 53 | self._log = log |
149 | 54 | if stop_deferred is None: | ||
150 | 55 | self.stop_deferred = succeed(True) | ||
151 | 56 | else: | ||
152 | 57 | self.stop_deferred = stop_deferred | ||
153 | 54 | 58 | ||
154 | 55 | def start(self, consumer): | 59 | def start(self, consumer): |
155 | 56 | self._log.append(('start', consumer)) | 60 | self._log.append(('start', consumer)) |
156 | 57 | 61 | ||
157 | 58 | def stop(self): | 62 | def stop(self): |
158 | 59 | self._log.append('stop') | 63 | self._log.append('stop') |
159 | 64 | return self.stop_deferred | ||
160 | 60 | 65 | ||
161 | 61 | 66 | ||
162 | 62 | class TestPollingTaskSource(TestCase): | 67 | class TestPollingTaskSource(TestCase): |
163 | @@ -144,6 +149,16 @@ | |||
164 | 144 | # No more calls were made. | 149 | # No more calls were made. |
165 | 145 | self.assertEqual(0, self._num_task_producer_calls) | 150 | self.assertEqual(0, self._num_task_producer_calls) |
166 | 146 | 151 | ||
167 | 152 | def test_stop_deferred_fires_immediately_if_no_polling(self): | ||
168 | 153 | # Calling stop when the source is not polling returns a deferred that | ||
169 | 154 | # fires immediately with True. | ||
170 | 155 | task_source = self.makeTaskSource() | ||
171 | 156 | task_source.start(NoopTaskConsumer()) | ||
172 | 157 | stop_deferred = task_source.stop() | ||
173 | 158 | stop_calls = [] | ||
174 | 159 | stop_deferred.addCallback(stop_calls.append) | ||
175 | 160 | self.assertEqual([True], stop_calls) | ||
176 | 161 | |||
177 | 147 | def test_start_multiple_times_polls_immediately(self): | 162 | def test_start_multiple_times_polls_immediately(self): |
178 | 148 | # Starting a task source multiple times polls immediately. | 163 | # Starting a task source multiple times polls immediately. |
179 | 149 | clock = Clock() | 164 | clock = Clock() |
180 | @@ -241,6 +256,74 @@ | |||
181 | 241 | clock.advance(interval) | 256 | clock.advance(interval) |
182 | 242 | self.assertEqual(len(produced_deferreds), 2) | 257 | self.assertEqual(len(produced_deferreds), 2) |
183 | 243 | 258 | ||
184 | 259 | def test_stop_deferred_doesnt_fire_until_polling_finished(self): | ||
185 | 260 | # If there is a call to the task producer outstanding when stop() is | ||
186 | 261 | # called, stop() returns a deferred that fires when the poll finishes. | ||
187 | 262 | # The value fired with is True if the source is still stopped when the | ||
188 | 263 | # deferred fires. | ||
189 | 264 | produced_deferred = Deferred() | ||
190 | 265 | def producer(): | ||
191 | 266 | return produced_deferred | ||
192 | 267 | task_source = self.makeTaskSource(task_producer=producer) | ||
193 | 268 | task_source.start(NoopTaskConsumer()) | ||
194 | 269 | # The call to start calls producer. It returns produced_deferred | ||
195 | 270 | # which has not been fired, so stop returns a deferred that has not | ||
196 | 271 | # been fired. | ||
197 | 272 | stop_deferred = task_source.stop() | ||
198 | 273 | stop_called = [] | ||
199 | 274 | stop_deferred.addCallback(stop_called.append) | ||
200 | 275 | self.assertEqual([], stop_called) | ||
201 | 276 | # When the task producing deferred fires, the stop deferred fires with | ||
202 | 277 | # 'True' to indicate that the source is still stopped. | ||
203 | 278 | produced_deferred.callback(None) | ||
204 | 279 | self.assertEqual([True], stop_called) | ||
205 | 280 | |||
206 | 281 | def test_stop_deferred_fires_with_false_if_source_restarted(self): | ||
207 | 282 | # If there is a call to the task producer outstanding when stop() is | ||
208 | 283 | # called, stop() returns a deferred that fires when the poll finishes. | ||
209 | 284 | # The value fired with is False if the source is no longer stopped | ||
210 | 285 | # when the deferred fires. | ||
211 | 286 | produced_deferred = Deferred() | ||
212 | 287 | def producer(): | ||
213 | 288 | return produced_deferred | ||
214 | 289 | task_source = self.makeTaskSource(task_producer=producer) | ||
215 | 290 | task_source.start(NoopTaskConsumer()) | ||
216 | 291 | # The call to start calls producer. It returns produced_deferred | ||
217 | 292 | # which has not been fired so stop returns a deferred that has not | ||
218 | 293 | # been fired. | ||
219 | 294 | stop_deferred = task_source.stop() | ||
220 | 295 | stop_called = [] | ||
221 | 296 | stop_deferred.addCallback(stop_called.append) | ||
222 | 297 | # Now we restart the source. | ||
223 | 298 | task_source.start(NoopTaskConsumer()) | ||
224 | 299 | self.assertEqual([], stop_called) | ||
225 | 300 | # When the task producing deferred fires, the stop deferred fires with | ||
226 | 301 | # 'False' to indicate that the source has been restarted. | ||
227 | 302 | produced_deferred.callback(None) | ||
228 | 303 | self.assertEqual([False], stop_called) | ||
229 | 304 | |||
230 | 305 | def test_stop_start_stop_when_polling_doesnt_poll_again(self): | ||
231 | 306 | # XXX | ||
232 | 307 | produced_deferreds = [] | ||
233 | 308 | def producer(): | ||
234 | 309 | d = Deferred() | ||
235 | 310 | produced_deferreds.append(d) | ||
236 | 311 | return d | ||
237 | 312 | task_source = self.makeTaskSource(task_producer=producer) | ||
238 | 313 | # Start the source. This calls the producer. | ||
239 | 314 | task_source.start(NoopTaskConsumer()) | ||
240 | 315 | self.assertEqual(1, len(produced_deferreds)) | ||
241 | 316 | task_source.stop() | ||
242 | 317 | # If we start it again, this does not call the producer because | ||
243 | 318 | # the above call is still in process. | ||
244 | 319 | task_source.start(NoopTaskConsumer()) | ||
245 | 320 | self.assertEqual(1, len(produced_deferreds)) | ||
246 | 321 | # If we now stop the source and the initial poll for a task completes, | ||
247 | 322 | # we don't poll again. | ||
248 | 323 | task_source.stop() | ||
249 | 324 | produced_deferreds[0].callback(None) | ||
250 | 325 | self.assertEqual(1, len(produced_deferreds)) | ||
251 | 326 | |||
252 | 244 | def test_taskStarted_deferred_doesnt_delay_polling(self): | 327 | def test_taskStarted_deferred_doesnt_delay_polling(self): |
253 | 245 | # If taskStarted returns a deferred, we don't wait for it to fire | 328 | # If taskStarted returns a deferred, we don't wait for it to fire |
254 | 246 | # before polling again. | 329 | # before polling again. |
255 | @@ -354,7 +437,7 @@ | |||
256 | 354 | consumer.consume(source) | 437 | consumer.consume(source) |
257 | 355 | self.assertRaises(AlreadyRunningError, consumer.consume, source) | 438 | self.assertRaises(AlreadyRunningError, consumer.consume, source) |
258 | 356 | 439 | ||
260 | 357 | def test_consume_returns_deferred_doesnt_fire_until_tasks(self): | 440 | def test_consumer_doesnt_finish_until_tasks_finish(self): |
261 | 358 | # `consume` returns a Deferred that fires when no more tasks are | 441 | # `consume` returns a Deferred that fires when no more tasks are |
262 | 359 | # running, but only after we've actually done something. | 442 | # running, but only after we've actually done something. |
263 | 360 | consumer = self.makeConsumer() | 443 | consumer = self.makeConsumer() |
264 | @@ -363,7 +446,7 @@ | |||
265 | 363 | d.addCallback(log.append) | 446 | d.addCallback(log.append) |
266 | 364 | self.assertEqual([], log) | 447 | self.assertEqual([], log) |
267 | 365 | 448 | ||
269 | 366 | def test_consume_returns_deferred_fires_when_tasks_done(self): | 449 | def test_consumer_finishes_when_tasks_done(self): |
270 | 367 | # `consume` returns a Deferred that fires when no more tasks are | 450 | # `consume` returns a Deferred that fires when no more tasks are |
271 | 368 | # running. | 451 | # running. |
272 | 369 | consumer = self.makeConsumer() | 452 | consumer = self.makeConsumer() |
273 | @@ -373,7 +456,7 @@ | |||
274 | 373 | consumer.taskStarted(lambda: None) | 456 | consumer.taskStarted(lambda: None) |
275 | 374 | self.assertEqual([None], task_log) | 457 | self.assertEqual([None], task_log) |
276 | 375 | 458 | ||
278 | 376 | def test_consume_returns_deferred_fires_if_no_tasks_found(self): | 459 | def test_consumer_finishes_if_no_tasks_found(self): |
279 | 377 | # `consume` returns a Deferred that fires if no tasks are found when | 460 | # `consume` returns a Deferred that fires if no tasks are found when |
280 | 378 | # no tasks are running. | 461 | # no tasks are running. |
281 | 379 | consumer = self.makeConsumer() | 462 | consumer = self.makeConsumer() |
282 | @@ -383,7 +466,37 @@ | |||
283 | 383 | consumer.noTasksFound() | 466 | consumer.noTasksFound() |
284 | 384 | self.assertEqual([None], task_log) | 467 | self.assertEqual([None], task_log) |
285 | 385 | 468 | ||
287 | 386 | def test_consume_deferred_no_fire_if_no_tasks_found_and_job_running(self): | 469 | def test_consumer_doesnt_finish_until_stop_deferred_fires(self): |
288 | 470 | # The Deferred returned by `consume` does not fire until the deferred | ||
289 | 471 | # returned by the source's stop() method fires with True to indicate | ||
290 | 472 | # that the source is still stopped. | ||
291 | 473 | consumer = self.makeConsumer() | ||
292 | 474 | consume_log = [] | ||
293 | 475 | stop_deferred = Deferred() | ||
294 | 476 | source = LoggingSource([], stop_deferred) | ||
295 | 477 | d = consumer.consume(source) | ||
296 | 478 | d.addCallback(consume_log.append) | ||
297 | 479 | consumer.noTasksFound() | ||
298 | 480 | self.assertEqual([], consume_log) | ||
299 | 481 | stop_deferred.callback(True) | ||
300 | 482 | self.assertEqual([None], consume_log) | ||
301 | 483 | |||
302 | 484 | def test_consumer_doesnt_finish_if_stop_doesnt_stop(self): | ||
303 | 485 | # The Deferred returned by `consume` does not fire when the deferred | ||
304 | 486 | # returned by the source's stop() method fires with False to indicate | ||
305 | 487 | # that the source has been restarted. | ||
306 | 488 | consumer = self.makeConsumer() | ||
307 | 489 | consume_log = [] | ||
308 | 490 | stop_deferred = Deferred() | ||
309 | 491 | source = LoggingSource([], stop_deferred) | ||
310 | 492 | d = consumer.consume(source) | ||
311 | 493 | d.addCallback(consume_log.append) | ||
312 | 494 | consumer.noTasksFound() | ||
313 | 495 | self.assertEqual([], consume_log) | ||
314 | 496 | stop_deferred.callback(False) | ||
315 | 497 | self.assertEqual([], consume_log) | ||
316 | 498 | |||
317 | 499 | def test_consumer_doesnt_finish_if_no_tasks_found_and_job_running(self): | ||
318 | 387 | # If no tasks are found while a job is running, the Deferred returned | 500 | # If no tasks are found while a job is running, the Deferred returned |
319 | 388 | # by `consume` is not fired. | 501 | # by `consume` is not fired. |
320 | 389 | consumer = self.makeConsumer() | 502 | consumer = self.makeConsumer() |
321 | @@ -402,7 +515,7 @@ | |||
322 | 402 | del log[:] | 515 | del log[:] |
323 | 403 | # Finishes immediately, all tasks are done. | 516 | # Finishes immediately, all tasks are done. |
324 | 404 | consumer.taskStarted(lambda: None) | 517 | consumer.taskStarted(lambda: None) |
326 | 405 | self.assertEqual(['stop'], log) | 518 | self.assertEqual(1, log.count('stop')) |
327 | 406 | 519 | ||
328 | 407 | def test_taskStarted_before_consume_raises_error(self): | 520 | def test_taskStarted_before_consume_raises_error(self): |
329 | 408 | # taskStarted can only be called after we have started consuming. This | 521 | # taskStarted can only be called after we have started consuming. This |
330 | @@ -427,6 +540,18 @@ | |||
331 | 427 | consumer.taskStarted(lambda: log.append('task')) | 540 | consumer.taskStarted(lambda: log.append('task')) |
332 | 428 | self.assertEqual(['task'], log) | 541 | self.assertEqual(['task'], log) |
333 | 429 | 542 | ||
334 | 543 | def test_taskStarted_restarts_source(self): | ||
335 | 544 | # If, after the task passed to taskStarted has been started, the | ||
336 | 545 | # consumer is not yet at its worker_limit, it starts the source again | ||
337 | 546 | # in order consume as many pending jobs as we can as quickly as we | ||
338 | 547 | # can. | ||
339 | 548 | log = [] | ||
340 | 549 | consumer = self.makeConsumer() | ||
341 | 550 | consumer.consume(LoggingSource(log)) | ||
342 | 551 | del log[:] | ||
343 | 552 | consumer.taskStarted(self._neverEndingTask) | ||
344 | 553 | self.assertEqual([('start', consumer)], log) | ||
345 | 554 | |||
346 | 430 | def test_reaching_working_limit_stops_source(self): | 555 | def test_reaching_working_limit_stops_source(self): |
347 | 431 | # Each time taskStarted is called, we start a worker. When we reach | 556 | # Each time taskStarted is called, we start a worker. When we reach |
348 | 432 | # the worker limit, we tell the source to stop generating work. | 557 | # the worker limit, we tell the source to stop generating work. |
349 | @@ -437,10 +562,10 @@ | |||
350 | 437 | consumer.consume(source) | 562 | consumer.consume(source) |
351 | 438 | del log[:] | 563 | del log[:] |
352 | 439 | consumer.taskStarted(self._neverEndingTask) | 564 | consumer.taskStarted(self._neverEndingTask) |
354 | 440 | self.assertEqual([], log) | 565 | self.assertEqual(0, log.count('stop')) |
355 | 441 | for i in range(worker_limit - 1): | 566 | for i in range(worker_limit - 1): |
356 | 442 | consumer.taskStarted(self._neverEndingTask) | 567 | consumer.taskStarted(self._neverEndingTask) |
358 | 443 | self.assertEqual(['stop'], log) | 568 | self.assertEqual(1, log.count('stop')) |
359 | 444 | 569 | ||
360 | 445 | def test_passing_working_limit_stops_source(self): | 570 | def test_passing_working_limit_stops_source(self): |
361 | 446 | # If we have already reached the worker limit, and taskStarted is | 571 | # If we have already reached the worker limit, and taskStarted is |
EPIGRAMS IN PROGRAMMING:
58. Fools ignore complexity. Pragmatists suffer it. Some can avoid it. Geniuses remove it.
I hope I'm being a pragmatist here.
Hi Jono,
I hope you can make time to have a look at the branch some time today. If you don't have time for a full review, I'll get Tim to look at it tomorrow. But you already know what a DeferredLock is :)
If the code and tests don't make sense, then I've failed in my mission, but there are two issues addressed here:
1) Some race conditions around the puller exiting while a request for a job is pending (basically, change ITaskSource.stop to return deferred)
2) Addressing the behaviour where only one job gets pulled per run of the puller (see bug 408638)
Cheers,
mwh