Merge lp:~jameinel/bzr/2.1-client-reconnect-819604 into lp:bzr/2.1

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.1-client-reconnect-819604
Merge into: lp:bzr/2.1
Diff against target: 1035 lines (+589/-143)
7 files modified
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+40/-9)
bzrlib/smart/client.py (+170/-86)
bzrlib/smart/medium.py (+43/-24)
bzrlib/smart/protocol.py (+0/-3)
bzrlib/tests/test_osutils.py (+39/-0)
bzrlib/tests/test_smart_transport.py (+295/-21)
To merge this branch: bzr merge lp:~jameinel/bzr/2.1-client-reconnect-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78597@code.launchpad.net

Commit message

Start of bug #819604, allow bzr clients to reconnect if the connection is gone when we go to write a new request.

Description of the change

This is some initial work for having clients reconnect when their bzr+ssh (or bzr://) connection gets closed underneath them.

To start with, the trickiest bit is that it is really quite hard to detect when things are closed. Mostly because of buffering, etc, at lots of different levels (internal memory buffering, socket/pipe buffering, 'ssh.exe' buffering, latency of close message from server back to client, etc.)

I'll also clarify that this isn't intended to handle all possible sorts of connection failures. What we really are trying to handle is allowing a server to be upgraded 'live'. Such that the server will gently disconnect us only between complete requests. Versus having requests terminated randomly in the middle of content. It may help with some of those cases, and we certainly don't want to cause corruption, etc if one of those happens. But it isn't the goal of *this* change.

This specific patch has a few aspects, and does help some cases in real-world testing.

1) Backport a minimum amount of the SmartSSHClientMedium proxies to _real_medium patch from Andrew. In bzr-2.2 Andrew updated the code so that when spawning an 'ssh' subprocess, we use a socketpair when possible instead of pipes. (That way we can read without blocking, allowing us to use large buffers, etc.)

It also meant that he shared the logic from SmartSimplePipeStreamMedium and SmartTCPClientMedium, which I wanted to do, to avoid having to re-implement the logic multiple times.

This is a pretty small part of the patch.

2) Update some of the lower level code so that we get ConnectionReset rather than various IOError and ValueError when writing to a closed connection.

3) Update the _SmartClient._send_request logic. If we get a ConnectionReset while we are trying to write the request, then we can safely retry the request. On the assumption that as long as the server doesn't see the final 'e' terminal byte, it will reject the request, because it is incomplete.

4) The one caveat is if there is a 'body_stream'. body_stream is an iterable, so we can't just rewind it and resume it.

On the plus side, there is only one caller RemoteStreamSink.insert_stream. In theory we could update that caller. In practice, it *also* takes a 'stream' object, which can't simply be rewound. Though we do a 'no-op' stream just before the real content to determine that the remote server actually supports the RPC we want to use.

That would allow us to close the gap a little bit. So we can detect ConnectionReset all the way to the point that we actually start streaming the content.

The other bit that would be possible, is to update ProtocolThreeRequester.call_with_body_stream to allow it to set a flag to indicate whether it has actually started consuming the stream yet. Then update it to flush just before it iterates the body stream to help force detecting closed connections. (otherwise we are likely to have buffered at least the first part of the body_stream in local memory, since the local buffer is 1MB and the first flush is only called after the first chunk of the body_stream.)

This just closes a window where we won't reconnect during 'bzr push'.

5) The next step is that (especially for sockets), we write out the request successfully, and then notice it is closed when we try to read back the response.

Here I'm a lot more concerned about non-idempotent requests. Because the server might have fully read the request, and sent back a response, but we didn't get to see the response.

However, our RPC design is meant to be stateless (all state is passed in the request, not assumed to be stored on the server side). Which means things *should* be idempotent. For something like 'put_bytes', we may write the same content twice, but we shouldn't corrupt anything.

The only one I can think of is 'append_bytes', and the associated open_write_stream() code. (RemoteTransport.open_write_stream uses AppendBasedFileStream as its implementation.)

And that code shouldn't really be used with up-to-date bzr clients and servers. So I'm much more ok with it just dying if it isn't safe to retry.

To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote :

I should note, I think it would be prudent to land this into bzr.dev first, but I wanted to make sure it was written against 2.1 since that is the oldest client that we want to backport support for.

Also, I expect there will be a modest amount of conflicts bringing this up from bzr-2.1 into bzr.dev. There has been a modest amount of changes inbetween, so I'll try to produce branches for each series.

Revision history for this message
John A Meinel (jameinel) wrote :

Small update, in working on the read-reconnect code, I realized passing all these arguments around was very clumsy. So I refactored the code into a helper class _SmartClientRequest. That way, the 6 arguments being passed around just end up as attributes, and the function call interplay is easier to follow.

Revision history for this message
Vincent Ladeuil (vila) wrote :

> I should note, I think it would be prudent to land this into bzr.dev first,
> but I wanted to make sure it was written against 2.1 since that is the oldest
> client that we want to backport support for.

+4 (+1 for each of 2.1, 2.2, 2.3, 2.4)

No need to ask anymore, good, keep reading my mind ;)

I don't know exactly how we should proceed but as much as possible, all the modifications related to this topic should be done in a separate branch which should be merged into whatever series we target.

Backport is hard, let's avoid it as much as possible (but no more ;)

> Also, I expect there will be a modest amount of conflicts bringing this up
> from bzr-2.1 into bzr.dev. There has been a modest amount of changes
> inbetween, so I'll try to produce branches for each series.

Yup, that sounds a lot like a loom with a thread by targeted series. Or any arrangement of branches that makes our life easier.

Revision history for this message
Vincent Ladeuil (vila) wrote :

> Small update, in working on the read-reconnect code, I realized passing all
> these arguments around was very clumsy. So I refactored the code into a helper
> class _SmartClientRequest. That way, the 6 arguments being passed around just
> end up as attributes, and the function call interplay is easier to follow.

From the description alone this part got my approval. If some helpers need to move there too, clarifying the class features, even better ( no pre-conceptions here, just saying).

Revision history for this message
John A Meinel (jameinel) wrote :

Some small code cleanups, and add -Dnoretry as a debugging flag that will disable the retry-on-write.

Revision history for this message
Martin Pool (mbp) wrote :

I'm going to (reluctantly) bump these out of the merge queue until this is landed and well tested in 2.5 - probably until after the 2.5.0 final release is out. Then we can come back and look at landing to earlier series.

Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 11/29/2011 7:32 AM, Martin Pool wrote:
> I'm going to (reluctantly) bump these out of the merge queue until
> this is landed and well tested in 2.5 - probably until after the
> 2.5.0 final release is out. Then we can come back and look at
> landing to earlier series.

I don't think it is something you should feel bad about :). That was
certainly the intent.

1) I wrote it against 2.1 so we would have a chance to land it there
   cleanly.

2) I tried to split it up into multiple patches to make it easier to
   review.

3) I merged it up through the stack because there are a fair number of
   changes to this code, and thus conflicts, etc.

4) It should certainly land on trunk and get a thorough shakeout there.
   I'm not sure how much time you think is enough. Certainly if enough
   time passes, 2.1 will be fully obsolete :).

Anyway, thanks for getting this landed in 2.5.

John
=:->

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk7UiEcACgkQJdeBCYSNAANSFwCgiwx+EeIpiYLJF84P0NPcLFd5
T90AoJDimVbLmeDiqn1FWFDmdWtMXaWq
=94bN
-----END PGP SIGNATURE-----

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
+++ bzrlib/help_topics/en/debug-flags.txt 2012-09-11 12:31:20 +0000
@@ -24,6 +24,8 @@
24-Dindex Trace major index operations.24-Dindex Trace major index operations.
25-Dknit Trace knit operations.25-Dknit Trace knit operations.
26-Dlock Trace when lockdir locks are taken or released.26-Dlock Trace when lockdir locks are taken or released.
27-Dnoretry If a connection is reset, fail immediately rather than
28 retrying the request.
27-Dprogress Trace progress bar operations.29-Dprogress Trace progress bar operations.
28-Dmerge Emit information for debugging merges.30-Dmerge Emit information for debugging merges.
29-Dno_apport Don't use apport to report crashes.31-Dno_apport Don't use apport to report crashes.
3032
=== modified file 'bzrlib/osutils.py'
--- bzrlib/osutils.py 2010-05-27 04:00:01 +0000
+++ bzrlib/osutils.py 2012-09-11 12:31:20 +0000
@@ -40,6 +40,7 @@
40 rmtree,40 rmtree,
41 )41 )
42import signal42import signal
43import socket
43import subprocess44import subprocess
44import tempfile45import tempfile
45from tempfile import (46from tempfile import (
@@ -1929,6 +1930,20 @@
1929 return socket.gethostname().decode(get_user_encoding())1930 return socket.gethostname().decode(get_user_encoding())
19301931
19311932
1933# We must not read/write any more than 64k at a time from/to a socket so we
1934# don't risk "no buffer space available" errors on some platforms. Windows in
1935# particular is likely to throw WSAECONNABORTED or WSAENOBUFS if given too much
1936# data at once.
1937MAX_SOCKET_CHUNK = 64 * 1024
1938
1939_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL]
1940for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:
1941 _eno = getattr(errno, _eno, None)
1942 if _eno is not None:
1943 _end_of_stream_errors.append(_eno)
1944del _eno
1945
1946
1932def recv_all(socket, bytes):1947def recv_all(socket, bytes):
1933 """Receive an exact number of bytes.1948 """Receive an exact number of bytes.
19341949
@@ -1948,21 +1963,37 @@
1948 return b1963 return b
19491964
19501965
1951def send_all(socket, bytes, report_activity=None):1966def send_all(sock, bytes, report_activity=None):
1952 """Send all bytes on a socket.1967 """Send all bytes on a socket.
19531968
1954 Regular socket.sendall() can give socket error 10053 on Windows. This1969 Breaks large blocks in smaller chunks to avoid buffering limitations on
1955 implementation sends no more than 64k at a time, which avoids this problem.1970 some platforms, and catches EINTR which may be thrown if the send is
1971 interrupted by a signal.
1972
1973 This is preferred to socket.sendall(), because it avoids portability bugs
1974 and provides activity reporting.
19561975
1957 :param report_activity: Call this as bytes are read, see1976 :param report_activity: Call this as bytes are read, see
1958 Transport._report_activity1977 Transport._report_activity
1959 """1978 """
1960 chunk_size = 2**161979 sent_total = 0
1961 for pos in xrange(0, len(bytes), chunk_size):1980 byte_count = len(bytes)
1962 block = bytes[pos:pos+chunk_size]1981 while sent_total < byte_count:
1963 if report_activity is not None:1982 try:
1964 report_activity(len(block), 'write')1983 sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))
1965 until_no_eintr(socket.sendall, block)1984 except (socket.error, IOError), e:
1985 if e.args[0] in _end_of_stream_errors:
1986 raise errors.ConnectionReset(
1987 "Error trying to write to socket", e)
1988 if e.args[0] != errno.EINTR:
1989 raise
1990 else:
1991 if sent == 0:
1992 raise errors.ConnectionReset('Sending to %s returned 0 bytes'
1993 % (sock,))
1994 sent_total += sent
1995 if report_activity is not None:
1996 report_activity(sent, 'write')
19661997
19671998
1968def dereference_path(path):1999def dereference_path(path):
19692000
=== modified file 'bzrlib/smart/client.py'
--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/client.py 2012-09-11 12:31:20 +0000
@@ -16,10 +16,11 @@
1616
17import bzrlib17import bzrlib
18from bzrlib.smart import message, protocol18from bzrlib.smart import message, protocol
19from bzrlib.trace import warning
20from bzrlib import (19from bzrlib import (
20 debug,
21 errors,21 errors,
22 hooks,22 hooks,
23 trace,
23 )24 )
2425
2526
@@ -39,93 +40,12 @@
39 def __repr__(self):40 def __repr__(self):
40 return '%s(%r)' % (self.__class__.__name__, self._medium)41 return '%s(%r)' % (self.__class__.__name__, self._medium)
4142
42 def _send_request(self, protocol_version, method, args, body=None,
43 readv_body=None, body_stream=None):
44 encoder, response_handler = self._construct_protocol(
45 protocol_version)
46 encoder.set_headers(self._headers)
47 if body is not None:
48 if readv_body is not None:
49 raise AssertionError(
50 "body and readv_body are mutually exclusive.")
51 if body_stream is not None:
52 raise AssertionError(
53 "body and body_stream are mutually exclusive.")
54 encoder.call_with_body_bytes((method, ) + args, body)
55 elif readv_body is not None:
56 if body_stream is not None:
57 raise AssertionError(
58 "readv_body and body_stream are mutually exclusive.")
59 encoder.call_with_body_readv_array((method, ) + args, readv_body)
60 elif body_stream is not None:
61 encoder.call_with_body_stream((method, ) + args, body_stream)
62 else:
63 encoder.call(method, *args)
64 return response_handler
65
66 def _run_call_hooks(self, method, args, body, readv_body):
67 if not _SmartClient.hooks['call']:
68 return
69 params = CallHookParams(method, args, body, readv_body, self._medium)
70 for hook in _SmartClient.hooks['call']:
71 hook(params)
72
73 def _call_and_read_response(self, method, args, body=None, readv_body=None,43 def _call_and_read_response(self, method, args, body=None, readv_body=None,
74 body_stream=None, expect_response_body=True):44 body_stream=None, expect_response_body=True):
75 self._run_call_hooks(method, args, body, readv_body)45 request = _SmartClientRequest(self, method, args, body=body,
76 if self._medium._protocol_version is not None:46 readv_body=readv_body, body_stream=body_stream,
77 response_handler = self._send_request(47 expect_response_body=expect_response_body)
78 self._medium._protocol_version, method, args, body=body,48 return request.call_and_read_response()
79 readv_body=readv_body, body_stream=body_stream)
80 return (response_handler.read_response_tuple(
81 expect_body=expect_response_body),
82 response_handler)
83 else:
84 for protocol_version in [3, 2]:
85 if protocol_version == 2:
86 # If v3 doesn't work, the remote side is older than 1.6.
87 self._medium._remember_remote_is_before((1, 6))
88 response_handler = self._send_request(
89 protocol_version, method, args, body=body,
90 readv_body=readv_body, body_stream=body_stream)
91 try:
92 response_tuple = response_handler.read_response_tuple(
93 expect_body=expect_response_body)
94 except errors.UnexpectedProtocolVersionMarker, err:
95 # TODO: We could recover from this without disconnecting if
96 # we recognise the protocol version.
97 warning(
98 'Server does not understand Bazaar network protocol %d,'
99 ' reconnecting. (Upgrade the server to avoid this.)'
100 % (protocol_version,))
101 self._medium.disconnect()
102 continue
103 except errors.ErrorFromSmartServer:
104 # If we received an error reply from the server, then it
105 # must be ok with this protocol version.
106 self._medium._protocol_version = protocol_version
107 raise
108 else:
109 self._medium._protocol_version = protocol_version
110 return response_tuple, response_handler
111 raise errors.SmartProtocolError(
112 'Server is not a Bazaar server: ' + str(err))
113
114 def _construct_protocol(self, version):
115 request = self._medium.get_request()
116 if version == 3:
117 request_encoder = protocol.ProtocolThreeRequester(request)
118 response_handler = message.ConventionalResponseHandler()
119 response_proto = protocol.ProtocolThreeDecoder(
120 response_handler, expect_version_marker=True)
121 response_handler.setProtoAndMediumRequest(response_proto, request)
122 elif version == 2:
123 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
124 response_handler = request_encoder
125 else:
126 request_encoder = protocol.SmartClientRequestProtocolOne(request)
127 response_handler = request_encoder
128 return request_encoder, response_handler
12949
130 def call(self, method, *args):50 def call(self, method, *args):
131 """Call a method on the remote server."""51 """Call a method on the remote server."""
@@ -191,6 +111,170 @@
191 return self._medium.remote_path_from_transport(transport)111 return self._medium.remote_path_from_transport(transport)
192112
193113
114class _SmartClientRequest(object):
115 """Encapsulate the logic for a single request.
116
117 This class handles things like reconnecting and sending the request a
118 second time when the connection is reset in the middle. It also handles the
119 multiple requests that get made if we don't know what protocol the server
120 supports yet.
121
122 Generally, you build up one of these objects, passing in the arguments that
123 you want to send to the server, and then use 'call_and_read_response' to
124 get the response from the server.
125 """
126
127 def __init__(self, client, method, args, body=None, readv_body=None,
128 body_stream=None, expect_response_body=True):
129 self.client = client
130 self.method = method
131 self.args = args
132 self.body = body
133 self.readv_body = readv_body
134 self.body_stream = body_stream
135 self.expect_response_body = expect_response_body
136
137 def call_and_read_response(self):
138 """Send the request to the server, and read the initial response.
139
140 This doesn't read all of the body content of the response, instead it
141 returns (response_tuple, response_handler). response_tuple is the 'ok',
142 or 'error' information, and 'response_handler' can be used to get the
143 content stream out.
144 """
145 self._run_call_hooks()
146 protocol_version = self.client._medium._protocol_version
147 if protocol_version is None:
148 return self._call_determining_protocol_version()
149 else:
150 return self._call(protocol_version)
151
152 def _run_call_hooks(self):
153 if not _SmartClient.hooks['call']:
154 return
155 params = CallHookParams(self.method, self.args, self.body,
156 self.readv_body, self.client._medium)
157 for hook in _SmartClient.hooks['call']:
158 hook(params)
159
160 def _call(self, protocol_version):
161 """We know the protocol version.
162
163 So this just sends the request, and then reads the response. This is
164 where the code will be to retry requests if the connection is closed.
165 """
166 response_handler = self._send(protocol_version)
167 response_tuple = response_handler.read_response_tuple(
168 expect_body=self.expect_response_body)
169 return (response_tuple, response_handler)
170
171 def _call_determining_protocol_version(self):
172 """Determine what protocol the remote server supports.
173
174 We do this by placing a request in the most recent protocol, and
175 handling the UnexpectedProtocolVersionMarker from the server.
176 """
177 for protocol_version in [3, 2]:
178 if protocol_version == 2:
179 # If v3 doesn't work, the remote side is older than 1.6.
180 self.client._medium._remember_remote_is_before((1, 6))
181 try:
182 response_tuple, response_handler = self._call(protocol_version)
183 except errors.UnexpectedProtocolVersionMarker, err:
184 # TODO: We could recover from this without disconnecting if
185 # we recognise the protocol version.
186 trace.warning(
187 'Server does not understand Bazaar network protocol %d,'
188 ' reconnecting. (Upgrade the server to avoid this.)'
189 % (protocol_version,))
190 self.client._medium.disconnect()
191 continue
192 except errors.ErrorFromSmartServer:
193 # If we received an error reply from the server, then it
194 # must be ok with this protocol version.
195 self.client._medium._protocol_version = protocol_version
196 raise
197 else:
198 self.client._medium._protocol_version = protocol_version
199 return response_tuple, response_handler
200 raise errors.SmartProtocolError(
201 'Server is not a Bazaar server: ' + str(err))
202
203 def _construct_protocol(self, version):
204 """Build the encoding stack for a given protocol version."""
205 request = self.client._medium.get_request()
206 if version == 3:
207 request_encoder = protocol.ProtocolThreeRequester(request)
208 response_handler = message.ConventionalResponseHandler()
209 response_proto = protocol.ProtocolThreeDecoder(
210 response_handler, expect_version_marker=True)
211 response_handler.setProtoAndMediumRequest(response_proto, request)
212 elif version == 2:
213 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
214 response_handler = request_encoder
215 else:
216 request_encoder = protocol.SmartClientRequestProtocolOne(request)
217 response_handler = request_encoder
218 return request_encoder, response_handler
219
220 def _send(self, protocol_version):
221 """Encode the request, and send it to the server.
222
223 This will retry a request if we get a ConnectionReset while sending the
224 request to the server. (Unless we have a body_stream that we have
225 already started consuming, since we can't restart body_streams)
226
227 :return: response_handler as defined by _construct_protocol
228 """
229 encoder, response_handler = self._construct_protocol(protocol_version)
230 try:
231 self._send_no_retry(encoder)
232 except errors.ConnectionReset, e:
233 # If we fail during the _send_no_retry phase, then we can
234 # be confident that the server did not get our request, because we
235 # haven't started waiting for the reply yet. So try the request
236 # again. We only issue a single retry, because if the connection
237 # really is down, there is no reason to loop endlessly.
238
239 # Connection is dead, so close our end of it.
240 self.client._medium.reset()
241 if (('noretry' in debug.debug_flags)
242 or self.body_stream is not None):
243 # We can't restart a body_stream that has been partially
244 # consumed, so we don't retry.
245 raise
246 trace.warning('ConnectionReset calling %r, retrying'
247 % (self.method,))
248 trace.log_exception_quietly()
249 encoder, response_handler = self._construct_protocol(
250 protocol_version)
251 self._send_no_retry(encoder)
252 return response_handler
253
254 def _send_no_retry(self, encoder):
255 """Just encode the request and try to send it."""
256 encoder.set_headers(self.client._headers)
257 if self.body is not None:
258 if self.readv_body is not None:
259 raise AssertionError(
260 "body and readv_body are mutually exclusive.")
261 if self.body_stream is not None:
262 raise AssertionError(
263 "body and body_stream are mutually exclusive.")
264 encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
265 elif self.readv_body is not None:
266 if self.body_stream is not None:
267 raise AssertionError(
268 "readv_body and body_stream are mutually exclusive.")
269 encoder.call_with_body_readv_array((self.method, ) + self.args,
270 self.readv_body)
271 elif self.body_stream is not None:
272 encoder.call_with_body_stream((self.method, ) + self.args,
273 self.body_stream)
274 else:
275 encoder.call(self.method, *self.args)
276
277
194class SmartClientHooks(hooks.Hooks):278class SmartClientHooks(hooks.Hooks):
195279
196 def __init__(self):280 def __init__(self):
197281
=== modified file 'bzrlib/smart/medium.py'
--- bzrlib/smart/medium.py 2010-04-27 06:40:37 +0000
+++ bzrlib/smart/medium.py 2012-09-11 12:31:20 +0000
@@ -712,6 +712,14 @@
712 """712 """
713 return SmartClientStreamMediumRequest(self)713 return SmartClientStreamMediumRequest(self)
714714
715 def reset(self):
716 """We have been disconnected, reset current state.
717
718 This resets things like _current_request and connected state.
719 """
720 self.disconnect()
721 self._current_request = None
722
715723
716class SmartSimplePipesClientMedium(SmartClientStreamMedium):724class SmartSimplePipesClientMedium(SmartClientStreamMedium):
717 """A client medium using simple pipes.725 """A client medium using simple pipes.
@@ -726,11 +734,21 @@
726734
727 def _accept_bytes(self, bytes):735 def _accept_bytes(self, bytes):
728 """See SmartClientStreamMedium.accept_bytes."""736 """See SmartClientStreamMedium.accept_bytes."""
729 osutils.until_no_eintr(self._writeable_pipe.write, bytes)737 try:
738 osutils.until_no_eintr(self._writeable_pipe.write, bytes)
739 except IOError, e:
740 if e.errno in (errno.EINVAL, errno.EPIPE):
741 raise errors.ConnectionReset(
742 "Error trying to write to subprocess:\n%s"
743 % (e,))
744 raise
730 self._report_activity(len(bytes), 'write')745 self._report_activity(len(bytes), 'write')
731746
732 def _flush(self):747 def _flush(self):
733 """See SmartClientStreamMedium._flush()."""748 """See SmartClientStreamMedium._flush()."""
749 # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
750 # However, testing shows that even when the child process is
751 # gone, this doesn't error.
734 osutils.until_no_eintr(self._writeable_pipe.flush)752 osutils.until_no_eintr(self._writeable_pipe.flush)
735753
736 def _read_bytes(self, count):754 def _read_bytes(self, count):
@@ -741,7 +759,10 @@
741759
742760
743class SmartSSHClientMedium(SmartClientStreamMedium):761class SmartSSHClientMedium(SmartClientStreamMedium):
744 """A client medium using SSH."""762 """A client medium using SSH.
763
764 It delegates IO to a SmartSimplePipesClientMedium.
765 """
745766
746 def __init__(self, host, port=None, username=None, password=None,767 def __init__(self, host, port=None, username=None, password=None,
747 base=None, vendor=None, bzr_remote_path=None):768 base=None, vendor=None, bzr_remote_path=None):
@@ -750,11 +771,11 @@
750 :param vendor: An optional override for the ssh vendor to use. See771 :param vendor: An optional override for the ssh vendor to use. See
751 bzrlib.transport.ssh for details on ssh vendors.772 bzrlib.transport.ssh for details on ssh vendors.
752 """773 """
753 self._connected = False
754 self._host = host774 self._host = host
755 self._password = password775 self._password = password
756 self._port = port776 self._port = port
757 self._username = username777 self._username = username
778 self._real_medium = None
758 # for the benefit of progress making a short description of this779 # for the benefit of progress making a short description of this
759 # transport780 # transport
760 self._scheme = 'bzr+ssh'781 self._scheme = 'bzr+ssh'
@@ -762,10 +783,8 @@
762 # _DebugCounter so we have to store all the values used in our repr783 # _DebugCounter so we have to store all the values used in our repr
763 # method before calling the super init.784 # method before calling the super init.
764 SmartClientStreamMedium.__init__(self, base)785 SmartClientStreamMedium.__init__(self, base)
765 self._read_from = None
766 self._ssh_connection = None786 self._ssh_connection = None
767 self._vendor = vendor787 self._vendor = vendor
768 self._write_to = None
769 self._bzr_remote_path = bzr_remote_path788 self._bzr_remote_path = bzr_remote_path
770789
771 def __repr__(self):790 def __repr__(self):
@@ -783,21 +802,20 @@
783 def _accept_bytes(self, bytes):802 def _accept_bytes(self, bytes):
784 """See SmartClientStreamMedium.accept_bytes."""803 """See SmartClientStreamMedium.accept_bytes."""
785 self._ensure_connection()804 self._ensure_connection()
786 osutils.until_no_eintr(self._write_to.write, bytes)805 self._real_medium.accept_bytes(bytes)
787 self._report_activity(len(bytes), 'write')
788806
789 def disconnect(self):807 def disconnect(self):
790 """See SmartClientMedium.disconnect()."""808 """See SmartClientMedium.disconnect()."""
791 if not self._connected:809 if self._real_medium is not None:
792 return810 self._real_medium.disconnect()
793 osutils.until_no_eintr(self._read_from.close)811 self._real_medium = None
794 osutils.until_no_eintr(self._write_to.close)812 if self._ssh_connection is not None:
795 self._ssh_connection.close()813 self._ssh_connection.close()
796 self._connected = False814 self._ssh_connection = None
797815
798 def _ensure_connection(self):816 def _ensure_connection(self):
799 """Connect this medium if not already connected."""817 """Connect this medium if not already connected."""
800 if self._connected:818 if self._real_medium is not None:
801 return819 return
802 if self._vendor is None:820 if self._vendor is None:
803 vendor = ssh._get_ssh_vendor()821 vendor = ssh._get_ssh_vendor()
@@ -807,22 +825,19 @@
807 self._password, self._host, self._port,825 self._password, self._host, self._port,
808 command=[self._bzr_remote_path, 'serve', '--inet',826 command=[self._bzr_remote_path, 'serve', '--inet',
809 '--directory=/', '--allow-writes'])827 '--directory=/', '--allow-writes'])
810 self._read_from, self._write_to = \828 read_from, write_to = self._ssh_connection.get_filelike_channels()
811 self._ssh_connection.get_filelike_channels()829 self._real_medium = SmartSimplePipesClientMedium(
812 self._connected = True830 read_from, write_to, self.base)
813831
814 def _flush(self):832 def _flush(self):
815 """See SmartClientStreamMedium._flush()."""833 """See SmartClientStreamMedium._flush()."""
816 self._write_to.flush()834 self._real_medium._flush()
817835
818 def _read_bytes(self, count):836 def _read_bytes(self, count):
819 """See SmartClientStreamMedium.read_bytes."""837 """See SmartClientStreamMedium.read_bytes."""
820 if not self._connected:838 if self._real_medium is None:
821 raise errors.MediumNotConnected(self)839 raise errors.MediumNotConnected(self)
822 bytes_to_read = min(count, _MAX_READ_SIZE)840 return self._real_medium.read_bytes(count)
823 bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
824 self._report_activity(len(bytes), 'read')
825 return bytes
826841
827842
828# Port 4155 is the default port for bzr://, registered with IANA.843# Port 4155 is the default port for bzr://, registered with IANA.
@@ -948,13 +963,17 @@
948 self._medium._flush()963 self._medium._flush()
949964
950965
966WSAECONNABORTED = 10053
967WSAECONNRESET = 10054
968
951def _read_bytes_from_socket(sock, desired_count, report_activity):969def _read_bytes_from_socket(sock, desired_count, report_activity):
952 # We ignore the desired_count because on sockets it's more efficient to970 # We ignore the desired_count because on sockets it's more efficient to
953 # read large chunks (of _MAX_READ_SIZE bytes) at a time.971 # read large chunks (of _MAX_READ_SIZE bytes) at a time.
954 try:972 try:
955 bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)973 bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
956 except socket.error, e:974 except socket.error, e:
957 if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):975 if len(e.args) and e.args[0] in (errno.ECONNRESET, WSAECONNABORTED,
976 WSAECONNRESET):
958 # The connection was closed by the other side. Callers expect an977 # The connection was closed by the other side. Callers expect an
959 # empty string to signal end-of-stream.978 # empty string to signal end-of-stream.
960 bytes = ''979 bytes = ''
961980
=== modified file 'bzrlib/smart/protocol.py'
--- bzrlib/smart/protocol.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/protocol.py 2012-09-11 12:31:20 +0000
@@ -1075,9 +1075,6 @@
1075 self._real_write_func = write_func1075 self._real_write_func = write_func
10761076
1077 def _write_func(self, bytes):1077 def _write_func(self, bytes):
1078 # TODO: It is probably more appropriate to use sum(map(len, _buf))
1079 # for total number of bytes to write, rather than buffer based on
1080 # the number of write() calls
1081 # TODO: Another possibility would be to turn this into an async model.1078 # TODO: Another possibility would be to turn this into an async model.
1082 # Where we let another thread know that we have some bytes if1079 # Where we let another thread know that we have some bytes if
1083 # they want it, but we don't actually block for it1080 # they want it, but we don't actually block for it
10841081
=== modified file 'bzrlib/tests/test_osutils.py'
--- bzrlib/tests/test_osutils.py 2010-11-30 20:42:42 +0000
+++ bzrlib/tests/test_osutils.py 2012-09-11 12:31:20 +0000
@@ -801,6 +801,45 @@
801 self.assertEqual(None, osutils.safe_file_id(None))801 self.assertEqual(None, osutils.safe_file_id(None))
802802
803803
804class TestSendAll(tests.TestCase):
805
806 def test_send_with_disconnected_socket(self):
807 class DisconnectedSocket(object):
808 def __init__(self, err):
809 self.err = err
810 def send(self, content):
811 raise self.err
812 def close(self):
813 pass
814 # All of these should be treated as ConnectionReset
815 errs = []
816 for err_cls in (IOError, socket.error):
817 for errnum in osutils._end_of_stream_errors:
818 errs.append(err_cls(errnum))
819 for err in errs:
820 sock = DisconnectedSocket(err)
821 self.assertRaises(errors.ConnectionReset,
822 osutils.send_all, sock, 'some more content')
823
824 def test_send_with_no_progress(self):
825 # See https://bugs.launchpad.net/bzr/+bug/1047309
826 # It seems that paramiko can get into a state where it doesn't error,
827 # but it returns 0 bytes sent for requests over and over again.
828 class NoSendingSocket(object):
829 def __init__(self):
830 self.call_count = 0
831 def send(self, bytes):
832 self.call_count += 1
833 if self.call_count > 100:
834 # Prevent the test suite from hanging
835 raise RuntimeError('too many calls')
836 return 0
837 sock = NoSendingSocket()
838 self.assertRaises(errors.ConnectionReset,
839 osutils.send_all, sock, 'content')
840 self.assertEqual(1, sock.call_count)
841
842
804class TestWin32Funcs(tests.TestCase):843class TestWin32Funcs(tests.TestCase):
805 """Test that _win32 versions of os utilities return appropriate paths."""844 """Test that _win32 versions of os utilities return appropriate paths."""
806845
807846
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- bzrlib/tests/test_smart_transport.py 2010-02-17 17:11:16 +0000
+++ bzrlib/tests/test_smart_transport.py 2012-09-11 12:31:20 +0000
@@ -18,13 +18,17 @@
1818
19# all of this deals with byte strings so this is safe19# all of this deals with byte strings so this is safe
20from cStringIO import StringIO20from cStringIO import StringIO
21import errno
21import os22import os
22import socket23import socket
24import subprocess
25import sys
23import threading26import threading
2427
25import bzrlib28import bzrlib
26from bzrlib import (29from bzrlib import (
27 bzrdir,30 bzrdir,
31 debug,
28 errors,32 errors,
29 osutils,33 osutils,
30 tests,34 tests,
@@ -49,6 +53,29 @@
49from bzrlib.transport.http import SmartClientHTTPMediumRequest53from bzrlib.transport.http import SmartClientHTTPMediumRequest
5054
5155
56def create_file_pipes():
57 r, w = os.pipe()
58 # These must be opened without buffering, or we get undefined results
59 rf = os.fdopen(r, 'rb', 0)
60 wf = os.fdopen(w, 'wb', 0)
61 return rf, wf
62
63
64def portable_socket_pair():
65 """Return a pair of TCP sockets connected to each other.
66
67 Unlike socket.socketpair, this should work on Windows.
68 """
69 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
70 listen_sock.bind(('127.0.0.1', 0))
71 listen_sock.listen(1)
72 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73 client_sock.connect(listen_sock.getsockname())
74 server_sock, addr = listen_sock.accept()
75 listen_sock.close()
76 return server_sock, client_sock
77
78
52class StringIOSSHVendor(object):79class StringIOSSHVendor(object):
53 """A SSH vendor that uses StringIO to buffer writes and answer reads."""80 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
5481
@@ -63,6 +90,27 @@
63 return StringIOSSHConnection(self)90 return StringIOSSHConnection(self)
6491
6592
93class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
94 """The first connection will be considered closed.
95
96 The second connection will succeed normally.
97 """
98
99 def __init__(self, read_from, write_to, fail_at_write=True):
100 super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
101 write_to)
102 self.fail_at_write = fail_at_write
103 self._first = True
104
105 def connect_ssh(self, username, password, host, port, command):
106 self.calls.append(('connect_ssh', username, password, host, port,
107 command))
108 if self._first:
109 self._first = False
110 return ClosedSSHConnection(self)
111 return StringIOSSHConnection(self)
112
113
66class StringIOSSHConnection(object):114class StringIOSSHConnection(object):
67 """A SSH connection that uses StringIO to buffer writes and answer reads."""115 """A SSH connection that uses StringIO to buffer writes and answer reads."""
68116
@@ -71,11 +119,36 @@
71119
72 def close(self):120 def close(self):
73 self.vendor.calls.append(('close', ))121 self.vendor.calls.append(('close', ))
122 self.vendor.read_from.close()
123 self.vendor.write_to.close()
74124
75 def get_filelike_channels(self):125 def get_filelike_channels(self):
76 return self.vendor.read_from, self.vendor.write_to126 return self.vendor.read_from, self.vendor.write_to
77127
78128
129class ClosedSSHConnection(object):
130 """An SSH connection that just has closed channels."""
131
132 def __init__(self, vendor):
133 self.vendor = vendor
134
135 def close(self):
136 self.vendor.calls.append(('close', ))
137
138 def get_filelike_channels(self):
139 # We create matching pipes, and then close the ssh side
140 bzr_read, ssh_write = create_file_pipes()
141 # We always fail when bzr goes to read
142 ssh_write.close()
143 if self.vendor.fail_at_write:
144 # If set, we'll also fail when bzr goes to write
145 ssh_read, bzr_write = create_file_pipes()
146 ssh_read.close()
147 else:
148 bzr_write = self.vendor.write_to
149 return bzr_read, bzr_write
150
151
79class _InvalidHostnameFeature(tests.Feature):152class _InvalidHostnameFeature(tests.Feature):
80 """Does 'non_existent.invalid' fail to resolve?153 """Does 'non_existent.invalid' fail to resolve?
81154
@@ -171,6 +244,91 @@
171 client_medium._accept_bytes('abc')244 client_medium._accept_bytes('abc')
172 self.assertEqual('abc', output.getvalue())245 self.assertEqual('abc', output.getvalue())
173246
247 def test_simple_pipes__accept_bytes_subprocess_closed(self):
248 # It is unfortunate that we have to use Popen for this. However,
249 # os.pipe() does not behave the same as subprocess.Popen().
250 # On Windows, if you use os.pipe() and close the write side,
251 # read.read() hangs. On Linux, read.read() returns the empty string.
252 p = subprocess.Popen([sys.executable, '-c',
253 'import sys\n'
254 'sys.stdout.write(sys.stdin.read(4))\n'
255 'sys.stdout.close()\n'],
256 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
257 client_medium = medium.SmartSimplePipesClientMedium(
258 p.stdout, p.stdin, 'base')
259 client_medium._accept_bytes('abc\n')
260 self.assertEqual('abc', client_medium._read_bytes(3))
261 p.wait()
262 # While writing to the underlying pipe,
263 # Windows py2.6.6 we get IOError(EINVAL)
264 # Lucid py2.6.5, we get IOError(EPIPE)
265 # In both cases, it should be wrapped to ConnectionReset
266 self.assertRaises(errors.ConnectionReset,
267 client_medium._accept_bytes, 'more')
268
269 def test_simple_pipes__accept_bytes_pipe_closed(self):
270 child_read, client_write = create_file_pipes()
271 client_medium = medium.SmartSimplePipesClientMedium(
272 None, client_write, 'base')
273 client_medium._accept_bytes('abc\n')
274 self.assertEqual('abc\n', child_read.read(4))
275 # While writing to the underlying pipe,
276 # Windows py2.6.6 we get IOError(EINVAL)
277 # Lucid py2.6.5, we get IOError(EPIPE)
278 # In both cases, it should be wrapped to ConnectionReset
279 child_read.close()
280 self.assertRaises(errors.ConnectionReset,
281 client_medium._accept_bytes, 'more')
282
283 def test_simple_pipes__flush_pipe_closed(self):
284 child_read, client_write = create_file_pipes()
285 client_medium = medium.SmartSimplePipesClientMedium(
286 None, client_write, 'base')
287 client_medium._accept_bytes('abc\n')
288 child_read.close()
289 # Even though the pipe is closed, flush on the write side seems to be a
290 # no-op, rather than a failure.
291 client_medium._flush()
292
293 def test_simple_pipes__flush_subprocess_closed(self):
294 p = subprocess.Popen([sys.executable, '-c',
295 'import sys\n'
296 'sys.stdout.write(sys.stdin.read(4))\n'
297 'sys.stdout.close()\n'],
298 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
299 client_medium = medium.SmartSimplePipesClientMedium(
300 p.stdout, p.stdin, 'base')
301 client_medium._accept_bytes('abc\n')
302 p.wait()
303 # Even though the child process is dead, flush seems to be a no-op.
304 client_medium._flush()
305
306 def test_simple_pipes__read_bytes_pipe_closed(self):
307 child_read, client_write = create_file_pipes()
308 client_medium = medium.SmartSimplePipesClientMedium(
309 child_read, client_write, 'base')
310 client_medium._accept_bytes('abc\n')
311 client_write.close()
312 self.assertEqual('abc\n', client_medium._read_bytes(4))
313 self.assertEqual('', client_medium._read_bytes(4))
314
315 def test_simple_pipes__read_bytes_subprocess_closed(self):
316 p = subprocess.Popen([sys.executable, '-c',
317 'import sys\n'
318 'if sys.platform == "win32":\n'
319 ' import msvcrt, os\n'
320 ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
321 ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
322 'sys.stdout.write(sys.stdin.read(4))\n'
323 'sys.stdout.close()\n'],
324 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
325 client_medium = medium.SmartSimplePipesClientMedium(
326 p.stdout, p.stdin, 'base')
327 client_medium._accept_bytes('abc\n')
328 p.wait()
329 self.assertEqual('abc\n', client_medium._read_bytes(4))
330 self.assertEqual('', client_medium._read_bytes(4))
331
174 def test_simple_pipes_client_disconnect_does_nothing(self):332 def test_simple_pipes_client_disconnect_does_nothing(self):
175 # calling disconnect does nothing.333 # calling disconnect does nothing.
176 input = StringIO()334 input = StringIO()
@@ -556,6 +714,28 @@
556 request.finished_reading()714 request.finished_reading()
557 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)715 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
558716
717 def test_reset(self):
718 server_sock, client_sock = portable_socket_pair()
719 # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
720 # bzr where it exists.
721 client_medium = medium.SmartTCPClientMedium(None, None, None)
722 client_medium._socket = client_sock
723 client_medium._connected = True
724 req = client_medium.get_request()
725 self.assertRaises(errors.TooManyConcurrentRequests,
726 client_medium.get_request)
727 client_medium.reset()
728 # The stream should be reset, marked as disconnected, though ready for
729 # us to make a new request
730 self.assertFalse(client_medium._connected)
731 self.assertIs(None, client_medium._socket)
732 try:
733 self.assertEqual('', client_sock.recv(1))
734 except socket.error, e:
735 if e.errno not in (errno.EBADF,):
736 raise
737 req = client_medium.get_request()
738
559739
560class RemoteTransportTests(TestCaseWithSmartMedium):740class RemoteTransportTests(TestCaseWithSmartMedium):
561741
@@ -609,20 +789,6 @@
609 super(TestSmartServerStreamMedium, self).setUp()789 super(TestSmartServerStreamMedium, self).setUp()
610 self._captureVar('BZR_NO_SMART_VFS', None)790 self._captureVar('BZR_NO_SMART_VFS', None)
611791
612 def portable_socket_pair(self):
613 """Return a pair of TCP sockets connected to each other.
614
615 Unlike socket.socketpair, this should work on Windows.
616 """
617 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
618 listen_sock.bind(('127.0.0.1', 0))
619 listen_sock.listen(1)
620 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
621 client_sock.connect(listen_sock.getsockname())
622 server_sock, addr = listen_sock.accept()
623 listen_sock.close()
624 return server_sock, client_sock
625
626 def test_smart_query_version(self):792 def test_smart_query_version(self):
627 """Feed a canned query version to a server"""793 """Feed a canned query version to a server"""
628 # wire-to-wire, using the whole stack794 # wire-to-wire, using the whole stack
@@ -687,7 +853,7 @@
687853
688 def test_socket_stream_with_bulk_data(self):854 def test_socket_stream_with_bulk_data(self):
689 sample_request_bytes = 'command\n9\nbulk datadone\n'855 sample_request_bytes = 'command\n9\nbulk datadone\n'
690 server_sock, client_sock = self.portable_socket_pair()856 server_sock, client_sock = portable_socket_pair()
691 server = medium.SmartServerSocketStreamMedium(857 server = medium.SmartServerSocketStreamMedium(
692 server_sock, None)858 server_sock, None)
693 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)859 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -706,7 +872,7 @@
706 self.assertTrue(server.finished)872 self.assertTrue(server.finished)
707873
708 def test_socket_stream_shutdown_detection(self):874 def test_socket_stream_shutdown_detection(self):
709 server_sock, client_sock = self.portable_socket_pair()875 server_sock, client_sock = portable_socket_pair()
710 client_sock.close()876 client_sock.close()
711 server = medium.SmartServerSocketStreamMedium(877 server = medium.SmartServerSocketStreamMedium(
712 server_sock, None)878 server_sock, None)
@@ -726,7 +892,7 @@
726 rest_of_request_bytes = 'lo\n'892 rest_of_request_bytes = 'lo\n'
727 expected_response = (893 expected_response = (
728 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')894 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
729 server_sock, client_sock = self.portable_socket_pair()895 server_sock, client_sock = portable_socket_pair()
730 server = medium.SmartServerSocketStreamMedium(896 server = medium.SmartServerSocketStreamMedium(
731 server_sock, None)897 server_sock, None)
732 client_sock.sendall(incomplete_request_bytes)898 client_sock.sendall(incomplete_request_bytes)
@@ -802,7 +968,7 @@
802 # _serve_one_request should still process both of them as if they had968 # _serve_one_request should still process both of them as if they had
803 # been received separately.969 # been received separately.
804 sample_request_bytes = 'command\n'970 sample_request_bytes = 'command\n'
805 server_sock, client_sock = self.portable_socket_pair()971 server_sock, client_sock = portable_socket_pair()
806 server = medium.SmartServerSocketStreamMedium(972 server = medium.SmartServerSocketStreamMedium(
807 server_sock, None)973 server_sock, None)
808 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)974 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -839,7 +1005,7 @@
839 self.assertTrue(server.finished)1005 self.assertTrue(server.finished)
8401006
841 def test_socket_stream_error_handling(self):1007 def test_socket_stream_error_handling(self):
842 server_sock, client_sock = self.portable_socket_pair()1008 server_sock, client_sock = portable_socket_pair()
843 server = medium.SmartServerSocketStreamMedium(1009 server = medium.SmartServerSocketStreamMedium(
844 server_sock, None)1010 server_sock, None)
845 fake_protocol = ErrorRaisingProtocol(Exception('boom'))1011 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
@@ -860,7 +1026,7 @@
860 self.assertEqual('', from_server.getvalue())1026 self.assertEqual('', from_server.getvalue())
8611027
862 def test_socket_stream_keyboard_interrupt_handling(self):1028 def test_socket_stream_keyboard_interrupt_handling(self):
863 server_sock, client_sock = self.portable_socket_pair()1029 server_sock, client_sock = portable_socket_pair()
864 server = medium.SmartServerSocketStreamMedium(1030 server = medium.SmartServerSocketStreamMedium(
865 server_sock, None)1031 server_sock, None)
866 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))1032 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
@@ -877,7 +1043,7 @@
877 return server._build_protocol()1043 return server._build_protocol()
8781044
879 def build_protocol_socket(self, bytes):1045 def build_protocol_socket(self, bytes):
880 server_sock, client_sock = self.portable_socket_pair()1046 server_sock, client_sock = portable_socket_pair()
881 server = medium.SmartServerSocketStreamMedium(1047 server = medium.SmartServerSocketStreamMedium(
882 server_sock, None)1048 server_sock, None)
883 client_sock.sendall(bytes)1049 client_sock.sendall(bytes)
@@ -3214,6 +3380,114 @@
3214 # encoder.3380 # encoder.
32153381
32163382
3383class Test_SmartClientRequest(tests.TestCase):
3384
3385 def make_client_with_failing_medium(self, fail_at_write=True):
3386 response = StringIO()
3387 output = StringIO()
3388 vendor = FirstRejectedStringIOSSHVendor(response, output,
3389 fail_at_write=fail_at_write)
3390 client_medium = medium.SmartSSHClientMedium(
3391 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
3392 'bzr')
3393 smart_client = client._SmartClient(client_medium, headers={})
3394 return output, vendor, smart_client
3395
3396 def test__send_no_retry_pipes(self):
3397 client_read, server_write = create_file_pipes()
3398 server_read, client_write = create_file_pipes()
3399 client_medium = medium.SmartSimplePipesClientMedium(client_read,
3400 client_write, base='/')
3401 smart_client = client._SmartClient(client_medium)
3402 smart_request = client._SmartClientRequest(smart_client,
3403 'hello', ())
3404 # Close the server side
3405 server_read.close()
3406 encoder, response_handler = smart_request._construct_protocol(3)
3407 self.assertRaises(errors.ConnectionReset,
3408 smart_request._send_no_retry, encoder)
3409
3410 def test__send_read_response_sockets(self):
3411 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3412 listen_sock.bind(('127.0.0.1', 0))
3413 listen_sock.listen(1)
3414 host, port = listen_sock.getsockname()
3415 client_medium = medium.SmartTCPClientMedium(host, port, '/')
3416 client_medium._ensure_connection()
3417 smart_client = client._SmartClient(client_medium)
3418 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3419 # Accept the connection, but don't actually talk to the client.
3420 server_sock, _ = listen_sock.accept()
3421 server_sock.close()
3422 # Sockets buffer and don't really notice that the server has closed the
3423 # connection until we try to read again.
3424 handler = smart_request._send(3)
3425 self.assertRaises(errors.ConnectionReset,
3426 handler.read_response_tuple, expect_body=False)
3427
3428 def test__send_retries_on_write(self):
3429 output, vendor, smart_client = self.make_client_with_failing_medium()
3430 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3431 handler = smart_request._send(3)
3432 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3433 '\x00\x00\x00\x02de' # empty headers
3434 's\x00\x00\x00\tl5:helloee',
3435 output.getvalue())
3436 self.assertEqual(
3437 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3438 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3439 ('close',),
3440 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3441 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3442 ],
3443 vendor.calls)
3444
3445 def test__send_doesnt_retry_read_failure(self):
3446 output, vendor, smart_client = self.make_client_with_failing_medium(
3447 fail_at_write=False)
3448 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3449 handler = smart_request._send(3)
3450 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3451 '\x00\x00\x00\x02de' # empty headers
3452 's\x00\x00\x00\tl5:helloee',
3453 output.getvalue())
3454 self.assertEqual(
3455 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3456 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3457 ],
3458 vendor.calls)
3459 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3460
3461 def test__send_doesnt_retry_body_stream(self):
3462 # We don't know how much of body_stream would get iterated as part of
3463 # _send before it failed to actually send the request, so we
3464 # just always fail in this condition.
3465 output, vendor, smart_client = self.make_client_with_failing_medium()
3466 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3467 body_stream=['a', 'b'])
3468 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3469 # We got one connect, but it fails, so we disconnect, but we don't
3470 # retry it
3471 self.assertEqual(
3472 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3473 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3474 ('close',),
3475 ],
3476 vendor.calls)
3477
3478 def test__send_disabled_retry(self):
3479 debug.debug_flags.add('noretry')
3480 output, vendor, smart_client = self.make_client_with_failing_medium()
3481 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3482 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3483 self.assertEqual(
3484 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3485 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3486 ('close',),
3487 ],
3488 vendor.calls)
3489
3490
3217class LengthPrefixedBodyDecoder(tests.TestCase):3491class LengthPrefixedBodyDecoder(tests.TestCase):
32183492
3219 # XXX: TODO: make accept_reading_trailer invoke translate_response or3493 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches