Merge lp:~free.ekanayaka/txamqp/closing-logic-cleanup into lp:txamqp

Proposed by Free Ekanayaka
Status: Merged
Merged at revision: 76
Proposed branch: lp:~free.ekanayaka/txamqp/closing-logic-cleanup
Merge into: lp:txamqp
Diff against target: 304 lines (+183/-7)
6 files modified
src/txamqp/client.py (+6/-1)
src/txamqp/protocol.py (+50/-5)
src/txamqp/test/test_broker.py (+9/-0)
src/txamqp/test/test_endpoint.py (+1/-1)
src/txamqp/test/test_protocol.py (+113/-0)
src/txamqp/testing.py (+4/-0)
To merge this branch: bzr merge lp:~free.ekanayaka/txamqp/closing-logic-cleanup
Reviewer Review Type Date Requested Status
Adam Collard (community) Approve
Alberto Donato (community) Approve
txAMQP Team Pending
Review via email: mp+296665@code.launchpad.net

Description of the change

This branch implements the following changes:

- TwistedDelegate.connection_close now sends a 'close-ok' method frame before calling AMQClient.doClose. There's no response for this type of method, but it will be confident that it will be written to the transport before closing it, since AMQClient.doClose eventually calls transport.loseConnection(), which does flush buffers before shutting down the socket.

- AMQClient now has a new "disconnected" instance attribute, which is a TwistedEvent that consumer code can wait on in order to know when the transport has actually shut down. This is useful for integrating txamqp with things like twisted.application.internet.ClientService (that would start to connect again as soon as the connection is lost).

- AMQClient.closed is now properly initialized in the __init__ (it was set dynamically before), so it's always safe to inspect it.

- The AMQClient.sendHB LoopingCall now gets the same clock as AMQClient, instead of the global reactor. This makes unit-testing easier.

- AMQClient.queues (of type TimeoutDeferredQueue) also get same clock as AMQClient.

- The "reason" parameter of AMQClient.close is now optional and defaults to ConnectionDone.

- AMQClient.close has a new "within" parameter, which tells the client to possibly try to perform a clean connection shutdown with the 'close'/'close-ok' frames handshake. If the handshake doesn't succeed in "within" seconds, the transport connection gets terminated without handshake. The default of the new parameter is 0, which means "do close immediately without handshake", so there's should be no behavior change at all for existing code, expect that close() now returns a Deferred, but it seems minor, since the non-handshake code is still all synchronous.

- The old AMQClient.close logic has been moved to AMQClient.doClose, which is basically the same pattern as AMQChannel.close/doClose.

- AMQClient.checkHeartbeat now calls abortConnection rather than loseConnection: since the peer is unresponsive it most probably won't perform the TCP closing handshake anyways.

- Unit tests and integration tests where added for all the behavior above.

Phew, that was quite a bit of narrative, really hope the diff isn't controversial :)

After this branch I have a final small one that allows application code to distinguish between connection close errors and channel close errors (with two new subclasses of Close). Other than that I have no immediate pending work on txamqp, but I'd like to resume:

https://code.launchpad.net/~ei-grad/txamqp/publish-confirms

and have support for confirms.

Thanks!

To post a comment you must log in.
83. By Free Ekanayaka

Merge from travis-integration

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Hi Esteve, any take on this change? Or you didn't have time to look at it yet? Cheers

Revision history for this message
Alberto Donato (ack) wrote :

LGTM. +1

review: Approve
84. By Free Ekanayaka

Fix typo

Revision history for this message
Free Ekanayaka (free.ekanayaka) :
85. By Free Ekanayaka

Merge from trunk

Revision history for this message
Adam Collard (adam-collard) wrote :

We'll need to broadcast the change to the interface as well.

