Merge lp:~jameinel/bzr/2.2-client-reconnect-819604 into lp:bzr/2.2

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.2-client-reconnect-819604
Merge into: lp:bzr/2.2
Diff against target: 1397 lines (+810/-219)
10 files modified
NEWS (+10/-0)
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+4/-1)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+22/-6)
bzrlib/smart/protocol.py (+5/-3)
bzrlib/smart/request.py (+144/-99)
bzrlib/tests/test_smart.py (+5/-2)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+400/-22)
To merge this branch: bzr merge lp:~jameinel/bzr/2.2-client-reconnect-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78842@code.launchpad.net

Commit message

Bug #819604, allow bzr-2.2 to gracefully reconnect to the server if we get a ConnectionReset at appropriate times.

Description of the change

This is just a rollup of all of my changes for the 2.1-client-reconnect (for bug #819604) but merged into bzr-2.2 and updated for the api changes, etc.

I suggest we review and approve the 2.1 series, since it is all nicely split out into logical steps, and then when ready any changes needed there can be brought into this branch, etc.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'NEWS'
--- NEWS 2011-09-02 18:47:36 +0000
+++ NEWS 2011-10-10 13:44:32 +0000
@@ -20,6 +20,11 @@
20Bug Fixes20Bug Fixes
21*********21*********
2222
23* Teach the bzr client how to reconnect if we get ``ConnectionReset``
24 while making an RPC request. This doesn't handle all possible network
25 disconnects, but it should at least handle when the server is asked to
26 shutdown gracefully. (John Arbash Meinel, #819604)
27
23Improvements28Improvements
24************29************
2530
@@ -111,6 +116,11 @@
111116
112 (John Arbash Meinel, #609187, #812928)117 (John Arbash Meinel, #609187, #812928)
113118
119* Teach the bzr client how to reconnect if we get ``ConnectionReset``
120 while making an RPC request. This doesn't handle all possible network
121 disconnects, but it should at least handle when the server is asked to
122 shutdown gracefully. (John Arbash Meinel, #819604)
123
114124
115Improvements125Improvements
116************126************
117127
=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
+++ bzrlib/help_topics/en/debug-flags.txt 2011-10-10 13:44:32 +0000
@@ -24,6 +24,8 @@
24-Dindex Trace major index operations.24-Dindex Trace major index operations.
25-Dknit Trace knit operations.25-Dknit Trace knit operations.
26-Dlock Trace when lockdir locks are taken or released.26-Dlock Trace when lockdir locks are taken or released.
27-Dnoretry If a connection is reset, fail immediately rather than
28 retrying the request.
27-Dprogress Trace progress bar operations.29-Dprogress Trace progress bar operations.
28-Dmerge Emit information for debugging merges.30-Dmerge Emit information for debugging merges.
29-Dno_apport Don't use apport to report crashes.31-Dno_apport Don't use apport to report crashes.
3032
=== modified file 'bzrlib/osutils.py'
--- bzrlib/osutils.py 2010-07-09 16:16:11 +0000
+++ bzrlib/osutils.py 2011-10-10 13:44:32 +0000
@@ -1993,6 +1993,9 @@
1993# data at once.1993# data at once.
1994MAX_SOCKET_CHUNK = 64 * 10241994MAX_SOCKET_CHUNK = 64 * 1024
19951995
1996WSAECONNABORTED = 10053
1997WSAECONNRESET = 10054
1998
1996def read_bytes_from_socket(sock, report_activity=None,1999def read_bytes_from_socket(sock, report_activity=None,
1997 max_read_size=MAX_SOCKET_CHUNK):2000 max_read_size=MAX_SOCKET_CHUNK):
1998 """Read up to max_read_size of bytes from sock and notify of progress.2001 """Read up to max_read_size of bytes from sock and notify of progress.
@@ -2006,7 +2009,7 @@
2006 bytes = sock.recv(max_read_size)2009 bytes = sock.recv(max_read_size)
2007 except socket.error, e:2010 except socket.error, e:
2008 eno = e.args[0]2011 eno = e.args[0]
2009 if eno == getattr(errno, "WSAECONNRESET", errno.ECONNRESET):2012 if eno in (errno.ECONNRESET, WSAECONNABORTED, WSAECONNRESET):
2010 # The connection was closed by the other side. Callers expect2013 # The connection was closed by the other side. Callers expect
2011 # an empty string to signal end-of-stream.2014 # an empty string to signal end-of-stream.
2012 return ""2015 return ""
20132016
=== modified file 'bzrlib/smart/client.py'
--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/client.py 2011-10-10 13:44:32 +0000
@@ -14,12 +14,18 @@
14# along with this program; if not, write to the Free Software14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1616
17from bzrlib import lazy_import
18lazy_import.lazy_import(globals(), """
19from bzrlib.smart import request as _mod_request
20""")
21
17import bzrlib22import bzrlib
18from bzrlib.smart import message, protocol23from bzrlib.smart import message, protocol
19from bzrlib.trace import warning
20from bzrlib import (24from bzrlib import (
25 debug,
21 errors,26 errors,
22 hooks,27 hooks,
28 trace,
23 )29 )
2430
2531
@@ -39,93 +45,12 @@
39 def __repr__(self):45 def __repr__(self):
40 return '%s(%r)' % (self.__class__.__name__, self._medium)46 return '%s(%r)' % (self.__class__.__name__, self._medium)
4147
42 def _send_request(self, protocol_version, method, args, body=None,
43 readv_body=None, body_stream=None):
44 encoder, response_handler = self._construct_protocol(
45 protocol_version)
46 encoder.set_headers(self._headers)
47 if body is not None:
48 if readv_body is not None:
49 raise AssertionError(
50 "body and readv_body are mutually exclusive.")
51 if body_stream is not None:
52 raise AssertionError(
53 "body and body_stream are mutually exclusive.")
54 encoder.call_with_body_bytes((method, ) + args, body)
55 elif readv_body is not None:
56 if body_stream is not None:
57 raise AssertionError(
58 "readv_body and body_stream are mutually exclusive.")
59 encoder.call_with_body_readv_array((method, ) + args, readv_body)
60 elif body_stream is not None:
61 encoder.call_with_body_stream((method, ) + args, body_stream)
62 else:
63 encoder.call(method, *args)
64 return response_handler
65
66 def _run_call_hooks(self, method, args, body, readv_body):
67 if not _SmartClient.hooks['call']:
68 return
69 params = CallHookParams(method, args, body, readv_body, self._medium)
70 for hook in _SmartClient.hooks['call']:
71 hook(params)
72
73 def _call_and_read_response(self, method, args, body=None, readv_body=None,48 def _call_and_read_response(self, method, args, body=None, readv_body=None,
74 body_stream=None, expect_response_body=True):49 body_stream=None, expect_response_body=True):
75 self._run_call_hooks(method, args, body, readv_body)50 request = _SmartClientRequest(self, method, args, body=body,
76 if self._medium._protocol_version is not None:51 readv_body=readv_body, body_stream=body_stream,
77 response_handler = self._send_request(52 expect_response_body=expect_response_body)
78 self._medium._protocol_version, method, args, body=body,53 return request.call_and_read_response()
79 readv_body=readv_body, body_stream=body_stream)
80 return (response_handler.read_response_tuple(
81 expect_body=expect_response_body),
82 response_handler)
83 else:
84 for protocol_version in [3, 2]:
85 if protocol_version == 2:
86 # If v3 doesn't work, the remote side is older than 1.6.
87 self._medium._remember_remote_is_before((1, 6))
88 response_handler = self._send_request(
89 protocol_version, method, args, body=body,
90 readv_body=readv_body, body_stream=body_stream)
91 try:
92 response_tuple = response_handler.read_response_tuple(
93 expect_body=expect_response_body)
94 except errors.UnexpectedProtocolVersionMarker, err:
95 # TODO: We could recover from this without disconnecting if
96 # we recognise the protocol version.
97 warning(
98 'Server does not understand Bazaar network protocol %d,'
99 ' reconnecting. (Upgrade the server to avoid this.)'
100 % (protocol_version,))
101 self._medium.disconnect()
102 continue
103 except errors.ErrorFromSmartServer:
104 # If we received an error reply from the server, then it
105 # must be ok with this protocol version.
106 self._medium._protocol_version = protocol_version
107 raise
108 else:
109 self._medium._protocol_version = protocol_version
110 return response_tuple, response_handler
111 raise errors.SmartProtocolError(
112 'Server is not a Bazaar server: ' + str(err))
113
114 def _construct_protocol(self, version):
115 request = self._medium.get_request()
116 if version == 3:
117 request_encoder = protocol.ProtocolThreeRequester(request)
118 response_handler = message.ConventionalResponseHandler()
119 response_proto = protocol.ProtocolThreeDecoder(
120 response_handler, expect_version_marker=True)
121 response_handler.setProtoAndMediumRequest(response_proto, request)
122 elif version == 2:
123 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
124 response_handler = request_encoder
125 else:
126 request_encoder = protocol.SmartClientRequestProtocolOne(request)
127 response_handler = request_encoder
128 return request_encoder, response_handler
12954
130 def call(self, method, *args):55 def call(self, method, *args):
131 """Call a method on the remote server."""56 """Call a method on the remote server."""
@@ -191,6 +116,203 @@
191 return self._medium.remote_path_from_transport(transport)116 return self._medium.remote_path_from_transport(transport)
192117
193118
119class _SmartClientRequest(object):
120 """Encapsulate the logic for a single request.
121
122 This class handles things like reconnecting and sending the request a
123 second time when the connection is reset in the middle. It also handles the
124 multiple requests that get made if we don't know what protocol the server
125 supports yet.
126
127 Generally, you build up one of these objects, passing in the arguments that
128 you want to send to the server, and then use 'call_and_read_response' to
129 get the response from the server.
130 """
131
132 def __init__(self, client, method, args, body=None, readv_body=None,
133 body_stream=None, expect_response_body=True):
134 self.client = client
135 self.method = method
136 self.args = args
137 self.body = body
138 self.readv_body = readv_body
139 self.body_stream = body_stream
140 self.expect_response_body = expect_response_body
141
142 def call_and_read_response(self):
143 """Send the request to the server, and read the initial response.
144
145 This doesn't read all of the body content of the response, instead it
146 returns (response_tuple, response_handler). response_tuple is the 'ok',
147 or 'error' information, and 'response_handler' can be used to get the
148 content stream out.
149 """
150 self._run_call_hooks()
151 protocol_version = self.client._medium._protocol_version
152 if protocol_version is None:
153 return self._call_determining_protocol_version()
154 else:
155 return self._call(protocol_version)
156
157 def _is_safe_to_send_twice(self):
158 """Check if the current method is re-entrant safe."""
159 if self.body_stream is not None or 'noretry' in debug.debug_flags:
160 # We can't restart a body stream that has already been consumed.
161 return False
162 request_type = _mod_request.request_handlers.get_info(self.method)
163 if request_type in ('read', 'idem', 'semi'):
164 return True
165 # If we have gotten this far, 'stream' cannot be retried, because we
166 # already consumed the local stream.
167 if request_type in ('semivfs', 'mutate', 'stream'):
168 return False
169 trace.mutter('Unknown request type: %s for method %s'
170 % (request_type, self.method))
171 return False
172
173 def _run_call_hooks(self):
174 if not _SmartClient.hooks['call']:
175 return
176 params = CallHookParams(self.method, self.args, self.body,
177 self.readv_body, self.client._medium)
178 for hook in _SmartClient.hooks['call']:
179 hook(params)
180
181 def _call(self, protocol_version):
182 """We know the protocol version.
183
184 So this just sends the request, and then reads the response. This is
185 where the code will be to retry requests if the connection is closed.
186 """
187 response_handler = self._send(protocol_version)
188 try:
189 response_tuple = response_handler.read_response_tuple(
190 expect_body=self.expect_response_body)
191 except errors.ConnectionReset, e:
192 self.client._medium.reset()
193 if not self._is_safe_to_send_twice():
194 raise
195 trace.warning('ConnectionReset reading response for %r, retrying'
196 % (self.method,))
197 trace.log_exception_quietly()
198 encoder, response_handler = self._construct_protocol(
199 protocol_version)
200 self._send_no_retry(encoder)
201 response_tuple = response_handler.read_response_tuple(
202 expect_body=self.expect_response_body)
203 return (response_tuple, response_handler)
204
205 def _call_determining_protocol_version(self):
206 """Determine what protocol the remote server supports.
207
208 We do this by placing a request in the most recent protocol, and
209 handling the UnexpectedProtocolVersionMarker from the server.
210 """
211 for protocol_version in [3, 2]:
212 if protocol_version == 2:
213 # If v3 doesn't work, the remote side is older than 1.6.
214 self.client._medium._remember_remote_is_before((1, 6))
215 try:
216 response_tuple, response_handler = self._call(protocol_version)
217 except errors.UnexpectedProtocolVersionMarker, err:
218 # TODO: We could recover from this without disconnecting if
219 # we recognise the protocol version.
220 trace.warning(
221 'Server does not understand Bazaar network protocol %d,'
222 ' reconnecting. (Upgrade the server to avoid this.)'
223 % (protocol_version,))
224 self.client._medium.disconnect()
225 continue
226 except errors.ErrorFromSmartServer:
227 # If we received an error reply from the server, then it
228 # must be ok with this protocol version.
229 self.client._medium._protocol_version = protocol_version
230 raise
231 else:
232 self.client._medium._protocol_version = protocol_version
233 return response_tuple, response_handler
234 raise errors.SmartProtocolError(
235 'Server is not a Bazaar server: ' + str(err))
236
237 def _construct_protocol(self, version):
238 """Build the encoding stack for a given protocol version."""
239 request = self.client._medium.get_request()
240 if version == 3:
241 request_encoder = protocol.ProtocolThreeRequester(request)
242 response_handler = message.ConventionalResponseHandler()
243 response_proto = protocol.ProtocolThreeDecoder(
244 response_handler, expect_version_marker=True)
245 response_handler.setProtoAndMediumRequest(response_proto, request)
246 elif version == 2:
247 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
248 response_handler = request_encoder
249 else:
250 request_encoder = protocol.SmartClientRequestProtocolOne(request)
251 response_handler = request_encoder
252 return request_encoder, response_handler
253
254 def _send(self, protocol_version):
255 """Encode the request, and send it to the server.
256
257 This will retry a request if we get a ConnectionReset while sending the
258 request to the server. (Unless we have a body_stream that we have
259 already started consuming, since we can't restart body_streams)
260
261 :return: response_handler as defined by _construct_protocol
262 """
263 encoder, response_handler = self._construct_protocol(protocol_version)
264 try:
265 self._send_no_retry(encoder)
266 except errors.ConnectionReset, e:
267 # If we fail during the _send_no_retry phase, then we can
268 # be confident that the server did not get our request, because we
269 # haven't started waiting for the reply yet. So try the request
270 # again. We only issue a single retry, because if the connection
271 # really is down, there is no reason to loop endlessly.
272
273 # Connection is dead, so close our end of it.
274 self.client._medium.reset()
275 if (('noretry' in debug.debug_flags)
276 or (self.body_stream is not None
277 and encoder.body_stream_started)):
278 # We can't restart a body_stream that has been partially
279 # consumed, so we don't retry.
280 # Note: We don't have to worry about
281 # SmartClientRequestProtocolOne or Two, because they don't
282 # support client-side body streams.
283 raise
284 trace.warning('ConnectionReset calling %r, retrying'
285 % (self.method,))
286 trace.log_exception_quietly()
287 encoder, response_handler = self._construct_protocol(
288 protocol_version)
289 self._send_no_retry(encoder)
290 return response_handler
291
292 def _send_no_retry(self, encoder):
293 """Just encode the request and try to send it."""
294 encoder.set_headers(self.client._headers)
295 if self.body is not None:
296 if self.readv_body is not None:
297 raise AssertionError(
298 "body and readv_body are mutually exclusive.")
299 if self.body_stream is not None:
300 raise AssertionError(
301 "body and body_stream are mutually exclusive.")
302 encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
303 elif self.readv_body is not None:
304 if self.body_stream is not None:
305 raise AssertionError(
306 "readv_body and body_stream are mutually exclusive.")
307 encoder.call_with_body_readv_array((self.method, ) + self.args,
308 self.readv_body)
309 elif self.body_stream is not None:
310 encoder.call_with_body_stream((self.method, ) + self.args,
311 self.body_stream)
312 else:
313 encoder.call(self.method, *self.args)
314
315
194class SmartClientHooks(hooks.Hooks):316class SmartClientHooks(hooks.Hooks):
195317
196 def __init__(self):318 def __init__(self):
197319
=== modified file 'bzrlib/smart/medium.py'
--- bzrlib/smart/medium.py 2010-06-21 08:08:04 +0000
+++ bzrlib/smart/medium.py 2011-10-10 13:44:32 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2011 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -24,6 +24,7 @@
24bzrlib/transport/smart/__init__.py.24bzrlib/transport/smart/__init__.py.
25"""25"""
2626
27import errno
27import os28import os
28import sys29import sys
29import urllib30import urllib
@@ -710,6 +711,14 @@
710 """711 """
711 return SmartClientStreamMediumRequest(self)712 return SmartClientStreamMediumRequest(self)
712713
714 def reset(self):
715 """We have been disconnected, reset current state.
716
717 This resets things like _current_request and connected state.
718 """
719 self.disconnect()
720 self._current_request = None
721
713722
714class SmartSimplePipesClientMedium(SmartClientStreamMedium):723class SmartSimplePipesClientMedium(SmartClientStreamMedium):
715 """A client medium using simple pipes.724 """A client medium using simple pipes.
@@ -724,11 +733,20 @@
724733
725 def _accept_bytes(self, bytes):734 def _accept_bytes(self, bytes):
726 """See SmartClientStreamMedium.accept_bytes."""735 """See SmartClientStreamMedium.accept_bytes."""
727 self._writeable_pipe.write(bytes)736 try:
737 self._writeable_pipe.write(bytes)
738 except IOError, e:
739 if e.errno in (errno.EINVAL, errno.EPIPE):
740 raise errors.ConnectionReset(
741 "Error trying to write to subprocess:\n%s" % (e,))
742 raise
728 self._report_activity(len(bytes), 'write')743 self._report_activity(len(bytes), 'write')
729744
730 def _flush(self):745 def _flush(self):
731 """See SmartClientStreamMedium._flush()."""746 """See SmartClientStreamMedium._flush()."""
747 # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
748 # However, testing shows that even when the child process is
749 # gone, this doesn't error.
732 self._writeable_pipe.flush()750 self._writeable_pipe.flush()
733751
734 def _read_bytes(self, count):752 def _read_bytes(self, count):
@@ -753,8 +771,8 @@
753771
754class SmartSSHClientMedium(SmartClientStreamMedium):772class SmartSSHClientMedium(SmartClientStreamMedium):
755 """A client medium using SSH.773 """A client medium using SSH.
756 774
757 It delegates IO to a SmartClientSocketMedium or775 It delegates IO to a SmartSimplePipesClientMedium or
758 SmartClientAlreadyConnectedSocketMedium (depending on platform).776 SmartClientAlreadyConnectedSocketMedium (depending on platform).
759 """777 """
760778
@@ -993,5 +1011,3 @@
993 This invokes self._medium._flush to ensure all bytes are transmitted.1011 This invokes self._medium._flush to ensure all bytes are transmitted.
994 """1012 """
995 self._medium._flush()1013 self._medium._flush()
996
997
9981014
=== modified file 'bzrlib/smart/protocol.py'
--- bzrlib/smart/protocol.py 2010-06-11 05:57:09 +0000
+++ bzrlib/smart/protocol.py 2011-10-10 13:44:32 +0000
@@ -1081,9 +1081,6 @@
1081 self._real_write_func = write_func1081 self._real_write_func = write_func
10821082
1083 def _write_func(self, bytes):1083 def _write_func(self, bytes):
1084 # TODO: It is probably more appropriate to use sum(map(len, _buf))
1085 # for total number of bytes to write, rather than buffer based on
1086 # the number of write() calls
1087 # TODO: Another possibility would be to turn this into an async model.1084 # TODO: Another possibility would be to turn this into an async model.
1088 # Where we let another thread know that we have some bytes if1085 # Where we let another thread know that we have some bytes if
1089 # they want it, but we don't actually block for it1086 # they want it, but we don't actually block for it
@@ -1292,6 +1289,7 @@
1292 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)1289 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1293 self._medium_request = medium_request1290 self._medium_request = medium_request
1294 self._headers = {}1291 self._headers = {}
1292 self.body_stream_started = None
12951293
1296 def set_headers(self, headers):1294 def set_headers(self, headers):
1297 self._headers = headers.copy()1295 self._headers = headers.copy()
@@ -1357,6 +1355,7 @@
1357 if path is not None:1355 if path is not None:
1358 mutter(' (to %s)', path)1356 mutter(' (to %s)', path)
1359 self._request_start_time = osutils.timer_func()1357 self._request_start_time = osutils.timer_func()
1358 self.body_stream_started = False
1360 self._write_protocol_version()1359 self._write_protocol_version()
1361 self._write_headers(self._headers)1360 self._write_headers(self._headers)
1362 self._write_structure(args)1361 self._write_structure(args)
@@ -1364,6 +1363,9 @@
1364 # have finished sending the stream. We would notice at the end1363 # have finished sending the stream. We would notice at the end
1365 # anyway, but if the medium can deliver it early then it's good1364 # anyway, but if the medium can deliver it early then it's good
1366 # to short-circuit the whole request...1365 # to short-circuit the whole request...
1366 # Provoke any ConnectionReset failures before we start the body stream.
1367 self.flush()
1368 self.body_stream_started = True
1367 for exc_info, part in _iter_with_errors(stream):1369 for exc_info, part in _iter_with_errors(stream):
1368 if exc_info is not None:1370 if exc_info is not None:
1369 # Iterating the stream failed. Cleanly abort the request.1371 # Iterating the stream failed. Cleanly abort the request.
13701372
=== modified file 'bzrlib/smart/request.py'
--- bzrlib/smart/request.py 2010-05-13 16:17:54 +0000
+++ bzrlib/smart/request.py 2011-10-10 13:44:32 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2011 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -486,154 +486,199 @@
486 return SuccessfulSmartServerResponse((answer,))486 return SuccessfulSmartServerResponse((answer,))
487487
488488
489# In the 'info' attribute, we store whether this request is 'safe' to retry if
490# we get a disconnect while reading the response. It can have the values:
491# read This is purely a read request, so retrying it is perfectly ok.
492# idem An idempotent write request. Something like 'put' where if you put
493# the same bytes twice you end up with the same final bytes.
494# semi This is a request that isn't strictly idempotent, but doesn't
495# result in corruption if it is retried. This is for things like
496# 'lock' and 'unlock'. If you call lock, it updates the disk
497# structure. If you fail to read the response, you won't be able to
498# use the lock, because you don't have the lock token. Calling lock
499# again will fail, because the lock is already taken. However, we
500# can't tell if the server received our request or not. If it didn't,
501# then retrying the request is fine, as it will actually do what we
502# want. If it did, we will interrupt the current operation, but we
503# are no worse off than interrupting the current operation because of
504# a ConnectionReset.
505# semivfs Similar to semi, but specific to a Virtual FileSystem request.
506# stream This is a request that takes a stream that cannot be restarted if
507# consumed. This request is 'safe' in that if we determine the
508# connection is closed before we consume the stream, we can try
509# again.
510# mutate State is updated in a way that replaying that request results in a
511# different state. For example 'append' writes more bytes to a given
512# file. If append succeeds, it moves the file pointer.
489request_handlers = registry.Registry()513request_handlers = registry.Registry()
490request_handlers.register_lazy(514request_handlers.register_lazy(
491 'append', 'bzrlib.smart.vfs', 'AppendRequest')515 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
492request_handlers.register_lazy(516request_handlers.register_lazy(
493 'Branch.get_config_file', 'bzrlib.smart.branch',517 'Branch.get_config_file', 'bzrlib.smart.branch',
494 'SmartServerBranchGetConfigFile')518 'SmartServerBranchGetConfigFile', info='read')
495request_handlers.register_lazy(519request_handlers.register_lazy(
496 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')520 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
521 info='read')
497request_handlers.register_lazy(522request_handlers.register_lazy(
498 'Branch.get_tags_bytes', 'bzrlib.smart.branch',523 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
499 'SmartServerBranchGetTagsBytes')524 'SmartServerBranchGetTagsBytes', info='read')
500request_handlers.register_lazy(525request_handlers.register_lazy(
501 'Branch.set_tags_bytes', 'bzrlib.smart.branch',526 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
502 'SmartServerBranchSetTagsBytes')527 'SmartServerBranchSetTagsBytes', info='idem')
503request_handlers.register_lazy(528request_handlers.register_lazy(
504 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')529 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
505request_handlers.register_lazy(530 'SmartServerBranchRequestGetStackedOnURL', info='read')
506 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')531request_handlers.register_lazy(
507request_handlers.register_lazy(532 'Branch.last_revision_info', 'bzrlib.smart.branch',
508 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')533 'SmartServerBranchRequestLastRevisionInfo', info='read')
509request_handlers.register_lazy( 'Branch.revision_history',534request_handlers.register_lazy(
510 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')535 'Branch.lock_write', 'bzrlib.smart.branch',
511request_handlers.register_lazy( 'Branch.set_config_option',536 'SmartServerBranchRequestLockWrite', info='semi')
512 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')537request_handlers.register_lazy(
513request_handlers.register_lazy( 'Branch.set_config_option_dict',538 'Branch.revision_history', 'bzrlib.smart.branch',
514 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict')539 'SmartServerRequestRevisionHistory', info='read')
515request_handlers.register_lazy( 'Branch.set_last_revision',540request_handlers.register_lazy(
516 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')541 'Branch.set_config_option', 'bzrlib.smart.branch',
542 'SmartServerBranchRequestSetConfigOption', info='idem')
543request_handlers.register_lazy(
544 'Branch.set_config_option_dict', 'bzrlib.smart.branch',
545 'SmartServerBranchRequestSetConfigOptionDict', info='idem')
546request_handlers.register_lazy(
547 'Branch.set_last_revision', 'bzrlib.smart.branch',
548 'SmartServerBranchRequestSetLastRevision', info='idem')
517request_handlers.register_lazy(549request_handlers.register_lazy(
518 'Branch.set_last_revision_info', 'bzrlib.smart.branch',550 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
519 'SmartServerBranchRequestSetLastRevisionInfo')551 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
520request_handlers.register_lazy(552request_handlers.register_lazy(
521 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',553 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
522 'SmartServerBranchRequestSetLastRevisionEx')554 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
523request_handlers.register_lazy(555request_handlers.register_lazy(
524 'Branch.set_parent_location', 'bzrlib.smart.branch',556 'Branch.set_parent_location', 'bzrlib.smart.branch',
525 'SmartServerBranchRequestSetParentLocation')557 'SmartServerBranchRequestSetParentLocation', info='idem')
526request_handlers.register_lazy(558request_handlers.register_lazy(
527 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')559 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock',
560 info='semi')
528request_handlers.register_lazy(561request_handlers.register_lazy(
529 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',562 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
530 'SmartServerBzrDirRequestCloningMetaDir')563 'SmartServerBzrDirRequestCloningMetaDir', info='read')
531request_handlers.register_lazy(564request_handlers.register_lazy(
532 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',565 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
533 'SmartServerRequestCreateBranch')566 'SmartServerRequestCreateBranch', info='semi')
534request_handlers.register_lazy(567request_handlers.register_lazy(
535 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',568 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
536 'SmartServerRequestCreateRepository')569 'SmartServerRequestCreateRepository', info='semi')
537request_handlers.register_lazy(570request_handlers.register_lazy(
538 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',571 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
539 'SmartServerRequestFindRepositoryV1')572 'SmartServerRequestFindRepositoryV1', info='read')
540request_handlers.register_lazy(573request_handlers.register_lazy(
541 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',574 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
542 'SmartServerRequestFindRepositoryV2')575 'SmartServerRequestFindRepositoryV2', info='read')
543request_handlers.register_lazy(576request_handlers.register_lazy(
544 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',577 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
545 'SmartServerRequestFindRepositoryV3')578 'SmartServerRequestFindRepositoryV3', info='read')
546request_handlers.register_lazy(579request_handlers.register_lazy(
547 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',580 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
548 'SmartServerBzrDirRequestConfigFile')581 'SmartServerBzrDirRequestConfigFile', info='read')
549request_handlers.register_lazy(582request_handlers.register_lazy(
550 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',583 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
551 'SmartServerRequestInitializeBzrDir')584 'SmartServerRequestInitializeBzrDir', info='semi')
552request_handlers.register_lazy(585request_handlers.register_lazy(
553 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',586 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
554 'SmartServerRequestBzrDirInitializeEx')587 'SmartServerRequestBzrDirInitializeEx', info='semi')
555request_handlers.register_lazy(588request_handlers.register_lazy(
556 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')589 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
557request_handlers.register_lazy(590 info='read')
558 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')591request_handlers.register_lazy(
592 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
593 'SmartServerRequestOpenBzrDir_2_1', info='read')
559request_handlers.register_lazy(594request_handlers.register_lazy(
560 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',595 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
561 'SmartServerRequestOpenBranch')596 'SmartServerRequestOpenBranch', info='read')
562request_handlers.register_lazy(597request_handlers.register_lazy(
563 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',598 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
564 'SmartServerRequestOpenBranchV2')599 'SmartServerRequestOpenBranchV2', info='read')
565request_handlers.register_lazy(600request_handlers.register_lazy(
566 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',601 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
567 'SmartServerRequestOpenBranchV3')602 'SmartServerRequestOpenBranchV3', info='read')
568request_handlers.register_lazy(603request_handlers.register_lazy(
569 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')604 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
570request_handlers.register_lazy(605request_handlers.register_lazy(
571 'get', 'bzrlib.smart.vfs', 'GetRequest')606 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
572request_handlers.register_lazy(607request_handlers.register_lazy(
573 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')608 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
574request_handlers.register_lazy(609request_handlers.register_lazy(
575 'has', 'bzrlib.smart.vfs', 'HasRequest')610 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
576request_handlers.register_lazy(611request_handlers.register_lazy(
577 'hello', 'bzrlib.smart.request', 'HelloRequest')612 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
578request_handlers.register_lazy(613request_handlers.register_lazy(
579 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')614 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
580request_handlers.register_lazy(615 info='read')
581 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')616request_handlers.register_lazy(
582request_handlers.register_lazy(617 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
583 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')618request_handlers.register_lazy(
584request_handlers.register_lazy(619 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
585 'move', 'bzrlib.smart.vfs', 'MoveRequest')620request_handlers.register_lazy(
586request_handlers.register_lazy(621 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
587 'put', 'bzrlib.smart.vfs', 'PutRequest')622request_handlers.register_lazy(
588request_handlers.register_lazy(623 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
589 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')624request_handlers.register_lazy(
590request_handlers.register_lazy(625 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
591 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')626request_handlers.register_lazy(
592request_handlers.register_lazy(627 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
593 'rename', 'bzrlib.smart.vfs', 'RenameRequest')628request_handlers.register_lazy(
629 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
594request_handlers.register_lazy(630request_handlers.register_lazy(
595 'PackRepository.autopack', 'bzrlib.smart.packrepository',631 'PackRepository.autopack', 'bzrlib.smart.packrepository',
596 'SmartServerPackRepositoryAutopack')632 'SmartServerPackRepositoryAutopack', info='idem')
597request_handlers.register_lazy('Repository.gather_stats',633request_handlers.register_lazy(
598 'bzrlib.smart.repository',634 'Repository.gather_stats', 'bzrlib.smart.repository',
599 'SmartServerRepositoryGatherStats')635 'SmartServerRepositoryGatherStats', info='read')
600request_handlers.register_lazy('Repository.get_parent_map',636request_handlers.register_lazy(
601 'bzrlib.smart.repository',637 'Repository.get_parent_map', 'bzrlib.smart.repository',
602 'SmartServerRepositoryGetParentMap')638 'SmartServerRepositoryGetParentMap', info='read')
603request_handlers.register_lazy(639request_handlers.register_lazy(
604 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')640 'Repository.get_revision_graph', 'bzrlib.smart.repository',
605request_handlers.register_lazy(641 'SmartServerRepositoryGetRevisionGraph', info='read')
606 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')642request_handlers.register_lazy(
607request_handlers.register_lazy(643 'Repository.has_revision', 'bzrlib.smart.repository',
608 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')644 'SmartServerRequestHasRevision', info='read')
609request_handlers.register_lazy(645request_handlers.register_lazy(
610 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')646 'Repository.insert_stream', 'bzrlib.smart.repository',
611request_handlers.register_lazy(647 'SmartServerRepositoryInsertStream', info='stream')
612 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')648request_handlers.register_lazy(
613request_handlers.register_lazy(649 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
614 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')650 'SmartServerRepositoryInsertStream_1_19', info='stream')
615request_handlers.register_lazy(651request_handlers.register_lazy(
616 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')652 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
653 'SmartServerRepositoryInsertStreamLocked', info='stream')
654request_handlers.register_lazy(
655 'Repository.is_shared', 'bzrlib.smart.repository',
656 'SmartServerRepositoryIsShared', info='read')
657request_handlers.register_lazy(
658 'Repository.lock_write', 'bzrlib.smart.repository',
659 'SmartServerRepositoryLockWrite', info='semi')
617request_handlers.register_lazy(660request_handlers.register_lazy(
618 'Repository.set_make_working_trees', 'bzrlib.smart.repository',661 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
619 'SmartServerRepositorySetMakeWorkingTrees')662 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
620request_handlers.register_lazy(663request_handlers.register_lazy(
621 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')664 'Repository.unlock', 'bzrlib.smart.repository',
665 'SmartServerRepositoryUnlock', info='semi')
622request_handlers.register_lazy(666request_handlers.register_lazy(
623 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',667 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
624 'SmartServerRepositoryGetRevIdForRevno')668 'SmartServerRepositoryGetRevIdForRevno', info='read')
625request_handlers.register_lazy(669request_handlers.register_lazy(
626 'Repository.get_stream', 'bzrlib.smart.repository',670 'Repository.get_stream', 'bzrlib.smart.repository',
627 'SmartServerRepositoryGetStream')671 'SmartServerRepositoryGetStream', info='read')
628request_handlers.register_lazy(672request_handlers.register_lazy(
629 'Repository.get_stream_1.19', 'bzrlib.smart.repository',673 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
630 'SmartServerRepositoryGetStream_1_19')674 'SmartServerRepositoryGetStream_1_19', info='read')
631request_handlers.register_lazy(675request_handlers.register_lazy(
632 'Repository.tarball', 'bzrlib.smart.repository',676 'Repository.tarball', 'bzrlib.smart.repository',
633 'SmartServerRepositoryTarball')677 'SmartServerRepositoryTarball', info='read')
634request_handlers.register_lazy(678request_handlers.register_lazy(
635 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')679 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
636request_handlers.register_lazy(680request_handlers.register_lazy(
637 'stat', 'bzrlib.smart.vfs', 'StatRequest')681 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
638request_handlers.register_lazy(682request_handlers.register_lazy(
639 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')683 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly',
684 info='read')
640685
=== modified file 'bzrlib/tests/test_smart.py'
--- bzrlib/tests/test_smart.py 2010-05-13 16:17:54 +0000
+++ bzrlib/tests/test_smart.py 2011-10-10 13:44:32 +0000
@@ -1849,8 +1849,11 @@
1849 """All registered request_handlers can be found."""1849 """All registered request_handlers can be found."""
1850 # If there's a typo in a register_lazy call, this loop will fail with1850 # If there's a typo in a register_lazy call, this loop will fail with
1851 # an AttributeError.1851 # an AttributeError.
1852 for key, item in smart_req.request_handlers.iteritems():1852 for key in smart_req.request_handlers.keys():
1853 pass1853 try:
1854 item = smart_req.request_handlers.get(key)
1855 except AttributeError, e:
1856 raise AttributeError('failed to get %s: %s' % (key, e))
18541857
1855 def assertHandlerEqual(self, verb, handler):1858 def assertHandlerEqual(self, verb, handler):
1856 self.assertEqual(smart_req.request_handlers.get(verb), handler)1859 self.assertEqual(smart_req.request_handlers.get(verb), handler)
18571860
=== modified file 'bzrlib/tests/test_smart_request.py'
--- bzrlib/tests/test_smart_request.py 2010-06-20 11:18:38 +0000
+++ bzrlib/tests/test_smart_request.py 2011-10-10 13:44:32 +0000
@@ -111,6 +111,16 @@
111 self.assertEqual(111 self.assertEqual(
112 [[transport]] * 3, handler._command.jail_transports_log)112 [[transport]] * 3, handler._command.jail_transports_log)
113113
114 def test_all_registered_requests_are_safety_qualified(self):
115 unclassified_requests = []
116 allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
117 for key in request.request_handlers.keys():
118 info = request.request_handlers.get_info(key)
119 if info is None or info not in allowed_info:
120 unclassified_requests.append(key)
121 if unclassified_requests:
122 self.fail('These requests were not categorized as safe/unsafe'
123 ' to retry: %s' % (unclassified_requests,))
114124
115125
116class TestSmartRequestHandlerErrorTranslation(TestCase):126class TestSmartRequestHandlerErrorTranslation(TestCase):
117127
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- bzrlib/tests/test_smart_transport.py 2010-06-25 09:56:07 +0000
+++ bzrlib/tests/test_smart_transport.py 2011-10-10 13:44:32 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2011 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -18,13 +18,17 @@
1818
19# all of this deals with byte strings so this is safe19# all of this deals with byte strings so this is safe
20from cStringIO import StringIO20from cStringIO import StringIO
21import errno
21import os22import os
22import socket23import socket
24import subprocess
25import sys
23import threading26import threading
2427
25import bzrlib28import bzrlib
26from bzrlib import (29from bzrlib import (
27 bzrdir,30 bzrdir,
31 debug,
28 errors,32 errors,
29 osutils,33 osutils,
30 tests,34 tests,
@@ -50,6 +54,29 @@
50 )54 )
5155
5256
57def create_file_pipes():
58 r, w = os.pipe()
59 # These must be opened without buffering, or we get undefined results
60 rf = os.fdopen(r, 'rb', 0)
61 wf = os.fdopen(w, 'wb', 0)
62 return rf, wf
63
64
65def portable_socket_pair():
66 """Return a pair of TCP sockets connected to each other.
67
68 Unlike socket.socketpair, this should work on Windows.
69 """
70 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
71 listen_sock.bind(('127.0.0.1', 0))
72 listen_sock.listen(1)
73 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74 client_sock.connect(listen_sock.getsockname())
75 server_sock, addr = listen_sock.accept()
76 listen_sock.close()
77 return server_sock, client_sock
78
79
53class StringIOSSHVendor(object):80class StringIOSSHVendor(object):
54 """A SSH vendor that uses StringIO to buffer writes and answer reads."""81 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
5582
@@ -64,6 +91,27 @@
64 return StringIOSSHConnection(self)91 return StringIOSSHConnection(self)
6592
6693
94class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
95 """The first connection will be considered closed.
96
97 The second connection will succeed normally.
98 """
99
100 def __init__(self, read_from, write_to, fail_at_write=True):
101 super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
102 write_to)
103 self.fail_at_write = fail_at_write
104 self._first = True
105
106 def connect_ssh(self, username, password, host, port, command):
107 self.calls.append(('connect_ssh', username, password, host, port,
108 command))
109 if self._first:
110 self._first = False
111 return ClosedSSHConnection(self)
112 return StringIOSSHConnection(self)
113
114
67class StringIOSSHConnection(ssh.SSHConnection):115class StringIOSSHConnection(ssh.SSHConnection):
68 """A SSH connection that uses StringIO to buffer writes and answer reads."""116 """A SSH connection that uses StringIO to buffer writes and answer reads."""
69117
@@ -79,6 +127,29 @@
79 return 'pipes', (self.vendor.read_from, self.vendor.write_to)127 return 'pipes', (self.vendor.read_from, self.vendor.write_to)
80128
81129
130class ClosedSSHConnection(ssh.SSHConnection):
131 """An SSH connection that just has closed channels."""
132
133 def __init__(self, vendor):
134 self.vendor = vendor
135
136 def close(self):
137 self.vendor.calls.append(('close', ))
138
139 def get_sock_or_pipes(self):
140 # We create matching pipes, and then close the ssh side
141 bzr_read, ssh_write = create_file_pipes()
142 # We always fail when bzr goes to read
143 ssh_write.close()
144 if self.vendor.fail_at_write:
145 # If set, we'll also fail when bzr goes to write
146 ssh_read, bzr_write = create_file_pipes()
147 ssh_read.close()
148 else:
149 bzr_write = self.vendor.write_to
150 return 'pipes', (bzr_read, bzr_write)
151
152
82class _InvalidHostnameFeature(tests.Feature):153class _InvalidHostnameFeature(tests.Feature):
83 """Does 'non_existent.invalid' fail to resolve?154 """Does 'non_existent.invalid' fail to resolve?
84155
@@ -174,6 +245,91 @@
174 client_medium._accept_bytes('abc')245 client_medium._accept_bytes('abc')
175 self.assertEqual('abc', output.getvalue())246 self.assertEqual('abc', output.getvalue())
176247
248 def test_simple_pipes__accept_bytes_subprocess_closed(self):
249 # It is unfortunate that we have to use Popen for this. However,
250 # os.pipe() does not behave the same as subprocess.Popen().
251 # On Windows, if you use os.pipe() and close the write side,
252 # read.read() hangs. On Linux, read.read() returns the empty string.
253 p = subprocess.Popen([sys.executable, '-c',
254 'import sys\n'
255 'sys.stdout.write(sys.stdin.read(4))\n'
256 'sys.stdout.close()\n'],
257 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
258 client_medium = medium.SmartSimplePipesClientMedium(
259 p.stdout, p.stdin, 'base')
260 client_medium._accept_bytes('abc\n')
261 self.assertEqual('abc', client_medium._read_bytes(3))
262 p.wait()
263 # While writing to the underlying pipe,
264 # Windows py2.6.6 we get IOError(EINVAL)
265 # Lucid py2.6.5, we get IOError(EPIPE)
266 # In both cases, it should be wrapped to ConnectionReset
267 self.assertRaises(errors.ConnectionReset,
268 client_medium._accept_bytes, 'more')
269
270 def test_simple_pipes__accept_bytes_pipe_closed(self):
271 child_read, client_write = create_file_pipes()
272 client_medium = medium.SmartSimplePipesClientMedium(
273 None, client_write, 'base')
274 client_medium._accept_bytes('abc\n')
275 self.assertEqual('abc\n', child_read.read(4))
276 # While writing to the underlying pipe,
277 # Windows py2.6.6 we get IOError(EINVAL)
278 # Lucid py2.6.5, we get IOError(EPIPE)
279 # In both cases, it should be wrapped to ConnectionReset
280 child_read.close()
281 self.assertRaises(errors.ConnectionReset,
282 client_medium._accept_bytes, 'more')
283
284 def test_simple_pipes__flush_pipe_closed(self):
285 child_read, client_write = create_file_pipes()
286 client_medium = medium.SmartSimplePipesClientMedium(
287 None, client_write, 'base')
288 client_medium._accept_bytes('abc\n')
289 child_read.close()
290 # Even though the pipe is closed, flush on the write side seems to be a
291 # no-op, rather than a failure.
292 client_medium._flush()
293
294 def test_simple_pipes__flush_subprocess_closed(self):
295 p = subprocess.Popen([sys.executable, '-c',
296 'import sys\n'
297 'sys.stdout.write(sys.stdin.read(4))\n'
298 'sys.stdout.close()\n'],
299 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
300 client_medium = medium.SmartSimplePipesClientMedium(
301 p.stdout, p.stdin, 'base')
302 client_medium._accept_bytes('abc\n')
303 p.wait()
304 # Even though the child process is dead, flush seems to be a no-op.
305 client_medium._flush()
306
307 def test_simple_pipes__read_bytes_pipe_closed(self):
308 child_read, client_write = create_file_pipes()
309 client_medium = medium.SmartSimplePipesClientMedium(
310 child_read, client_write, 'base')
311 client_medium._accept_bytes('abc\n')
312 client_write.close()
313 self.assertEqual('abc\n', client_medium._read_bytes(4))
314 self.assertEqual('', client_medium._read_bytes(4))
315
316 def test_simple_pipes__read_bytes_subprocess_closed(self):
317 p = subprocess.Popen([sys.executable, '-c',
318 'import sys\n'
319 'if sys.platform == "win32":\n'
320 ' import msvcrt, os\n'
321 ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
322 ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
323 'sys.stdout.write(sys.stdin.read(4))\n'
324 'sys.stdout.close()\n'],
325 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
326 client_medium = medium.SmartSimplePipesClientMedium(
327 p.stdout, p.stdin, 'base')
328 client_medium._accept_bytes('abc\n')
329 p.wait()
330 self.assertEqual('abc\n', client_medium._read_bytes(4))
331 self.assertEqual('', client_medium._read_bytes(4))
332
177 def test_simple_pipes_client_disconnect_does_nothing(self):333 def test_simple_pipes_client_disconnect_does_nothing(self):
178 # calling disconnect does nothing.334 # calling disconnect does nothing.
179 input = StringIO()335 input = StringIO()
@@ -561,6 +717,28 @@
561 request.finished_reading()717 request.finished_reading()
562 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)718 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
563719
720 def test_reset(self):
721 server_sock, client_sock = portable_socket_pair()
722 # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
723 # bzr where it exists.
724 client_medium = medium.SmartTCPClientMedium(None, None, None)
725 client_medium._socket = client_sock
726 client_medium._connected = True
727 req = client_medium.get_request()
728 self.assertRaises(errors.TooManyConcurrentRequests,
729 client_medium.get_request)
730 client_medium.reset()
731 # The stream should be reset, marked as disconnected, though ready for
732 # us to make a new request
733 self.assertFalse(client_medium._connected)
734 self.assertIs(None, client_medium._socket)
735 try:
736 self.assertEqual('', client_sock.recv(1))
737 except socket.error, e:
738 if e.errno not in (errno.EBADF,):
739 raise
740 req = client_medium.get_request()
741
564742
565class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):743class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):
566744
@@ -614,20 +792,6 @@
614 super(TestSmartServerStreamMedium, self).setUp()792 super(TestSmartServerStreamMedium, self).setUp()
615 self._captureVar('BZR_NO_SMART_VFS', None)793 self._captureVar('BZR_NO_SMART_VFS', None)
616794
617 def portable_socket_pair(self):
618 """Return a pair of TCP sockets connected to each other.
619
620 Unlike socket.socketpair, this should work on Windows.
621 """
622 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
623 listen_sock.bind(('127.0.0.1', 0))
624 listen_sock.listen(1)
625 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
626 client_sock.connect(listen_sock.getsockname())
627 server_sock, addr = listen_sock.accept()
628 listen_sock.close()
629 return server_sock, client_sock
630
631 def test_smart_query_version(self):795 def test_smart_query_version(self):
632 """Feed a canned query version to a server"""796 """Feed a canned query version to a server"""
633 # wire-to-wire, using the whole stack797 # wire-to-wire, using the whole stack
@@ -692,7 +856,7 @@
692856
693 def test_socket_stream_with_bulk_data(self):857 def test_socket_stream_with_bulk_data(self):
694 sample_request_bytes = 'command\n9\nbulk datadone\n'858 sample_request_bytes = 'command\n9\nbulk datadone\n'
695 server_sock, client_sock = self.portable_socket_pair()859 server_sock, client_sock = portable_socket_pair()
696 server = medium.SmartServerSocketStreamMedium(860 server = medium.SmartServerSocketStreamMedium(
697 server_sock, None)861 server_sock, None)
698 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)862 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -711,7 +875,7 @@
711 self.assertTrue(server.finished)875 self.assertTrue(server.finished)
712876
713 def test_socket_stream_shutdown_detection(self):877 def test_socket_stream_shutdown_detection(self):
714 server_sock, client_sock = self.portable_socket_pair()878 server_sock, client_sock = portable_socket_pair()
715 client_sock.close()879 client_sock.close()
716 server = medium.SmartServerSocketStreamMedium(880 server = medium.SmartServerSocketStreamMedium(
717 server_sock, None)881 server_sock, None)
@@ -731,7 +895,7 @@
731 rest_of_request_bytes = 'lo\n'895 rest_of_request_bytes = 'lo\n'
732 expected_response = (896 expected_response = (
733 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')897 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
734 server_sock, client_sock = self.portable_socket_pair()898 server_sock, client_sock = portable_socket_pair()
735 server = medium.SmartServerSocketStreamMedium(899 server = medium.SmartServerSocketStreamMedium(
736 server_sock, None)900 server_sock, None)
737 client_sock.sendall(incomplete_request_bytes)901 client_sock.sendall(incomplete_request_bytes)
@@ -807,7 +971,7 @@
807 # _serve_one_request should still process both of them as if they had971 # _serve_one_request should still process both of them as if they had
808 # been received separately.972 # been received separately.
809 sample_request_bytes = 'command\n'973 sample_request_bytes = 'command\n'
810 server_sock, client_sock = self.portable_socket_pair()974 server_sock, client_sock = portable_socket_pair()
811 server = medium.SmartServerSocketStreamMedium(975 server = medium.SmartServerSocketStreamMedium(
812 server_sock, None)976 server_sock, None)
813 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)977 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -844,7 +1008,7 @@
844 self.assertTrue(server.finished)1008 self.assertTrue(server.finished)
8451009
846 def test_socket_stream_error_handling(self):1010 def test_socket_stream_error_handling(self):
847 server_sock, client_sock = self.portable_socket_pair()1011 server_sock, client_sock = portable_socket_pair()
848 server = medium.SmartServerSocketStreamMedium(1012 server = medium.SmartServerSocketStreamMedium(
849 server_sock, None)1013 server_sock, None)
850 fake_protocol = ErrorRaisingProtocol(Exception('boom'))1014 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
@@ -865,7 +1029,7 @@
865 self.assertEqual('', from_server.getvalue())1029 self.assertEqual('', from_server.getvalue())
8661030
867 def test_socket_stream_keyboard_interrupt_handling(self):1031 def test_socket_stream_keyboard_interrupt_handling(self):
868 server_sock, client_sock = self.portable_socket_pair()1032 server_sock, client_sock = portable_socket_pair()
869 server = medium.SmartServerSocketStreamMedium(1033 server = medium.SmartServerSocketStreamMedium(
870 server_sock, None)1034 server_sock, None)
871 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))1035 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
@@ -882,7 +1046,7 @@
882 return server._build_protocol()1046 return server._build_protocol()
8831047
884 def build_protocol_socket(self, bytes):1048 def build_protocol_socket(self, bytes):
885 server_sock, client_sock = self.portable_socket_pair()1049 server_sock, client_sock = portable_socket_pair()
886 server = medium.SmartServerSocketStreamMedium(1050 server = medium.SmartServerSocketStreamMedium(
887 server_sock, None)1051 server_sock, None)
888 client_sock.sendall(bytes)1052 client_sock.sendall(bytes)
@@ -2785,6 +2949,33 @@
2785 'e', # end2949 'e', # end
2786 output.getvalue())2950 output.getvalue())
27872951
2952 def test_records_start_of_body_stream(self):
2953 requester, output = self.make_client_encoder_and_output()
2954 requester.set_headers({})
2955 in_stream = [False]
2956 def stream_checker():
2957 self.assertTrue(requester.body_stream_started)
2958 in_stream[0] = True
2959 yield 'content'
2960 flush_called = []
2961 orig_flush = requester.flush
2962 def tracked_flush():
2963 flush_called.append(in_stream[0])
2964 if in_stream[0]:
2965 self.assertTrue(requester.body_stream_started)
2966 else:
2967 self.assertFalse(requester.body_stream_started)
2968 return orig_flush()
2969 requester.flush = tracked_flush
2970 requester.call_with_body_stream(('one arg',), stream_checker())
2971 self.assertEqual(
2972 'bzr message 3 (bzr 1.6)\n' # protocol version
2973 '\x00\x00\x00\x02de' # headers
2974 's\x00\x00\x00\x0bl7:one arge' # args
2975 'b\x00\x00\x00\x07content' # body
2976 'e', output.getvalue())
2977 self.assertEqual([False, True, True], flush_called)
2978
27882979
2789class StubMediumRequest(object):2980class StubMediumRequest(object):
2790 """A stub medium request that tracks the number of times accept_bytes is2981 """A stub medium request that tracks the number of times accept_bytes is
@@ -3209,6 +3400,193 @@
3209 # encoder.3400 # encoder.
32103401
32113402
3403class Test_SmartClientRequest(tests.TestCase):
3404
3405 def make_client_with_failing_medium(self, fail_at_write=True, response=''):
3406 response_io = StringIO(response)
3407 output = StringIO()
3408 vendor = FirstRejectedStringIOSSHVendor(response_io, output,
3409 fail_at_write=fail_at_write)
3410 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3411 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3412 smart_client = client._SmartClient(client_medium, headers={})
3413 return output, vendor, smart_client
3414
3415 def make_response(self, args, body=None, body_stream=None):
3416 response_io = StringIO()
3417 response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
3418 body_stream=body_stream)
3419 responder = protocol.ProtocolThreeResponder(response_io.write)
3420 responder.send_response(response)
3421 return response_io.getvalue()
3422
3423 def test__call_doesnt_retry_append(self):
3424 response = self.make_response(('appended', '8'))
3425 output, vendor, smart_client = self.make_client_with_failing_medium(
3426 fail_at_write=False, response=response)
3427 smart_request = client._SmartClientRequest(smart_client, 'append',
3428 ('foo', ''), body='content\n')
3429 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3430
3431 def test__call_retries_get_bytes(self):
3432 response = self.make_response(('ok',), 'content\n')
3433 output, vendor, smart_client = self.make_client_with_failing_medium(
3434 fail_at_write=False, response=response)
3435 smart_request = client._SmartClientRequest(smart_client, 'get',
3436 ('foo',))
3437 response, response_handler = smart_request._call(3)
3438 self.assertEqual(('ok',), response)
3439 self.assertEqual('content\n', response_handler.read_body_bytes())
3440
3441 def test__call_noretry_get_bytes(self):
3442 debug.debug_flags.add('noretry')
3443 response = self.make_response(('ok',), 'content\n')
3444 output, vendor, smart_client = self.make_client_with_failing_medium(
3445 fail_at_write=False, response=response)
3446 smart_request = client._SmartClientRequest(smart_client, 'get',
3447 ('foo',))
3448 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3449
3450 def test__send_no_retry_pipes(self):
3451 client_read, server_write = create_file_pipes()
3452 server_read, client_write = create_file_pipes()
3453 client_medium = medium.SmartSimplePipesClientMedium(client_read,
3454 client_write, base='/')
3455 smart_client = client._SmartClient(client_medium)
3456 smart_request = client._SmartClientRequest(smart_client,
3457 'hello', ())
3458 # Close the server side
3459 server_read.close()
3460 encoder, response_handler = smart_request._construct_protocol(3)
3461 self.assertRaises(errors.ConnectionReset,
3462 smart_request._send_no_retry, encoder)
3463
3464 def test__send_read_response_sockets(self):
3465 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3466 listen_sock.bind(('127.0.0.1', 0))
3467 listen_sock.listen(1)
3468 host, port = listen_sock.getsockname()
3469 client_medium = medium.SmartTCPClientMedium(host, port, '/')
3470 client_medium._ensure_connection()
3471 smart_client = client._SmartClient(client_medium)
3472 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3473 # Accept the connection, but don't actually talk to the client.
3474 server_sock, _ = listen_sock.accept()
3475 server_sock.close()
3476 # Sockets buffer and don't really notice that the server has closed the
3477 # connection until we try to read again.
3478 handler = smart_request._send(3)
3479 self.assertRaises(errors.ConnectionReset,
3480 handler.read_response_tuple, expect_body=False)
3481
3482 def test__send_retries_on_write(self):
3483 output, vendor, smart_client = self.make_client_with_failing_medium()
3484 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3485 handler = smart_request._send(3)
3486 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3487 '\x00\x00\x00\x02de' # empty headers
3488 's\x00\x00\x00\tl5:helloee',
3489 output.getvalue())
3490 self.assertEqual(
3491 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3492 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3493 ('close',),
3494 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3495 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3496 ],
3497 vendor.calls)
3498
3499 def test__send_doesnt_retry_read_failure(self):
3500 output, vendor, smart_client = self.make_client_with_failing_medium(
3501 fail_at_write=False)
3502 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3503 handler = smart_request._send(3)
3504 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3505 '\x00\x00\x00\x02de' # empty headers
3506 's\x00\x00\x00\tl5:helloee',
3507 output.getvalue())
3508 self.assertEqual(
3509 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3510 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3511 ],
3512 vendor.calls)
3513 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3514
3515 def test__send_request_retries_body_stream_if_not_started(self):
3516 output, vendor, smart_client = self.make_client_with_failing_medium()
3517 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3518 body_stream=['a', 'b'])
3519 response_handler = smart_request._send(3)
3520 # We connect, get disconnected, and notice before consuming the stream,
3521 # so we try again one time and succeed.
3522 self.assertEqual(
3523 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3524 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3525 ('close',),
3526 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3527 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3528 ],
3529 vendor.calls)
3530 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3531 '\x00\x00\x00\x02de' # empty headers
3532 's\x00\x00\x00\tl5:helloe'
3533 'b\x00\x00\x00\x01a'
3534 'b\x00\x00\x00\x01b'
3535 'e',
3536 output.getvalue())
3537
3538 def test__send_request_stops_if_body_started(self):
3539 # We intentionally use the python StringIO so that we can subclass it.
3540 from StringIO import StringIO
3541 response = StringIO()
3542
3543 class FailAfterFirstWrite(StringIO):
3544 """Allow one 'write' call to pass, fail the rest"""
3545 def __init__(self):
3546 StringIO.__init__(self)
3547 self._first = True
3548
3549 def write(self, s):
3550 if self._first:
3551 self._first = False
3552 return StringIO.write(self, s)
3553 raise IOError(errno.EINVAL, 'invalid file handle')
3554 output = FailAfterFirstWrite()
3555
3556 vendor = FirstRejectedStringIOSSHVendor(response, output,
3557 fail_at_write=False)
3558 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3559 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3560 smart_client = client._SmartClient(client_medium, headers={})
3561 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3562 body_stream=['a', 'b'])
3563 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3564 # We connect, and manage to get to the point that we start consuming
3565 # the body stream. The next write fails, so we just stop.
3566 self.assertEqual(
3567 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3568 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3569 ('close',),
3570 ],
3571 vendor.calls)
3572 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3573 '\x00\x00\x00\x02de' # empty headers
3574 's\x00\x00\x00\tl5:helloe',
3575 output.getvalue())
3576
3577 def test__send_disabled_retry(self):
3578 debug.debug_flags.add('noretry')
3579 output, vendor, smart_client = self.make_client_with_failing_medium()
3580 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3581 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3582 self.assertEqual(
3583 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3584 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3585 ('close',),
3586 ],
3587 vendor.calls)
3588
3589
3212class LengthPrefixedBodyDecoder(tests.TestCase):3590class LengthPrefixedBodyDecoder(tests.TestCase):
32133591
3214 # XXX: TODO: make accept_reading_trailer invoke translate_response or3592 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches