Merge lp:~jameinel/bzr/2.3-client-reconnect-819604 into lp:bzr/2.3

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.3-client-reconnect-819604
Merge into: lp:bzr/2.3
Diff against target: 1387 lines (+810/-217)
11 files modified
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+22/-6)
bzrlib/smart/protocol.py (+5/-3)
bzrlib/smart/request.py (+144/-99)
bzrlib/tests/test_smart.py (+5/-2)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+399/-21)
doc/en/release-notes/bzr-2.1.txt (+5/-0)
doc/en/release-notes/bzr-2.2.txt (+5/-0)
doc/en/release-notes/bzr-2.3.txt (+5/-0)
To merge this branch: bzr merge lp:~jameinel/bzr/2.3-client-reconnect-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78843@code.launchpad.net

Commit message

Bug #819604, allow clients to reconnect if they lose a connection at particular times.

Description of the change

This is the rollup of the client reconnect code (bug #819604) for bzr-2.3. The only significant tweaks vs the 2.2 version is that 'osutils.read_bytes_from_socket' already handled WSAECONNABORTED and that NEWS is now split up into multiple files, rather than just being NEWS.

Again, reviews should be done on 2.1 to start with, this is created to handle any merge conflicts, etc that result in up-porting the code.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
--- bzrlib/help_topics/en/debug-flags.txt 2010-10-06 03:20:28 +0000
+++ bzrlib/help_topics/en/debug-flags.txt 2011-10-10 13:55:27 +0000
@@ -24,6 +24,8 @@
24-Dindex Trace major index operations.24-Dindex Trace major index operations.
25-Dknit Trace knit operations.25-Dknit Trace knit operations.
26-Dlock Trace when lockdir locks are taken or released.26-Dlock Trace when lockdir locks are taken or released.
27-Dnoretry If a connection is reset, fail immediately rather than
28 retrying the request.
27-Dprogress Trace progress bar operations.29-Dprogress Trace progress bar operations.
28-Dmem_dump Dump memory to a file upon an out of memory error.30-Dmem_dump Dump memory to a file upon an out of memory error.
29-Dmerge Emit information for debugging merges.31-Dmerge Emit information for debugging merges.
3032
=== modified file 'bzrlib/smart/client.py'
--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/client.py 2011-10-10 13:55:27 +0000
@@ -14,12 +14,18 @@
14# along with this program; if not, write to the Free Software14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1616
17from bzrlib import lazy_import
18lazy_import.lazy_import(globals(), """
19from bzrlib.smart import request as _mod_request
20""")
21
17import bzrlib22import bzrlib
18from bzrlib.smart import message, protocol23from bzrlib.smart import message, protocol
19from bzrlib.trace import warning
20from bzrlib import (24from bzrlib import (
25 debug,
21 errors,26 errors,
22 hooks,27 hooks,
28 trace,
23 )29 )
2430
2531
@@ -39,93 +45,12 @@
39 def __repr__(self):45 def __repr__(self):
40 return '%s(%r)' % (self.__class__.__name__, self._medium)46 return '%s(%r)' % (self.__class__.__name__, self._medium)
4147
42 def _send_request(self, protocol_version, method, args, body=None,
43 readv_body=None, body_stream=None):
44 encoder, response_handler = self._construct_protocol(
45 protocol_version)
46 encoder.set_headers(self._headers)
47 if body is not None:
48 if readv_body is not None:
49 raise AssertionError(
50 "body and readv_body are mutually exclusive.")
51 if body_stream is not None:
52 raise AssertionError(
53 "body and body_stream are mutually exclusive.")
54 encoder.call_with_body_bytes((method, ) + args, body)
55 elif readv_body is not None:
56 if body_stream is not None:
57 raise AssertionError(
58 "readv_body and body_stream are mutually exclusive.")
59 encoder.call_with_body_readv_array((method, ) + args, readv_body)
60 elif body_stream is not None:
61 encoder.call_with_body_stream((method, ) + args, body_stream)
62 else:
63 encoder.call(method, *args)
64 return response_handler
65
66 def _run_call_hooks(self, method, args, body, readv_body):
67 if not _SmartClient.hooks['call']:
68 return
69 params = CallHookParams(method, args, body, readv_body, self._medium)
70 for hook in _SmartClient.hooks['call']:
71 hook(params)
72
73 def _call_and_read_response(self, method, args, body=None, readv_body=None,48 def _call_and_read_response(self, method, args, body=None, readv_body=None,
74 body_stream=None, expect_response_body=True):49 body_stream=None, expect_response_body=True):
75 self._run_call_hooks(method, args, body, readv_body)50 request = _SmartClientRequest(self, method, args, body=body,
76 if self._medium._protocol_version is not None:51 readv_body=readv_body, body_stream=body_stream,
77 response_handler = self._send_request(52 expect_response_body=expect_response_body)
78 self._medium._protocol_version, method, args, body=body,53 return request.call_and_read_response()
79 readv_body=readv_body, body_stream=body_stream)
80 return (response_handler.read_response_tuple(
81 expect_body=expect_response_body),
82 response_handler)
83 else:
84 for protocol_version in [3, 2]:
85 if protocol_version == 2:
86 # If v3 doesn't work, the remote side is older than 1.6.
87 self._medium._remember_remote_is_before((1, 6))
88 response_handler = self._send_request(
89 protocol_version, method, args, body=body,
90 readv_body=readv_body, body_stream=body_stream)
91 try:
92 response_tuple = response_handler.read_response_tuple(
93 expect_body=expect_response_body)
94 except errors.UnexpectedProtocolVersionMarker, err:
95 # TODO: We could recover from this without disconnecting if
96 # we recognise the protocol version.
97 warning(
98 'Server does not understand Bazaar network protocol %d,'
99 ' reconnecting. (Upgrade the server to avoid this.)'
100 % (protocol_version,))
101 self._medium.disconnect()
102 continue
103 except errors.ErrorFromSmartServer:
104 # If we received an error reply from the server, then it
105 # must be ok with this protocol version.
106 self._medium._protocol_version = protocol_version
107 raise
108 else:
109 self._medium._protocol_version = protocol_version
110 return response_tuple, response_handler
111 raise errors.SmartProtocolError(
112 'Server is not a Bazaar server: ' + str(err))
113
114 def _construct_protocol(self, version):
115 request = self._medium.get_request()
116 if version == 3:
117 request_encoder = protocol.ProtocolThreeRequester(request)
118 response_handler = message.ConventionalResponseHandler()
119 response_proto = protocol.ProtocolThreeDecoder(
120 response_handler, expect_version_marker=True)
121 response_handler.setProtoAndMediumRequest(response_proto, request)
122 elif version == 2:
123 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
124 response_handler = request_encoder
125 else:
126 request_encoder = protocol.SmartClientRequestProtocolOne(request)
127 response_handler = request_encoder
128 return request_encoder, response_handler
12954
130 def call(self, method, *args):55 def call(self, method, *args):
131 """Call a method on the remote server."""56 """Call a method on the remote server."""
@@ -191,6 +116,203 @@
191 return self._medium.remote_path_from_transport(transport)116 return self._medium.remote_path_from_transport(transport)
192117
193118
119class _SmartClientRequest(object):
120 """Encapsulate the logic for a single request.
121
122 This class handles things like reconnecting and sending the request a
123 second time when the connection is reset in the middle. It also handles the
124 multiple requests that get made if we don't know what protocol the server
125 supports yet.
126
127 Generally, you build up one of these objects, passing in the arguments that
128 you want to send to the server, and then use 'call_and_read_response' to
129 get the response from the server.
130 """
131
132 def __init__(self, client, method, args, body=None, readv_body=None,
133 body_stream=None, expect_response_body=True):
134 self.client = client
135 self.method = method
136 self.args = args
137 self.body = body
138 self.readv_body = readv_body
139 self.body_stream = body_stream
140 self.expect_response_body = expect_response_body
141
142 def call_and_read_response(self):
143 """Send the request to the server, and read the initial response.
144
145 This doesn't read all of the body content of the response, instead it
146 returns (response_tuple, response_handler). response_tuple is the 'ok',
147 or 'error' information, and 'response_handler' can be used to get the
148 content stream out.
149 """
150 self._run_call_hooks()
151 protocol_version = self.client._medium._protocol_version
152 if protocol_version is None:
153 return self._call_determining_protocol_version()
154 else:
155 return self._call(protocol_version)
156
157 def _is_safe_to_send_twice(self):
158 """Check if the current method is re-entrant safe."""
159 if self.body_stream is not None or 'noretry' in debug.debug_flags:
160 # We can't restart a body stream that has already been consumed.
161 return False
162 request_type = _mod_request.request_handlers.get_info(self.method)
163 if request_type in ('read', 'idem', 'semi'):
164 return True
165 # If we have gotten this far, 'stream' cannot be retried, because we
166 # already consumed the local stream.
167 if request_type in ('semivfs', 'mutate', 'stream'):
168 return False
169 trace.mutter('Unknown request type: %s for method %s'
170 % (request_type, self.method))
171 return False
172
173 def _run_call_hooks(self):
174 if not _SmartClient.hooks['call']:
175 return
176 params = CallHookParams(self.method, self.args, self.body,
177 self.readv_body, self.client._medium)
178 for hook in _SmartClient.hooks['call']:
179 hook(params)
180
181 def _call(self, protocol_version):
182 """We know the protocol version.
183
184 So this just sends the request, and then reads the response. This is
185 where the code will be to retry requests if the connection is closed.
186 """
187 response_handler = self._send(protocol_version)
188 try:
189 response_tuple = response_handler.read_response_tuple(
190 expect_body=self.expect_response_body)
191 except errors.ConnectionReset, e:
192 self.client._medium.reset()
193 if not self._is_safe_to_send_twice():
194 raise
195 trace.warning('ConnectionReset reading response for %r, retrying'
196 % (self.method,))
197 trace.log_exception_quietly()
198 encoder, response_handler = self._construct_protocol(
199 protocol_version)
200 self._send_no_retry(encoder)
201 response_tuple = response_handler.read_response_tuple(
202 expect_body=self.expect_response_body)
203 return (response_tuple, response_handler)
204
205 def _call_determining_protocol_version(self):
206 """Determine what protocol the remote server supports.
207
208 We do this by placing a request in the most recent protocol, and
209 handling the UnexpectedProtocolVersionMarker from the server.
210 """
211 for protocol_version in [3, 2]:
212 if protocol_version == 2:
213 # If v3 doesn't work, the remote side is older than 1.6.
214 self.client._medium._remember_remote_is_before((1, 6))
215 try:
216 response_tuple, response_handler = self._call(protocol_version)
217 except errors.UnexpectedProtocolVersionMarker, err:
218 # TODO: We could recover from this without disconnecting if
219 # we recognise the protocol version.
220 trace.warning(
221 'Server does not understand Bazaar network protocol %d,'
222 ' reconnecting. (Upgrade the server to avoid this.)'
223 % (protocol_version,))
224 self.client._medium.disconnect()
225 continue
226 except errors.ErrorFromSmartServer:
227 # If we received an error reply from the server, then it
228 # must be ok with this protocol version.
229 self.client._medium._protocol_version = protocol_version
230 raise
231 else:
232 self.client._medium._protocol_version = protocol_version
233 return response_tuple, response_handler
234 raise errors.SmartProtocolError(
235 'Server is not a Bazaar server: ' + str(err))
236
237 def _construct_protocol(self, version):
238 """Build the encoding stack for a given protocol version."""
239 request = self.client._medium.get_request()
240 if version == 3:
241 request_encoder = protocol.ProtocolThreeRequester(request)
242 response_handler = message.ConventionalResponseHandler()
243 response_proto = protocol.ProtocolThreeDecoder(
244 response_handler, expect_version_marker=True)
245 response_handler.setProtoAndMediumRequest(response_proto, request)
246 elif version == 2:
247 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
248 response_handler = request_encoder
249 else:
250 request_encoder = protocol.SmartClientRequestProtocolOne(request)
251 response_handler = request_encoder
252 return request_encoder, response_handler
253
254 def _send(self, protocol_version):
255 """Encode the request, and send it to the server.
256
257 This will retry a request if we get a ConnectionReset while sending the
258 request to the server. (Unless we have a body_stream that we have
259 already started consuming, since we can't restart body_streams)
260
261 :return: response_handler as defined by _construct_protocol
262 """
263 encoder, response_handler = self._construct_protocol(protocol_version)
264 try:
265 self._send_no_retry(encoder)
266 except errors.ConnectionReset, e:
267 # If we fail during the _send_no_retry phase, then we can
268 # be confident that the server did not get our request, because we
269 # haven't started waiting for the reply yet. So try the request
270 # again. We only issue a single retry, because if the connection
271 # really is down, there is no reason to loop endlessly.
272
273 # Connection is dead, so close our end of it.
274 self.client._medium.reset()
275 if (('noretry' in debug.debug_flags)
276 or (self.body_stream is not None
277 and encoder.body_stream_started)):
278 # We can't restart a body_stream that has been partially
279 # consumed, so we don't retry.
280 # Note: We don't have to worry about
281 # SmartClientRequestProtocolOne or Two, because they don't
282 # support client-side body streams.
283 raise
284 trace.warning('ConnectionReset calling %r, retrying'
285 % (self.method,))
286 trace.log_exception_quietly()
287 encoder, response_handler = self._construct_protocol(
288 protocol_version)
289 self._send_no_retry(encoder)
290 return response_handler
291
292 def _send_no_retry(self, encoder):
293 """Just encode the request and try to send it."""
294 encoder.set_headers(self.client._headers)
295 if self.body is not None:
296 if self.readv_body is not None:
297 raise AssertionError(
298 "body and readv_body are mutually exclusive.")
299 if self.body_stream is not None:
300 raise AssertionError(
301 "body and body_stream are mutually exclusive.")
302 encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
303 elif self.readv_body is not None:
304 if self.body_stream is not None:
305 raise AssertionError(
306 "readv_body and body_stream are mutually exclusive.")
307 encoder.call_with_body_readv_array((self.method, ) + self.args,
308 self.readv_body)
309 elif self.body_stream is not None:
310 encoder.call_with_body_stream((self.method, ) + self.args,
311 self.body_stream)
312 else:
313 encoder.call(self.method, *self.args)
314
315
194class SmartClientHooks(hooks.Hooks):316class SmartClientHooks(hooks.Hooks):
195317
196 def __init__(self):318 def __init__(self):
197319
=== modified file 'bzrlib/smart/medium.py'
--- bzrlib/smart/medium.py 2010-06-21 08:08:04 +0000
+++ bzrlib/smart/medium.py 2011-10-10 13:55:27 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2011 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -24,6 +24,7 @@
24bzrlib/transport/smart/__init__.py.24bzrlib/transport/smart/__init__.py.
25"""25"""
2626
27import errno
27import os28import os
28import sys29import sys
29import urllib30import urllib
@@ -710,6 +711,14 @@
710 """711 """
711 return SmartClientStreamMediumRequest(self)712 return SmartClientStreamMediumRequest(self)
712713
714 def reset(self):
715 """We have been disconnected, reset current state.
716
717 This resets things like _current_request and connected state.
718 """
719 self.disconnect()
720 self._current_request = None
721
713722
714class SmartSimplePipesClientMedium(SmartClientStreamMedium):723class SmartSimplePipesClientMedium(SmartClientStreamMedium):
715 """A client medium using simple pipes.724 """A client medium using simple pipes.
@@ -724,11 +733,20 @@
724733
725 def _accept_bytes(self, bytes):734 def _accept_bytes(self, bytes):
726 """See SmartClientStreamMedium.accept_bytes."""735 """See SmartClientStreamMedium.accept_bytes."""
727 self._writeable_pipe.write(bytes)736 try:
737 self._writeable_pipe.write(bytes)
738 except IOError, e:
739 if e.errno in (errno.EINVAL, errno.EPIPE):
740 raise errors.ConnectionReset(
741 "Error trying to write to subprocess:\n%s" % (e,))
742 raise
728 self._report_activity(len(bytes), 'write')743 self._report_activity(len(bytes), 'write')
729744
730 def _flush(self):745 def _flush(self):
731 """See SmartClientStreamMedium._flush()."""746 """See SmartClientStreamMedium._flush()."""
747 # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
748 # However, testing shows that even when the child process is
749 # gone, this doesn't error.
732 self._writeable_pipe.flush()750 self._writeable_pipe.flush()
733751
734 def _read_bytes(self, count):752 def _read_bytes(self, count):
@@ -753,8 +771,8 @@
753771
754class SmartSSHClientMedium(SmartClientStreamMedium):772class SmartSSHClientMedium(SmartClientStreamMedium):
755 """A client medium using SSH.773 """A client medium using SSH.
756 774
757 It delegates IO to a SmartClientSocketMedium or775 It delegates IO to a SmartSimplePipesClientMedium or
758 SmartClientAlreadyConnectedSocketMedium (depending on platform).776 SmartClientAlreadyConnectedSocketMedium (depending on platform).
759 """777 """
760778
@@ -993,5 +1011,3 @@
993 This invokes self._medium._flush to ensure all bytes are transmitted.1011 This invokes self._medium._flush to ensure all bytes are transmitted.
994 """1012 """
995 self._medium._flush()1013 self._medium._flush()
996
997
9981014
=== modified file 'bzrlib/smart/protocol.py'
--- bzrlib/smart/protocol.py 2010-06-11 05:57:09 +0000
+++ bzrlib/smart/protocol.py 2011-10-10 13:55:27 +0000
@@ -1081,9 +1081,6 @@
1081 self._real_write_func = write_func1081 self._real_write_func = write_func
10821082
1083 def _write_func(self, bytes):1083 def _write_func(self, bytes):
1084 # TODO: It is probably more appropriate to use sum(map(len, _buf))
1085 # for total number of bytes to write, rather than buffer based on
1086 # the number of write() calls
1087 # TODO: Another possibility would be to turn this into an async model.1084 # TODO: Another possibility would be to turn this into an async model.
1088 # Where we let another thread know that we have some bytes if1085 # Where we let another thread know that we have some bytes if
1089 # they want it, but we don't actually block for it1086 # they want it, but we don't actually block for it
@@ -1292,6 +1289,7 @@
1292 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)1289 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1293 self._medium_request = medium_request1290 self._medium_request = medium_request
1294 self._headers = {}1291 self._headers = {}
1292 self.body_stream_started = None
12951293
1296 def set_headers(self, headers):1294 def set_headers(self, headers):
1297 self._headers = headers.copy()1295 self._headers = headers.copy()
@@ -1357,6 +1355,7 @@
1357 if path is not None:1355 if path is not None:
1358 mutter(' (to %s)', path)1356 mutter(' (to %s)', path)
1359 self._request_start_time = osutils.timer_func()1357 self._request_start_time = osutils.timer_func()
1358 self.body_stream_started = False
1360 self._write_protocol_version()1359 self._write_protocol_version()
1361 self._write_headers(self._headers)1360 self._write_headers(self._headers)
1362 self._write_structure(args)1361 self._write_structure(args)
@@ -1364,6 +1363,9 @@
1364 # have finished sending the stream. We would notice at the end1363 # have finished sending the stream. We would notice at the end
1365 # anyway, but if the medium can deliver it early then it's good1364 # anyway, but if the medium can deliver it early then it's good
1366 # to short-circuit the whole request...1365 # to short-circuit the whole request...
1366 # Provoke any ConnectionReset failures before we start the body stream.
1367 self.flush()
1368 self.body_stream_started = True
1367 for exc_info, part in _iter_with_errors(stream):1369 for exc_info, part in _iter_with_errors(stream):
1368 if exc_info is not None:1370 if exc_info is not None:
1369 # Iterating the stream failed. Cleanly abort the request.1371 # Iterating the stream failed. Cleanly abort the request.
13701372
=== modified file 'bzrlib/smart/request.py'
--- bzrlib/smart/request.py 2010-05-13 16:17:54 +0000
+++ bzrlib/smart/request.py 2011-10-10 13:55:27 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2011 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -486,154 +486,199 @@
486 return SuccessfulSmartServerResponse((answer,))486 return SuccessfulSmartServerResponse((answer,))
487487
488488
489# In the 'info' attribute, we store whether this request is 'safe' to retry if
490# we get a disconnect while reading the response. It can have the values:
491# read This is purely a read request, so retrying it is perfectly ok.
492# idem An idempotent write request. Something like 'put' where if you put
493# the same bytes twice you end up with the same final bytes.
494# semi This is a request that isn't strictly idempotent, but doesn't
495# result in corruption if it is retried. This is for things like
496# 'lock' and 'unlock'. If you call lock, it updates the disk
497# structure. If you fail to read the response, you won't be able to
498# use the lock, because you don't have the lock token. Calling lock
499# again will fail, because the lock is already taken. However, we
500# can't tell if the server received our request or not. If it didn't,
501# then retrying the request is fine, as it will actually do what we
502# want. If it did, we will interrupt the current operation, but we
503# are no worse off than interrupting the current operation because of
504# a ConnectionReset.
505# semivfs Similar to semi, but specific to a Virtual FileSystem request.
506# stream This is a request that takes a stream that cannot be restarted if
507# consumed. This request is 'safe' in that if we determine the
508# connection is closed before we consume the stream, we can try
509# again.
510# mutate State is updated in a way that replaying that request results in a
511# different state. For example 'append' writes more bytes to a given
512# file. If append succeeds, it moves the file pointer.
489request_handlers = registry.Registry()513request_handlers = registry.Registry()
490request_handlers.register_lazy(514request_handlers.register_lazy(
491 'append', 'bzrlib.smart.vfs', 'AppendRequest')515 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
492request_handlers.register_lazy(516request_handlers.register_lazy(
493 'Branch.get_config_file', 'bzrlib.smart.branch',517 'Branch.get_config_file', 'bzrlib.smart.branch',
494 'SmartServerBranchGetConfigFile')518 'SmartServerBranchGetConfigFile', info='read')
495request_handlers.register_lazy(519request_handlers.register_lazy(
496 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')520 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
521 info='read')
497request_handlers.register_lazy(522request_handlers.register_lazy(
498 'Branch.get_tags_bytes', 'bzrlib.smart.branch',523 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
499 'SmartServerBranchGetTagsBytes')524 'SmartServerBranchGetTagsBytes', info='read')
500request_handlers.register_lazy(525request_handlers.register_lazy(
501 'Branch.set_tags_bytes', 'bzrlib.smart.branch',526 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
502 'SmartServerBranchSetTagsBytes')527 'SmartServerBranchSetTagsBytes', info='idem')
503request_handlers.register_lazy(528request_handlers.register_lazy(
504 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')529 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
505request_handlers.register_lazy(530 'SmartServerBranchRequestGetStackedOnURL', info='read')
506 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')531request_handlers.register_lazy(
507request_handlers.register_lazy(532 'Branch.last_revision_info', 'bzrlib.smart.branch',
508 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')533 'SmartServerBranchRequestLastRevisionInfo', info='read')
509request_handlers.register_lazy( 'Branch.revision_history',534request_handlers.register_lazy(
510 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')535 'Branch.lock_write', 'bzrlib.smart.branch',
511request_handlers.register_lazy( 'Branch.set_config_option',536 'SmartServerBranchRequestLockWrite', info='semi')
512 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')537request_handlers.register_lazy(
513request_handlers.register_lazy( 'Branch.set_config_option_dict',538 'Branch.revision_history', 'bzrlib.smart.branch',
514 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict')539 'SmartServerRequestRevisionHistory', info='read')
515request_handlers.register_lazy( 'Branch.set_last_revision',540request_handlers.register_lazy(
516 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')541 'Branch.set_config_option', 'bzrlib.smart.branch',
542 'SmartServerBranchRequestSetConfigOption', info='idem')
543request_handlers.register_lazy(
544 'Branch.set_config_option_dict', 'bzrlib.smart.branch',
545 'SmartServerBranchRequestSetConfigOptionDict', info='idem')
546request_handlers.register_lazy(
547 'Branch.set_last_revision', 'bzrlib.smart.branch',
548 'SmartServerBranchRequestSetLastRevision', info='idem')
517request_handlers.register_lazy(549request_handlers.register_lazy(
518 'Branch.set_last_revision_info', 'bzrlib.smart.branch',550 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
519 'SmartServerBranchRequestSetLastRevisionInfo')551 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
520request_handlers.register_lazy(552request_handlers.register_lazy(
521 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',553 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
522 'SmartServerBranchRequestSetLastRevisionEx')554 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
523request_handlers.register_lazy(555request_handlers.register_lazy(
524 'Branch.set_parent_location', 'bzrlib.smart.branch',556 'Branch.set_parent_location', 'bzrlib.smart.branch',
525 'SmartServerBranchRequestSetParentLocation')557 'SmartServerBranchRequestSetParentLocation', info='idem')
526request_handlers.register_lazy(558request_handlers.register_lazy(
527 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')559 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock',
560 info='semi')
528request_handlers.register_lazy(561request_handlers.register_lazy(
529 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',562 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
530 'SmartServerBzrDirRequestCloningMetaDir')563 'SmartServerBzrDirRequestCloningMetaDir', info='read')
531request_handlers.register_lazy(564request_handlers.register_lazy(
532 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',565 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
533 'SmartServerRequestCreateBranch')566 'SmartServerRequestCreateBranch', info='semi')
534request_handlers.register_lazy(567request_handlers.register_lazy(
535 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',568 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
536 'SmartServerRequestCreateRepository')569 'SmartServerRequestCreateRepository', info='semi')
537request_handlers.register_lazy(570request_handlers.register_lazy(
538 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',571 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
539 'SmartServerRequestFindRepositoryV1')572 'SmartServerRequestFindRepositoryV1', info='read')
540request_handlers.register_lazy(573request_handlers.register_lazy(
541 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',574 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
542 'SmartServerRequestFindRepositoryV2')575 'SmartServerRequestFindRepositoryV2', info='read')
543request_handlers.register_lazy(576request_handlers.register_lazy(
544 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',577 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
545 'SmartServerRequestFindRepositoryV3')578 'SmartServerRequestFindRepositoryV3', info='read')
546request_handlers.register_lazy(579request_handlers.register_lazy(
547 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',580 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
548 'SmartServerBzrDirRequestConfigFile')581 'SmartServerBzrDirRequestConfigFile', info='read')
549request_handlers.register_lazy(582request_handlers.register_lazy(
550 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',583 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
551 'SmartServerRequestInitializeBzrDir')584 'SmartServerRequestInitializeBzrDir', info='semi')
552request_handlers.register_lazy(585request_handlers.register_lazy(
553 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',586 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
554 'SmartServerRequestBzrDirInitializeEx')587 'SmartServerRequestBzrDirInitializeEx', info='semi')
555request_handlers.register_lazy(588request_handlers.register_lazy(
556 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')589 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
557request_handlers.register_lazy(590 info='read')
558 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')591request_handlers.register_lazy(
592 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
593 'SmartServerRequestOpenBzrDir_2_1', info='read')
559request_handlers.register_lazy(594request_handlers.register_lazy(
560 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',595 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
561 'SmartServerRequestOpenBranch')596 'SmartServerRequestOpenBranch', info='read')
562request_handlers.register_lazy(597request_handlers.register_lazy(
563 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',598 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
564 'SmartServerRequestOpenBranchV2')599 'SmartServerRequestOpenBranchV2', info='read')
565request_handlers.register_lazy(600request_handlers.register_lazy(
566 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',601 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
567 'SmartServerRequestOpenBranchV3')602 'SmartServerRequestOpenBranchV3', info='read')
568request_handlers.register_lazy(603request_handlers.register_lazy(
569 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')604 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
570request_handlers.register_lazy(605request_handlers.register_lazy(
571 'get', 'bzrlib.smart.vfs', 'GetRequest')606 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
572request_handlers.register_lazy(607request_handlers.register_lazy(
573 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')608 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
574request_handlers.register_lazy(609request_handlers.register_lazy(
575 'has', 'bzrlib.smart.vfs', 'HasRequest')610 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
576request_handlers.register_lazy(611request_handlers.register_lazy(
577 'hello', 'bzrlib.smart.request', 'HelloRequest')612 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
578request_handlers.register_lazy(613request_handlers.register_lazy(
579 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')614 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
580request_handlers.register_lazy(615 info='read')
581 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')616request_handlers.register_lazy(
582request_handlers.register_lazy(617 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
583 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')618request_handlers.register_lazy(
584request_handlers.register_lazy(619 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
585 'move', 'bzrlib.smart.vfs', 'MoveRequest')620request_handlers.register_lazy(
586request_handlers.register_lazy(621 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
587 'put', 'bzrlib.smart.vfs', 'PutRequest')622request_handlers.register_lazy(
588request_handlers.register_lazy(623 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
589 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')624request_handlers.register_lazy(
590request_handlers.register_lazy(625 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
591 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')626request_handlers.register_lazy(
592request_handlers.register_lazy(627 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
593 'rename', 'bzrlib.smart.vfs', 'RenameRequest')628request_handlers.register_lazy(
629 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
594request_handlers.register_lazy(630request_handlers.register_lazy(
595 'PackRepository.autopack', 'bzrlib.smart.packrepository',631 'PackRepository.autopack', 'bzrlib.smart.packrepository',
596 'SmartServerPackRepositoryAutopack')632 'SmartServerPackRepositoryAutopack', info='idem')
597request_handlers.register_lazy('Repository.gather_stats',633request_handlers.register_lazy(
598 'bzrlib.smart.repository',634 'Repository.gather_stats', 'bzrlib.smart.repository',
599 'SmartServerRepositoryGatherStats')635 'SmartServerRepositoryGatherStats', info='read')
600request_handlers.register_lazy('Repository.get_parent_map',636request_handlers.register_lazy(
601 'bzrlib.smart.repository',637 'Repository.get_parent_map', 'bzrlib.smart.repository',
602 'SmartServerRepositoryGetParentMap')638 'SmartServerRepositoryGetParentMap', info='read')
603request_handlers.register_lazy(639request_handlers.register_lazy(
604 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')640 'Repository.get_revision_graph', 'bzrlib.smart.repository',
605request_handlers.register_lazy(641 'SmartServerRepositoryGetRevisionGraph', info='read')
606 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')642request_handlers.register_lazy(
607request_handlers.register_lazy(643 'Repository.has_revision', 'bzrlib.smart.repository',
608 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')644 'SmartServerRequestHasRevision', info='read')
609request_handlers.register_lazy(645request_handlers.register_lazy(
610 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')646 'Repository.insert_stream', 'bzrlib.smart.repository',
611request_handlers.register_lazy(647 'SmartServerRepositoryInsertStream', info='stream')
612 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')648request_handlers.register_lazy(
613request_handlers.register_lazy(649 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
614 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')650 'SmartServerRepositoryInsertStream_1_19', info='stream')
615request_handlers.register_lazy(651request_handlers.register_lazy(
616 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')652 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
653 'SmartServerRepositoryInsertStreamLocked', info='stream')
654request_handlers.register_lazy(
655 'Repository.is_shared', 'bzrlib.smart.repository',
656 'SmartServerRepositoryIsShared', info='read')
657request_handlers.register_lazy(
658 'Repository.lock_write', 'bzrlib.smart.repository',
659 'SmartServerRepositoryLockWrite', info='semi')
617request_handlers.register_lazy(660request_handlers.register_lazy(
618 'Repository.set_make_working_trees', 'bzrlib.smart.repository',661 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
619 'SmartServerRepositorySetMakeWorkingTrees')662 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
620request_handlers.register_lazy(663request_handlers.register_lazy(
621 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')664 'Repository.unlock', 'bzrlib.smart.repository',
665 'SmartServerRepositoryUnlock', info='semi')
622request_handlers.register_lazy(666request_handlers.register_lazy(
623 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',667 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
624 'SmartServerRepositoryGetRevIdForRevno')668 'SmartServerRepositoryGetRevIdForRevno', info='read')
625request_handlers.register_lazy(669request_handlers.register_lazy(
626 'Repository.get_stream', 'bzrlib.smart.repository',670 'Repository.get_stream', 'bzrlib.smart.repository',
627 'SmartServerRepositoryGetStream')671 'SmartServerRepositoryGetStream', info='read')
628request_handlers.register_lazy(672request_handlers.register_lazy(
629 'Repository.get_stream_1.19', 'bzrlib.smart.repository',673 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
630 'SmartServerRepositoryGetStream_1_19')674 'SmartServerRepositoryGetStream_1_19', info='read')
631request_handlers.register_lazy(675request_handlers.register_lazy(
632 'Repository.tarball', 'bzrlib.smart.repository',676 'Repository.tarball', 'bzrlib.smart.repository',
633 'SmartServerRepositoryTarball')677 'SmartServerRepositoryTarball', info='read')
634request_handlers.register_lazy(678request_handlers.register_lazy(
635 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')679 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
636request_handlers.register_lazy(680request_handlers.register_lazy(
637 'stat', 'bzrlib.smart.vfs', 'StatRequest')681 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
638request_handlers.register_lazy(682request_handlers.register_lazy(
639 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')683 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly',
684 info='read')
640685
=== modified file 'bzrlib/tests/test_smart.py'
--- bzrlib/tests/test_smart.py 2011-01-12 01:01:53 +0000
+++ bzrlib/tests/test_smart.py 2011-10-10 13:55:27 +0000
@@ -1844,8 +1844,11 @@
1844 """All registered request_handlers can be found."""1844 """All registered request_handlers can be found."""
1845 # If there's a typo in a register_lazy call, this loop will fail with1845 # If there's a typo in a register_lazy call, this loop will fail with
1846 # an AttributeError.1846 # an AttributeError.
1847 for key, item in smart_req.request_handlers.iteritems():1847 for key in smart_req.request_handlers.keys():
1848 pass1848 try:
1849 item = smart_req.request_handlers.get(key)
1850 except AttributeError, e:
1851 raise AttributeError('failed to get %s: %s' % (key, e))
18491852
1850 def assertHandlerEqual(self, verb, handler):1853 def assertHandlerEqual(self, verb, handler):
1851 self.assertEqual(smart_req.request_handlers.get(verb), handler)1854 self.assertEqual(smart_req.request_handlers.get(verb), handler)
18521855
=== modified file 'bzrlib/tests/test_smart_request.py'
--- bzrlib/tests/test_smart_request.py 2010-07-13 19:02:12 +0000
+++ bzrlib/tests/test_smart_request.py 2011-10-10 13:55:27 +0000
@@ -111,6 +111,16 @@
111 self.assertEqual(111 self.assertEqual(
112 [[transport]] * 3, handler._command.jail_transports_log)112 [[transport]] * 3, handler._command.jail_transports_log)
113113
114 def test_all_registered_requests_are_safety_qualified(self):
115 unclassified_requests = []
116 allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
117 for key in request.request_handlers.keys():
118 info = request.request_handlers.get_info(key)
119 if info is None or info not in allowed_info:
120 unclassified_requests.append(key)
121 if unclassified_requests:
122 self.fail('These requests were not categorized as safe/unsafe'
123 ' to retry: %s' % (unclassified_requests,))
114124
115125
116class TestSmartRequestHandlerErrorTranslation(TestCase):126class TestSmartRequestHandlerErrorTranslation(TestCase):
117127
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- bzrlib/tests/test_smart_transport.py 2011-01-10 22:20:12 +0000
+++ bzrlib/tests/test_smart_transport.py 2011-10-10 13:55:27 +0000
@@ -18,13 +18,17 @@
1818
19# all of this deals with byte strings so this is safe19# all of this deals with byte strings so this is safe
20from cStringIO import StringIO20from cStringIO import StringIO
21import errno
21import os22import os
22import socket23import socket
24import subprocess
25import sys
23import threading26import threading
2427
25import bzrlib28import bzrlib
26from bzrlib import (29from bzrlib import (
27 bzrdir,30 bzrdir,
31 debug,
28 errors,32 errors,
29 osutils,33 osutils,
30 tests,34 tests,
@@ -53,6 +57,29 @@
53 )57 )
5458
5559
60def create_file_pipes():
61 r, w = os.pipe()
62 # These must be opened without buffering, or we get undefined results
63 rf = os.fdopen(r, 'rb', 0)
64 wf = os.fdopen(w, 'wb', 0)
65 return rf, wf
66
67
68def portable_socket_pair():
69 """Return a pair of TCP sockets connected to each other.
70
71 Unlike socket.socketpair, this should work on Windows.
72 """
73 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74 listen_sock.bind(('127.0.0.1', 0))
75 listen_sock.listen(1)
76 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77 client_sock.connect(listen_sock.getsockname())
78 server_sock, addr = listen_sock.accept()
79 listen_sock.close()
80 return server_sock, client_sock
81
82
56class StringIOSSHVendor(object):83class StringIOSSHVendor(object):
57 """A SSH vendor that uses StringIO to buffer writes and answer reads."""84 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
5885
@@ -67,6 +94,27 @@
67 return StringIOSSHConnection(self)94 return StringIOSSHConnection(self)
6895
6996
97class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
98 """The first connection will be considered closed.
99
100 The second connection will succeed normally.
101 """
102
103 def __init__(self, read_from, write_to, fail_at_write=True):
104 super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
105 write_to)
106 self.fail_at_write = fail_at_write
107 self._first = True
108
109 def connect_ssh(self, username, password, host, port, command):
110 self.calls.append(('connect_ssh', username, password, host, port,
111 command))
112 if self._first:
113 self._first = False
114 return ClosedSSHConnection(self)
115 return StringIOSSHConnection(self)
116
117
70class StringIOSSHConnection(ssh.SSHConnection):118class StringIOSSHConnection(ssh.SSHConnection):
71 """A SSH connection that uses StringIO to buffer writes and answer reads."""119 """A SSH connection that uses StringIO to buffer writes and answer reads."""
72120
@@ -82,6 +130,29 @@
82 return 'pipes', (self.vendor.read_from, self.vendor.write_to)130 return 'pipes', (self.vendor.read_from, self.vendor.write_to)
83131
84132
133class ClosedSSHConnection(ssh.SSHConnection):
134 """An SSH connection that just has closed channels."""
135
136 def __init__(self, vendor):
137 self.vendor = vendor
138
139 def close(self):
140 self.vendor.calls.append(('close', ))
141
142 def get_sock_or_pipes(self):
143 # We create matching pipes, and then close the ssh side
144 bzr_read, ssh_write = create_file_pipes()
145 # We always fail when bzr goes to read
146 ssh_write.close()
147 if self.vendor.fail_at_write:
148 # If set, we'll also fail when bzr goes to write
149 ssh_read, bzr_write = create_file_pipes()
150 ssh_read.close()
151 else:
152 bzr_write = self.vendor.write_to
153 return 'pipes', (bzr_read, bzr_write)
154
155
85class _InvalidHostnameFeature(tests.Feature):156class _InvalidHostnameFeature(tests.Feature):
86 """Does 'non_existent.invalid' fail to resolve?157 """Does 'non_existent.invalid' fail to resolve?
87158
@@ -177,6 +248,91 @@
177 client_medium._accept_bytes('abc')248 client_medium._accept_bytes('abc')
178 self.assertEqual('abc', output.getvalue())249 self.assertEqual('abc', output.getvalue())
179250
251 def test_simple_pipes__accept_bytes_subprocess_closed(self):
252 # It is unfortunate that we have to use Popen for this. However,
253 # os.pipe() does not behave the same as subprocess.Popen().
254 # On Windows, if you use os.pipe() and close the write side,
255 # read.read() hangs. On Linux, read.read() returns the empty string.
256 p = subprocess.Popen([sys.executable, '-c',
257 'import sys\n'
258 'sys.stdout.write(sys.stdin.read(4))\n'
259 'sys.stdout.close()\n'],
260 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
261 client_medium = medium.SmartSimplePipesClientMedium(
262 p.stdout, p.stdin, 'base')
263 client_medium._accept_bytes('abc\n')
264 self.assertEqual('abc', client_medium._read_bytes(3))
265 p.wait()
266 # While writing to the underlying pipe,
267 # Windows py2.6.6 we get IOError(EINVAL)
268 # Lucid py2.6.5, we get IOError(EPIPE)
269 # In both cases, it should be wrapped to ConnectionReset
270 self.assertRaises(errors.ConnectionReset,
271 client_medium._accept_bytes, 'more')
272
273 def test_simple_pipes__accept_bytes_pipe_closed(self):
274 child_read, client_write = create_file_pipes()
275 client_medium = medium.SmartSimplePipesClientMedium(
276 None, client_write, 'base')
277 client_medium._accept_bytes('abc\n')
278 self.assertEqual('abc\n', child_read.read(4))
279 # While writing to the underlying pipe,
280 # Windows py2.6.6 we get IOError(EINVAL)
281 # Lucid py2.6.5, we get IOError(EPIPE)
282 # In both cases, it should be wrapped to ConnectionReset
283 child_read.close()
284 self.assertRaises(errors.ConnectionReset,
285 client_medium._accept_bytes, 'more')
286
287 def test_simple_pipes__flush_pipe_closed(self):
288 child_read, client_write = create_file_pipes()
289 client_medium = medium.SmartSimplePipesClientMedium(
290 None, client_write, 'base')
291 client_medium._accept_bytes('abc\n')
292 child_read.close()
293 # Even though the pipe is closed, flush on the write side seems to be a
294 # no-op, rather than a failure.
295 client_medium._flush()
296
297 def test_simple_pipes__flush_subprocess_closed(self):
298 p = subprocess.Popen([sys.executable, '-c',
299 'import sys\n'
300 'sys.stdout.write(sys.stdin.read(4))\n'
301 'sys.stdout.close()\n'],
302 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
303 client_medium = medium.SmartSimplePipesClientMedium(
304 p.stdout, p.stdin, 'base')
305 client_medium._accept_bytes('abc\n')
306 p.wait()
307 # Even though the child process is dead, flush seems to be a no-op.
308 client_medium._flush()
309
310 def test_simple_pipes__read_bytes_pipe_closed(self):
311 child_read, client_write = create_file_pipes()
312 client_medium = medium.SmartSimplePipesClientMedium(
313 child_read, client_write, 'base')
314 client_medium._accept_bytes('abc\n')
315 client_write.close()
316 self.assertEqual('abc\n', client_medium._read_bytes(4))
317 self.assertEqual('', client_medium._read_bytes(4))
318
319 def test_simple_pipes__read_bytes_subprocess_closed(self):
320 p = subprocess.Popen([sys.executable, '-c',
321 'import sys\n'
322 'if sys.platform == "win32":\n'
323 ' import msvcrt, os\n'
324 ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
325 ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
326 'sys.stdout.write(sys.stdin.read(4))\n'
327 'sys.stdout.close()\n'],
328 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
329 client_medium = medium.SmartSimplePipesClientMedium(
330 p.stdout, p.stdin, 'base')
331 client_medium._accept_bytes('abc\n')
332 p.wait()
333 self.assertEqual('abc\n', client_medium._read_bytes(4))
334 self.assertEqual('', client_medium._read_bytes(4))
335
180 def test_simple_pipes_client_disconnect_does_nothing(self):336 def test_simple_pipes_client_disconnect_does_nothing(self):
181 # calling disconnect does nothing.337 # calling disconnect does nothing.
182 input = StringIO()338 input = StringIO()
@@ -564,6 +720,28 @@
564 request.finished_reading()720 request.finished_reading()
565 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)721 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
566722
723 def test_reset(self):
724 server_sock, client_sock = portable_socket_pair()
725 # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
726 # bzr where it exists.
727 client_medium = medium.SmartTCPClientMedium(None, None, None)
728 client_medium._socket = client_sock
729 client_medium._connected = True
730 req = client_medium.get_request()
731 self.assertRaises(errors.TooManyConcurrentRequests,
732 client_medium.get_request)
733 client_medium.reset()
734 # The stream should be reset, marked as disconnected, though ready for
735 # us to make a new request
736 self.assertFalse(client_medium._connected)
737 self.assertIs(None, client_medium._socket)
738 try:
739 self.assertEqual('', client_sock.recv(1))
740 except socket.error, e:
741 if e.errno not in (errno.EBADF,):
742 raise
743 req = client_medium.get_request()
744
567745
568class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):746class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):
569747
@@ -617,20 +795,6 @@
617 super(TestSmartServerStreamMedium, self).setUp()795 super(TestSmartServerStreamMedium, self).setUp()
618 self.overrideEnv('BZR_NO_SMART_VFS', None)796 self.overrideEnv('BZR_NO_SMART_VFS', None)
619797
620 def portable_socket_pair(self):
621 """Return a pair of TCP sockets connected to each other.
622
623 Unlike socket.socketpair, this should work on Windows.
624 """
625 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
626 listen_sock.bind(('127.0.0.1', 0))
627 listen_sock.listen(1)
628 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
629 client_sock.connect(listen_sock.getsockname())
630 server_sock, addr = listen_sock.accept()
631 listen_sock.close()
632 return server_sock, client_sock
633
634 def test_smart_query_version(self):798 def test_smart_query_version(self):
635 """Feed a canned query version to a server"""799 """Feed a canned query version to a server"""
636 # wire-to-wire, using the whole stack800 # wire-to-wire, using the whole stack
@@ -695,7 +859,7 @@
695859
696 def test_socket_stream_with_bulk_data(self):860 def test_socket_stream_with_bulk_data(self):
697 sample_request_bytes = 'command\n9\nbulk datadone\n'861 sample_request_bytes = 'command\n9\nbulk datadone\n'
698 server_sock, client_sock = self.portable_socket_pair()862 server_sock, client_sock = portable_socket_pair()
699 server = medium.SmartServerSocketStreamMedium(863 server = medium.SmartServerSocketStreamMedium(
700 server_sock, None)864 server_sock, None)
701 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)865 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -714,7 +878,7 @@
714 self.assertTrue(server.finished)878 self.assertTrue(server.finished)
715879
716 def test_socket_stream_shutdown_detection(self):880 def test_socket_stream_shutdown_detection(self):
717 server_sock, client_sock = self.portable_socket_pair()881 server_sock, client_sock = portable_socket_pair()
718 client_sock.close()882 client_sock.close()
719 server = medium.SmartServerSocketStreamMedium(883 server = medium.SmartServerSocketStreamMedium(
720 server_sock, None)884 server_sock, None)
@@ -734,7 +898,7 @@
734 rest_of_request_bytes = 'lo\n'898 rest_of_request_bytes = 'lo\n'
735 expected_response = (899 expected_response = (
736 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')900 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
737 server_sock, client_sock = self.portable_socket_pair()901 server_sock, client_sock = portable_socket_pair()
738 server = medium.SmartServerSocketStreamMedium(902 server = medium.SmartServerSocketStreamMedium(
739 server_sock, None)903 server_sock, None)
740 client_sock.sendall(incomplete_request_bytes)904 client_sock.sendall(incomplete_request_bytes)
@@ -810,7 +974,7 @@
810 # _serve_one_request should still process both of them as if they had974 # _serve_one_request should still process both of them as if they had
811 # been received separately.975 # been received separately.
812 sample_request_bytes = 'command\n'976 sample_request_bytes = 'command\n'
813 server_sock, client_sock = self.portable_socket_pair()977 server_sock, client_sock = portable_socket_pair()
814 server = medium.SmartServerSocketStreamMedium(978 server = medium.SmartServerSocketStreamMedium(
815 server_sock, None)979 server_sock, None)
816 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)980 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -847,7 +1011,7 @@
847 self.assertTrue(server.finished)1011 self.assertTrue(server.finished)
8481012
849 def test_socket_stream_error_handling(self):1013 def test_socket_stream_error_handling(self):
850 server_sock, client_sock = self.portable_socket_pair()1014 server_sock, client_sock = portable_socket_pair()
851 server = medium.SmartServerSocketStreamMedium(1015 server = medium.SmartServerSocketStreamMedium(
852 server_sock, None)1016 server_sock, None)
853 fake_protocol = ErrorRaisingProtocol(Exception('boom'))1017 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
@@ -868,7 +1032,7 @@
868 self.assertEqual('', from_server.getvalue())1032 self.assertEqual('', from_server.getvalue())
8691033
870 def test_socket_stream_keyboard_interrupt_handling(self):1034 def test_socket_stream_keyboard_interrupt_handling(self):
871 server_sock, client_sock = self.portable_socket_pair()1035 server_sock, client_sock = portable_socket_pair()
872 server = medium.SmartServerSocketStreamMedium(1036 server = medium.SmartServerSocketStreamMedium(
873 server_sock, None)1037 server_sock, None)
874 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))1038 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
@@ -885,7 +1049,7 @@
885 return server._build_protocol()1049 return server._build_protocol()
8861050
887 def build_protocol_socket(self, bytes):1051 def build_protocol_socket(self, bytes):
888 server_sock, client_sock = self.portable_socket_pair()1052 server_sock, client_sock = portable_socket_pair()
889 server = medium.SmartServerSocketStreamMedium(1053 server = medium.SmartServerSocketStreamMedium(
890 server_sock, None)1054 server_sock, None)
891 client_sock.sendall(bytes)1055 client_sock.sendall(bytes)
@@ -2792,6 +2956,33 @@
2792 'e', # end2956 'e', # end
2793 output.getvalue())2957 output.getvalue())
27942958
2959 def test_records_start_of_body_stream(self):
2960 requester, output = self.make_client_encoder_and_output()
2961 requester.set_headers({})
2962 in_stream = [False]
2963 def stream_checker():
2964 self.assertTrue(requester.body_stream_started)
2965 in_stream[0] = True
2966 yield 'content'
2967 flush_called = []
2968 orig_flush = requester.flush
2969 def tracked_flush():
2970 flush_called.append(in_stream[0])
2971 if in_stream[0]:
2972 self.assertTrue(requester.body_stream_started)
2973 else:
2974 self.assertFalse(requester.body_stream_started)
2975 return orig_flush()
2976 requester.flush = tracked_flush
2977 requester.call_with_body_stream(('one arg',), stream_checker())
2978 self.assertEqual(
2979 'bzr message 3 (bzr 1.6)\n' # protocol version
2980 '\x00\x00\x00\x02de' # headers
2981 's\x00\x00\x00\x0bl7:one arge' # args
2982 'b\x00\x00\x00\x07content' # body
2983 'e', output.getvalue())
2984 self.assertEqual([False, True, True], flush_called)
2985
27952986
2796class StubMediumRequest(object):2987class StubMediumRequest(object):
2797 """A stub medium request that tracks the number of times accept_bytes is2988 """A stub medium request that tracks the number of times accept_bytes is
@@ -3216,6 +3407,193 @@
3216 # encoder.3407 # encoder.
32173408
32183409
3410class Test_SmartClientRequest(tests.TestCase):
3411
3412 def make_client_with_failing_medium(self, fail_at_write=True, response=''):
3413 response_io = StringIO(response)
3414 output = StringIO()
3415 vendor = FirstRejectedStringIOSSHVendor(response_io, output,
3416 fail_at_write=fail_at_write)
3417 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3418 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3419 smart_client = client._SmartClient(client_medium, headers={})
3420 return output, vendor, smart_client
3421
3422 def make_response(self, args, body=None, body_stream=None):
3423 response_io = StringIO()
3424 response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
3425 body_stream=body_stream)
3426 responder = protocol.ProtocolThreeResponder(response_io.write)
3427 responder.send_response(response)
3428 return response_io.getvalue()
3429
3430 def test__call_doesnt_retry_append(self):
3431 response = self.make_response(('appended', '8'))
3432 output, vendor, smart_client = self.make_client_with_failing_medium(
3433 fail_at_write=False, response=response)
3434 smart_request = client._SmartClientRequest(smart_client, 'append',
3435 ('foo', ''), body='content\n')
3436 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3437
3438 def test__call_retries_get_bytes(self):
3439 response = self.make_response(('ok',), 'content\n')
3440 output, vendor, smart_client = self.make_client_with_failing_medium(
3441 fail_at_write=False, response=response)
3442 smart_request = client._SmartClientRequest(smart_client, 'get',
3443 ('foo',))
3444 response, response_handler = smart_request._call(3)
3445 self.assertEqual(('ok',), response)
3446 self.assertEqual('content\n', response_handler.read_body_bytes())
3447
3448 def test__call_noretry_get_bytes(self):
3449 debug.debug_flags.add('noretry')
3450 response = self.make_response(('ok',), 'content\n')
3451 output, vendor, smart_client = self.make_client_with_failing_medium(
3452 fail_at_write=False, response=response)
3453 smart_request = client._SmartClientRequest(smart_client, 'get',
3454 ('foo',))
3455 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3456
3457 def test__send_no_retry_pipes(self):
3458 client_read, server_write = create_file_pipes()
3459 server_read, client_write = create_file_pipes()
3460 client_medium = medium.SmartSimplePipesClientMedium(client_read,
3461 client_write, base='/')
3462 smart_client = client._SmartClient(client_medium)
3463 smart_request = client._SmartClientRequest(smart_client,
3464 'hello', ())
3465 # Close the server side
3466 server_read.close()
3467 encoder, response_handler = smart_request._construct_protocol(3)
3468 self.assertRaises(errors.ConnectionReset,
3469 smart_request._send_no_retry, encoder)
3470
3471 def test__send_read_response_sockets(self):
3472 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3473 listen_sock.bind(('127.0.0.1', 0))
3474 listen_sock.listen(1)
3475 host, port = listen_sock.getsockname()
3476 client_medium = medium.SmartTCPClientMedium(host, port, '/')
3477 client_medium._ensure_connection()
3478 smart_client = client._SmartClient(client_medium)
3479 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3480 # Accept the connection, but don't actually talk to the client.
3481 server_sock, _ = listen_sock.accept()
3482 server_sock.close()
3483 # Sockets buffer and don't really notice that the server has closed the
3484 # connection until we try to read again.
3485 handler = smart_request._send(3)
3486 self.assertRaises(errors.ConnectionReset,
3487 handler.read_response_tuple, expect_body=False)
3488
3489 def test__send_retries_on_write(self):
3490 output, vendor, smart_client = self.make_client_with_failing_medium()
3491 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3492 handler = smart_request._send(3)
3493 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3494 '\x00\x00\x00\x02de' # empty headers
3495 's\x00\x00\x00\tl5:helloee',
3496 output.getvalue())
3497 self.assertEqual(
3498 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3499 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3500 ('close',),
3501 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3502 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3503 ],
3504 vendor.calls)
3505
3506 def test__send_doesnt_retry_read_failure(self):
3507 output, vendor, smart_client = self.make_client_with_failing_medium(
3508 fail_at_write=False)
3509 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3510 handler = smart_request._send(3)
3511 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3512 '\x00\x00\x00\x02de' # empty headers
3513 's\x00\x00\x00\tl5:helloee',
3514 output.getvalue())
3515 self.assertEqual(
3516 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3517 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3518 ],
3519 vendor.calls)
3520 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3521
3522 def test__send_request_retries_body_stream_if_not_started(self):
3523 output, vendor, smart_client = self.make_client_with_failing_medium()
3524 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3525 body_stream=['a', 'b'])
3526 response_handler = smart_request._send(3)
3527 # We connect, get disconnected, and notice before consuming the stream,
3528 # so we try again one time and succeed.
3529 self.assertEqual(
3530 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3531 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3532 ('close',),
3533 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3534 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3535 ],
3536 vendor.calls)
3537 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3538 '\x00\x00\x00\x02de' # empty headers
3539 's\x00\x00\x00\tl5:helloe'
3540 'b\x00\x00\x00\x01a'
3541 'b\x00\x00\x00\x01b'
3542 'e',
3543 output.getvalue())
3544
3545 def test__send_request_stops_if_body_started(self):
3546 # We intentionally use the python StringIO so that we can subclass it.
3547 from StringIO import StringIO
3548 response = StringIO()
3549
3550 class FailAfterFirstWrite(StringIO):
3551 """Allow one 'write' call to pass, fail the rest"""
3552 def __init__(self):
3553 StringIO.__init__(self)
3554 self._first = True
3555
3556 def write(self, s):
3557 if self._first:
3558 self._first = False
3559 return StringIO.write(self, s)
3560 raise IOError(errno.EINVAL, 'invalid file handle')
3561 output = FailAfterFirstWrite()
3562
3563 vendor = FirstRejectedStringIOSSHVendor(response, output,
3564 fail_at_write=False)
3565 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3566 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3567 smart_client = client._SmartClient(client_medium, headers={})
3568 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3569 body_stream=['a', 'b'])
3570 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3571 # We connect, and manage to get to the point that we start consuming
3572 # the body stream. The next write fails, so we just stop.
3573 self.assertEqual(
3574 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3575 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3576 ('close',),
3577 ],
3578 vendor.calls)
3579 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3580 '\x00\x00\x00\x02de' # empty headers
3581 's\x00\x00\x00\tl5:helloe',
3582 output.getvalue())
3583
3584 def test__send_disabled_retry(self):
3585 debug.debug_flags.add('noretry')
3586 output, vendor, smart_client = self.make_client_with_failing_medium()
3587 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3588 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3589 self.assertEqual(
3590 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3591 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3592 ('close',),
3593 ],
3594 vendor.calls)
3595
3596
3219class LengthPrefixedBodyDecoder(tests.TestCase):3597class LengthPrefixedBodyDecoder(tests.TestCase):
32203598
3221 # XXX: TODO: make accept_reading_trailer invoke translate_response or3599 # XXX: TODO: make accept_reading_trailer invoke translate_response or
32223600
=== modified file 'doc/en/release-notes/bzr-2.1.txt'
--- doc/en/release-notes/bzr-2.1.txt 2011-08-20 09:28:27 +0000
+++ doc/en/release-notes/bzr-2.1.txt 2011-10-10 13:55:27 +0000
@@ -43,6 +43,11 @@
4343
44 (John Arbash Meinel, #609187, #812928)44 (John Arbash Meinel, #609187, #812928)
4545
46* Teach the bzr client how to reconnect if we get ``ConnectionReset``
47 while making an RPC request. This doesn't handle all possible network
48 disconnects, but it should at least handle when the server is asked to
49 shutdown gracefully. (John Arbash Meinel, #819604)
50
46Improvements51Improvements
47************52************
4853
4954
=== modified file 'doc/en/release-notes/bzr-2.2.txt'
--- doc/en/release-notes/bzr-2.2.txt 2011-09-09 12:32:08 +0000
+++ doc/en/release-notes/bzr-2.2.txt 2011-10-10 13:55:27 +0000
@@ -19,6 +19,11 @@
19Bug Fixes19Bug Fixes
20*********20*********
2121
22* Teach the bzr client how to reconnect if we get ``ConnectionReset``
23 while making an RPC request. This doesn't handle all possible network
24 disconnects, but it should at least handle when the server is asked to
25 shutdown gracefully. (John Arbash Meinel, #819604)
26
22Improvements27Improvements
23************28************
2429
2530
=== modified file 'doc/en/release-notes/bzr-2.3.txt'
--- doc/en/release-notes/bzr-2.3.txt 2011-09-09 12:32:08 +0000
+++ doc/en/release-notes/bzr-2.3.txt 2011-10-10 13:55:27 +0000
@@ -35,6 +35,11 @@
35* Cope cleanly with buggy HTTP proxies that close the socket in the middle35* Cope cleanly with buggy HTTP proxies that close the socket in the middle
36 of a multipart response. (Martin Pool, #198646).36 of a multipart response. (Martin Pool, #198646).
3737
38* Teach the bzr client how to reconnect if we get ``ConnectionReset``
39 while making an RPC request. This doesn't handle all possible network
40 disconnects, but it should at least handle when the server is asked to
41 shutdown gracefully. (John Arbash Meinel, #819604)
42
38Documentation43Documentation
39*************44*************
4045

Subscribers

People subscribed via source and target branches