Merge lp:~jameinel/bzr/2.2-client-reconnect-819604 into lp:bzr/2.2

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.2-client-reconnect-819604
Merge into: lp:bzr/2.2
Diff against target: 1397 lines (+810/-219)
10 files modified
NEWS (+10/-0)
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+4/-1)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+22/-6)
bzrlib/smart/protocol.py (+5/-3)
bzrlib/smart/request.py (+144/-99)
bzrlib/tests/test_smart.py (+5/-2)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+400/-22)
To merge this branch: bzr merge lp:~jameinel/bzr/2.2-client-reconnect-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78842@code.launchpad.net

Commit message

Bug #819604, allow bzr-2.2 to gracefully reconnect to the server if we get a ConnectionReset at appropriate times.

Description of the change

This is just a rollup of all of my changes for the 2.1-client-reconnect (for bug #819604) but merged into bzr-2.2 and updated for the api changes, etc.

I suggest we review and approve the 2.1 series, since it is all nicely split out into logical steps, and then when ready any changes needed there can be brought into this branch, etc.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2011-09-02 18:47:36 +0000
3+++ NEWS 2011-10-10 13:44:32 +0000
4@@ -20,6 +20,11 @@
5 Bug Fixes
6 *********
7
8+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
9+ while making an RPC request. This doesn't handle all possible network
10+ disconnects, but it should at least handle when the server is asked to
11+ shutdown gracefully. (John Arbash Meinel, #819604)
12+
13 Improvements
14 ************
15
16@@ -111,6 +116,11 @@
17
18 (John Arbash Meinel, #609187, #812928)
19
20+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
21+ while making an RPC request. This doesn't handle all possible network
22+ disconnects, but it should at least handle when the server is asked to
23+ shutdown gracefully. (John Arbash Meinel, #819604)
24+
25
26 Improvements
27 ************
28
29=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
30--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
31+++ bzrlib/help_topics/en/debug-flags.txt 2011-10-10 13:44:32 +0000
32@@ -24,6 +24,8 @@
33 -Dindex Trace major index operations.
34 -Dknit Trace knit operations.
35 -Dlock Trace when lockdir locks are taken or released.
36+-Dnoretry If a connection is reset, fail immediately rather than
37+ retrying the request.
38 -Dprogress Trace progress bar operations.
39 -Dmerge Emit information for debugging merges.
40 -Dno_apport Don't use apport to report crashes.
41
42=== modified file 'bzrlib/osutils.py'
43--- bzrlib/osutils.py 2010-07-09 16:16:11 +0000
44+++ bzrlib/osutils.py 2011-10-10 13:44:32 +0000
45@@ -1993,6 +1993,9 @@
46 # data at once.
47 MAX_SOCKET_CHUNK = 64 * 1024
48
49+WSAECONNABORTED = 10053
50+WSAECONNRESET = 10054
51+
52 def read_bytes_from_socket(sock, report_activity=None,
53 max_read_size=MAX_SOCKET_CHUNK):
54 """Read up to max_read_size of bytes from sock and notify of progress.
55@@ -2006,7 +2009,7 @@
56 bytes = sock.recv(max_read_size)
57 except socket.error, e:
58 eno = e.args[0]
59- if eno == getattr(errno, "WSAECONNRESET", errno.ECONNRESET):
60+ if eno in (errno.ECONNRESET, WSAECONNABORTED, WSAECONNRESET):
61 # The connection was closed by the other side. Callers expect
62 # an empty string to signal end-of-stream.
63 return ""
64
65=== modified file 'bzrlib/smart/client.py'
66--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
67+++ bzrlib/smart/client.py 2011-10-10 13:44:32 +0000
68@@ -14,12 +14,18 @@
69 # along with this program; if not, write to the Free Software
70 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
71
72+from bzrlib import lazy_import
73+lazy_import.lazy_import(globals(), """
74+from bzrlib.smart import request as _mod_request
75+""")
76+
77 import bzrlib
78 from bzrlib.smart import message, protocol
79-from bzrlib.trace import warning
80 from bzrlib import (
81+ debug,
82 errors,
83 hooks,
84+ trace,
85 )
86
87
88@@ -39,93 +45,12 @@
89 def __repr__(self):
90 return '%s(%r)' % (self.__class__.__name__, self._medium)
91
92- def _send_request(self, protocol_version, method, args, body=None,
93- readv_body=None, body_stream=None):
94- encoder, response_handler = self._construct_protocol(
95- protocol_version)
96- encoder.set_headers(self._headers)
97- if body is not None:
98- if readv_body is not None:
99- raise AssertionError(
100- "body and readv_body are mutually exclusive.")
101- if body_stream is not None:
102- raise AssertionError(
103- "body and body_stream are mutually exclusive.")
104- encoder.call_with_body_bytes((method, ) + args, body)
105- elif readv_body is not None:
106- if body_stream is not None:
107- raise AssertionError(
108- "readv_body and body_stream are mutually exclusive.")
109- encoder.call_with_body_readv_array((method, ) + args, readv_body)
110- elif body_stream is not None:
111- encoder.call_with_body_stream((method, ) + args, body_stream)
112- else:
113- encoder.call(method, *args)
114- return response_handler
115-
116- def _run_call_hooks(self, method, args, body, readv_body):
117- if not _SmartClient.hooks['call']:
118- return
119- params = CallHookParams(method, args, body, readv_body, self._medium)
120- for hook in _SmartClient.hooks['call']:
121- hook(params)
122-
123 def _call_and_read_response(self, method, args, body=None, readv_body=None,
124 body_stream=None, expect_response_body=True):
125- self._run_call_hooks(method, args, body, readv_body)
126- if self._medium._protocol_version is not None:
127- response_handler = self._send_request(
128- self._medium._protocol_version, method, args, body=body,
129- readv_body=readv_body, body_stream=body_stream)
130- return (response_handler.read_response_tuple(
131- expect_body=expect_response_body),
132- response_handler)
133- else:
134- for protocol_version in [3, 2]:
135- if protocol_version == 2:
136- # If v3 doesn't work, the remote side is older than 1.6.
137- self._medium._remember_remote_is_before((1, 6))
138- response_handler = self._send_request(
139- protocol_version, method, args, body=body,
140- readv_body=readv_body, body_stream=body_stream)
141- try:
142- response_tuple = response_handler.read_response_tuple(
143- expect_body=expect_response_body)
144- except errors.UnexpectedProtocolVersionMarker, err:
145- # TODO: We could recover from this without disconnecting if
146- # we recognise the protocol version.
147- warning(
148- 'Server does not understand Bazaar network protocol %d,'
149- ' reconnecting. (Upgrade the server to avoid this.)'
150- % (protocol_version,))
151- self._medium.disconnect()
152- continue
153- except errors.ErrorFromSmartServer:
154- # If we received an error reply from the server, then it
155- # must be ok with this protocol version.
156- self._medium._protocol_version = protocol_version
157- raise
158- else:
159- self._medium._protocol_version = protocol_version
160- return response_tuple, response_handler
161- raise errors.SmartProtocolError(
162- 'Server is not a Bazaar server: ' + str(err))
163-
164- def _construct_protocol(self, version):
165- request = self._medium.get_request()
166- if version == 3:
167- request_encoder = protocol.ProtocolThreeRequester(request)
168- response_handler = message.ConventionalResponseHandler()
169- response_proto = protocol.ProtocolThreeDecoder(
170- response_handler, expect_version_marker=True)
171- response_handler.setProtoAndMediumRequest(response_proto, request)
172- elif version == 2:
173- request_encoder = protocol.SmartClientRequestProtocolTwo(request)
174- response_handler = request_encoder
175- else:
176- request_encoder = protocol.SmartClientRequestProtocolOne(request)
177- response_handler = request_encoder
178- return request_encoder, response_handler
179+ request = _SmartClientRequest(self, method, args, body=body,
180+ readv_body=readv_body, body_stream=body_stream,
181+ expect_response_body=expect_response_body)
182+ return request.call_and_read_response()
183
184 def call(self, method, *args):
185 """Call a method on the remote server."""
186@@ -191,6 +116,203 @@
187 return self._medium.remote_path_from_transport(transport)
188
189
190+class _SmartClientRequest(object):
191+ """Encapsulate the logic for a single request.
192+
193+ This class handles things like reconnecting and sending the request a
194+ second time when the connection is reset in the middle. It also handles the
195+ multiple requests that get made if we don't know what protocol the server
196+ supports yet.
197+
198+ Generally, you build up one of these objects, passing in the arguments that
199+ you want to send to the server, and then use 'call_and_read_response' to
200+ get the response from the server.
201+ """
202+
203+ def __init__(self, client, method, args, body=None, readv_body=None,
204+ body_stream=None, expect_response_body=True):
205+ self.client = client
206+ self.method = method
207+ self.args = args
208+ self.body = body
209+ self.readv_body = readv_body
210+ self.body_stream = body_stream
211+ self.expect_response_body = expect_response_body
212+
213+ def call_and_read_response(self):
214+ """Send the request to the server, and read the initial response.
215+
216+ This doesn't read all of the body content of the response, instead it
217+ returns (response_tuple, response_handler). response_tuple is the 'ok',
218+ or 'error' information, and 'response_handler' can be used to get the
219+ content stream out.
220+ """
221+ self._run_call_hooks()
222+ protocol_version = self.client._medium._protocol_version
223+ if protocol_version is None:
224+ return self._call_determining_protocol_version()
225+ else:
226+ return self._call(protocol_version)
227+
228+ def _is_safe_to_send_twice(self):
229+ """Check if the current method is re-entrant safe."""
230+ if self.body_stream is not None or 'noretry' in debug.debug_flags:
231+ # We can't restart a body stream that has already been consumed.
232+ return False
233+ request_type = _mod_request.request_handlers.get_info(self.method)
234+ if request_type in ('read', 'idem', 'semi'):
235+ return True
236+ # If we have gotten this far, 'stream' cannot be retried, because we
237+ # already consumed the local stream.
238+ if request_type in ('semivfs', 'mutate', 'stream'):
239+ return False
240+ trace.mutter('Unknown request type: %s for method %s'
241+ % (request_type, self.method))
242+ return False
243+
244+ def _run_call_hooks(self):
245+ if not _SmartClient.hooks['call']:
246+ return
247+ params = CallHookParams(self.method, self.args, self.body,
248+ self.readv_body, self.client._medium)
249+ for hook in _SmartClient.hooks['call']:
250+ hook(params)
251+
252+ def _call(self, protocol_version):
253+ """We know the protocol version.
254+
255+ So this just sends the request, and then reads the response. This is
256+ where the code will be to retry requests if the connection is closed.
257+ """
258+ response_handler = self._send(protocol_version)
259+ try:
260+ response_tuple = response_handler.read_response_tuple(
261+ expect_body=self.expect_response_body)
262+ except errors.ConnectionReset, e:
263+ self.client._medium.reset()
264+ if not self._is_safe_to_send_twice():
265+ raise
266+ trace.warning('ConnectionReset reading response for %r, retrying'
267+ % (self.method,))
268+ trace.log_exception_quietly()
269+ encoder, response_handler = self._construct_protocol(
270+ protocol_version)
271+ self._send_no_retry(encoder)
272+ response_tuple = response_handler.read_response_tuple(
273+ expect_body=self.expect_response_body)
274+ return (response_tuple, response_handler)
275+
276+ def _call_determining_protocol_version(self):
277+ """Determine what protocol the remote server supports.
278+
279+ We do this by placing a request in the most recent protocol, and
280+ handling the UnexpectedProtocolVersionMarker from the server.
281+ """
282+ for protocol_version in [3, 2]:
283+ if protocol_version == 2:
284+ # If v3 doesn't work, the remote side is older than 1.6.
285+ self.client._medium._remember_remote_is_before((1, 6))
286+ try:
287+ response_tuple, response_handler = self._call(protocol_version)
288+ except errors.UnexpectedProtocolVersionMarker, err:
289+ # TODO: We could recover from this without disconnecting if
290+ # we recognise the protocol version.
291+ trace.warning(
292+ 'Server does not understand Bazaar network protocol %d,'
293+ ' reconnecting. (Upgrade the server to avoid this.)'
294+ % (protocol_version,))
295+ self.client._medium.disconnect()
296+ continue
297+ except errors.ErrorFromSmartServer:
298+ # If we received an error reply from the server, then it
299+ # must be ok with this protocol version.
300+ self.client._medium._protocol_version = protocol_version
301+ raise
302+ else:
303+ self.client._medium._protocol_version = protocol_version
304+ return response_tuple, response_handler
305+ raise errors.SmartProtocolError(
306+ 'Server is not a Bazaar server: ' + str(err))
307+
308+ def _construct_protocol(self, version):
309+ """Build the encoding stack for a given protocol version."""
310+ request = self.client._medium.get_request()
311+ if version == 3:
312+ request_encoder = protocol.ProtocolThreeRequester(request)
313+ response_handler = message.ConventionalResponseHandler()
314+ response_proto = protocol.ProtocolThreeDecoder(
315+ response_handler, expect_version_marker=True)
316+ response_handler.setProtoAndMediumRequest(response_proto, request)
317+ elif version == 2:
318+ request_encoder = protocol.SmartClientRequestProtocolTwo(request)
319+ response_handler = request_encoder
320+ else:
321+ request_encoder = protocol.SmartClientRequestProtocolOne(request)
322+ response_handler = request_encoder
323+ return request_encoder, response_handler
324+
325+ def _send(self, protocol_version):
326+ """Encode the request, and send it to the server.
327+
328+ This will retry a request if we get a ConnectionReset while sending the
329+ request to the server. (Unless we have a body_stream that we have
330+ already started consuming, since we can't restart body_streams)
331+
332+ :return: response_handler as defined by _construct_protocol
333+ """
334+ encoder, response_handler = self._construct_protocol(protocol_version)
335+ try:
336+ self._send_no_retry(encoder)
337+ except errors.ConnectionReset, e:
338+ # If we fail during the _send_no_retry phase, then we can
339+ # be confident that the server did not get our request, because we
340+ # haven't started waiting for the reply yet. So try the request
341+ # again. We only issue a single retry, because if the connection
342+ # really is down, there is no reason to loop endlessly.
343+
344+ # Connection is dead, so close our end of it.
345+ self.client._medium.reset()
346+ if (('noretry' in debug.debug_flags)
347+ or (self.body_stream is not None
348+ and encoder.body_stream_started)):
349+ # We can't restart a body_stream that has been partially
350+ # consumed, so we don't retry.
351+ # Note: We don't have to worry about
352+ # SmartClientRequestProtocolOne or Two, because they don't
353+ # support client-side body streams.
354+ raise
355+ trace.warning('ConnectionReset calling %r, retrying'
356+ % (self.method,))
357+ trace.log_exception_quietly()
358+ encoder, response_handler = self._construct_protocol(
359+ protocol_version)
360+ self._send_no_retry(encoder)
361+ return response_handler
362+
363+ def _send_no_retry(self, encoder):
364+ """Just encode the request and try to send it."""
365+ encoder.set_headers(self.client._headers)
366+ if self.body is not None:
367+ if self.readv_body is not None:
368+ raise AssertionError(
369+ "body and readv_body are mutually exclusive.")
370+ if self.body_stream is not None:
371+ raise AssertionError(
372+ "body and body_stream are mutually exclusive.")
373+ encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
374+ elif self.readv_body is not None:
375+ if self.body_stream is not None:
376+ raise AssertionError(
377+ "readv_body and body_stream are mutually exclusive.")
378+ encoder.call_with_body_readv_array((self.method, ) + self.args,
379+ self.readv_body)
380+ elif self.body_stream is not None:
381+ encoder.call_with_body_stream((self.method, ) + self.args,
382+ self.body_stream)
383+ else:
384+ encoder.call(self.method, *self.args)
385+
386+
387 class SmartClientHooks(hooks.Hooks):
388
389 def __init__(self):
390
391=== modified file 'bzrlib/smart/medium.py'
392--- bzrlib/smart/medium.py 2010-06-21 08:08:04 +0000
393+++ bzrlib/smart/medium.py 2011-10-10 13:44:32 +0000
394@@ -1,4 +1,4 @@
395-# Copyright (C) 2006-2010 Canonical Ltd
396+# Copyright (C) 2006-2011 Canonical Ltd
397 #
398 # This program is free software; you can redistribute it and/or modify
399 # it under the terms of the GNU General Public License as published by
400@@ -24,6 +24,7 @@
401 bzrlib/transport/smart/__init__.py.
402 """
403
404+import errno
405 import os
406 import sys
407 import urllib
408@@ -710,6 +711,14 @@
409 """
410 return SmartClientStreamMediumRequest(self)
411
412+ def reset(self):
413+ """We have been disconnected, reset current state.
414+
415+ This resets things like _current_request and connected state.
416+ """
417+ self.disconnect()
418+ self._current_request = None
419+
420
421 class SmartSimplePipesClientMedium(SmartClientStreamMedium):
422 """A client medium using simple pipes.
423@@ -724,11 +733,20 @@
424
425 def _accept_bytes(self, bytes):
426 """See SmartClientStreamMedium.accept_bytes."""
427- self._writeable_pipe.write(bytes)
428+ try:
429+ self._writeable_pipe.write(bytes)
430+ except IOError, e:
431+ if e.errno in (errno.EINVAL, errno.EPIPE):
432+ raise errors.ConnectionReset(
433+ "Error trying to write to subprocess:\n%s" % (e,))
434+ raise
435 self._report_activity(len(bytes), 'write')
436
437 def _flush(self):
438 """See SmartClientStreamMedium._flush()."""
439+ # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
440+ # However, testing shows that even when the child process is
441+ # gone, this doesn't error.
442 self._writeable_pipe.flush()
443
444 def _read_bytes(self, count):
445@@ -753,8 +771,8 @@
446
447 class SmartSSHClientMedium(SmartClientStreamMedium):
448 """A client medium using SSH.
449-
450- It delegates IO to a SmartClientSocketMedium or
451+
452+ It delegates IO to a SmartSimplePipesClientMedium or
453 SmartClientAlreadyConnectedSocketMedium (depending on platform).
454 """
455
456@@ -993,5 +1011,3 @@
457 This invokes self._medium._flush to ensure all bytes are transmitted.
458 """
459 self._medium._flush()
460-
461-
462
463=== modified file 'bzrlib/smart/protocol.py'
464--- bzrlib/smart/protocol.py 2010-06-11 05:57:09 +0000
465+++ bzrlib/smart/protocol.py 2011-10-10 13:44:32 +0000
466@@ -1081,9 +1081,6 @@
467 self._real_write_func = write_func
468
469 def _write_func(self, bytes):
470- # TODO: It is probably more appropriate to use sum(map(len, _buf))
471- # for total number of bytes to write, rather than buffer based on
472- # the number of write() calls
473 # TODO: Another possibility would be to turn this into an async model.
474 # Where we let another thread know that we have some bytes if
475 # they want it, but we don't actually block for it
476@@ -1292,6 +1289,7 @@
477 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
478 self._medium_request = medium_request
479 self._headers = {}
480+ self.body_stream_started = None
481
482 def set_headers(self, headers):
483 self._headers = headers.copy()
484@@ -1357,6 +1355,7 @@
485 if path is not None:
486 mutter(' (to %s)', path)
487 self._request_start_time = osutils.timer_func()
488+ self.body_stream_started = False
489 self._write_protocol_version()
490 self._write_headers(self._headers)
491 self._write_structure(args)
492@@ -1364,6 +1363,9 @@
493 # have finished sending the stream. We would notice at the end
494 # anyway, but if the medium can deliver it early then it's good
495 # to short-circuit the whole request...
496+ # Provoke any ConnectionReset failures before we start the body stream.
497+ self.flush()
498+ self.body_stream_started = True
499 for exc_info, part in _iter_with_errors(stream):
500 if exc_info is not None:
501 # Iterating the stream failed. Cleanly abort the request.
502
503=== modified file 'bzrlib/smart/request.py'
504--- bzrlib/smart/request.py 2010-05-13 16:17:54 +0000
505+++ bzrlib/smart/request.py 2011-10-10 13:44:32 +0000
506@@ -1,4 +1,4 @@
507-# Copyright (C) 2006-2010 Canonical Ltd
508+# Copyright (C) 2006-2011 Canonical Ltd
509 #
510 # This program is free software; you can redistribute it and/or modify
511 # it under the terms of the GNU General Public License as published by
512@@ -486,154 +486,199 @@
513 return SuccessfulSmartServerResponse((answer,))
514
515
516+# In the 'info' attribute, we store whether this request is 'safe' to retry if
517+# we get a disconnect while reading the response. It can have the values:
518+# read This is purely a read request, so retrying it is perfectly ok.
519+# idem An idempotent write request. Something like 'put' where if you put
520+# the same bytes twice you end up with the same final bytes.
521+# semi This is a request that isn't strictly idempotent, but doesn't
522+# result in corruption if it is retried. This is for things like
523+# 'lock' and 'unlock'. If you call lock, it updates the disk
524+# structure. If you fail to read the response, you won't be able to
525+# use the lock, because you don't have the lock token. Calling lock
526+# again will fail, because the lock is already taken. However, we
527+# can't tell if the server received our request or not. If it didn't,
528+# then retrying the request is fine, as it will actually do what we
529+# want. If it did, we will interrupt the current operation, but we
530+# are no worse off than interrupting the current operation because of
531+# a ConnectionReset.
532+# semivfs Similar to semi, but specific to a Virtual FileSystem request.
533+# stream This is a request that takes a stream that cannot be restarted if
534+# consumed. This request is 'safe' in that if we determine the
535+# connection is closed before we consume the stream, we can try
536+# again.
537+# mutate State is updated in a way that replaying that request results in a
538+# different state. For example 'append' writes more bytes to a given
539+# file. If append succeeds, it moves the file pointer.
540 request_handlers = registry.Registry()
541 request_handlers.register_lazy(
542- 'append', 'bzrlib.smart.vfs', 'AppendRequest')
543+ 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
544 request_handlers.register_lazy(
545 'Branch.get_config_file', 'bzrlib.smart.branch',
546- 'SmartServerBranchGetConfigFile')
547+ 'SmartServerBranchGetConfigFile', info='read')
548 request_handlers.register_lazy(
549- 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')
550+ 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
551+ info='read')
552 request_handlers.register_lazy(
553 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
554- 'SmartServerBranchGetTagsBytes')
555+ 'SmartServerBranchGetTagsBytes', info='read')
556 request_handlers.register_lazy(
557 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
558- 'SmartServerBranchSetTagsBytes')
559-request_handlers.register_lazy(
560- 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')
561-request_handlers.register_lazy(
562- 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')
563-request_handlers.register_lazy(
564- 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')
565-request_handlers.register_lazy( 'Branch.revision_history',
566- 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')
567-request_handlers.register_lazy( 'Branch.set_config_option',
568- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')
569-request_handlers.register_lazy( 'Branch.set_config_option_dict',
570- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict')
571-request_handlers.register_lazy( 'Branch.set_last_revision',
572- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')
573+ 'SmartServerBranchSetTagsBytes', info='idem')
574+request_handlers.register_lazy(
575+ 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
576+ 'SmartServerBranchRequestGetStackedOnURL', info='read')
577+request_handlers.register_lazy(
578+ 'Branch.last_revision_info', 'bzrlib.smart.branch',
579+ 'SmartServerBranchRequestLastRevisionInfo', info='read')
580+request_handlers.register_lazy(
581+ 'Branch.lock_write', 'bzrlib.smart.branch',
582+ 'SmartServerBranchRequestLockWrite', info='semi')
583+request_handlers.register_lazy(
584+ 'Branch.revision_history', 'bzrlib.smart.branch',
585+ 'SmartServerRequestRevisionHistory', info='read')
586+request_handlers.register_lazy(
587+ 'Branch.set_config_option', 'bzrlib.smart.branch',
588+ 'SmartServerBranchRequestSetConfigOption', info='idem')
589+request_handlers.register_lazy(
590+ 'Branch.set_config_option_dict', 'bzrlib.smart.branch',
591+ 'SmartServerBranchRequestSetConfigOptionDict', info='idem')
592+request_handlers.register_lazy(
593+ 'Branch.set_last_revision', 'bzrlib.smart.branch',
594+ 'SmartServerBranchRequestSetLastRevision', info='idem')
595 request_handlers.register_lazy(
596 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
597- 'SmartServerBranchRequestSetLastRevisionInfo')
598+ 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
599 request_handlers.register_lazy(
600 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
601- 'SmartServerBranchRequestSetLastRevisionEx')
602+ 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
603 request_handlers.register_lazy(
604 'Branch.set_parent_location', 'bzrlib.smart.branch',
605- 'SmartServerBranchRequestSetParentLocation')
606+ 'SmartServerBranchRequestSetParentLocation', info='idem')
607 request_handlers.register_lazy(
608- 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')
609+ 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock',
610+ info='semi')
611 request_handlers.register_lazy(
612 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
613- 'SmartServerBzrDirRequestCloningMetaDir')
614+ 'SmartServerBzrDirRequestCloningMetaDir', info='read')
615 request_handlers.register_lazy(
616 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
617- 'SmartServerRequestCreateBranch')
618+ 'SmartServerRequestCreateBranch', info='semi')
619 request_handlers.register_lazy(
620 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
621- 'SmartServerRequestCreateRepository')
622+ 'SmartServerRequestCreateRepository', info='semi')
623 request_handlers.register_lazy(
624 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
625- 'SmartServerRequestFindRepositoryV1')
626+ 'SmartServerRequestFindRepositoryV1', info='read')
627 request_handlers.register_lazy(
628 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
629- 'SmartServerRequestFindRepositoryV2')
630+ 'SmartServerRequestFindRepositoryV2', info='read')
631 request_handlers.register_lazy(
632 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
633- 'SmartServerRequestFindRepositoryV3')
634+ 'SmartServerRequestFindRepositoryV3', info='read')
635 request_handlers.register_lazy(
636 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
637- 'SmartServerBzrDirRequestConfigFile')
638+ 'SmartServerBzrDirRequestConfigFile', info='read')
639 request_handlers.register_lazy(
640 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
641- 'SmartServerRequestInitializeBzrDir')
642+ 'SmartServerRequestInitializeBzrDir', info='semi')
643 request_handlers.register_lazy(
644 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
645- 'SmartServerRequestBzrDirInitializeEx')
646-request_handlers.register_lazy(
647- 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')
648-request_handlers.register_lazy(
649- 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')
650+ 'SmartServerRequestBzrDirInitializeEx', info='semi')
651+request_handlers.register_lazy(
652+ 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
653+ info='read')
654+request_handlers.register_lazy(
655+ 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
656+ 'SmartServerRequestOpenBzrDir_2_1', info='read')
657 request_handlers.register_lazy(
658 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
659- 'SmartServerRequestOpenBranch')
660+ 'SmartServerRequestOpenBranch', info='read')
661 request_handlers.register_lazy(
662 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
663- 'SmartServerRequestOpenBranchV2')
664+ 'SmartServerRequestOpenBranchV2', info='read')
665 request_handlers.register_lazy(
666 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
667- 'SmartServerRequestOpenBranchV3')
668-request_handlers.register_lazy(
669- 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')
670-request_handlers.register_lazy(
671- 'get', 'bzrlib.smart.vfs', 'GetRequest')
672-request_handlers.register_lazy(
673- 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')
674-request_handlers.register_lazy(
675- 'has', 'bzrlib.smart.vfs', 'HasRequest')
676-request_handlers.register_lazy(
677- 'hello', 'bzrlib.smart.request', 'HelloRequest')
678-request_handlers.register_lazy(
679- 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')
680-request_handlers.register_lazy(
681- 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')
682-request_handlers.register_lazy(
683- 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')
684-request_handlers.register_lazy(
685- 'move', 'bzrlib.smart.vfs', 'MoveRequest')
686-request_handlers.register_lazy(
687- 'put', 'bzrlib.smart.vfs', 'PutRequest')
688-request_handlers.register_lazy(
689- 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')
690-request_handlers.register_lazy(
691- 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')
692-request_handlers.register_lazy(
693- 'rename', 'bzrlib.smart.vfs', 'RenameRequest')
694+ 'SmartServerRequestOpenBranchV3', info='read')
695+request_handlers.register_lazy(
696+ 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
697+request_handlers.register_lazy(
698+ 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
699+request_handlers.register_lazy(
700+ 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
701+request_handlers.register_lazy(
702+ 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
703+request_handlers.register_lazy(
704+ 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
705+request_handlers.register_lazy(
706+ 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
707+ info='read')
708+request_handlers.register_lazy(
709+ 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
710+request_handlers.register_lazy(
711+ 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
712+request_handlers.register_lazy(
713+ 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
714+request_handlers.register_lazy(
715+ 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
716+request_handlers.register_lazy(
717+ 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
718+request_handlers.register_lazy(
719+ 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
720+request_handlers.register_lazy(
721+ 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
722 request_handlers.register_lazy(
723 'PackRepository.autopack', 'bzrlib.smart.packrepository',
724- 'SmartServerPackRepositoryAutopack')
725-request_handlers.register_lazy('Repository.gather_stats',
726- 'bzrlib.smart.repository',
727- 'SmartServerRepositoryGatherStats')
728-request_handlers.register_lazy('Repository.get_parent_map',
729- 'bzrlib.smart.repository',
730- 'SmartServerRepositoryGetParentMap')
731-request_handlers.register_lazy(
732- 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')
733-request_handlers.register_lazy(
734- 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
735-request_handlers.register_lazy(
736- 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
737-request_handlers.register_lazy(
738- 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')
739-request_handlers.register_lazy(
740- 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')
741-request_handlers.register_lazy(
742- 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
743-request_handlers.register_lazy(
744- 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')
745+ 'SmartServerPackRepositoryAutopack', info='idem')
746+request_handlers.register_lazy(
747+ 'Repository.gather_stats', 'bzrlib.smart.repository',
748+ 'SmartServerRepositoryGatherStats', info='read')
749+request_handlers.register_lazy(
750+ 'Repository.get_parent_map', 'bzrlib.smart.repository',
751+ 'SmartServerRepositoryGetParentMap', info='read')
752+request_handlers.register_lazy(
753+ 'Repository.get_revision_graph', 'bzrlib.smart.repository',
754+ 'SmartServerRepositoryGetRevisionGraph', info='read')
755+request_handlers.register_lazy(
756+ 'Repository.has_revision', 'bzrlib.smart.repository',
757+ 'SmartServerRequestHasRevision', info='read')
758+request_handlers.register_lazy(
759+ 'Repository.insert_stream', 'bzrlib.smart.repository',
760+ 'SmartServerRepositoryInsertStream', info='stream')
761+request_handlers.register_lazy(
762+ 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
763+ 'SmartServerRepositoryInsertStream_1_19', info='stream')
764+request_handlers.register_lazy(
765+ 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
766+ 'SmartServerRepositoryInsertStreamLocked', info='stream')
767+request_handlers.register_lazy(
768+ 'Repository.is_shared', 'bzrlib.smart.repository',
769+ 'SmartServerRepositoryIsShared', info='read')
770+request_handlers.register_lazy(
771+ 'Repository.lock_write', 'bzrlib.smart.repository',
772+ 'SmartServerRepositoryLockWrite', info='semi')
773 request_handlers.register_lazy(
774 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
775- 'SmartServerRepositorySetMakeWorkingTrees')
776+ 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
777 request_handlers.register_lazy(
778- 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')
779+ 'Repository.unlock', 'bzrlib.smart.repository',
780+ 'SmartServerRepositoryUnlock', info='semi')
781 request_handlers.register_lazy(
782 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
783- 'SmartServerRepositoryGetRevIdForRevno')
784+ 'SmartServerRepositoryGetRevIdForRevno', info='read')
785 request_handlers.register_lazy(
786 'Repository.get_stream', 'bzrlib.smart.repository',
787- 'SmartServerRepositoryGetStream')
788+ 'SmartServerRepositoryGetStream', info='read')
789 request_handlers.register_lazy(
790 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
791- 'SmartServerRepositoryGetStream_1_19')
792+ 'SmartServerRepositoryGetStream_1_19', info='read')
793 request_handlers.register_lazy(
794 'Repository.tarball', 'bzrlib.smart.repository',
795- 'SmartServerRepositoryTarball')
796-request_handlers.register_lazy(
797- 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')
798-request_handlers.register_lazy(
799- 'stat', 'bzrlib.smart.vfs', 'StatRequest')
800-request_handlers.register_lazy(
801- 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')
802+ 'SmartServerRepositoryTarball', info='read')
803+request_handlers.register_lazy(
804+ 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
805+request_handlers.register_lazy(
806+ 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
807+request_handlers.register_lazy(
808+ 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly',
809+ info='read')
810
811=== modified file 'bzrlib/tests/test_smart.py'
812--- bzrlib/tests/test_smart.py 2010-05-13 16:17:54 +0000
813+++ bzrlib/tests/test_smart.py 2011-10-10 13:44:32 +0000
814@@ -1849,8 +1849,11 @@
815 """All registered request_handlers can be found."""
816 # If there's a typo in a register_lazy call, this loop will fail with
817 # an AttributeError.
818- for key, item in smart_req.request_handlers.iteritems():
819- pass
820+ for key in smart_req.request_handlers.keys():
821+ try:
822+ item = smart_req.request_handlers.get(key)
823+ except AttributeError, e:
824+ raise AttributeError('failed to get %s: %s' % (key, e))
825
826 def assertHandlerEqual(self, verb, handler):
827 self.assertEqual(smart_req.request_handlers.get(verb), handler)
828
829=== modified file 'bzrlib/tests/test_smart_request.py'
830--- bzrlib/tests/test_smart_request.py 2010-06-20 11:18:38 +0000
831+++ bzrlib/tests/test_smart_request.py 2011-10-10 13:44:32 +0000
832@@ -111,6 +111,16 @@
833 self.assertEqual(
834 [[transport]] * 3, handler._command.jail_transports_log)
835
836+ def test_all_registered_requests_are_safety_qualified(self):
837+ unclassified_requests = []
838+ allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
839+ for key in request.request_handlers.keys():
840+ info = request.request_handlers.get_info(key)
841+ if info is None or info not in allowed_info:
842+ unclassified_requests.append(key)
843+ if unclassified_requests:
844+ self.fail('These requests were not categorized as safe/unsafe'
845+ ' to retry: %s' % (unclassified_requests,))
846
847
848 class TestSmartRequestHandlerErrorTranslation(TestCase):
849
850=== modified file 'bzrlib/tests/test_smart_transport.py'
851--- bzrlib/tests/test_smart_transport.py 2010-06-25 09:56:07 +0000
852+++ bzrlib/tests/test_smart_transport.py 2011-10-10 13:44:32 +0000
853@@ -1,4 +1,4 @@
854-# Copyright (C) 2006-2010 Canonical Ltd
855+# Copyright (C) 2006-2011 Canonical Ltd
856 #
857 # This program is free software; you can redistribute it and/or modify
858 # it under the terms of the GNU General Public License as published by
859@@ -18,13 +18,17 @@
860
861 # all of this deals with byte strings so this is safe
862 from cStringIO import StringIO
863+import errno
864 import os
865 import socket
866+import subprocess
867+import sys
868 import threading
869
870 import bzrlib
871 from bzrlib import (
872 bzrdir,
873+ debug,
874 errors,
875 osutils,
876 tests,
877@@ -50,6 +54,29 @@
878 )
879
880
881+def create_file_pipes():
882+ r, w = os.pipe()
883+ # These must be opened without buffering, or we get undefined results
884+ rf = os.fdopen(r, 'rb', 0)
885+ wf = os.fdopen(w, 'wb', 0)
886+ return rf, wf
887+
888+
889+def portable_socket_pair():
890+ """Return a pair of TCP sockets connected to each other.
891+
892+ Unlike socket.socketpair, this should work on Windows.
893+ """
894+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
895+ listen_sock.bind(('127.0.0.1', 0))
896+ listen_sock.listen(1)
897+ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
898+ client_sock.connect(listen_sock.getsockname())
899+ server_sock, addr = listen_sock.accept()
900+ listen_sock.close()
901+ return server_sock, client_sock
902+
903+
904 class StringIOSSHVendor(object):
905 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
906
907@@ -64,6 +91,27 @@
908 return StringIOSSHConnection(self)
909
910
911+class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
912+ """The first connection will be considered closed.
913+
914+ The second connection will succeed normally.
915+ """
916+
917+ def __init__(self, read_from, write_to, fail_at_write=True):
918+ super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
919+ write_to)
920+ self.fail_at_write = fail_at_write
921+ self._first = True
922+
923+ def connect_ssh(self, username, password, host, port, command):
924+ self.calls.append(('connect_ssh', username, password, host, port,
925+ command))
926+ if self._first:
927+ self._first = False
928+ return ClosedSSHConnection(self)
929+ return StringIOSSHConnection(self)
930+
931+
932 class StringIOSSHConnection(ssh.SSHConnection):
933 """A SSH connection that uses StringIO to buffer writes and answer reads."""
934
935@@ -79,6 +127,29 @@
936 return 'pipes', (self.vendor.read_from, self.vendor.write_to)
937
938
939+class ClosedSSHConnection(ssh.SSHConnection):
940+ """An SSH connection that just has closed channels."""
941+
942+ def __init__(self, vendor):
943+ self.vendor = vendor
944+
945+ def close(self):
946+ self.vendor.calls.append(('close', ))
947+
948+ def get_sock_or_pipes(self):
949+ # We create matching pipes, and then close the ssh side
950+ bzr_read, ssh_write = create_file_pipes()
951+ # We always fail when bzr goes to read
952+ ssh_write.close()
953+ if self.vendor.fail_at_write:
954+ # If set, we'll also fail when bzr goes to write
955+ ssh_read, bzr_write = create_file_pipes()
956+ ssh_read.close()
957+ else:
958+ bzr_write = self.vendor.write_to
959+ return 'pipes', (bzr_read, bzr_write)
960+
961+
962 class _InvalidHostnameFeature(tests.Feature):
963 """Does 'non_existent.invalid' fail to resolve?
964
965@@ -174,6 +245,91 @@
966 client_medium._accept_bytes('abc')
967 self.assertEqual('abc', output.getvalue())
968
969+ def test_simple_pipes__accept_bytes_subprocess_closed(self):
970+ # It is unfortunate that we have to use Popen for this. However,
971+ # os.pipe() does not behave the same as subprocess.Popen().
972+ # On Windows, if you use os.pipe() and close the write side,
973+ # read.read() hangs. On Linux, read.read() returns the empty string.
974+ p = subprocess.Popen([sys.executable, '-c',
975+ 'import sys\n'
976+ 'sys.stdout.write(sys.stdin.read(4))\n'
977+ 'sys.stdout.close()\n'],
978+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
979+ client_medium = medium.SmartSimplePipesClientMedium(
980+ p.stdout, p.stdin, 'base')
981+ client_medium._accept_bytes('abc\n')
982+ self.assertEqual('abc', client_medium._read_bytes(3))
983+ p.wait()
984+ # While writing to the underlying pipe,
985+ # Windows py2.6.6 we get IOError(EINVAL)
986+ # Lucid py2.6.5, we get IOError(EPIPE)
987+ # In both cases, it should be wrapped to ConnectionReset
988+ self.assertRaises(errors.ConnectionReset,
989+ client_medium._accept_bytes, 'more')
990+
991+ def test_simple_pipes__accept_bytes_pipe_closed(self):
992+ child_read, client_write = create_file_pipes()
993+ client_medium = medium.SmartSimplePipesClientMedium(
994+ None, client_write, 'base')
995+ client_medium._accept_bytes('abc\n')
996+ self.assertEqual('abc\n', child_read.read(4))
997+ # While writing to the underlying pipe,
998+ # Windows py2.6.6 we get IOError(EINVAL)
999+ # Lucid py2.6.5, we get IOError(EPIPE)
1000+ # In both cases, it should be wrapped to ConnectionReset
1001+ child_read.close()
1002+ self.assertRaises(errors.ConnectionReset,
1003+ client_medium._accept_bytes, 'more')
1004+
1005+ def test_simple_pipes__flush_pipe_closed(self):
1006+ child_read, client_write = create_file_pipes()
1007+ client_medium = medium.SmartSimplePipesClientMedium(
1008+ None, client_write, 'base')
1009+ client_medium._accept_bytes('abc\n')
1010+ child_read.close()
1011+ # Even though the pipe is closed, flush on the write side seems to be a
1012+ # no-op, rather than a failure.
1013+ client_medium._flush()
1014+
1015+ def test_simple_pipes__flush_subprocess_closed(self):
1016+ p = subprocess.Popen([sys.executable, '-c',
1017+ 'import sys\n'
1018+ 'sys.stdout.write(sys.stdin.read(4))\n'
1019+ 'sys.stdout.close()\n'],
1020+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1021+ client_medium = medium.SmartSimplePipesClientMedium(
1022+ p.stdout, p.stdin, 'base')
1023+ client_medium._accept_bytes('abc\n')
1024+ p.wait()
1025+ # Even though the child process is dead, flush seems to be a no-op.
1026+ client_medium._flush()
1027+
1028+ def test_simple_pipes__read_bytes_pipe_closed(self):
1029+ child_read, client_write = create_file_pipes()
1030+ client_medium = medium.SmartSimplePipesClientMedium(
1031+ child_read, client_write, 'base')
1032+ client_medium._accept_bytes('abc\n')
1033+ client_write.close()
1034+ self.assertEqual('abc\n', client_medium._read_bytes(4))
1035+ self.assertEqual('', client_medium._read_bytes(4))
1036+
1037+ def test_simple_pipes__read_bytes_subprocess_closed(self):
1038+ p = subprocess.Popen([sys.executable, '-c',
1039+ 'import sys\n'
1040+ 'if sys.platform == "win32":\n'
1041+ ' import msvcrt, os\n'
1042+ ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
1043+ ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
1044+ 'sys.stdout.write(sys.stdin.read(4))\n'
1045+ 'sys.stdout.close()\n'],
1046+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1047+ client_medium = medium.SmartSimplePipesClientMedium(
1048+ p.stdout, p.stdin, 'base')
1049+ client_medium._accept_bytes('abc\n')
1050+ p.wait()
1051+ self.assertEqual('abc\n', client_medium._read_bytes(4))
1052+ self.assertEqual('', client_medium._read_bytes(4))
1053+
1054 def test_simple_pipes_client_disconnect_does_nothing(self):
1055 # calling disconnect does nothing.
1056 input = StringIO()
1057@@ -561,6 +717,28 @@
1058 request.finished_reading()
1059 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
1060
1061+ def test_reset(self):
1062+ server_sock, client_sock = portable_socket_pair()
1063+ # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
1064+ # bzr where it exists.
1065+ client_medium = medium.SmartTCPClientMedium(None, None, None)
1066+ client_medium._socket = client_sock
1067+ client_medium._connected = True
1068+ req = client_medium.get_request()
1069+ self.assertRaises(errors.TooManyConcurrentRequests,
1070+ client_medium.get_request)
1071+ client_medium.reset()
1072+ # The stream should be reset, marked as disconnected, though ready for
1073+ # us to make a new request
1074+ self.assertFalse(client_medium._connected)
1075+ self.assertIs(None, client_medium._socket)
1076+ try:
1077+ self.assertEqual('', client_sock.recv(1))
1078+ except socket.error, e:
1079+ if e.errno not in (errno.EBADF,):
1080+ raise
1081+ req = client_medium.get_request()
1082+
1083
1084 class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):
1085
1086@@ -614,20 +792,6 @@
1087 super(TestSmartServerStreamMedium, self).setUp()
1088 self._captureVar('BZR_NO_SMART_VFS', None)
1089
1090- def portable_socket_pair(self):
1091- """Return a pair of TCP sockets connected to each other.
1092-
1093- Unlike socket.socketpair, this should work on Windows.
1094- """
1095- listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1096- listen_sock.bind(('127.0.0.1', 0))
1097- listen_sock.listen(1)
1098- client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1099- client_sock.connect(listen_sock.getsockname())
1100- server_sock, addr = listen_sock.accept()
1101- listen_sock.close()
1102- return server_sock, client_sock
1103-
1104 def test_smart_query_version(self):
1105 """Feed a canned query version to a server"""
1106 # wire-to-wire, using the whole stack
1107@@ -692,7 +856,7 @@
1108
1109 def test_socket_stream_with_bulk_data(self):
1110 sample_request_bytes = 'command\n9\nbulk datadone\n'
1111- server_sock, client_sock = self.portable_socket_pair()
1112+ server_sock, client_sock = portable_socket_pair()
1113 server = medium.SmartServerSocketStreamMedium(
1114 server_sock, None)
1115 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1116@@ -711,7 +875,7 @@
1117 self.assertTrue(server.finished)
1118
1119 def test_socket_stream_shutdown_detection(self):
1120- server_sock, client_sock = self.portable_socket_pair()
1121+ server_sock, client_sock = portable_socket_pair()
1122 client_sock.close()
1123 server = medium.SmartServerSocketStreamMedium(
1124 server_sock, None)
1125@@ -731,7 +895,7 @@
1126 rest_of_request_bytes = 'lo\n'
1127 expected_response = (
1128 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
1129- server_sock, client_sock = self.portable_socket_pair()
1130+ server_sock, client_sock = portable_socket_pair()
1131 server = medium.SmartServerSocketStreamMedium(
1132 server_sock, None)
1133 client_sock.sendall(incomplete_request_bytes)
1134@@ -807,7 +971,7 @@
1135 # _serve_one_request should still process both of them as if they had
1136 # been received separately.
1137 sample_request_bytes = 'command\n'
1138- server_sock, client_sock = self.portable_socket_pair()
1139+ server_sock, client_sock = portable_socket_pair()
1140 server = medium.SmartServerSocketStreamMedium(
1141 server_sock, None)
1142 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1143@@ -844,7 +1008,7 @@
1144 self.assertTrue(server.finished)
1145
1146 def test_socket_stream_error_handling(self):
1147- server_sock, client_sock = self.portable_socket_pair()
1148+ server_sock, client_sock = portable_socket_pair()
1149 server = medium.SmartServerSocketStreamMedium(
1150 server_sock, None)
1151 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
1152@@ -865,7 +1029,7 @@
1153 self.assertEqual('', from_server.getvalue())
1154
1155 def test_socket_stream_keyboard_interrupt_handling(self):
1156- server_sock, client_sock = self.portable_socket_pair()
1157+ server_sock, client_sock = portable_socket_pair()
1158 server = medium.SmartServerSocketStreamMedium(
1159 server_sock, None)
1160 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
1161@@ -882,7 +1046,7 @@
1162 return server._build_protocol()
1163
1164 def build_protocol_socket(self, bytes):
1165- server_sock, client_sock = self.portable_socket_pair()
1166+ server_sock, client_sock = portable_socket_pair()
1167 server = medium.SmartServerSocketStreamMedium(
1168 server_sock, None)
1169 client_sock.sendall(bytes)
1170@@ -2785,6 +2949,33 @@
1171 'e', # end
1172 output.getvalue())
1173
1174+ def test_records_start_of_body_stream(self):
1175+ requester, output = self.make_client_encoder_and_output()
1176+ requester.set_headers({})
1177+ in_stream = [False]
1178+ def stream_checker():
1179+ self.assertTrue(requester.body_stream_started)
1180+ in_stream[0] = True
1181+ yield 'content'
1182+ flush_called = []
1183+ orig_flush = requester.flush
1184+ def tracked_flush():
1185+ flush_called.append(in_stream[0])
1186+ if in_stream[0]:
1187+ self.assertTrue(requester.body_stream_started)
1188+ else:
1189+ self.assertFalse(requester.body_stream_started)
1190+ return orig_flush()
1191+ requester.flush = tracked_flush
1192+ requester.call_with_body_stream(('one arg',), stream_checker())
1193+ self.assertEqual(
1194+ 'bzr message 3 (bzr 1.6)\n' # protocol version
1195+ '\x00\x00\x00\x02de' # headers
1196+ 's\x00\x00\x00\x0bl7:one arge' # args
1197+ 'b\x00\x00\x00\x07content' # body
1198+ 'e', output.getvalue())
1199+ self.assertEqual([False, True, True], flush_called)
1200+
1201
1202 class StubMediumRequest(object):
1203 """A stub medium request that tracks the number of times accept_bytes is
1204@@ -3209,6 +3400,193 @@
1205 # encoder.
1206
1207
1208+class Test_SmartClientRequest(tests.TestCase):
1209+
1210+ def make_client_with_failing_medium(self, fail_at_write=True, response=''):
1211+ response_io = StringIO(response)
1212+ output = StringIO()
1213+ vendor = FirstRejectedStringIOSSHVendor(response_io, output,
1214+ fail_at_write=fail_at_write)
1215+ ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
1216+ client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
1217+ smart_client = client._SmartClient(client_medium, headers={})
1218+ return output, vendor, smart_client
1219+
1220+ def make_response(self, args, body=None, body_stream=None):
1221+ response_io = StringIO()
1222+ response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
1223+ body_stream=body_stream)
1224+ responder = protocol.ProtocolThreeResponder(response_io.write)
1225+ responder.send_response(response)
1226+ return response_io.getvalue()
1227+
1228+ def test__call_doesnt_retry_append(self):
1229+ response = self.make_response(('appended', '8'))
1230+ output, vendor, smart_client = self.make_client_with_failing_medium(
1231+ fail_at_write=False, response=response)
1232+ smart_request = client._SmartClientRequest(smart_client, 'append',
1233+ ('foo', ''), body='content\n')
1234+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1235+
1236+ def test__call_retries_get_bytes(self):
1237+ response = self.make_response(('ok',), 'content\n')
1238+ output, vendor, smart_client = self.make_client_with_failing_medium(
1239+ fail_at_write=False, response=response)
1240+ smart_request = client._SmartClientRequest(smart_client, 'get',
1241+ ('foo',))
1242+ response, response_handler = smart_request._call(3)
1243+ self.assertEqual(('ok',), response)
1244+ self.assertEqual('content\n', response_handler.read_body_bytes())
1245+
1246+ def test__call_noretry_get_bytes(self):
1247+ debug.debug_flags.add('noretry')
1248+ response = self.make_response(('ok',), 'content\n')
1249+ output, vendor, smart_client = self.make_client_with_failing_medium(
1250+ fail_at_write=False, response=response)
1251+ smart_request = client._SmartClientRequest(smart_client, 'get',
1252+ ('foo',))
1253+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1254+
1255+ def test__send_no_retry_pipes(self):
1256+ client_read, server_write = create_file_pipes()
1257+ server_read, client_write = create_file_pipes()
1258+ client_medium = medium.SmartSimplePipesClientMedium(client_read,
1259+ client_write, base='/')
1260+ smart_client = client._SmartClient(client_medium)
1261+ smart_request = client._SmartClientRequest(smart_client,
1262+ 'hello', ())
1263+ # Close the server side
1264+ server_read.close()
1265+ encoder, response_handler = smart_request._construct_protocol(3)
1266+ self.assertRaises(errors.ConnectionReset,
1267+ smart_request._send_no_retry, encoder)
1268+
1269+ def test__send_read_response_sockets(self):
1270+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1271+ listen_sock.bind(('127.0.0.1', 0))
1272+ listen_sock.listen(1)
1273+ host, port = listen_sock.getsockname()
1274+ client_medium = medium.SmartTCPClientMedium(host, port, '/')
1275+ client_medium._ensure_connection()
1276+ smart_client = client._SmartClient(client_medium)
1277+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1278+ # Accept the connection, but don't actually talk to the client.
1279+ server_sock, _ = listen_sock.accept()
1280+ server_sock.close()
1281+ # Sockets buffer and don't really notice that the server has closed the
1282+ # connection until we try to read again.
1283+ handler = smart_request._send(3)
1284+ self.assertRaises(errors.ConnectionReset,
1285+ handler.read_response_tuple, expect_body=False)
1286+
1287+ def test__send_retries_on_write(self):
1288+ output, vendor, smart_client = self.make_client_with_failing_medium()
1289+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1290+ handler = smart_request._send(3)
1291+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1292+ '\x00\x00\x00\x02de' # empty headers
1293+ 's\x00\x00\x00\tl5:helloee',
1294+ output.getvalue())
1295+ self.assertEqual(
1296+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1297+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1298+ ('close',),
1299+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1300+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1301+ ],
1302+ vendor.calls)
1303+
1304+ def test__send_doesnt_retry_read_failure(self):
1305+ output, vendor, smart_client = self.make_client_with_failing_medium(
1306+ fail_at_write=False)
1307+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1308+ handler = smart_request._send(3)
1309+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1310+ '\x00\x00\x00\x02de' # empty headers
1311+ 's\x00\x00\x00\tl5:helloee',
1312+ output.getvalue())
1313+ self.assertEqual(
1314+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1315+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1316+ ],
1317+ vendor.calls)
1318+ self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
1319+
1320+ def test__send_request_retries_body_stream_if_not_started(self):
1321+ output, vendor, smart_client = self.make_client_with_failing_medium()
1322+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1323+ body_stream=['a', 'b'])
1324+ response_handler = smart_request._send(3)
1325+ # We connect, get disconnected, and notice before consuming the stream,
1326+ # so we try again one time and succeed.
1327+ self.assertEqual(
1328+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1329+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1330+ ('close',),
1331+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1332+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1333+ ],
1334+ vendor.calls)
1335+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1336+ '\x00\x00\x00\x02de' # empty headers
1337+ 's\x00\x00\x00\tl5:helloe'
1338+ 'b\x00\x00\x00\x01a'
1339+ 'b\x00\x00\x00\x01b'
1340+ 'e',
1341+ output.getvalue())
1342+
1343+ def test__send_request_stops_if_body_started(self):
1344+ # We intentionally use the python StringIO so that we can subclass it.
1345+ from StringIO import StringIO
1346+ response = StringIO()
1347+
1348+ class FailAfterFirstWrite(StringIO):
1349+ """Allow one 'write' call to pass, fail the rest"""
1350+ def __init__(self):
1351+ StringIO.__init__(self)
1352+ self._first = True
1353+
1354+ def write(self, s):
1355+ if self._first:
1356+ self._first = False
1357+ return StringIO.write(self, s)
1358+ raise IOError(errno.EINVAL, 'invalid file handle')
1359+ output = FailAfterFirstWrite()
1360+
1361+ vendor = FirstRejectedStringIOSSHVendor(response, output,
1362+ fail_at_write=False)
1363+ ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
1364+ client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
1365+ smart_client = client._SmartClient(client_medium, headers={})
1366+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1367+ body_stream=['a', 'b'])
1368+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1369+ # We connect, and manage to get to the point that we start consuming
1370+ # the body stream. The next write fails, so we just stop.
1371+ self.assertEqual(
1372+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1373+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1374+ ('close',),
1375+ ],
1376+ vendor.calls)
1377+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1378+ '\x00\x00\x00\x02de' # empty headers
1379+ 's\x00\x00\x00\tl5:helloe',
1380+ output.getvalue())
1381+
1382+ def test__send_disabled_retry(self):
1383+ debug.debug_flags.add('noretry')
1384+ output, vendor, smart_client = self.make_client_with_failing_medium()
1385+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1386+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1387+ self.assertEqual(
1388+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1389+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1390+ ('close',),
1391+ ],
1392+ vendor.calls)
1393+
1394+
1395 class LengthPrefixedBodyDecoder(tests.TestCase):
1396
1397 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches