Merge lp:~jameinel/bzr/2.1-client-reconnect-819604 into lp:bzr/2.1
- 2.1-client-reconnect-819604
- Merge into 2.1
Status: | Work in progress |
---|---|
Proposed branch: | lp:~jameinel/bzr/2.1-client-reconnect-819604 |
Merge into: | lp:bzr/2.1 |
Diff against target: |
1035 lines (+589/-143) 7 files modified
bzrlib/help_topics/en/debug-flags.txt (+2/-0) bzrlib/osutils.py (+40/-9) bzrlib/smart/client.py (+170/-86) bzrlib/smart/medium.py (+43/-24) bzrlib/smart/protocol.py (+0/-3) bzrlib/tests/test_osutils.py (+39/-0) bzrlib/tests/test_smart_transport.py (+295/-21) |
To merge this branch: | bzr merge lp:~jameinel/bzr/2.1-client-reconnect-819604 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
bzr-core | Pending | ||
Review via email: mp+78597@code.launchpad.net |
Commit message
Start of bug #819604, allow bzr clients to reconnect if the connection is gone when we go to write a new request.
Description of the change
This is some initial work for having clients reconnect when their bzr+ssh (or bzr://) connection gets closed underneath them.
To start with, the trickiest bit is that it is really quite hard to detect when things are closed. Mostly because of buffering, etc, at lots of different levels (internal memory buffering, socket/pipe buffering, 'ssh.exe' buffering, latency of close message from server back to client, etc.)
I'll also clarify that this isn't intended to handle all possible sorts of connection failures. What we really are trying to handle is allowing a server to be upgraded 'live'. Such that the server will gently disconnect us only between complete requests. Versus having requests terminated randomly in the middle of content. It may help with some of those cases, and we certainly don't want to cause corruption, etc if one of those happens. But it isn't the goal of *this* change.
This specific patch has a few aspects, and does help some cases in real-world testing.
1) Backport a minimum amount of the SmartSSHClientM
It also meant that he shared the logic from SmartSimplePipe
This is a pretty small part of the patch.
2) Update some of the lower level code so that we get ConnectionReset rather than various IOError and ValueError when writing to a closed connection.
3) Update the _SmartClient.
4) The one caveat is if there is a 'body_stream'. body_stream is an iterable, so we can't just rewind it and resume it.
On the plus side, there is only one caller RemoteStreamSin
That would allow us to close the gap a little bit. So we can detect ConnectionReset all the way to the point that we actually start streaming the content.
The other bit that would be possible, is to update ProtocolThreeRe
This just closes a window where we won't reconnect during 'bzr push'.
5) The next step is that (especially for sockets), we write out the request successfully, and then notice it is closed when we try to read back the response.
Here I'm a lot more concerned about non-idempotent requests. Because the server might have fully read the request, and sent back a response, but we didn't get to see the response.
However, our RPC design is meant to be stateless (all state is passed in the request, not assumed to be stored on the server side). Which means things *should* be idempotent. For something like 'put_bytes', we may write the same content twice, but we shouldn't corrupt anything.
The only one I can think of is 'append_bytes', and the associated open_write_stream() code. (RemoteTranspor
And that code shouldn't really be used with up-to-date bzr clients and servers. So I'm much more ok with it just dying if it isn't safe to retry.
John A Meinel (jameinel) wrote : | # |
Small update, in working on the read-reconnect code, I realized passing all these arguments around was very clumsy. So I refactored the code into a helper class _SmartClientReq
Vincent Ladeuil (vila) wrote : | # |
> I should note, I think it would be prudent to land this into bzr.dev first,
> but I wanted to make sure it was written against 2.1 since that is the oldest
> client that we want to backport support for.
+4 (+1 for each of 2.1, 2.2, 2.3, 2.4)
No need to ask anymore, good, keep reading my mind ;)
I don't know exactly how we should proceed but as much as possible, all the modifications related to this topic should be done in a separate branch which should be merged into whatever series we target.
Backport is hard, let's avoid it as much as possible (but no more ;)
> Also, I expect there will be a modest amount of conflicts bringing this up
> from bzr-2.1 into bzr.dev. There has been a modest amount of changes
> inbetween, so I'll try to produce branches for each series.
Yup, that sounds a lot like a loom with a thread by targeted series. Or any arrangement of branches that makes our life easier.
Vincent Ladeuil (vila) wrote : | # |
> Small update, in working on the read-reconnect code, I realized passing all
> these arguments around was very clumsy. So I refactored the code into a helper
> class _SmartClientReq
> end up as attributes, and the function call interplay is easier to follow.
From the description alone this part got my approval. If some helpers need to move there too, clarifying the class features, even better ( no pre-conceptions here, just saying).
John A Meinel (jameinel) wrote : | # |
Some small code cleanups, and add -Dnoretry as a debugging flag that will disable the retry-on-write.
Martin Pool (mbp) wrote : | # |
I'm going to (reluctantly) bump these out of the merge queue until this is landed and well tested in 2.5 - probably until after the 2.5.0 final release is out. Then we can come back and look at landing to earlier series.
John A Meinel (jameinel) wrote : | # |
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
On 11/29/2011 7:32 AM, Martin Pool wrote:
> I'm going to (reluctantly) bump these out of the merge queue until
> this is landed and well tested in 2.5 - probably until after the
> 2.5.0 final release is out. Then we can come back and look at
> landing to earlier series.
I don't think it is something you should feel bad about :). That was
certainly the intent.
1) I wrote it against 2.1 so we would have a chance to land it there
cleanly.
2) I tried to split it up into multiple patches to make it easier to
review.
3) I merged it up through the stack because there are a fair number of
changes to this code, and thus conflicts, etc.
4) It should certainly land on trunk and get a thorough shakeout there.
I'm not sure how much time you think is enough. Certainly if enough
time passes, 2.1 will be fully obsolete :).
Anyway, thanks for getting this landed in 2.5.
John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://
iEYEARECAAYFAk7
T90AoJDimVbLmeD
=94bN
-----END PGP SIGNATURE-----
Preview Diff
1 | === modified file 'bzrlib/help_topics/en/debug-flags.txt' |
2 | --- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000 |
3 | +++ bzrlib/help_topics/en/debug-flags.txt 2012-09-11 12:31:20 +0000 |
4 | @@ -24,6 +24,8 @@ |
5 | -Dindex Trace major index operations. |
6 | -Dknit Trace knit operations. |
7 | -Dlock Trace when lockdir locks are taken or released. |
8 | +-Dnoretry If a connection is reset, fail immediately rather than |
9 | + retrying the request. |
10 | -Dprogress Trace progress bar operations. |
11 | -Dmerge Emit information for debugging merges. |
12 | -Dno_apport Don't use apport to report crashes. |
13 | |
14 | === modified file 'bzrlib/osutils.py' |
15 | --- bzrlib/osutils.py 2010-05-27 04:00:01 +0000 |
16 | +++ bzrlib/osutils.py 2012-09-11 12:31:20 +0000 |
17 | @@ -40,6 +40,7 @@ |
18 | rmtree, |
19 | ) |
20 | import signal |
21 | +import socket |
22 | import subprocess |
23 | import tempfile |
24 | from tempfile import ( |
25 | @@ -1929,6 +1930,20 @@ |
26 | return socket.gethostname().decode(get_user_encoding()) |
27 | |
28 | |
29 | +# We must not read/write any more than 64k at a time from/to a socket so we |
30 | +# don't risk "no buffer space available" errors on some platforms. Windows in |
31 | +# particular is likely to throw WSAECONNABORTED or WSAENOBUFS if given too much |
32 | +# data at once. |
33 | +MAX_SOCKET_CHUNK = 64 * 1024 |
34 | + |
35 | +_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL] |
36 | +for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']: |
37 | + _eno = getattr(errno, _eno, None) |
38 | + if _eno is not None: |
39 | + _end_of_stream_errors.append(_eno) |
40 | +del _eno |
41 | + |
42 | + |
43 | def recv_all(socket, bytes): |
44 | """Receive an exact number of bytes. |
45 | |
46 | @@ -1948,21 +1963,37 @@ |
47 | return b |
48 | |
49 | |
50 | -def send_all(socket, bytes, report_activity=None): |
51 | +def send_all(sock, bytes, report_activity=None): |
52 | """Send all bytes on a socket. |
53 | |
54 | - Regular socket.sendall() can give socket error 10053 on Windows. This |
55 | - implementation sends no more than 64k at a time, which avoids this problem. |
56 | + Breaks large blocks in smaller chunks to avoid buffering limitations on |
57 | + some platforms, and catches EINTR which may be thrown if the send is |
58 | + interrupted by a signal. |
59 | + |
60 | + This is preferred to socket.sendall(), because it avoids portability bugs |
61 | + and provides activity reporting. |
62 | |
63 | :param report_activity: Call this as bytes are read, see |
64 | Transport._report_activity |
65 | """ |
66 | - chunk_size = 2**16 |
67 | - for pos in xrange(0, len(bytes), chunk_size): |
68 | - block = bytes[pos:pos+chunk_size] |
69 | - if report_activity is not None: |
70 | - report_activity(len(block), 'write') |
71 | - until_no_eintr(socket.sendall, block) |
72 | + sent_total = 0 |
73 | + byte_count = len(bytes) |
74 | + while sent_total < byte_count: |
75 | + try: |
76 | + sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK)) |
77 | + except (socket.error, IOError), e: |
78 | + if e.args[0] in _end_of_stream_errors: |
79 | + raise errors.ConnectionReset( |
80 | + "Error trying to write to socket", e) |
81 | + if e.args[0] != errno.EINTR: |
82 | + raise |
83 | + else: |
84 | + if sent == 0: |
85 | + raise errors.ConnectionReset('Sending to %s returned 0 bytes' |
86 | + % (sock,)) |
87 | + sent_total += sent |
88 | + if report_activity is not None: |
89 | + report_activity(sent, 'write') |
90 | |
91 | |
92 | def dereference_path(path): |
93 | |
94 | === modified file 'bzrlib/smart/client.py' |
95 | --- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000 |
96 | +++ bzrlib/smart/client.py 2012-09-11 12:31:20 +0000 |
97 | @@ -16,10 +16,11 @@ |
98 | |
99 | import bzrlib |
100 | from bzrlib.smart import message, protocol |
101 | -from bzrlib.trace import warning |
102 | from bzrlib import ( |
103 | + debug, |
104 | errors, |
105 | hooks, |
106 | + trace, |
107 | ) |
108 | |
109 | |
110 | @@ -39,93 +40,12 @@ |
111 | def __repr__(self): |
112 | return '%s(%r)' % (self.__class__.__name__, self._medium) |
113 | |
114 | - def _send_request(self, protocol_version, method, args, body=None, |
115 | - readv_body=None, body_stream=None): |
116 | - encoder, response_handler = self._construct_protocol( |
117 | - protocol_version) |
118 | - encoder.set_headers(self._headers) |
119 | - if body is not None: |
120 | - if readv_body is not None: |
121 | - raise AssertionError( |
122 | - "body and readv_body are mutually exclusive.") |
123 | - if body_stream is not None: |
124 | - raise AssertionError( |
125 | - "body and body_stream are mutually exclusive.") |
126 | - encoder.call_with_body_bytes((method, ) + args, body) |
127 | - elif readv_body is not None: |
128 | - if body_stream is not None: |
129 | - raise AssertionError( |
130 | - "readv_body and body_stream are mutually exclusive.") |
131 | - encoder.call_with_body_readv_array((method, ) + args, readv_body) |
132 | - elif body_stream is not None: |
133 | - encoder.call_with_body_stream((method, ) + args, body_stream) |
134 | - else: |
135 | - encoder.call(method, *args) |
136 | - return response_handler |
137 | - |
138 | - def _run_call_hooks(self, method, args, body, readv_body): |
139 | - if not _SmartClient.hooks['call']: |
140 | - return |
141 | - params = CallHookParams(method, args, body, readv_body, self._medium) |
142 | - for hook in _SmartClient.hooks['call']: |
143 | - hook(params) |
144 | - |
145 | def _call_and_read_response(self, method, args, body=None, readv_body=None, |
146 | body_stream=None, expect_response_body=True): |
147 | - self._run_call_hooks(method, args, body, readv_body) |
148 | - if self._medium._protocol_version is not None: |
149 | - response_handler = self._send_request( |
150 | - self._medium._protocol_version, method, args, body=body, |
151 | - readv_body=readv_body, body_stream=body_stream) |
152 | - return (response_handler.read_response_tuple( |
153 | - expect_body=expect_response_body), |
154 | - response_handler) |
155 | - else: |
156 | - for protocol_version in [3, 2]: |
157 | - if protocol_version == 2: |
158 | - # If v3 doesn't work, the remote side is older than 1.6. |
159 | - self._medium._remember_remote_is_before((1, 6)) |
160 | - response_handler = self._send_request( |
161 | - protocol_version, method, args, body=body, |
162 | - readv_body=readv_body, body_stream=body_stream) |
163 | - try: |
164 | - response_tuple = response_handler.read_response_tuple( |
165 | - expect_body=expect_response_body) |
166 | - except errors.UnexpectedProtocolVersionMarker, err: |
167 | - # TODO: We could recover from this without disconnecting if |
168 | - # we recognise the protocol version. |
169 | - warning( |
170 | - 'Server does not understand Bazaar network protocol %d,' |
171 | - ' reconnecting. (Upgrade the server to avoid this.)' |
172 | - % (protocol_version,)) |
173 | - self._medium.disconnect() |
174 | - continue |
175 | - except errors.ErrorFromSmartServer: |
176 | - # If we received an error reply from the server, then it |
177 | - # must be ok with this protocol version. |
178 | - self._medium._protocol_version = protocol_version |
179 | - raise |
180 | - else: |
181 | - self._medium._protocol_version = protocol_version |
182 | - return response_tuple, response_handler |
183 | - raise errors.SmartProtocolError( |
184 | - 'Server is not a Bazaar server: ' + str(err)) |
185 | - |
186 | - def _construct_protocol(self, version): |
187 | - request = self._medium.get_request() |
188 | - if version == 3: |
189 | - request_encoder = protocol.ProtocolThreeRequester(request) |
190 | - response_handler = message.ConventionalResponseHandler() |
191 | - response_proto = protocol.ProtocolThreeDecoder( |
192 | - response_handler, expect_version_marker=True) |
193 | - response_handler.setProtoAndMediumRequest(response_proto, request) |
194 | - elif version == 2: |
195 | - request_encoder = protocol.SmartClientRequestProtocolTwo(request) |
196 | - response_handler = request_encoder |
197 | - else: |
198 | - request_encoder = protocol.SmartClientRequestProtocolOne(request) |
199 | - response_handler = request_encoder |
200 | - return request_encoder, response_handler |
201 | + request = _SmartClientRequest(self, method, args, body=body, |
202 | + readv_body=readv_body, body_stream=body_stream, |
203 | + expect_response_body=expect_response_body) |
204 | + return request.call_and_read_response() |
205 | |
206 | def call(self, method, *args): |
207 | """Call a method on the remote server.""" |
208 | @@ -191,6 +111,170 @@ |
209 | return self._medium.remote_path_from_transport(transport) |
210 | |
211 | |
212 | +class _SmartClientRequest(object): |
213 | + """Encapsulate the logic for a single request. |
214 | + |
215 | + This class handles things like reconnecting and sending the request a |
216 | + second time when the connection is reset in the middle. It also handles the |
217 | + multiple requests that get made if we don't know what protocol the server |
218 | + supports yet. |
219 | + |
220 | + Generally, you build up one of these objects, passing in the arguments that |
221 | + you want to send to the server, and then use 'call_and_read_response' to |
222 | + get the response from the server. |
223 | + """ |
224 | + |
225 | + def __init__(self, client, method, args, body=None, readv_body=None, |
226 | + body_stream=None, expect_response_body=True): |
227 | + self.client = client |
228 | + self.method = method |
229 | + self.args = args |
230 | + self.body = body |
231 | + self.readv_body = readv_body |
232 | + self.body_stream = body_stream |
233 | + self.expect_response_body = expect_response_body |
234 | + |
235 | + def call_and_read_response(self): |
236 | + """Send the request to the server, and read the initial response. |
237 | + |
238 | + This doesn't read all of the body content of the response, instead it |
239 | + returns (response_tuple, response_handler). response_tuple is the 'ok', |
240 | + or 'error' information, and 'response_handler' can be used to get the |
241 | + content stream out. |
242 | + """ |
243 | + self._run_call_hooks() |
244 | + protocol_version = self.client._medium._protocol_version |
245 | + if protocol_version is None: |
246 | + return self._call_determining_protocol_version() |
247 | + else: |
248 | + return self._call(protocol_version) |
249 | + |
250 | + def _run_call_hooks(self): |
251 | + if not _SmartClient.hooks['call']: |
252 | + return |
253 | + params = CallHookParams(self.method, self.args, self.body, |
254 | + self.readv_body, self.client._medium) |
255 | + for hook in _SmartClient.hooks['call']: |
256 | + hook(params) |
257 | + |
258 | + def _call(self, protocol_version): |
259 | + """We know the protocol version. |
260 | + |
261 | + So this just sends the request, and then reads the response. This is |
262 | + where the code will be to retry requests if the connection is closed. |
263 | + """ |
264 | + response_handler = self._send(protocol_version) |
265 | + response_tuple = response_handler.read_response_tuple( |
266 | + expect_body=self.expect_response_body) |
267 | + return (response_tuple, response_handler) |
268 | + |
269 | + def _call_determining_protocol_version(self): |
270 | + """Determine what protocol the remote server supports. |
271 | + |
272 | + We do this by placing a request in the most recent protocol, and |
273 | + handling the UnexpectedProtocolVersionMarker from the server. |
274 | + """ |
275 | + for protocol_version in [3, 2]: |
276 | + if protocol_version == 2: |
277 | + # If v3 doesn't work, the remote side is older than 1.6. |
278 | + self.client._medium._remember_remote_is_before((1, 6)) |
279 | + try: |
280 | + response_tuple, response_handler = self._call(protocol_version) |
281 | + except errors.UnexpectedProtocolVersionMarker, err: |
282 | + # TODO: We could recover from this without disconnecting if |
283 | + # we recognise the protocol version. |
284 | + trace.warning( |
285 | + 'Server does not understand Bazaar network protocol %d,' |
286 | + ' reconnecting. (Upgrade the server to avoid this.)' |
287 | + % (protocol_version,)) |
288 | + self.client._medium.disconnect() |
289 | + continue |
290 | + except errors.ErrorFromSmartServer: |
291 | + # If we received an error reply from the server, then it |
292 | + # must be ok with this protocol version. |
293 | + self.client._medium._protocol_version = protocol_version |
294 | + raise |
295 | + else: |
296 | + self.client._medium._protocol_version = protocol_version |
297 | + return response_tuple, response_handler |
298 | + raise errors.SmartProtocolError( |
299 | + 'Server is not a Bazaar server: ' + str(err)) |
300 | + |
301 | + def _construct_protocol(self, version): |
302 | + """Build the encoding stack for a given protocol version.""" |
303 | + request = self.client._medium.get_request() |
304 | + if version == 3: |
305 | + request_encoder = protocol.ProtocolThreeRequester(request) |
306 | + response_handler = message.ConventionalResponseHandler() |
307 | + response_proto = protocol.ProtocolThreeDecoder( |
308 | + response_handler, expect_version_marker=True) |
309 | + response_handler.setProtoAndMediumRequest(response_proto, request) |
310 | + elif version == 2: |
311 | + request_encoder = protocol.SmartClientRequestProtocolTwo(request) |
312 | + response_handler = request_encoder |
313 | + else: |
314 | + request_encoder = protocol.SmartClientRequestProtocolOne(request) |
315 | + response_handler = request_encoder |
316 | + return request_encoder, response_handler |
317 | + |
318 | + def _send(self, protocol_version): |
319 | + """Encode the request, and send it to the server. |
320 | + |
321 | + This will retry a request if we get a ConnectionReset while sending the |
322 | + request to the server. (Unless we have a body_stream that we have |
323 | + already started consuming, since we can't restart body_streams) |
324 | + |
325 | + :return: response_handler as defined by _construct_protocol |
326 | + """ |
327 | + encoder, response_handler = self._construct_protocol(protocol_version) |
328 | + try: |
329 | + self._send_no_retry(encoder) |
330 | + except errors.ConnectionReset, e: |
331 | + # If we fail during the _send_no_retry phase, then we can |
332 | + # be confident that the server did not get our request, because we |
333 | + # haven't started waiting for the reply yet. So try the request |
334 | + # again. We only issue a single retry, because if the connection |
335 | + # really is down, there is no reason to loop endlessly. |
336 | + |
337 | + # Connection is dead, so close our end of it. |
338 | + self.client._medium.reset() |
339 | + if (('noretry' in debug.debug_flags) |
340 | + or self.body_stream is not None): |
341 | + # We can't restart a body_stream that has been partially |
342 | + # consumed, so we don't retry. |
343 | + raise |
344 | + trace.warning('ConnectionReset calling %r, retrying' |
345 | + % (self.method,)) |
346 | + trace.log_exception_quietly() |
347 | + encoder, response_handler = self._construct_protocol( |
348 | + protocol_version) |
349 | + self._send_no_retry(encoder) |
350 | + return response_handler |
351 | + |
352 | + def _send_no_retry(self, encoder): |
353 | + """Just encode the request and try to send it.""" |
354 | + encoder.set_headers(self.client._headers) |
355 | + if self.body is not None: |
356 | + if self.readv_body is not None: |
357 | + raise AssertionError( |
358 | + "body and readv_body are mutually exclusive.") |
359 | + if self.body_stream is not None: |
360 | + raise AssertionError( |
361 | + "body and body_stream are mutually exclusive.") |
362 | + encoder.call_with_body_bytes((self.method, ) + self.args, self.body) |
363 | + elif self.readv_body is not None: |
364 | + if self.body_stream is not None: |
365 | + raise AssertionError( |
366 | + "readv_body and body_stream are mutually exclusive.") |
367 | + encoder.call_with_body_readv_array((self.method, ) + self.args, |
368 | + self.readv_body) |
369 | + elif self.body_stream is not None: |
370 | + encoder.call_with_body_stream((self.method, ) + self.args, |
371 | + self.body_stream) |
372 | + else: |
373 | + encoder.call(self.method, *self.args) |
374 | + |
375 | + |
376 | class SmartClientHooks(hooks.Hooks): |
377 | |
378 | def __init__(self): |
379 | |
380 | === modified file 'bzrlib/smart/medium.py' |
381 | --- bzrlib/smart/medium.py 2010-04-27 06:40:37 +0000 |
382 | +++ bzrlib/smart/medium.py 2012-09-11 12:31:20 +0000 |
383 | @@ -712,6 +712,14 @@ |
384 | """ |
385 | return SmartClientStreamMediumRequest(self) |
386 | |
387 | + def reset(self): |
388 | + """We have been disconnected, reset current state. |
389 | + |
390 | + This resets things like _current_request and connected state. |
391 | + """ |
392 | + self.disconnect() |
393 | + self._current_request = None |
394 | + |
395 | |
396 | class SmartSimplePipesClientMedium(SmartClientStreamMedium): |
397 | """A client medium using simple pipes. |
398 | @@ -726,11 +734,21 @@ |
399 | |
400 | def _accept_bytes(self, bytes): |
401 | """See SmartClientStreamMedium.accept_bytes.""" |
402 | - osutils.until_no_eintr(self._writeable_pipe.write, bytes) |
403 | + try: |
404 | + osutils.until_no_eintr(self._writeable_pipe.write, bytes) |
405 | + except IOError, e: |
406 | + if e.errno in (errno.EINVAL, errno.EPIPE): |
407 | + raise errors.ConnectionReset( |
408 | + "Error trying to write to subprocess:\n%s" |
409 | + % (e,)) |
410 | + raise |
411 | self._report_activity(len(bytes), 'write') |
412 | |
413 | def _flush(self): |
414 | """See SmartClientStreamMedium._flush().""" |
415 | + # Note: If flush were to fail, we'd like to raise ConnectionReset, etc. |
416 | + # However, testing shows that even when the child process is |
417 | + # gone, this doesn't error. |
418 | osutils.until_no_eintr(self._writeable_pipe.flush) |
419 | |
420 | def _read_bytes(self, count): |
421 | @@ -741,7 +759,10 @@ |
422 | |
423 | |
424 | class SmartSSHClientMedium(SmartClientStreamMedium): |
425 | - """A client medium using SSH.""" |
426 | + """A client medium using SSH. |
427 | + |
428 | + It delegates IO to a SmartSimplePipesClientMedium. |
429 | + """ |
430 | |
431 | def __init__(self, host, port=None, username=None, password=None, |
432 | base=None, vendor=None, bzr_remote_path=None): |
433 | @@ -750,11 +771,11 @@ |
434 | :param vendor: An optional override for the ssh vendor to use. See |
435 | bzrlib.transport.ssh for details on ssh vendors. |
436 | """ |
437 | - self._connected = False |
438 | self._host = host |
439 | self._password = password |
440 | self._port = port |
441 | self._username = username |
442 | + self._real_medium = None |
443 | # for the benefit of progress making a short description of this |
444 | # transport |
445 | self._scheme = 'bzr+ssh' |
446 | @@ -762,10 +783,8 @@ |
447 | # _DebugCounter so we have to store all the values used in our repr |
448 | # method before calling the super init. |
449 | SmartClientStreamMedium.__init__(self, base) |
450 | - self._read_from = None |
451 | self._ssh_connection = None |
452 | self._vendor = vendor |
453 | - self._write_to = None |
454 | self._bzr_remote_path = bzr_remote_path |
455 | |
456 | def __repr__(self): |
457 | @@ -783,21 +802,20 @@ |
458 | def _accept_bytes(self, bytes): |
459 | """See SmartClientStreamMedium.accept_bytes.""" |
460 | self._ensure_connection() |
461 | - osutils.until_no_eintr(self._write_to.write, bytes) |
462 | - self._report_activity(len(bytes), 'write') |
463 | + self._real_medium.accept_bytes(bytes) |
464 | |
465 | def disconnect(self): |
466 | """See SmartClientMedium.disconnect().""" |
467 | - if not self._connected: |
468 | - return |
469 | - osutils.until_no_eintr(self._read_from.close) |
470 | - osutils.until_no_eintr(self._write_to.close) |
471 | - self._ssh_connection.close() |
472 | - self._connected = False |
473 | + if self._real_medium is not None: |
474 | + self._real_medium.disconnect() |
475 | + self._real_medium = None |
476 | + if self._ssh_connection is not None: |
477 | + self._ssh_connection.close() |
478 | + self._ssh_connection = None |
479 | |
480 | def _ensure_connection(self): |
481 | """Connect this medium if not already connected.""" |
482 | - if self._connected: |
483 | + if self._real_medium is not None: |
484 | return |
485 | if self._vendor is None: |
486 | vendor = ssh._get_ssh_vendor() |
487 | @@ -807,22 +825,19 @@ |
488 | self._password, self._host, self._port, |
489 | command=[self._bzr_remote_path, 'serve', '--inet', |
490 | '--directory=/', '--allow-writes']) |
491 | - self._read_from, self._write_to = \ |
492 | - self._ssh_connection.get_filelike_channels() |
493 | - self._connected = True |
494 | + read_from, write_to = self._ssh_connection.get_filelike_channels() |
495 | + self._real_medium = SmartSimplePipesClientMedium( |
496 | + read_from, write_to, self.base) |
497 | |
498 | def _flush(self): |
499 | """See SmartClientStreamMedium._flush().""" |
500 | - self._write_to.flush() |
501 | + self._real_medium._flush() |
502 | |
503 | def _read_bytes(self, count): |
504 | """See SmartClientStreamMedium.read_bytes.""" |
505 | - if not self._connected: |
506 | + if self._real_medium is None: |
507 | raise errors.MediumNotConnected(self) |
508 | - bytes_to_read = min(count, _MAX_READ_SIZE) |
509 | - bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read) |
510 | - self._report_activity(len(bytes), 'read') |
511 | - return bytes |
512 | + return self._real_medium.read_bytes(count) |
513 | |
514 | |
515 | # Port 4155 is the default port for bzr://, registered with IANA. |
516 | @@ -948,13 +963,17 @@ |
517 | self._medium._flush() |
518 | |
519 | |
520 | +WSAECONNABORTED = 10053 |
521 | +WSAECONNRESET = 10054 |
522 | + |
523 | def _read_bytes_from_socket(sock, desired_count, report_activity): |
524 | # We ignore the desired_count because on sockets it's more efficient to |
525 | # read large chunks (of _MAX_READ_SIZE bytes) at a time. |
526 | try: |
527 | bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE) |
528 | except socket.error, e: |
529 | - if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054): |
530 | + if len(e.args) and e.args[0] in (errno.ECONNRESET, WSAECONNABORTED, |
531 | + WSAECONNRESET): |
532 | # The connection was closed by the other side. Callers expect an |
533 | # empty string to signal end-of-stream. |
534 | bytes = '' |
535 | |
536 | === modified file 'bzrlib/smart/protocol.py' |
537 | --- bzrlib/smart/protocol.py 2010-02-17 17:11:16 +0000 |
538 | +++ bzrlib/smart/protocol.py 2012-09-11 12:31:20 +0000 |
539 | @@ -1075,9 +1075,6 @@ |
540 | self._real_write_func = write_func |
541 | |
542 | def _write_func(self, bytes): |
543 | - # TODO: It is probably more appropriate to use sum(map(len, _buf)) |
544 | - # for total number of bytes to write, rather than buffer based on |
545 | - # the number of write() calls |
546 | # TODO: Another possibility would be to turn this into an async model. |
547 | # Where we let another thread know that we have some bytes if |
548 | # they want it, but we don't actually block for it |
549 | |
550 | === modified file 'bzrlib/tests/test_osutils.py' |
551 | --- bzrlib/tests/test_osutils.py 2010-11-30 20:42:42 +0000 |
552 | +++ bzrlib/tests/test_osutils.py 2012-09-11 12:31:20 +0000 |
553 | @@ -801,6 +801,45 @@ |
554 | self.assertEqual(None, osutils.safe_file_id(None)) |
555 | |
556 | |
557 | +class TestSendAll(tests.TestCase): |
558 | + |
559 | + def test_send_with_disconnected_socket(self): |
560 | + class DisconnectedSocket(object): |
561 | + def __init__(self, err): |
562 | + self.err = err |
563 | + def send(self, content): |
564 | + raise self.err |
565 | + def close(self): |
566 | + pass |
567 | + # All of these should be treated as ConnectionReset |
568 | + errs = [] |
569 | + for err_cls in (IOError, socket.error): |
570 | + for errnum in osutils._end_of_stream_errors: |
571 | + errs.append(err_cls(errnum)) |
572 | + for err in errs: |
573 | + sock = DisconnectedSocket(err) |
574 | + self.assertRaises(errors.ConnectionReset, |
575 | + osutils.send_all, sock, 'some more content') |
576 | + |
577 | + def test_send_with_no_progress(self): |
578 | + # See https://bugs.launchpad.net/bzr/+bug/1047309 |
579 | + # It seems that paramiko can get into a state where it doesn't error, |
580 | + # but it returns 0 bytes sent for requests over and over again. |
581 | + class NoSendingSocket(object): |
582 | + def __init__(self): |
583 | + self.call_count = 0 |
584 | + def send(self, bytes): |
585 | + self.call_count += 1 |
586 | + if self.call_count > 100: |
587 | + # Prevent the test suite from hanging |
588 | + raise RuntimeError('too many calls') |
589 | + return 0 |
590 | + sock = NoSendingSocket() |
591 | + self.assertRaises(errors.ConnectionReset, |
592 | + osutils.send_all, sock, 'content') |
593 | + self.assertEqual(1, sock.call_count) |
594 | + |
595 | + |
596 | class TestWin32Funcs(tests.TestCase): |
597 | """Test that _win32 versions of os utilities return appropriate paths.""" |
598 | |
599 | |
600 | === modified file 'bzrlib/tests/test_smart_transport.py' |
601 | --- bzrlib/tests/test_smart_transport.py 2010-02-17 17:11:16 +0000 |
602 | +++ bzrlib/tests/test_smart_transport.py 2012-09-11 12:31:20 +0000 |
603 | @@ -18,13 +18,17 @@ |
604 | |
605 | # all of this deals with byte strings so this is safe |
606 | from cStringIO import StringIO |
607 | +import errno |
608 | import os |
609 | import socket |
610 | +import subprocess |
611 | +import sys |
612 | import threading |
613 | |
614 | import bzrlib |
615 | from bzrlib import ( |
616 | bzrdir, |
617 | + debug, |
618 | errors, |
619 | osutils, |
620 | tests, |
621 | @@ -49,6 +53,29 @@ |
622 | from bzrlib.transport.http import SmartClientHTTPMediumRequest |
623 | |
624 | |
625 | +def create_file_pipes(): |
626 | + r, w = os.pipe() |
627 | + # These must be opened without buffering, or we get undefined results |
628 | + rf = os.fdopen(r, 'rb', 0) |
629 | + wf = os.fdopen(w, 'wb', 0) |
630 | + return rf, wf |
631 | + |
632 | + |
633 | +def portable_socket_pair(): |
634 | + """Return a pair of TCP sockets connected to each other. |
635 | + |
636 | + Unlike socket.socketpair, this should work on Windows. |
637 | + """ |
638 | + listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
639 | + listen_sock.bind(('127.0.0.1', 0)) |
640 | + listen_sock.listen(1) |
641 | + client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
642 | + client_sock.connect(listen_sock.getsockname()) |
643 | + server_sock, addr = listen_sock.accept() |
644 | + listen_sock.close() |
645 | + return server_sock, client_sock |
646 | + |
647 | + |
648 | class StringIOSSHVendor(object): |
649 | """A SSH vendor that uses StringIO to buffer writes and answer reads.""" |
650 | |
651 | @@ -63,6 +90,27 @@ |
652 | return StringIOSSHConnection(self) |
653 | |
654 | |
655 | +class FirstRejectedStringIOSSHVendor(StringIOSSHVendor): |
656 | + """The first connection will be considered closed. |
657 | + |
658 | + The second connection will succeed normally. |
659 | + """ |
660 | + |
661 | + def __init__(self, read_from, write_to, fail_at_write=True): |
662 | + super(FirstRejectedStringIOSSHVendor, self).__init__(read_from, |
663 | + write_to) |
664 | + self.fail_at_write = fail_at_write |
665 | + self._first = True |
666 | + |
667 | + def connect_ssh(self, username, password, host, port, command): |
668 | + self.calls.append(('connect_ssh', username, password, host, port, |
669 | + command)) |
670 | + if self._first: |
671 | + self._first = False |
672 | + return ClosedSSHConnection(self) |
673 | + return StringIOSSHConnection(self) |
674 | + |
675 | + |
676 | class StringIOSSHConnection(object): |
677 | """A SSH connection that uses StringIO to buffer writes and answer reads.""" |
678 | |
679 | @@ -71,11 +119,36 @@ |
680 | |
681 | def close(self): |
682 | self.vendor.calls.append(('close', )) |
683 | + self.vendor.read_from.close() |
684 | + self.vendor.write_to.close() |
685 | |
686 | def get_filelike_channels(self): |
687 | return self.vendor.read_from, self.vendor.write_to |
688 | |
689 | |
690 | +class ClosedSSHConnection(object): |
691 | + """An SSH connection that just has closed channels.""" |
692 | + |
693 | + def __init__(self, vendor): |
694 | + self.vendor = vendor |
695 | + |
696 | + def close(self): |
697 | + self.vendor.calls.append(('close', )) |
698 | + |
699 | + def get_filelike_channels(self): |
700 | + # We create matching pipes, and then close the ssh side |
701 | + bzr_read, ssh_write = create_file_pipes() |
702 | + # We always fail when bzr goes to read |
703 | + ssh_write.close() |
704 | + if self.vendor.fail_at_write: |
705 | + # If set, we'll also fail when bzr goes to write |
706 | + ssh_read, bzr_write = create_file_pipes() |
707 | + ssh_read.close() |
708 | + else: |
709 | + bzr_write = self.vendor.write_to |
710 | + return bzr_read, bzr_write |
711 | + |
712 | + |
713 | class _InvalidHostnameFeature(tests.Feature): |
714 | """Does 'non_existent.invalid' fail to resolve? |
715 | |
716 | @@ -171,6 +244,91 @@ |
717 | client_medium._accept_bytes('abc') |
718 | self.assertEqual('abc', output.getvalue()) |
719 | |
720 | + def test_simple_pipes__accept_bytes_subprocess_closed(self): |
721 | + # It is unfortunate that we have to use Popen for this. However, |
722 | + # os.pipe() does not behave the same as subprocess.Popen(). |
723 | + # On Windows, if you use os.pipe() and close the write side, |
724 | + # read.read() hangs. On Linux, read.read() returns the empty string. |
725 | + p = subprocess.Popen([sys.executable, '-c', |
726 | + 'import sys\n' |
727 | + 'sys.stdout.write(sys.stdin.read(4))\n' |
728 | + 'sys.stdout.close()\n'], |
729 | + stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
730 | + client_medium = medium.SmartSimplePipesClientMedium( |
731 | + p.stdout, p.stdin, 'base') |
732 | + client_medium._accept_bytes('abc\n') |
733 | + self.assertEqual('abc', client_medium._read_bytes(3)) |
734 | + p.wait() |
735 | + # While writing to the underlying pipe, |
736 | + # Windows py2.6.6 we get IOError(EINVAL) |
737 | + # Lucid py2.6.5, we get IOError(EPIPE) |
738 | + # In both cases, it should be wrapped to ConnectionReset |
739 | + self.assertRaises(errors.ConnectionReset, |
740 | + client_medium._accept_bytes, 'more') |
741 | + |
742 | + def test_simple_pipes__accept_bytes_pipe_closed(self): |
743 | + child_read, client_write = create_file_pipes() |
744 | + client_medium = medium.SmartSimplePipesClientMedium( |
745 | + None, client_write, 'base') |
746 | + client_medium._accept_bytes('abc\n') |
747 | + self.assertEqual('abc\n', child_read.read(4)) |
748 | + # While writing to the underlying pipe, |
749 | + # Windows py2.6.6 we get IOError(EINVAL) |
750 | + # Lucid py2.6.5, we get IOError(EPIPE) |
751 | + # In both cases, it should be wrapped to ConnectionReset |
752 | + child_read.close() |
753 | + self.assertRaises(errors.ConnectionReset, |
754 | + client_medium._accept_bytes, 'more') |
755 | + |
756 | + def test_simple_pipes__flush_pipe_closed(self): |
757 | + child_read, client_write = create_file_pipes() |
758 | + client_medium = medium.SmartSimplePipesClientMedium( |
759 | + None, client_write, 'base') |
760 | + client_medium._accept_bytes('abc\n') |
761 | + child_read.close() |
762 | + # Even though the pipe is closed, flush on the write side seems to be a |
763 | + # no-op, rather than a failure. |
764 | + client_medium._flush() |
765 | + |
766 | + def test_simple_pipes__flush_subprocess_closed(self): |
767 | + p = subprocess.Popen([sys.executable, '-c', |
768 | + 'import sys\n' |
769 | + 'sys.stdout.write(sys.stdin.read(4))\n' |
770 | + 'sys.stdout.close()\n'], |
771 | + stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
772 | + client_medium = medium.SmartSimplePipesClientMedium( |
773 | + p.stdout, p.stdin, 'base') |
774 | + client_medium._accept_bytes('abc\n') |
775 | + p.wait() |
776 | + # Even though the child process is dead, flush seems to be a no-op. |
777 | + client_medium._flush() |
778 | + |
779 | + def test_simple_pipes__read_bytes_pipe_closed(self): |
780 | + child_read, client_write = create_file_pipes() |
781 | + client_medium = medium.SmartSimplePipesClientMedium( |
782 | + child_read, client_write, 'base') |
783 | + client_medium._accept_bytes('abc\n') |
784 | + client_write.close() |
785 | + self.assertEqual('abc\n', client_medium._read_bytes(4)) |
786 | + self.assertEqual('', client_medium._read_bytes(4)) |
787 | + |
788 | + def test_simple_pipes__read_bytes_subprocess_closed(self): |
789 | + p = subprocess.Popen([sys.executable, '-c', |
790 | + 'import sys\n' |
791 | + 'if sys.platform == "win32":\n' |
792 | + ' import msvcrt, os\n' |
793 | + ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n' |
794 | + ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n' |
795 | + 'sys.stdout.write(sys.stdin.read(4))\n' |
796 | + 'sys.stdout.close()\n'], |
797 | + stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
798 | + client_medium = medium.SmartSimplePipesClientMedium( |
799 | + p.stdout, p.stdin, 'base') |
800 | + client_medium._accept_bytes('abc\n') |
801 | + p.wait() |
802 | + self.assertEqual('abc\n', client_medium._read_bytes(4)) |
803 | + self.assertEqual('', client_medium._read_bytes(4)) |
804 | + |
805 | def test_simple_pipes_client_disconnect_does_nothing(self): |
806 | # calling disconnect does nothing. |
807 | input = StringIO() |
808 | @@ -556,6 +714,28 @@ |
809 | request.finished_reading() |
810 | self.assertRaises(errors.ReadingCompleted, request.read_bytes, None) |
811 | |
812 | + def test_reset(self): |
813 | + server_sock, client_sock = portable_socket_pair() |
814 | + # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of |
815 | + # bzr where it exists. |
816 | + client_medium = medium.SmartTCPClientMedium(None, None, None) |
817 | + client_medium._socket = client_sock |
818 | + client_medium._connected = True |
819 | + req = client_medium.get_request() |
820 | + self.assertRaises(errors.TooManyConcurrentRequests, |
821 | + client_medium.get_request) |
822 | + client_medium.reset() |
823 | + # The stream should be reset, marked as disconnected, though ready for |
824 | + # us to make a new request |
825 | + self.assertFalse(client_medium._connected) |
826 | + self.assertIs(None, client_medium._socket) |
827 | + try: |
828 | + self.assertEqual('', client_sock.recv(1)) |
829 | + except socket.error, e: |
830 | + if e.errno not in (errno.EBADF,): |
831 | + raise |
832 | + req = client_medium.get_request() |
833 | + |
834 | |
835 | class RemoteTransportTests(TestCaseWithSmartMedium): |
836 | |
837 | @@ -609,20 +789,6 @@ |
838 | super(TestSmartServerStreamMedium, self).setUp() |
839 | self._captureVar('BZR_NO_SMART_VFS', None) |
840 | |
841 | - def portable_socket_pair(self): |
842 | - """Return a pair of TCP sockets connected to each other. |
843 | - |
844 | - Unlike socket.socketpair, this should work on Windows. |
845 | - """ |
846 | - listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
847 | - listen_sock.bind(('127.0.0.1', 0)) |
848 | - listen_sock.listen(1) |
849 | - client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
850 | - client_sock.connect(listen_sock.getsockname()) |
851 | - server_sock, addr = listen_sock.accept() |
852 | - listen_sock.close() |
853 | - return server_sock, client_sock |
854 | - |
855 | def test_smart_query_version(self): |
856 | """Feed a canned query version to a server""" |
857 | # wire-to-wire, using the whole stack |
858 | @@ -687,7 +853,7 @@ |
859 | |
860 | def test_socket_stream_with_bulk_data(self): |
861 | sample_request_bytes = 'command\n9\nbulk datadone\n' |
862 | - server_sock, client_sock = self.portable_socket_pair() |
863 | + server_sock, client_sock = portable_socket_pair() |
864 | server = medium.SmartServerSocketStreamMedium( |
865 | server_sock, None) |
866 | sample_protocol = SampleRequest(expected_bytes=sample_request_bytes) |
867 | @@ -706,7 +872,7 @@ |
868 | self.assertTrue(server.finished) |
869 | |
870 | def test_socket_stream_shutdown_detection(self): |
871 | - server_sock, client_sock = self.portable_socket_pair() |
872 | + server_sock, client_sock = portable_socket_pair() |
873 | client_sock.close() |
874 | server = medium.SmartServerSocketStreamMedium( |
875 | server_sock, None) |
876 | @@ -726,7 +892,7 @@ |
877 | rest_of_request_bytes = 'lo\n' |
878 | expected_response = ( |
879 | protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n') |
880 | - server_sock, client_sock = self.portable_socket_pair() |
881 | + server_sock, client_sock = portable_socket_pair() |
882 | server = medium.SmartServerSocketStreamMedium( |
883 | server_sock, None) |
884 | client_sock.sendall(incomplete_request_bytes) |
885 | @@ -802,7 +968,7 @@ |
886 | # _serve_one_request should still process both of them as if they had |
887 | # been received separately. |
888 | sample_request_bytes = 'command\n' |
889 | - server_sock, client_sock = self.portable_socket_pair() |
890 | + server_sock, client_sock = portable_socket_pair() |
891 | server = medium.SmartServerSocketStreamMedium( |
892 | server_sock, None) |
893 | first_protocol = SampleRequest(expected_bytes=sample_request_bytes) |
894 | @@ -839,7 +1005,7 @@ |
895 | self.assertTrue(server.finished) |
896 | |
897 | def test_socket_stream_error_handling(self): |
898 | - server_sock, client_sock = self.portable_socket_pair() |
899 | + server_sock, client_sock = portable_socket_pair() |
900 | server = medium.SmartServerSocketStreamMedium( |
901 | server_sock, None) |
902 | fake_protocol = ErrorRaisingProtocol(Exception('boom')) |
903 | @@ -860,7 +1026,7 @@ |
904 | self.assertEqual('', from_server.getvalue()) |
905 | |
906 | def test_socket_stream_keyboard_interrupt_handling(self): |
907 | - server_sock, client_sock = self.portable_socket_pair() |
908 | + server_sock, client_sock = portable_socket_pair() |
909 | server = medium.SmartServerSocketStreamMedium( |
910 | server_sock, None) |
911 | fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom')) |
912 | @@ -877,7 +1043,7 @@ |
913 | return server._build_protocol() |
914 | |
915 | def build_protocol_socket(self, bytes): |
916 | - server_sock, client_sock = self.portable_socket_pair() |
917 | + server_sock, client_sock = portable_socket_pair() |
918 | server = medium.SmartServerSocketStreamMedium( |
919 | server_sock, None) |
920 | client_sock.sendall(bytes) |
921 | @@ -3214,6 +3380,114 @@ |
922 | # encoder. |
923 | |
924 | |
925 | +class Test_SmartClientRequest(tests.TestCase): |
926 | + |
927 | + def make_client_with_failing_medium(self, fail_at_write=True): |
928 | + response = StringIO() |
929 | + output = StringIO() |
930 | + vendor = FirstRejectedStringIOSSHVendor(response, output, |
931 | + fail_at_write=fail_at_write) |
932 | + client_medium = medium.SmartSSHClientMedium( |
933 | + 'a host', 'a port', 'a user', 'a pass', 'base', vendor, |
934 | + 'bzr') |
935 | + smart_client = client._SmartClient(client_medium, headers={}) |
936 | + return output, vendor, smart_client |
937 | + |
938 | + def test__send_no_retry_pipes(self): |
939 | + client_read, server_write = create_file_pipes() |
940 | + server_read, client_write = create_file_pipes() |
941 | + client_medium = medium.SmartSimplePipesClientMedium(client_read, |
942 | + client_write, base='/') |
943 | + smart_client = client._SmartClient(client_medium) |
944 | + smart_request = client._SmartClientRequest(smart_client, |
945 | + 'hello', ()) |
946 | + # Close the server side |
947 | + server_read.close() |
948 | + encoder, response_handler = smart_request._construct_protocol(3) |
949 | + self.assertRaises(errors.ConnectionReset, |
950 | + smart_request._send_no_retry, encoder) |
951 | + |
952 | + def test__send_read_response_sockets(self): |
953 | + listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
954 | + listen_sock.bind(('127.0.0.1', 0)) |
955 | + listen_sock.listen(1) |
956 | + host, port = listen_sock.getsockname() |
957 | + client_medium = medium.SmartTCPClientMedium(host, port, '/') |
958 | + client_medium._ensure_connection() |
959 | + smart_client = client._SmartClient(client_medium) |
960 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
961 | + # Accept the connection, but don't actually talk to the client. |
962 | + server_sock, _ = listen_sock.accept() |
963 | + server_sock.close() |
964 | + # Sockets buffer and don't really notice that the server has closed the |
965 | + # connection until we try to read again. |
966 | + handler = smart_request._send(3) |
967 | + self.assertRaises(errors.ConnectionReset, |
968 | + handler.read_response_tuple, expect_body=False) |
969 | + |
970 | + def test__send_retries_on_write(self): |
971 | + output, vendor, smart_client = self.make_client_with_failing_medium() |
972 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
973 | + handler = smart_request._send(3) |
974 | + self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol |
975 | + '\x00\x00\x00\x02de' # empty headers |
976 | + 's\x00\x00\x00\tl5:helloee', |
977 | + output.getvalue()) |
978 | + self.assertEqual( |
979 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
980 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
981 | + ('close',), |
982 | + ('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
983 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
984 | + ], |
985 | + vendor.calls) |
986 | + |
987 | + def test__send_doesnt_retry_read_failure(self): |
988 | + output, vendor, smart_client = self.make_client_with_failing_medium( |
989 | + fail_at_write=False) |
990 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
991 | + handler = smart_request._send(3) |
992 | + self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol |
993 | + '\x00\x00\x00\x02de' # empty headers |
994 | + 's\x00\x00\x00\tl5:helloee', |
995 | + output.getvalue()) |
996 | + self.assertEqual( |
997 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
998 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
999 | + ], |
1000 | + vendor.calls) |
1001 | + self.assertRaises(errors.ConnectionReset, handler.read_response_tuple) |
1002 | + |
1003 | + def test__send_doesnt_retry_body_stream(self): |
1004 | + # We don't know how much of body_stream would get iterated as part of |
1005 | + # _send before it failed to actually send the request, so we |
1006 | + # just always fail in this condition. |
1007 | + output, vendor, smart_client = self.make_client_with_failing_medium() |
1008 | + smart_request = client._SmartClientRequest(smart_client, 'hello', (), |
1009 | + body_stream=['a', 'b']) |
1010 | + self.assertRaises(errors.ConnectionReset, smart_request._send, 3) |
1011 | + # We got one connect, but it fails, so we disconnect, but we don't |
1012 | + # retry it |
1013 | + self.assertEqual( |
1014 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1015 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1016 | + ('close',), |
1017 | + ], |
1018 | + vendor.calls) |
1019 | + |
1020 | + def test__send_disabled_retry(self): |
1021 | + debug.debug_flags.add('noretry') |
1022 | + output, vendor, smart_client = self.make_client_with_failing_medium() |
1023 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
1024 | + self.assertRaises(errors.ConnectionReset, smart_request._send, 3) |
1025 | + self.assertEqual( |
1026 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1027 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1028 | + ('close',), |
1029 | + ], |
1030 | + vendor.calls) |
1031 | + |
1032 | + |
1033 | class LengthPrefixedBodyDecoder(tests.TestCase): |
1034 | |
1035 | # XXX: TODO: make accept_reading_trailer invoke translate_response or |
I should note, I think it would be prudent to land this into bzr.dev first, but I wanted to make sure it was written against 2.1 since that is the oldest client that we want to backport support for.
Also, I expect there will be a modest amount of conflicts bringing this up from bzr-2.1 into bzr.dev. There has been a modest amount of changes inbetween, so I'll try to produce branches for each series.