Broadly looks ok - few nit picks inline below.

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/txamqp/client.py'
2--- src/txamqp/client.py 2012-04-16 10:32:49 +0000
3+++ src/txamqp/client.py 2016-11-22 13:47:23 +0000
4@@ -102,7 +102,12 @@
5 ch.doClose(msg)
6
7 def connection_close(self, ch, msg):
8- self.client.close(msg)
9+ # The server wants to close the connection, so let's send
10+ # a connection-close-ok and start shutting down the connection. The
11+ # close-ok frame will be buffered by the transport and sent to
12+ # the server before actually closing the socket.
13+ ch.connection_close_ok()
14+ self.client.doClose(msg)
15
16 def close(self, reason):
17 self.client.closed = True
18
19=== modified file 'src/txamqp/protocol.py'
20--- src/txamqp/protocol.py 2016-06-01 14:18:06 +0000
21+++ src/txamqp/protocol.py 2016-11-22 13:47:23 +0000
22@@ -1,6 +1,7 @@
23 # coding: utf-8
24 from twisted.internet import defer, protocol
25 from twisted.internet.task import LoopingCall
26+from twisted.internet.error import ConnectionDone
27 from twisted.protocols import basic
28 from twisted.python.failure import Failure
29 from txamqp import spec
30@@ -36,7 +37,7 @@
31 self.reason = None
32
33 def close(self, reason):
34- """Explicitely close a channel"""
35+ """Explicitly close a channel"""
36 self._closing = True
37 self.doClose(reason)
38 self._closing = False
39@@ -237,6 +238,8 @@
40 self.work = defer.DeferredQueue()
41
42 self.started = TwistedEvent()
43+ self.disconnected = TwistedEvent() # Fired upon connection shutdown
44+ self.closed = False
45
46 self.queueLock = defer.DeferredLock()
47 self.basic_return_queue = TimeoutDeferredQueue()
48@@ -255,9 +258,12 @@
49 self.checkHB = self.clock.callLater(self.heartbeatInterval *
50 self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
51 self.sendHB = LoopingCall(self.sendHeartbeat)
52+ self.sendHB.clock = self.clock
53 d = self.started.wait()
54 d.addCallback(lambda _: self.reschedule_sendHB())
55 d.addCallback(lambda _: self.reschedule_checkHB())
56+ # If self.started fails, don't start the heartbeat.
57+ d.addErrback(lambda _: None)
58
59 def reschedule_sendHB(self):
60 if self.heartbeatInterval > 0:
61@@ -294,13 +300,48 @@
62 try:
63 q = self.queues[key]
64 except KeyError:
65- q = TimeoutDeferredQueue()
66+ q = TimeoutDeferredQueue(clock=self.clock)
67 self.queues[key] = q
68 finally:
69 self.queueLock.release()
70 defer.returnValue(q)
71
72- def close(self, reason):
73+ @defer.inlineCallbacks
74+ def close(self, reason=None, within=0):
75+ """Explicitely close the connection.
76+
77+ @param reason: Optional closing reason. If not given, ConnectionDone
78+ will be used.
79+ @param within: Shutdown the client within this amount of seconds. If
80+ zero (the default), all channels and queues will be closed
81+ immediately. If greater than 0, try to close the AMQP connection
82+ cleanly, by sending a "close" method and waiting for "close-ok". If
83+ no reply is received within the given amount of seconds, the
84+ transport will be forcely shutdown.
85+ """
86+ if self.closed:
87+ return
88+
89+ if reason is None:
90+ reason = ConnectionDone()
91+
92+ if within > 0:
93+ channel0 = yield self.channel(0)
94+ deferred = channel0.connection_close()
95+ call = self.clock.callLater(within, deferred.cancel)
96+ try:
97+ yield deferred
98+ except defer.CancelledError:
99+ pass
100+ else:
101+ call.cancel()
102+
103+ self.doClose(reason)
104+
105+ def doClose(self, reason):
106+ """Called when connection_close() is received"""
107+ # Let's close all channels and queues, since we don't want to write
108+ # any more data and no further read will happen.
109 for ch in self.channels.values():
110 ch.close(reason)
111 for q in self.queues.values():
112@@ -351,7 +392,7 @@
113 self.lastHBReceived = time()
114 else:
115 ch.dispatch(frame, self.work)
116- if self.heartbeatInterval > 0:
117+ if self.heartbeatInterval > 0 and not self.closed:
118 self.reschedule_checkHB()
119
120 @defer.inlineCallbacks
121@@ -387,7 +428,10 @@
122 def checkHeartbeat(self):
123 if self.checkHB.active():
124 self.checkHB.cancel()
125- self.transport.loseConnection()
126+ # Abort the connection, since the other pear is unresponsive and
127+ # there's no point in shutting down cleanly and trying to flush
128+ # pending data.
129+ self.transport.abortConnection()
130
131 def connectionLost(self, reason):
132 if self.heartbeatInterval > 0:
133@@ -396,6 +440,7 @@
134 if self.checkHB.active():
135 self.checkHB.cancel()
136 self.close(reason)
137+ self.disconnected.fire()
138
139 def channelFailed(self, channel, reason):
140 """Unexpected channel close"""
141
142=== modified file 'src/txamqp/test/test_broker.py'
143--- src/txamqp/test/test_broker.py 2016-05-23 14:41:22 +0000
144+++ src/txamqp/test/test_broker.py 2016-11-22 13:47:23 +0000
145@@ -165,3 +165,12 @@
146 msg = yield incoming.get(timeout=1)
147 self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)
148
149+ @inlineCallbacks
150+ def test_close_cleanly(self):
151+ """
152+ Test closing a client cleanly, by sending 'close' and waiting for
153+ 'close-ok'.
154+ """
155+ yield self.client.close(within=5)
156+ yield self.client.disconnected.wait()
157+ self.assertTrue(self.client.closed)
158
159=== modified file 'src/txamqp/test/test_endpoint.py'
160--- src/txamqp/test/test_endpoint.py 2016-05-27 09:44:14 +0000
161+++ src/txamqp/test/test_endpoint.py 2016-11-22 13:47:23 +0000
162@@ -100,4 +100,4 @@
163 client = yield endpoint.connect(factory)
164 channel = yield client.channel(1)
165 yield channel.channel_open()
166- client.close(None)
167+ yield client.close()
168
169=== added file 'src/txamqp/test/test_protocol.py'
170--- src/txamqp/test/test_protocol.py 1970-01-01 00:00:00 +0000
171+++ src/txamqp/test/test_protocol.py 2016-11-22 13:47:23 +0000
172@@ -0,0 +1,113 @@
173+from twisted.trial.unittest import TestCase
174+from twisted.internet.task import Clock
175+from twisted.internet.error import ConnectionLost
176+from twisted.logger import Logger
177+
178+from txamqp.protocol import AMQClient
179+from txamqp.client import TwistedDelegate, Closed
180+from txamqp.testing import AMQPump
181+from txamqp.spec import DEFAULT_SPEC, load
182+from txamqp.queue import Closed as QueueClosed
183+
184+
185+class AMQClientTest(TestCase):
186+ """Unit tests for the AMQClient protocol."""
187+
188+ def setUp(self):
189+ super(AMQClientTest, self).setUp()
190+ self.delegate = TwistedDelegate()
191+ self.clock = Clock()
192+ self.heartbeat = 1
193+ self.protocol = AMQClient(
194+ self.delegate, "/", load(DEFAULT_SPEC), clock=self.clock,
195+ heartbeat=self.heartbeat)
196+ self.transport = AMQPump(Logger())
197+ self.transport.connect(self.protocol)
198+
199+ def test_connection_close(self):
200+ """Test handling a connection-close method sent by the broker."""
201+ self.transport.channel(0).connection_close()
202+ # We send close-ok before shutting down the connection
203+ [frame] = self.transport.outgoing[0]
204+ self.assertEqual("close-ok", frame.payload.method.name)
205+ self.assertTrue(self.protocol.closed)
206+ channel0 = self.successResultOf(self.protocol.channel(0))
207+ self.assertTrue(channel0.closed)
208+
209+ def test_close(self):
210+ """Test explicitely closing a client."""
211+ d = self.protocol.close()
212+
213+ # Since 'within' defaults to 0, no outgoing 'close' frame is there.
214+ self.assertEqual({}, self.transport.outgoing)
215+
216+ self.assertIsNone(self.successResultOf(d))
217+ self.assertTrue(self.protocol.closed)
218+
219+ def test_close_within(self):
220+ """Test closing a client cleanly."""
221+ d = self.protocol.close(within=1)
222+
223+ # Since we passed within=1, we have an outgoing 'close' frame.
224+ [frame] = self.transport.outgoing[0]
225+ self.assertEqual("close", frame.payload.method.name)
226+
227+ # At this point the client is not yet closed, since we're waiting for
228+ # the 'close-ok' acknowledgement from the broker.
229+ self.assertFalse(self.protocol.closed)
230+
231+ self.transport.channel(0).connection_close_ok()
232+ self.assertIsNone(self.successResultOf(d))
233+ self.assertTrue(self.protocol.closed)
234+ self.assertEqual([], self.clock.calls)
235+
236+ def test_close_within_hits_timeout(self):
237+ """Test trying to close a client cleanly but hitting the timeout."""
238+ d = self.protocol.close(within=1)
239+
240+ self.clock.advance(1)
241+ self.assertIsNone(self.successResultOf(d))
242+ self.assertTrue(self.protocol.closed)
243+
244+ def test_close_closes_channels(self):
245+ """Test closing a client also closes channels."""
246+ channel = self.successResultOf(self.protocol.channel(0))
247+ self.protocol.close()
248+ d = channel.basic_consume(queue="test-queue")
249+ failure = self.failureResultOf(d)
250+ self.assertIsInstance(failure.value, Closed)
251+
252+ def test_close_closes_queues(self):
253+ """Test closing a client also closes queues."""
254+ queue = self.successResultOf(self.protocol.queue("tag"))
255+ d = queue.get()
256+ self.protocol.close()
257+ failure = self.failureResultOf(d)
258+ self.assertIsInstance(failure.value, QueueClosed)
259+
260+ def test_hearbeat_check_failure(self):
261+ """Test closing a client after a heartbeat check failure."""
262+ self.protocol.started.fire()
263+ channel = self.successResultOf(self.protocol.channel(0))
264+ d = channel.basic_consume(queue="test-queue")
265+ self.clock.advance(self.heartbeat * AMQClient.MAX_UNSEEN_HEARTBEAT)
266+ self.assertTrue(self.protocol.closed)
267+ failure = self.failureResultOf(d)
268+ self.assertIsInstance(failure.value, Closed)
269+ self.assertTrue(self.transport.aborted)
270+
271+ def test_connection_lost(self):
272+ """Test closing a client after the connection is lost."""
273+ channel = self.successResultOf(self.protocol.channel(0))
274+ d = channel.basic_consume(queue="test-queue")
275+ self.transport.abortConnection()
276+ self.assertTrue(self.protocol.closed)
277+ failure = self.failureResultOf(d)
278+ self.assertIsInstance(failure.value, Closed)
279+ self.assertIsInstance(failure.value.args[0].value, ConnectionLost)
280+
281+ def test_disconnected_event(self):
282+ """Test disconnected event fired after the connection is lost."""
283+ deferred = self.protocol.disconnected.wait()
284+ self.protocol.close()
285+ self.assertTrue(self.successResultOf(deferred))
286
287=== modified file 'src/txamqp/testing.py'
288--- src/txamqp/testing.py 2016-06-01 15:56:53 +0000
289+++ src/txamqp/testing.py 2016-11-22 13:47:23 +0000
290@@ -80,10 +80,14 @@
291 self._log("Outgoing frame: {frame}", frame=frame)
292
293 def loseConnection(self):
294+ if not self.connected:
295+ return
296 self.connected = False
297 self.client.connectionLost(Failure(ConnectionDone()))
298
299 def abortConnection(self):
300+ if not self.connected:
301+ return
302 self.connected = False
303 self.aborted = True
304 self.client.connectionLost(Failure(ConnectionLost()))

Subscribers

People subscribed via source and target branches

to status/vote changes: