Merge lp:~jameinel/bzr/2.3-client-reconnect-819604 into lp:bzr/2.3

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.3-client-reconnect-819604
Merge into: lp:bzr/2.3
Diff against target: 1387 lines (+810/-217)
11 files modified
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+22/-6)
bzrlib/smart/protocol.py (+5/-3)
bzrlib/smart/request.py (+144/-99)
bzrlib/tests/test_smart.py (+5/-2)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+399/-21)
doc/en/release-notes/bzr-2.1.txt (+5/-0)
doc/en/release-notes/bzr-2.2.txt (+5/-0)
doc/en/release-notes/bzr-2.3.txt (+5/-0)
To merge this branch: bzr merge lp:~jameinel/bzr/2.3-client-reconnect-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78843@code.launchpad.net

Commit message

Bug #819604, allow clients to reconnect if they lose a connection at particular times.

Description of the change

This is the rollup of the client reconnect code (bug #819604) for bzr-2.3. The only significant tweaks vs the 2.2 version is that 'osutils.read_bytes_from_socket' already handled WSAECONNABORTED and that NEWS is now split up into multiple files, rather than just being NEWS.

Again, reviews should be done on 2.1 to start with, this is created to handle any merge conflicts, etc that result in up-porting the code.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
2--- bzrlib/help_topics/en/debug-flags.txt 2010-10-06 03:20:28 +0000
3+++ bzrlib/help_topics/en/debug-flags.txt 2011-10-10 13:55:27 +0000
4@@ -24,6 +24,8 @@
5 -Dindex Trace major index operations.
6 -Dknit Trace knit operations.
7 -Dlock Trace when lockdir locks are taken or released.
8+-Dnoretry If a connection is reset, fail immediately rather than
9+ retrying the request.
10 -Dprogress Trace progress bar operations.
11 -Dmem_dump Dump memory to a file upon an out of memory error.
12 -Dmerge Emit information for debugging merges.
13
14=== modified file 'bzrlib/smart/client.py'
15--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
16+++ bzrlib/smart/client.py 2011-10-10 13:55:27 +0000
17@@ -14,12 +14,18 @@
18 # along with this program; if not, write to the Free Software
19 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
21+from bzrlib import lazy_import
22+lazy_import.lazy_import(globals(), """
23+from bzrlib.smart import request as _mod_request
24+""")
25+
26 import bzrlib
27 from bzrlib.smart import message, protocol
28-from bzrlib.trace import warning
29 from bzrlib import (
30+ debug,
31 errors,
32 hooks,
33+ trace,
34 )
35
36
37@@ -39,93 +45,12 @@
38 def __repr__(self):
39 return '%s(%r)' % (self.__class__.__name__, self._medium)
40
41- def _send_request(self, protocol_version, method, args, body=None,
42- readv_body=None, body_stream=None):
43- encoder, response_handler = self._construct_protocol(
44- protocol_version)
45- encoder.set_headers(self._headers)
46- if body is not None:
47- if readv_body is not None:
48- raise AssertionError(
49- "body and readv_body are mutually exclusive.")
50- if body_stream is not None:
51- raise AssertionError(
52- "body and body_stream are mutually exclusive.")
53- encoder.call_with_body_bytes((method, ) + args, body)
54- elif readv_body is not None:
55- if body_stream is not None:
56- raise AssertionError(
57- "readv_body and body_stream are mutually exclusive.")
58- encoder.call_with_body_readv_array((method, ) + args, readv_body)
59- elif body_stream is not None:
60- encoder.call_with_body_stream((method, ) + args, body_stream)
61- else:
62- encoder.call(method, *args)
63- return response_handler
64-
65- def _run_call_hooks(self, method, args, body, readv_body):
66- if not _SmartClient.hooks['call']:
67- return
68- params = CallHookParams(method, args, body, readv_body, self._medium)
69- for hook in _SmartClient.hooks['call']:
70- hook(params)
71-
72 def _call_and_read_response(self, method, args, body=None, readv_body=None,
73 body_stream=None, expect_response_body=True):
74- self._run_call_hooks(method, args, body, readv_body)
75- if self._medium._protocol_version is not None:
76- response_handler = self._send_request(
77- self._medium._protocol_version, method, args, body=body,
78- readv_body=readv_body, body_stream=body_stream)
79- return (response_handler.read_response_tuple(
80- expect_body=expect_response_body),
81- response_handler)
82- else:
83- for protocol_version in [3, 2]:
84- if protocol_version == 2:
85- # If v3 doesn't work, the remote side is older than 1.6.
86- self._medium._remember_remote_is_before((1, 6))
87- response_handler = self._send_request(
88- protocol_version, method, args, body=body,
89- readv_body=readv_body, body_stream=body_stream)
90- try:
91- response_tuple = response_handler.read_response_tuple(
92- expect_body=expect_response_body)
93- except errors.UnexpectedProtocolVersionMarker, err:
94- # TODO: We could recover from this without disconnecting if
95- # we recognise the protocol version.
96- warning(
97- 'Server does not understand Bazaar network protocol %d,'
98- ' reconnecting. (Upgrade the server to avoid this.)'
99- % (protocol_version,))
100- self._medium.disconnect()
101- continue
102- except errors.ErrorFromSmartServer:
103- # If we received an error reply from the server, then it
104- # must be ok with this protocol version.
105- self._medium._protocol_version = protocol_version
106- raise
107- else:
108- self._medium._protocol_version = protocol_version
109- return response_tuple, response_handler
110- raise errors.SmartProtocolError(
111- 'Server is not a Bazaar server: ' + str(err))
112-
113- def _construct_protocol(self, version):
114- request = self._medium.get_request()
115- if version == 3:
116- request_encoder = protocol.ProtocolThreeRequester(request)
117- response_handler = message.ConventionalResponseHandler()
118- response_proto = protocol.ProtocolThreeDecoder(
119- response_handler, expect_version_marker=True)
120- response_handler.setProtoAndMediumRequest(response_proto, request)
121- elif version == 2:
122- request_encoder = protocol.SmartClientRequestProtocolTwo(request)
123- response_handler = request_encoder
124- else:
125- request_encoder = protocol.SmartClientRequestProtocolOne(request)
126- response_handler = request_encoder
127- return request_encoder, response_handler
128+ request = _SmartClientRequest(self, method, args, body=body,
129+ readv_body=readv_body, body_stream=body_stream,
130+ expect_response_body=expect_response_body)
131+ return request.call_and_read_response()
132
133 def call(self, method, *args):
134 """Call a method on the remote server."""
135@@ -191,6 +116,203 @@
136 return self._medium.remote_path_from_transport(transport)
137
138
139+class _SmartClientRequest(object):
140+ """Encapsulate the logic for a single request.
141+
142+ This class handles things like reconnecting and sending the request a
143+ second time when the connection is reset in the middle. It also handles the
144+ multiple requests that get made if we don't know what protocol the server
145+ supports yet.
146+
147+ Generally, you build up one of these objects, passing in the arguments that
148+ you want to send to the server, and then use 'call_and_read_response' to
149+ get the response from the server.
150+ """
151+
152+ def __init__(self, client, method, args, body=None, readv_body=None,
153+ body_stream=None, expect_response_body=True):
154+ self.client = client
155+ self.method = method
156+ self.args = args
157+ self.body = body
158+ self.readv_body = readv_body
159+ self.body_stream = body_stream
160+ self.expect_response_body = expect_response_body
161+
162+ def call_and_read_response(self):
163+ """Send the request to the server, and read the initial response.
164+
165+ This doesn't read all of the body content of the response, instead it
166+ returns (response_tuple, response_handler). response_tuple is the 'ok',
167+ or 'error' information, and 'response_handler' can be used to get the
168+ content stream out.
169+ """
170+ self._run_call_hooks()
171+ protocol_version = self.client._medium._protocol_version
172+ if protocol_version is None:
173+ return self._call_determining_protocol_version()
174+ else:
175+ return self._call(protocol_version)
176+
177+ def _is_safe_to_send_twice(self):
178+ """Check if the current method is re-entrant safe."""
179+ if self.body_stream is not None or 'noretry' in debug.debug_flags:
180+ # We can't restart a body stream that has already been consumed.
181+ return False
182+ request_type = _mod_request.request_handlers.get_info(self.method)
183+ if request_type in ('read', 'idem', 'semi'):
184+ return True
185+ # If we have gotten this far, 'stream' cannot be retried, because we
186+ # already consumed the local stream.
187+ if request_type in ('semivfs', 'mutate', 'stream'):
188+ return False
189+ trace.mutter('Unknown request type: %s for method %s'
190+ % (request_type, self.method))
191+ return False
192+
193+ def _run_call_hooks(self):
194+ if not _SmartClient.hooks['call']:
195+ return
196+ params = CallHookParams(self.method, self.args, self.body,
197+ self.readv_body, self.client._medium)
198+ for hook in _SmartClient.hooks['call']:
199+ hook(params)
200+
201+ def _call(self, protocol_version):
202+ """We know the protocol version.
203+
204+ So this just sends the request, and then reads the response. This is
205+ where the code will be to retry requests if the connection is closed.
206+ """
207+ response_handler = self._send(protocol_version)
208+ try:
209+ response_tuple = response_handler.read_response_tuple(
210+ expect_body=self.expect_response_body)
211+ except errors.ConnectionReset, e:
212+ self.client._medium.reset()
213+ if not self._is_safe_to_send_twice():
214+ raise
215+ trace.warning('ConnectionReset reading response for %r, retrying'
216+ % (self.method,))
217+ trace.log_exception_quietly()
218+ encoder, response_handler = self._construct_protocol(
219+ protocol_version)
220+ self._send_no_retry(encoder)
221+ response_tuple = response_handler.read_response_tuple(
222+ expect_body=self.expect_response_body)
223+ return (response_tuple, response_handler)
224+
225+ def _call_determining_protocol_version(self):
226+ """Determine what protocol the remote server supports.
227+
228+ We do this by placing a request in the most recent protocol, and
229+ handling the UnexpectedProtocolVersionMarker from the server.
230+ """
231+ for protocol_version in [3, 2]:
232+ if protocol_version == 2:
233+ # If v3 doesn't work, the remote side is older than 1.6.
234+ self.client._medium._remember_remote_is_before((1, 6))
235+ try:
236+ response_tuple, response_handler = self._call(protocol_version)
237+ except errors.UnexpectedProtocolVersionMarker, err:
238+ # TODO: We could recover from this without disconnecting if
239+ # we recognise the protocol version.
240+ trace.warning(
241+ 'Server does not understand Bazaar network protocol %d,'
242+ ' reconnecting. (Upgrade the server to avoid this.)'
243+ % (protocol_version,))
244+ self.client._medium.disconnect()
245+ continue
246+ except errors.ErrorFromSmartServer:
247+ # If we received an error reply from the server, then it
248+ # must be ok with this protocol version.
249+ self.client._medium._protocol_version = protocol_version
250+ raise
251+ else:
252+ self.client._medium._protocol_version = protocol_version
253+ return response_tuple, response_handler
254+ raise errors.SmartProtocolError(
255+ 'Server is not a Bazaar server: ' + str(err))
256+
257+ def _construct_protocol(self, version):
258+ """Build the encoding stack for a given protocol version."""
259+ request = self.client._medium.get_request()
260+ if version == 3:
261+ request_encoder = protocol.ProtocolThreeRequester(request)
262+ response_handler = message.ConventionalResponseHandler()
263+ response_proto = protocol.ProtocolThreeDecoder(
264+ response_handler, expect_version_marker=True)
265+ response_handler.setProtoAndMediumRequest(response_proto, request)
266+ elif version == 2:
267+ request_encoder = protocol.SmartClientRequestProtocolTwo(request)
268+ response_handler = request_encoder
269+ else:
270+ request_encoder = protocol.SmartClientRequestProtocolOne(request)
271+ response_handler = request_encoder
272+ return request_encoder, response_handler
273+
274+ def _send(self, protocol_version):
275+ """Encode the request, and send it to the server.
276+
277+ This will retry a request if we get a ConnectionReset while sending the
278+ request to the server. (Unless we have a body_stream that we have
279+ already started consuming, since we can't restart body_streams)
280+
281+ :return: response_handler as defined by _construct_protocol
282+ """
283+ encoder, response_handler = self._construct_protocol(protocol_version)
284+ try:
285+ self._send_no_retry(encoder)
286+ except errors.ConnectionReset, e:
287+ # If we fail during the _send_no_retry phase, then we can
288+ # be confident that the server did not get our request, because we
289+ # haven't started waiting for the reply yet. So try the request
290+ # again. We only issue a single retry, because if the connection
291+ # really is down, there is no reason to loop endlessly.
292+
293+ # Connection is dead, so close our end of it.
294+ self.client._medium.reset()
295+ if (('noretry' in debug.debug_flags)
296+ or (self.body_stream is not None
297+ and encoder.body_stream_started)):
298+ # We can't restart a body_stream that has been partially
299+ # consumed, so we don't retry.
300+ # Note: We don't have to worry about
301+ # SmartClientRequestProtocolOne or Two, because they don't
302+ # support client-side body streams.
303+ raise
304+ trace.warning('ConnectionReset calling %r, retrying'
305+ % (self.method,))
306+ trace.log_exception_quietly()
307+ encoder, response_handler = self._construct_protocol(
308+ protocol_version)
309+ self._send_no_retry(encoder)
310+ return response_handler
311+
312+ def _send_no_retry(self, encoder):
313+ """Just encode the request and try to send it."""
314+ encoder.set_headers(self.client._headers)
315+ if self.body is not None:
316+ if self.readv_body is not None:
317+ raise AssertionError(
318+ "body and readv_body are mutually exclusive.")
319+ if self.body_stream is not None:
320+ raise AssertionError(
321+ "body and body_stream are mutually exclusive.")
322+ encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
323+ elif self.readv_body is not None:
324+ if self.body_stream is not None:
325+ raise AssertionError(
326+ "readv_body and body_stream are mutually exclusive.")
327+ encoder.call_with_body_readv_array((self.method, ) + self.args,
328+ self.readv_body)
329+ elif self.body_stream is not None:
330+ encoder.call_with_body_stream((self.method, ) + self.args,
331+ self.body_stream)
332+ else:
333+ encoder.call(self.method, *self.args)
334+
335+
336 class SmartClientHooks(hooks.Hooks):
337
338 def __init__(self):
339
340=== modified file 'bzrlib/smart/medium.py'
341--- bzrlib/smart/medium.py 2010-06-21 08:08:04 +0000
342+++ bzrlib/smart/medium.py 2011-10-10 13:55:27 +0000
343@@ -1,4 +1,4 @@
344-# Copyright (C) 2006-2010 Canonical Ltd
345+# Copyright (C) 2006-2011 Canonical Ltd
346 #
347 # This program is free software; you can redistribute it and/or modify
348 # it under the terms of the GNU General Public License as published by
349@@ -24,6 +24,7 @@
350 bzrlib/transport/smart/__init__.py.
351 """
352
353+import errno
354 import os
355 import sys
356 import urllib
357@@ -710,6 +711,14 @@
358 """
359 return SmartClientStreamMediumRequest(self)
360
361+ def reset(self):
362+ """We have been disconnected, reset current state.
363+
364+ This resets things like _current_request and connected state.
365+ """
366+ self.disconnect()
367+ self._current_request = None
368+
369
370 class SmartSimplePipesClientMedium(SmartClientStreamMedium):
371 """A client medium using simple pipes.
372@@ -724,11 +733,20 @@
373
374 def _accept_bytes(self, bytes):
375 """See SmartClientStreamMedium.accept_bytes."""
376- self._writeable_pipe.write(bytes)
377+ try:
378+ self._writeable_pipe.write(bytes)
379+ except IOError, e:
380+ if e.errno in (errno.EINVAL, errno.EPIPE):
381+ raise errors.ConnectionReset(
382+ "Error trying to write to subprocess:\n%s" % (e,))
383+ raise
384 self._report_activity(len(bytes), 'write')
385
386 def _flush(self):
387 """See SmartClientStreamMedium._flush()."""
388+ # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
389+ # However, testing shows that even when the child process is
390+ # gone, this doesn't error.
391 self._writeable_pipe.flush()
392
393 def _read_bytes(self, count):
394@@ -753,8 +771,8 @@
395
396 class SmartSSHClientMedium(SmartClientStreamMedium):
397 """A client medium using SSH.
398-
399- It delegates IO to a SmartClientSocketMedium or
400+
401+ It delegates IO to a SmartSimplePipesClientMedium or
402 SmartClientAlreadyConnectedSocketMedium (depending on platform).
403 """
404
405@@ -993,5 +1011,3 @@
406 This invokes self._medium._flush to ensure all bytes are transmitted.
407 """
408 self._medium._flush()
409-
410-
411
412=== modified file 'bzrlib/smart/protocol.py'
413--- bzrlib/smart/protocol.py 2010-06-11 05:57:09 +0000
414+++ bzrlib/smart/protocol.py 2011-10-10 13:55:27 +0000
415@@ -1081,9 +1081,6 @@
416 self._real_write_func = write_func
417
418 def _write_func(self, bytes):
419- # TODO: It is probably more appropriate to use sum(map(len, _buf))
420- # for total number of bytes to write, rather than buffer based on
421- # the number of write() calls
422 # TODO: Another possibility would be to turn this into an async model.
423 # Where we let another thread know that we have some bytes if
424 # they want it, but we don't actually block for it
425@@ -1292,6 +1289,7 @@
426 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
427 self._medium_request = medium_request
428 self._headers = {}
429+ self.body_stream_started = None
430
431 def set_headers(self, headers):
432 self._headers = headers.copy()
433@@ -1357,6 +1355,7 @@
434 if path is not None:
435 mutter(' (to %s)', path)
436 self._request_start_time = osutils.timer_func()
437+ self.body_stream_started = False
438 self._write_protocol_version()
439 self._write_headers(self._headers)
440 self._write_structure(args)
441@@ -1364,6 +1363,9 @@
442 # have finished sending the stream. We would notice at the end
443 # anyway, but if the medium can deliver it early then it's good
444 # to short-circuit the whole request...
445+ # Provoke any ConnectionReset failures before we start the body stream.
446+ self.flush()
447+ self.body_stream_started = True
448 for exc_info, part in _iter_with_errors(stream):
449 if exc_info is not None:
450 # Iterating the stream failed. Cleanly abort the request.
451
452=== modified file 'bzrlib/smart/request.py'
453--- bzrlib/smart/request.py 2010-05-13 16:17:54 +0000
454+++ bzrlib/smart/request.py 2011-10-10 13:55:27 +0000
455@@ -1,4 +1,4 @@
456-# Copyright (C) 2006-2010 Canonical Ltd
457+# Copyright (C) 2006-2011 Canonical Ltd
458 #
459 # This program is free software; you can redistribute it and/or modify
460 # it under the terms of the GNU General Public License as published by
461@@ -486,154 +486,199 @@
462 return SuccessfulSmartServerResponse((answer,))
463
464
465+# In the 'info' attribute, we store whether this request is 'safe' to retry if
466+# we get a disconnect while reading the response. It can have the values:
467+# read This is purely a read request, so retrying it is perfectly ok.
468+# idem An idempotent write request. Something like 'put' where if you put
469+# the same bytes twice you end up with the same final bytes.
470+# semi This is a request that isn't strictly idempotent, but doesn't
471+# result in corruption if it is retried. This is for things like
472+# 'lock' and 'unlock'. If you call lock, it updates the disk
473+# structure. If you fail to read the response, you won't be able to
474+# use the lock, because you don't have the lock token. Calling lock
475+# again will fail, because the lock is already taken. However, we
476+# can't tell if the server received our request or not. If it didn't,
477+# then retrying the request is fine, as it will actually do what we
478+# want. If it did, we will interrupt the current operation, but we
479+# are no worse off than interrupting the current operation because of
480+# a ConnectionReset.
481+# semivfs Similar to semi, but specific to a Virtual FileSystem request.
482+# stream This is a request that takes a stream that cannot be restarted if
483+# consumed. This request is 'safe' in that if we determine the
484+# connection is closed before we consume the stream, we can try
485+# again.
486+# mutate State is updated in a way that replaying that request results in a
487+# different state. For example 'append' writes more bytes to a given
488+# file. If append succeeds, it moves the file pointer.
489 request_handlers = registry.Registry()
490 request_handlers.register_lazy(
491- 'append', 'bzrlib.smart.vfs', 'AppendRequest')
492+ 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
493 request_handlers.register_lazy(
494 'Branch.get_config_file', 'bzrlib.smart.branch',
495- 'SmartServerBranchGetConfigFile')
496+ 'SmartServerBranchGetConfigFile', info='read')
497 request_handlers.register_lazy(
498- 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')
499+ 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
500+ info='read')
501 request_handlers.register_lazy(
502 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
503- 'SmartServerBranchGetTagsBytes')
504+ 'SmartServerBranchGetTagsBytes', info='read')
505 request_handlers.register_lazy(
506 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
507- 'SmartServerBranchSetTagsBytes')
508-request_handlers.register_lazy(
509- 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')
510-request_handlers.register_lazy(
511- 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')
512-request_handlers.register_lazy(
513- 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')
514-request_handlers.register_lazy( 'Branch.revision_history',
515- 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')
516-request_handlers.register_lazy( 'Branch.set_config_option',
517- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')
518-request_handlers.register_lazy( 'Branch.set_config_option_dict',
519- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict')
520-request_handlers.register_lazy( 'Branch.set_last_revision',
521- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')
522+ 'SmartServerBranchSetTagsBytes', info='idem')
523+request_handlers.register_lazy(
524+ 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
525+ 'SmartServerBranchRequestGetStackedOnURL', info='read')
526+request_handlers.register_lazy(
527+ 'Branch.last_revision_info', 'bzrlib.smart.branch',
528+ 'SmartServerBranchRequestLastRevisionInfo', info='read')
529+request_handlers.register_lazy(
530+ 'Branch.lock_write', 'bzrlib.smart.branch',
531+ 'SmartServerBranchRequestLockWrite', info='semi')
532+request_handlers.register_lazy(
533+ 'Branch.revision_history', 'bzrlib.smart.branch',
534+ 'SmartServerRequestRevisionHistory', info='read')
535+request_handlers.register_lazy(
536+ 'Branch.set_config_option', 'bzrlib.smart.branch',
537+ 'SmartServerBranchRequestSetConfigOption', info='idem')
538+request_handlers.register_lazy(
539+ 'Branch.set_config_option_dict', 'bzrlib.smart.branch',
540+ 'SmartServerBranchRequestSetConfigOptionDict', info='idem')
541+request_handlers.register_lazy(
542+ 'Branch.set_last_revision', 'bzrlib.smart.branch',
543+ 'SmartServerBranchRequestSetLastRevision', info='idem')
544 request_handlers.register_lazy(
545 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
546- 'SmartServerBranchRequestSetLastRevisionInfo')
547+ 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
548 request_handlers.register_lazy(
549 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
550- 'SmartServerBranchRequestSetLastRevisionEx')
551+ 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
552 request_handlers.register_lazy(
553 'Branch.set_parent_location', 'bzrlib.smart.branch',
554- 'SmartServerBranchRequestSetParentLocation')
555+ 'SmartServerBranchRequestSetParentLocation', info='idem')
556 request_handlers.register_lazy(
557- 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')
558+ 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock',
559+ info='semi')
560 request_handlers.register_lazy(
561 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
562- 'SmartServerBzrDirRequestCloningMetaDir')
563+ 'SmartServerBzrDirRequestCloningMetaDir', info='read')
564 request_handlers.register_lazy(
565 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
566- 'SmartServerRequestCreateBranch')
567+ 'SmartServerRequestCreateBranch', info='semi')
568 request_handlers.register_lazy(
569 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
570- 'SmartServerRequestCreateRepository')
571+ 'SmartServerRequestCreateRepository', info='semi')
572 request_handlers.register_lazy(
573 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
574- 'SmartServerRequestFindRepositoryV1')
575+ 'SmartServerRequestFindRepositoryV1', info='read')
576 request_handlers.register_lazy(
577 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
578- 'SmartServerRequestFindRepositoryV2')
579+ 'SmartServerRequestFindRepositoryV2', info='read')
580 request_handlers.register_lazy(
581 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
582- 'SmartServerRequestFindRepositoryV3')
583+ 'SmartServerRequestFindRepositoryV3', info='read')
584 request_handlers.register_lazy(
585 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
586- 'SmartServerBzrDirRequestConfigFile')
587+ 'SmartServerBzrDirRequestConfigFile', info='read')
588 request_handlers.register_lazy(
589 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
590- 'SmartServerRequestInitializeBzrDir')
591+ 'SmartServerRequestInitializeBzrDir', info='semi')
592 request_handlers.register_lazy(
593 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
594- 'SmartServerRequestBzrDirInitializeEx')
595-request_handlers.register_lazy(
596- 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')
597-request_handlers.register_lazy(
598- 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')
599+ 'SmartServerRequestBzrDirInitializeEx', info='semi')
600+request_handlers.register_lazy(
601+ 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
602+ info='read')
603+request_handlers.register_lazy(
604+ 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
605+ 'SmartServerRequestOpenBzrDir_2_1', info='read')
606 request_handlers.register_lazy(
607 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
608- 'SmartServerRequestOpenBranch')
609+ 'SmartServerRequestOpenBranch', info='read')
610 request_handlers.register_lazy(
611 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
612- 'SmartServerRequestOpenBranchV2')
613+ 'SmartServerRequestOpenBranchV2', info='read')
614 request_handlers.register_lazy(
615 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
616- 'SmartServerRequestOpenBranchV3')
617-request_handlers.register_lazy(
618- 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')
619-request_handlers.register_lazy(
620- 'get', 'bzrlib.smart.vfs', 'GetRequest')
621-request_handlers.register_lazy(
622- 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')
623-request_handlers.register_lazy(
624- 'has', 'bzrlib.smart.vfs', 'HasRequest')
625-request_handlers.register_lazy(
626- 'hello', 'bzrlib.smart.request', 'HelloRequest')
627-request_handlers.register_lazy(
628- 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')
629-request_handlers.register_lazy(
630- 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')
631-request_handlers.register_lazy(
632- 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')
633-request_handlers.register_lazy(
634- 'move', 'bzrlib.smart.vfs', 'MoveRequest')
635-request_handlers.register_lazy(
636- 'put', 'bzrlib.smart.vfs', 'PutRequest')
637-request_handlers.register_lazy(
638- 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')
639-request_handlers.register_lazy(
640- 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')
641-request_handlers.register_lazy(
642- 'rename', 'bzrlib.smart.vfs', 'RenameRequest')
643+ 'SmartServerRequestOpenBranchV3', info='read')
644+request_handlers.register_lazy(
645+ 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
646+request_handlers.register_lazy(
647+ 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
648+request_handlers.register_lazy(
649+ 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
650+request_handlers.register_lazy(
651+ 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
652+request_handlers.register_lazy(
653+ 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
654+request_handlers.register_lazy(
655+ 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
656+ info='read')
657+request_handlers.register_lazy(
658+ 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
659+request_handlers.register_lazy(
660+ 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
661+request_handlers.register_lazy(
662+ 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
663+request_handlers.register_lazy(
664+ 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
665+request_handlers.register_lazy(
666+ 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
667+request_handlers.register_lazy(
668+ 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
669+request_handlers.register_lazy(
670+ 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
671 request_handlers.register_lazy(
672 'PackRepository.autopack', 'bzrlib.smart.packrepository',
673- 'SmartServerPackRepositoryAutopack')
674-request_handlers.register_lazy('Repository.gather_stats',
675- 'bzrlib.smart.repository',
676- 'SmartServerRepositoryGatherStats')
677-request_handlers.register_lazy('Repository.get_parent_map',
678- 'bzrlib.smart.repository',
679- 'SmartServerRepositoryGetParentMap')
680-request_handlers.register_lazy(
681- 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')
682-request_handlers.register_lazy(
683- 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
684-request_handlers.register_lazy(
685- 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
686-request_handlers.register_lazy(
687- 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')
688-request_handlers.register_lazy(
689- 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')
690-request_handlers.register_lazy(
691- 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
692-request_handlers.register_lazy(
693- 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')
694+ 'SmartServerPackRepositoryAutopack', info='idem')
695+request_handlers.register_lazy(
696+ 'Repository.gather_stats', 'bzrlib.smart.repository',
697+ 'SmartServerRepositoryGatherStats', info='read')
698+request_handlers.register_lazy(
699+ 'Repository.get_parent_map', 'bzrlib.smart.repository',
700+ 'SmartServerRepositoryGetParentMap', info='read')
701+request_handlers.register_lazy(
702+ 'Repository.get_revision_graph', 'bzrlib.smart.repository',
703+ 'SmartServerRepositoryGetRevisionGraph', info='read')
704+request_handlers.register_lazy(
705+ 'Repository.has_revision', 'bzrlib.smart.repository',
706+ 'SmartServerRequestHasRevision', info='read')
707+request_handlers.register_lazy(
708+ 'Repository.insert_stream', 'bzrlib.smart.repository',
709+ 'SmartServerRepositoryInsertStream', info='stream')
710+request_handlers.register_lazy(
711+ 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
712+ 'SmartServerRepositoryInsertStream_1_19', info='stream')
713+request_handlers.register_lazy(
714+ 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
715+ 'SmartServerRepositoryInsertStreamLocked', info='stream')
716+request_handlers.register_lazy(
717+ 'Repository.is_shared', 'bzrlib.smart.repository',
718+ 'SmartServerRepositoryIsShared', info='read')
719+request_handlers.register_lazy(
720+ 'Repository.lock_write', 'bzrlib.smart.repository',
721+ 'SmartServerRepositoryLockWrite', info='semi')
722 request_handlers.register_lazy(
723 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
724- 'SmartServerRepositorySetMakeWorkingTrees')
725+ 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
726 request_handlers.register_lazy(
727- 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')
728+ 'Repository.unlock', 'bzrlib.smart.repository',
729+ 'SmartServerRepositoryUnlock', info='semi')
730 request_handlers.register_lazy(
731 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
732- 'SmartServerRepositoryGetRevIdForRevno')
733+ 'SmartServerRepositoryGetRevIdForRevno', info='read')
734 request_handlers.register_lazy(
735 'Repository.get_stream', 'bzrlib.smart.repository',
736- 'SmartServerRepositoryGetStream')
737+ 'SmartServerRepositoryGetStream', info='read')
738 request_handlers.register_lazy(
739 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
740- 'SmartServerRepositoryGetStream_1_19')
741+ 'SmartServerRepositoryGetStream_1_19', info='read')
742 request_handlers.register_lazy(
743 'Repository.tarball', 'bzrlib.smart.repository',
744- 'SmartServerRepositoryTarball')
745-request_handlers.register_lazy(
746- 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')
747-request_handlers.register_lazy(
748- 'stat', 'bzrlib.smart.vfs', 'StatRequest')
749-request_handlers.register_lazy(
750- 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')
751+ 'SmartServerRepositoryTarball', info='read')
752+request_handlers.register_lazy(
753+ 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
754+request_handlers.register_lazy(
755+ 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
756+request_handlers.register_lazy(
757+ 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly',
758+ info='read')
759
760=== modified file 'bzrlib/tests/test_smart.py'
761--- bzrlib/tests/test_smart.py 2011-01-12 01:01:53 +0000
762+++ bzrlib/tests/test_smart.py 2011-10-10 13:55:27 +0000
763@@ -1844,8 +1844,11 @@
764 """All registered request_handlers can be found."""
765 # If there's a typo in a register_lazy call, this loop will fail with
766 # an AttributeError.
767- for key, item in smart_req.request_handlers.iteritems():
768- pass
769+ for key in smart_req.request_handlers.keys():
770+ try:
771+ item = smart_req.request_handlers.get(key)
772+ except AttributeError, e:
773+ raise AttributeError('failed to get %s: %s' % (key, e))
774
775 def assertHandlerEqual(self, verb, handler):
776 self.assertEqual(smart_req.request_handlers.get(verb), handler)
777
778=== modified file 'bzrlib/tests/test_smart_request.py'
779--- bzrlib/tests/test_smart_request.py 2010-07-13 19:02:12 +0000
780+++ bzrlib/tests/test_smart_request.py 2011-10-10 13:55:27 +0000
781@@ -111,6 +111,16 @@
782 self.assertEqual(
783 [[transport]] * 3, handler._command.jail_transports_log)
784
785+ def test_all_registered_requests_are_safety_qualified(self):
786+ unclassified_requests = []
787+ allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
788+ for key in request.request_handlers.keys():
789+ info = request.request_handlers.get_info(key)
790+ if info is None or info not in allowed_info:
791+ unclassified_requests.append(key)
792+ if unclassified_requests:
793+ self.fail('These requests were not categorized as safe/unsafe'
794+ ' to retry: %s' % (unclassified_requests,))
795
796
797 class TestSmartRequestHandlerErrorTranslation(TestCase):
798
799=== modified file 'bzrlib/tests/test_smart_transport.py'
800--- bzrlib/tests/test_smart_transport.py 2011-01-10 22:20:12 +0000
801+++ bzrlib/tests/test_smart_transport.py 2011-10-10 13:55:27 +0000
802@@ -18,13 +18,17 @@
803
804 # all of this deals with byte strings so this is safe
805 from cStringIO import StringIO
806+import errno
807 import os
808 import socket
809+import subprocess
810+import sys
811 import threading
812
813 import bzrlib
814 from bzrlib import (
815 bzrdir,
816+ debug,
817 errors,
818 osutils,
819 tests,
820@@ -53,6 +57,29 @@
821 )
822
823
824+def create_file_pipes():
825+ r, w = os.pipe()
826+ # These must be opened without buffering, or we get undefined results
827+ rf = os.fdopen(r, 'rb', 0)
828+ wf = os.fdopen(w, 'wb', 0)
829+ return rf, wf
830+
831+
832+def portable_socket_pair():
833+ """Return a pair of TCP sockets connected to each other.
834+
835+ Unlike socket.socketpair, this should work on Windows.
836+ """
837+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
838+ listen_sock.bind(('127.0.0.1', 0))
839+ listen_sock.listen(1)
840+ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
841+ client_sock.connect(listen_sock.getsockname())
842+ server_sock, addr = listen_sock.accept()
843+ listen_sock.close()
844+ return server_sock, client_sock
845+
846+
847 class StringIOSSHVendor(object):
848 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
849
850@@ -67,6 +94,27 @@
851 return StringIOSSHConnection(self)
852
853
854+class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
855+ """The first connection will be considered closed.
856+
857+ The second connection will succeed normally.
858+ """
859+
860+ def __init__(self, read_from, write_to, fail_at_write=True):
861+ super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
862+ write_to)
863+ self.fail_at_write = fail_at_write
864+ self._first = True
865+
866+ def connect_ssh(self, username, password, host, port, command):
867+ self.calls.append(('connect_ssh', username, password, host, port,
868+ command))
869+ if self._first:
870+ self._first = False
871+ return ClosedSSHConnection(self)
872+ return StringIOSSHConnection(self)
873+
874+
875 class StringIOSSHConnection(ssh.SSHConnection):
876 """A SSH connection that uses StringIO to buffer writes and answer reads."""
877
878@@ -82,6 +130,29 @@
879 return 'pipes', (self.vendor.read_from, self.vendor.write_to)
880
881
882+class ClosedSSHConnection(ssh.SSHConnection):
883+ """An SSH connection that just has closed channels."""
884+
885+ def __init__(self, vendor):
886+ self.vendor = vendor
887+
888+ def close(self):
889+ self.vendor.calls.append(('close', ))
890+
891+ def get_sock_or_pipes(self):
892+ # We create matching pipes, and then close the ssh side
893+ bzr_read, ssh_write = create_file_pipes()
894+ # We always fail when bzr goes to read
895+ ssh_write.close()
896+ if self.vendor.fail_at_write:
897+ # If set, we'll also fail when bzr goes to write
898+ ssh_read, bzr_write = create_file_pipes()
899+ ssh_read.close()
900+ else:
901+ bzr_write = self.vendor.write_to
902+ return 'pipes', (bzr_read, bzr_write)
903+
904+
905 class _InvalidHostnameFeature(tests.Feature):
906 """Does 'non_existent.invalid' fail to resolve?
907
908@@ -177,6 +248,91 @@
909 client_medium._accept_bytes('abc')
910 self.assertEqual('abc', output.getvalue())
911
912+ def test_simple_pipes__accept_bytes_subprocess_closed(self):
913+ # It is unfortunate that we have to use Popen for this. However,
914+ # os.pipe() does not behave the same as subprocess.Popen().
915+ # On Windows, if you use os.pipe() and close the write side,
916+ # read.read() hangs. On Linux, read.read() returns the empty string.
917+ p = subprocess.Popen([sys.executable, '-c',
918+ 'import sys\n'
919+ 'sys.stdout.write(sys.stdin.read(4))\n'
920+ 'sys.stdout.close()\n'],
921+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
922+ client_medium = medium.SmartSimplePipesClientMedium(
923+ p.stdout, p.stdin, 'base')
924+ client_medium._accept_bytes('abc\n')
925+ self.assertEqual('abc', client_medium._read_bytes(3))
926+ p.wait()
927+ # While writing to the underlying pipe,
928+ # Windows py2.6.6 we get IOError(EINVAL)
929+ # Lucid py2.6.5, we get IOError(EPIPE)
930+ # In both cases, it should be wrapped to ConnectionReset
931+ self.assertRaises(errors.ConnectionReset,
932+ client_medium._accept_bytes, 'more')
933+
934+ def test_simple_pipes__accept_bytes_pipe_closed(self):
935+ child_read, client_write = create_file_pipes()
936+ client_medium = medium.SmartSimplePipesClientMedium(
937+ None, client_write, 'base')
938+ client_medium._accept_bytes('abc\n')
939+ self.assertEqual('abc\n', child_read.read(4))
940+ # While writing to the underlying pipe,
941+ # Windows py2.6.6 we get IOError(EINVAL)
942+ # Lucid py2.6.5, we get IOError(EPIPE)
943+ # In both cases, it should be wrapped to ConnectionReset
944+ child_read.close()
945+ self.assertRaises(errors.ConnectionReset,
946+ client_medium._accept_bytes, 'more')
947+
948+ def test_simple_pipes__flush_pipe_closed(self):
949+ child_read, client_write = create_file_pipes()
950+ client_medium = medium.SmartSimplePipesClientMedium(
951+ None, client_write, 'base')
952+ client_medium._accept_bytes('abc\n')
953+ child_read.close()
954+ # Even though the pipe is closed, flush on the write side seems to be a
955+ # no-op, rather than a failure.
956+ client_medium._flush()
957+
958+ def test_simple_pipes__flush_subprocess_closed(self):
959+ p = subprocess.Popen([sys.executable, '-c',
960+ 'import sys\n'
961+ 'sys.stdout.write(sys.stdin.read(4))\n'
962+ 'sys.stdout.close()\n'],
963+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
964+ client_medium = medium.SmartSimplePipesClientMedium(
965+ p.stdout, p.stdin, 'base')
966+ client_medium._accept_bytes('abc\n')
967+ p.wait()
968+ # Even though the child process is dead, flush seems to be a no-op.
969+ client_medium._flush()
970+
971+ def test_simple_pipes__read_bytes_pipe_closed(self):
972+ child_read, client_write = create_file_pipes()
973+ client_medium = medium.SmartSimplePipesClientMedium(
974+ child_read, client_write, 'base')
975+ client_medium._accept_bytes('abc\n')
976+ client_write.close()
977+ self.assertEqual('abc\n', client_medium._read_bytes(4))
978+ self.assertEqual('', client_medium._read_bytes(4))
979+
980+ def test_simple_pipes__read_bytes_subprocess_closed(self):
981+ p = subprocess.Popen([sys.executable, '-c',
982+ 'import sys\n'
983+ 'if sys.platform == "win32":\n'
984+ ' import msvcrt, os\n'
985+ ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
986+ ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
987+ 'sys.stdout.write(sys.stdin.read(4))\n'
988+ 'sys.stdout.close()\n'],
989+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
990+ client_medium = medium.SmartSimplePipesClientMedium(
991+ p.stdout, p.stdin, 'base')
992+ client_medium._accept_bytes('abc\n')
993+ p.wait()
994+ self.assertEqual('abc\n', client_medium._read_bytes(4))
995+ self.assertEqual('', client_medium._read_bytes(4))
996+
997 def test_simple_pipes_client_disconnect_does_nothing(self):
998 # calling disconnect does nothing.
999 input = StringIO()
1000@@ -564,6 +720,28 @@
1001 request.finished_reading()
1002 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
1003
1004+ def test_reset(self):
1005+ server_sock, client_sock = portable_socket_pair()
1006+ # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
1007+ # bzr where it exists.
1008+ client_medium = medium.SmartTCPClientMedium(None, None, None)
1009+ client_medium._socket = client_sock
1010+ client_medium._connected = True
1011+ req = client_medium.get_request()
1012+ self.assertRaises(errors.TooManyConcurrentRequests,
1013+ client_medium.get_request)
1014+ client_medium.reset()
1015+ # The stream should be reset, marked as disconnected, though ready for
1016+ # us to make a new request
1017+ self.assertFalse(client_medium._connected)
1018+ self.assertIs(None, client_medium._socket)
1019+ try:
1020+ self.assertEqual('', client_sock.recv(1))
1021+ except socket.error, e:
1022+ if e.errno not in (errno.EBADF,):
1023+ raise
1024+ req = client_medium.get_request()
1025+
1026
1027 class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):
1028
1029@@ -617,20 +795,6 @@
1030 super(TestSmartServerStreamMedium, self).setUp()
1031 self.overrideEnv('BZR_NO_SMART_VFS', None)
1032
1033- def portable_socket_pair(self):
1034- """Return a pair of TCP sockets connected to each other.
1035-
1036- Unlike socket.socketpair, this should work on Windows.
1037- """
1038- listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1039- listen_sock.bind(('127.0.0.1', 0))
1040- listen_sock.listen(1)
1041- client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1042- client_sock.connect(listen_sock.getsockname())
1043- server_sock, addr = listen_sock.accept()
1044- listen_sock.close()
1045- return server_sock, client_sock
1046-
1047 def test_smart_query_version(self):
1048 """Feed a canned query version to a server"""
1049 # wire-to-wire, using the whole stack
1050@@ -695,7 +859,7 @@
1051
1052 def test_socket_stream_with_bulk_data(self):
1053 sample_request_bytes = 'command\n9\nbulk datadone\n'
1054- server_sock, client_sock = self.portable_socket_pair()
1055+ server_sock, client_sock = portable_socket_pair()
1056 server = medium.SmartServerSocketStreamMedium(
1057 server_sock, None)
1058 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1059@@ -714,7 +878,7 @@
1060 self.assertTrue(server.finished)
1061
1062 def test_socket_stream_shutdown_detection(self):
1063- server_sock, client_sock = self.portable_socket_pair()
1064+ server_sock, client_sock = portable_socket_pair()
1065 client_sock.close()
1066 server = medium.SmartServerSocketStreamMedium(
1067 server_sock, None)
1068@@ -734,7 +898,7 @@
1069 rest_of_request_bytes = 'lo\n'
1070 expected_response = (
1071 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
1072- server_sock, client_sock = self.portable_socket_pair()
1073+ server_sock, client_sock = portable_socket_pair()
1074 server = medium.SmartServerSocketStreamMedium(
1075 server_sock, None)
1076 client_sock.sendall(incomplete_request_bytes)
1077@@ -810,7 +974,7 @@
1078 # _serve_one_request should still process both of them as if they had
1079 # been received separately.
1080 sample_request_bytes = 'command\n'
1081- server_sock, client_sock = self.portable_socket_pair()
1082+ server_sock, client_sock = portable_socket_pair()
1083 server = medium.SmartServerSocketStreamMedium(
1084 server_sock, None)
1085 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1086@@ -847,7 +1011,7 @@
1087 self.assertTrue(server.finished)
1088
1089 def test_socket_stream_error_handling(self):
1090- server_sock, client_sock = self.portable_socket_pair()
1091+ server_sock, client_sock = portable_socket_pair()
1092 server = medium.SmartServerSocketStreamMedium(
1093 server_sock, None)
1094 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
1095@@ -868,7 +1032,7 @@
1096 self.assertEqual('', from_server.getvalue())
1097
1098 def test_socket_stream_keyboard_interrupt_handling(self):
1099- server_sock, client_sock = self.portable_socket_pair()
1100+ server_sock, client_sock = portable_socket_pair()
1101 server = medium.SmartServerSocketStreamMedium(
1102 server_sock, None)
1103 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
1104@@ -885,7 +1049,7 @@
1105 return server._build_protocol()
1106
1107 def build_protocol_socket(self, bytes):
1108- server_sock, client_sock = self.portable_socket_pair()
1109+ server_sock, client_sock = portable_socket_pair()
1110 server = medium.SmartServerSocketStreamMedium(
1111 server_sock, None)
1112 client_sock.sendall(bytes)
1113@@ -2792,6 +2956,33 @@
1114 'e', # end
1115 output.getvalue())
1116
1117+ def test_records_start_of_body_stream(self):
1118+ requester, output = self.make_client_encoder_and_output()
1119+ requester.set_headers({})
1120+ in_stream = [False]
1121+ def stream_checker():
1122+ self.assertTrue(requester.body_stream_started)
1123+ in_stream[0] = True
1124+ yield 'content'
1125+ flush_called = []
1126+ orig_flush = requester.flush
1127+ def tracked_flush():
1128+ flush_called.append(in_stream[0])
1129+ if in_stream[0]:
1130+ self.assertTrue(requester.body_stream_started)
1131+ else:
1132+ self.assertFalse(requester.body_stream_started)
1133+ return orig_flush()
1134+ requester.flush = tracked_flush
1135+ requester.call_with_body_stream(('one arg',), stream_checker())
1136+ self.assertEqual(
1137+ 'bzr message 3 (bzr 1.6)\n' # protocol version
1138+ '\x00\x00\x00\x02de' # headers
1139+ 's\x00\x00\x00\x0bl7:one arge' # args
1140+ 'b\x00\x00\x00\x07content' # body
1141+ 'e', output.getvalue())
1142+ self.assertEqual([False, True, True], flush_called)
1143+
1144
1145 class StubMediumRequest(object):
1146 """A stub medium request that tracks the number of times accept_bytes is
1147@@ -3216,6 +3407,193 @@
1148 # encoder.
1149
1150
1151+class Test_SmartClientRequest(tests.TestCase):
1152+
1153+ def make_client_with_failing_medium(self, fail_at_write=True, response=''):
1154+ response_io = StringIO(response)
1155+ output = StringIO()
1156+ vendor = FirstRejectedStringIOSSHVendor(response_io, output,
1157+ fail_at_write=fail_at_write)
1158+ ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
1159+ client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
1160+ smart_client = client._SmartClient(client_medium, headers={})
1161+ return output, vendor, smart_client
1162+
1163+ def make_response(self, args, body=None, body_stream=None):
1164+ response_io = StringIO()
1165+ response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
1166+ body_stream=body_stream)
1167+ responder = protocol.ProtocolThreeResponder(response_io.write)
1168+ responder.send_response(response)
1169+ return response_io.getvalue()
1170+
1171+ def test__call_doesnt_retry_append(self):
1172+ response = self.make_response(('appended', '8'))
1173+ output, vendor, smart_client = self.make_client_with_failing_medium(
1174+ fail_at_write=False, response=response)
1175+ smart_request = client._SmartClientRequest(smart_client, 'append',
1176+ ('foo', ''), body='content\n')
1177+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1178+
1179+ def test__call_retries_get_bytes(self):
1180+ response = self.make_response(('ok',), 'content\n')
1181+ output, vendor, smart_client = self.make_client_with_failing_medium(
1182+ fail_at_write=False, response=response)
1183+ smart_request = client._SmartClientRequest(smart_client, 'get',
1184+ ('foo',))
1185+ response, response_handler = smart_request._call(3)
1186+ self.assertEqual(('ok',), response)
1187+ self.assertEqual('content\n', response_handler.read_body_bytes())
1188+
1189+ def test__call_noretry_get_bytes(self):
1190+ debug.debug_flags.add('noretry')
1191+ response = self.make_response(('ok',), 'content\n')
1192+ output, vendor, smart_client = self.make_client_with_failing_medium(
1193+ fail_at_write=False, response=response)
1194+ smart_request = client._SmartClientRequest(smart_client, 'get',
1195+ ('foo',))
1196+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1197+
1198+ def test__send_no_retry_pipes(self):
1199+ client_read, server_write = create_file_pipes()
1200+ server_read, client_write = create_file_pipes()
1201+ client_medium = medium.SmartSimplePipesClientMedium(client_read,
1202+ client_write, base='/')
1203+ smart_client = client._SmartClient(client_medium)
1204+ smart_request = client._SmartClientRequest(smart_client,
1205+ 'hello', ())
1206+ # Close the server side
1207+ server_read.close()
1208+ encoder, response_handler = smart_request._construct_protocol(3)
1209+ self.assertRaises(errors.ConnectionReset,
1210+ smart_request._send_no_retry, encoder)
1211+
1212+ def test__send_read_response_sockets(self):
1213+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1214+ listen_sock.bind(('127.0.0.1', 0))
1215+ listen_sock.listen(1)
1216+ host, port = listen_sock.getsockname()
1217+ client_medium = medium.SmartTCPClientMedium(host, port, '/')
1218+ client_medium._ensure_connection()
1219+ smart_client = client._SmartClient(client_medium)
1220+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1221+ # Accept the connection, but don't actually talk to the client.
1222+ server_sock, _ = listen_sock.accept()
1223+ server_sock.close()
1224+ # Sockets buffer and don't really notice that the server has closed the
1225+ # connection until we try to read again.
1226+ handler = smart_request._send(3)
1227+ self.assertRaises(errors.ConnectionReset,
1228+ handler.read_response_tuple, expect_body=False)
1229+
1230+ def test__send_retries_on_write(self):
1231+ output, vendor, smart_client = self.make_client_with_failing_medium()
1232+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1233+ handler = smart_request._send(3)
1234+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1235+ '\x00\x00\x00\x02de' # empty headers
1236+ 's\x00\x00\x00\tl5:helloee',
1237+ output.getvalue())
1238+ self.assertEqual(
1239+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1240+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1241+ ('close',),
1242+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1243+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1244+ ],
1245+ vendor.calls)
1246+
1247+ def test__send_doesnt_retry_read_failure(self):
1248+ output, vendor, smart_client = self.make_client_with_failing_medium(
1249+ fail_at_write=False)
1250+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1251+ handler = smart_request._send(3)
1252+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1253+ '\x00\x00\x00\x02de' # empty headers
1254+ 's\x00\x00\x00\tl5:helloee',
1255+ output.getvalue())
1256+ self.assertEqual(
1257+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1258+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1259+ ],
1260+ vendor.calls)
1261+ self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
1262+
1263+ def test__send_request_retries_body_stream_if_not_started(self):
1264+ output, vendor, smart_client = self.make_client_with_failing_medium()
1265+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1266+ body_stream=['a', 'b'])
1267+ response_handler = smart_request._send(3)
1268+ # We connect, get disconnected, and notice before consuming the stream,
1269+ # so we try again one time and succeed.
1270+ self.assertEqual(
1271+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1272+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1273+ ('close',),
1274+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1275+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1276+ ],
1277+ vendor.calls)
1278+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1279+ '\x00\x00\x00\x02de' # empty headers
1280+ 's\x00\x00\x00\tl5:helloe'
1281+ 'b\x00\x00\x00\x01a'
1282+ 'b\x00\x00\x00\x01b'
1283+ 'e',
1284+ output.getvalue())
1285+
1286+ def test__send_request_stops_if_body_started(self):
1287+ # We intentionally use the python StringIO so that we can subclass it.
1288+ from StringIO import StringIO
1289+ response = StringIO()
1290+
1291+ class FailAfterFirstWrite(StringIO):
1292+ """Allow one 'write' call to pass, fail the rest"""
1293+ def __init__(self):
1294+ StringIO.__init__(self)
1295+ self._first = True
1296+
1297+ def write(self, s):
1298+ if self._first:
1299+ self._first = False
1300+ return StringIO.write(self, s)
1301+ raise IOError(errno.EINVAL, 'invalid file handle')
1302+ output = FailAfterFirstWrite()
1303+
1304+ vendor = FirstRejectedStringIOSSHVendor(response, output,
1305+ fail_at_write=False)
1306+ ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
1307+ client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
1308+ smart_client = client._SmartClient(client_medium, headers={})
1309+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1310+ body_stream=['a', 'b'])
1311+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1312+ # We connect, and manage to get to the point that we start consuming
1313+ # the body stream. The next write fails, so we just stop.
1314+ self.assertEqual(
1315+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1316+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1317+ ('close',),
1318+ ],
1319+ vendor.calls)
1320+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1321+ '\x00\x00\x00\x02de' # empty headers
1322+ 's\x00\x00\x00\tl5:helloe',
1323+ output.getvalue())
1324+
1325+ def test__send_disabled_retry(self):
1326+ debug.debug_flags.add('noretry')
1327+ output, vendor, smart_client = self.make_client_with_failing_medium()
1328+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1329+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1330+ self.assertEqual(
1331+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1332+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1333+ ('close',),
1334+ ],
1335+ vendor.calls)
1336+
1337+
1338 class LengthPrefixedBodyDecoder(tests.TestCase):
1339
1340 # XXX: TODO: make accept_reading_trailer invoke translate_response or
1341
1342=== modified file 'doc/en/release-notes/bzr-2.1.txt'
1343--- doc/en/release-notes/bzr-2.1.txt 2011-08-20 09:28:27 +0000
1344+++ doc/en/release-notes/bzr-2.1.txt 2011-10-10 13:55:27 +0000
1345@@ -43,6 +43,11 @@
1346
1347 (John Arbash Meinel, #609187, #812928)
1348
1349+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
1350+ while making an RPC request. This doesn't handle all possible network
1351+ disconnects, but it should at least handle when the server is asked to
1352+ shutdown gracefully. (John Arbash Meinel, #819604)
1353+
1354 Improvements
1355 ************
1356
1357
1358=== modified file 'doc/en/release-notes/bzr-2.2.txt'
1359--- doc/en/release-notes/bzr-2.2.txt 2011-09-09 12:32:08 +0000
1360+++ doc/en/release-notes/bzr-2.2.txt 2011-10-10 13:55:27 +0000
1361@@ -19,6 +19,11 @@
1362 Bug Fixes
1363 *********
1364
1365+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
1366+ while making an RPC request. This doesn't handle all possible network
1367+ disconnects, but it should at least handle when the server is asked to
1368+ shutdown gracefully. (John Arbash Meinel, #819604)
1369+
1370 Improvements
1371 ************
1372
1373
1374=== modified file 'doc/en/release-notes/bzr-2.3.txt'
1375--- doc/en/release-notes/bzr-2.3.txt 2011-09-09 12:32:08 +0000
1376+++ doc/en/release-notes/bzr-2.3.txt 2011-10-10 13:55:27 +0000
1377@@ -35,6 +35,11 @@
1378 * Cope cleanly with buggy HTTP proxies that close the socket in the middle
1379 of a multipart response. (Martin Pool, #198646).
1380
1381+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
1382+ while making an RPC request. This doesn't handle all possible network
1383+ disconnects, but it should at least handle when the server is asked to
1384+ shutdown gracefully. (John Arbash Meinel, #819604)
1385+
1386 Documentation
1387 *************
1388

Subscribers

People subscribed via source and target branches