Merge lp:~elachuni/txamqp/heartbeat into lp:txamqp

Proposed by Anthony Lenton
Status: Merged
Merged at revision: not available
Proposed branch: lp:~elachuni/txamqp/heartbeat
Merge into: lp:txamqp
Diff against target: None lines
To merge this branch: bzr merge lp:~elachuni/txamqp/heartbeat
Reviewer Review Type Date Requested Status
Thomas Herve Approve
Review via email: mp+7078@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Anthony Lenton (elachuni) wrote :

Added handling for heartbeat frames.
The heartbeat interval is configurable by a AMQClient constructor parameter.
Added a brief test to check that it works.

It's only tested on RabbitMQ, but it should work on qpid too.

Revision history for this message
Thomas Herve (therve) wrote :

Thanks a lot for this code Anthony! It's going to be pretty useful. Here are some comments:

[1] There is a small conflict with current trunk.

[2] The connectionLostEvent doesn't seem to be used anywhere, it can probably be removed.

[3] The test is a bit bad: I understand you want to test it live, but a test running for 8 seconds is too long. Also, setTimeout is deprecated.

Thanks!

review: Needs Fixing
lp:~elachuni/txamqp/heartbeat updated
20. By Anthony Lenton

- Merged changes from trunk
- Removed unused connectionLostEvent
- Fixed test.

Revision history for this message
Anthony Lenton (elachuni) wrote :

Hi Thomas,
>
> [1] There is a small conflict with current trunk.
Merged and resolved.

>
> [2] The connectionLostEvent doesn't seem to be used anywhere, it can probably
> be removed.
Right, it was being used by client code here to easily access connectionLost events, but definitely shouldn't be part of the same patch. Removed.

>
> [3] The test is a bit bad: I understand you want to test it live, but a test
> running for 8 seconds is too long. Also, setTimeout is deprecated.
I've reduced it to 3 seconds. If that's still too long we'll need to test it some other way, as you can't ask for a heartbeat interval of less than one second. And it's not calling setTimeout now.

>
> Thanks!
Thank you! :)

Revision history for this message
Thomas Herve (therve) wrote :

OK, some last comments:

1) There is a conflict again :). Be careful here, because connectionLost is now defined in trunk, so you want to move your clean code.

2) The heartbeat test class is called TxTests, it should be named HeartbeatTests. The test file doesn't need the ASF license header too.

Thanks, +1!

review: Approve
lp:~elachuni/txamqp/heartbeat updated
21. By Anthony Lenton

Merged changes from trunk.

Revision history for this message
Esteve Fernandez (esteve) wrote :

Sorry for the slow response. I think adding heartbeat is a very useful feature, thanks for taking the time for implementing it. However, there's a couple of issues:

1 - The virtual host was changed to "/", instead of "localhost". We really need to implement "profiles" or whatever, so we can select some sane defaults for every broker. Until then, I'm not comfortable with introducing a change that's not intrinsic to the problem your branch solves.

2 - Although the code works and is clear, I'd rather use a LoopingCall (http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.task.LoopingCall.html). It's already built into Twisted, so you don't need to implement things like lastSent, lastReceived, etc. I think it would be a bit easier to do something like this (it's just pseudo-python, can't guarantee that it works):

class AMQClient(...):
    # Max unreceived heartbeat frames. The AMQP standard says it's 3.
    MAX_UNSEEN_HEARTBEAT = 3

    def __init__(self, ...):
        ...
        if self.heartbeatInterval > 0:
            self.sendHB = LoopingCall(self.sendHeartbeat)
            self.checkHB = LoopingCall(self.checkHeartbeat)
            d = self.started.wait()
            d.addCallback(lambda _: self.sendHB.start(self.heartbeatInterval, now=False))
            d.addCallback(lambda _: self.checkHB.start(
                self.heartbeatInterval * self.MAX_UNSEEN_HEARTBEAT, now=False))

    def sendHeartbeat(self):
        self.sendFrame(Frame(0, Heartbeat()))

    def checkHeartbeat(self):
        self.checkHB.stop()
        self.transport.loseConnection()

    def processFrame(self, frame):
        ...
        if frame.payload.type != Frame.HEARTBEAT:
            self.sendHB.stop()
            self.sendHB.start(self.heartbeatInterval, now=False)
            ch.dispatch(frame, self.work)
        self.checkHB.stop()
        self.checkHb.start(self.heartbeatInterval * self.MAX_UNSEEN_HEARTBEAT, now=False)

What do you think guys? The changes are minimal and I think they are a bit easier to read.

Revision history for this message
Esteve Fernandez (esteve) wrote :

I just realized that self.checkHB doesn't need to be a LoopingCall, reactor.callLater is enough.

lp:~elachuni/txamqp/heartbeat updated
22. By Anthony Lenton

Putting back 'localhost' vhost.

Revision history for this message
Anthony Lenton (elachuni) wrote :

> 1 - The virtual host was changed to "/", instead of "localhost".

Yikes, that was unintentionally left in from running the tests :-/
I've put the 'localhost' vhost back in rev.22, sorry for that.

> 2 - Although the code works and is clear, I'd rather use a LoopingCall (http:/
> /twistedmatrix.com/documents/8.2.0/api/twisted.internet.task.LoopingCall.html)
> . It's already built into Twisted, so you don't need to implement things like
> lastSent, lastReceived, etc. I think it would be a bit easier to do something
> like this (it's just pseudo-python, can't guarantee that it works):
>
> (...code...)
>
> What do you think guys? The changes are minimal and I think they are a bit
> easier to read.

I gave your code a try and it works well, with a couple of changes. However I don't really see much benefit in using LoopingCall in this case:
 * checkHB could use a regular callLater call, as you've pointed out.
 * The rescheduling of sendHB should actually be done in sendFrame, not in processFrame, as it's sending
frames (not receiving them) that postpones the need to send a HB frame.
 * After changing that it's practically the same to be using a regular callLater instead of a LoopingCall for sendHB; if in sendFrame you reschedule sendHB whatever the payload type (instead of filtering out HBs) it effectively behaves like a LoopingCall.
 * We do get rid of lastSent and lastReceived, but instead we'd be using two callbacks instead of one (checkHeartbeat and sendHeartbeat vs. heartbeatHandler), and two 'callback handles' (checkHB and sendHB vs. pendingHeartbeat).
 * The free bonus you get with lastSent and lastReceived is a bit of accountability about how long ago you sent and received frames (including heartbeat frames), which makes it easier to test.

I haven't pushed these changes into this branch, but if you want I can push a working version using LoopingCalls on some other branch so we can compare.

lp:~elachuni/txamqp/heartbeat updated
23. By Anthony Lenton

Removed License header and fixed class name, per therve's review.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/txamqp/client.py'
2--- src/txamqp/client.py 2009-05-04 11:12:36 +0000
3+++ src/txamqp/client.py 2009-05-05 15:41:08 +0000
4@@ -29,7 +29,8 @@
5
6 def connection_tune(self, ch, msg):
7 self.client.MAX_LENGTH = msg.frame_max
8- ch.connection_tune_ok(*msg.fields)
9+ args = msg.channel_max, msg.frame_max, self.client.heartbeatInterval
10+ ch.connection_tune_ok(*args)
11 self.client.started.set()
12
13 @defer.inlineCallbacks
14
15=== modified file 'src/txamqp/connection.py'
16--- src/txamqp/connection.py 2008-10-29 18:31:04 +0000
17+++ src/txamqp/connection.py 2009-04-27 17:00:08 +0000
18@@ -196,3 +196,15 @@
19
20 def __str__(self):
21 return "Body(%r)" % self.content
22+
23+class Heartbeat(Payload):
24+ type = Frame.HEARTBEAT
25+ def __str__(self):
26+ return "Heartbeat()"
27+
28+ def encode(self, enc):
29+ enc.encode_long(0)
30+
31+ def decode(spec, dec):
32+ dec.decode_long()
33+ return Heartbeat()
34
35=== modified file 'src/txamqp/protocol.py'
36--- src/txamqp/protocol.py 2009-05-04 11:12:36 +0000
37+++ src/txamqp/protocol.py 2009-06-04 14:42:57 +0000
38@@ -1,16 +1,17 @@
39 # coding: utf-8
40 from twisted.python import log
41-from twisted.internet import defer, protocol
42+from twisted.internet import defer, protocol, reactor
43 from twisted.protocols import basic
44 from txamqp import spec
45 from txamqp.codec import Codec, EOF
46-from txamqp.connection import Header, Frame, Method, Body
47+from txamqp.connection import Header, Frame, Method, Body, Heartbeat
48 from txamqp.message import Message
49 from txamqp.content import Content
50 from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed
51 from txamqp.client import TwistedEvent, TwistedDelegate, Closed
52 from cStringIO import StringIO
53 import struct
54+from time import time
55
56 class GarbageException(Exception):
57 pass
58@@ -200,7 +201,7 @@
59
60 channelClass = AMQChannel
61
62- def __init__(self, delegate, vhost, *args, **kwargs):
63+ def __init__(self, delegate, vhost, heartbeat=0, *args, **kwargs):
64 FrameReceiver.__init__(self, *args, **kwargs)
65 self.delegate = delegate
66
67@@ -218,6 +219,9 @@
68 self.work = defer.DeferredQueue()
69
70 self.started = TwistedEvent()
71+ self.connectionLostEvent = TwistedEvent()
72+ self.lastSent = time()
73+ self.lastReceived = time()
74
75 self.queueLock = defer.DeferredLock()
76
77@@ -225,6 +229,10 @@
78
79 self.outgoing.get().addCallback(self.writer)
80 self.work.get().addCallback(self.worker)
81+ self.heartbeatInterval = heartbeat
82+ self.pendingHeartbeat = None
83+ if self.heartbeatInterval > 0:
84+ self.started.wait().addCallback(self.heartbeatHandler)
85
86 def check_0_8(self):
87 return (self.spec.minor, self.spec.major) == (0, 8)
88@@ -293,13 +301,25 @@
89 self.sendInitString()
90 self.setFrameMode()
91
92+ def connectionLost(self, reason):
93+ if self.pendingHeartbeat is not None and self.pendingHeartbeat.active():
94+ self.pendingHeartbeat.cancel()
95+ self.pendingHeartbeat = None
96+ self.connectionLostEvent.set()
97+
98 def frameReceived(self, frame):
99 self.processFrame(frame)
100
101+ def sendFrame(self, frame):
102+ self.lastSent = time()
103+ FrameReceiver.sendFrame(self, frame)
104+
105 @defer.inlineCallbacks
106 def processFrame(self, frame):
107+ self.lastReceived = time()
108 ch = yield self.channel(frame.channel)
109- ch.dispatch(frame, self.work)
110+ if frame.payload.type != Frame.HEARTBEAT:
111+ ch.dispatch(frame, self.work)
112
113 @defer.inlineCallbacks
114 def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
115@@ -320,3 +340,15 @@
116
117 channel0 = yield self.channel(0)
118 yield channel0.connection_open(self.vhost)
119+
120+ def heartbeatHandler (self, dummy=None):
121+ now = time()
122+ if self.lastSent + self.heartbeatInterval < now:
123+ self.sendFrame(Frame(0, Heartbeat()))
124+ if self.lastReceived + self.heartbeatInterval * 3 < now:
125+ self.transport.loseConnection()
126+ tple = None
127+ if self.transport.connected:
128+ tple = reactor.callLater(self.heartbeatInterval, self.heartbeatHandler)
129+ self.pendingHeartbeat = tple
130+
131
132=== added file 'src/txamqp/test/test_heartbeat.py'
133--- src/txamqp/test/test_heartbeat.py 1970-01-01 00:00:00 +0000
134+++ src/txamqp/test/test_heartbeat.py 2009-06-04 16:26:58 +0000
135@@ -0,0 +1,57 @@
136+#
137+# Licensed to the Apache Software Foundation (ASF) under one
138+# or more contributor license agreements. See the NOTICE file
139+# distributed with this work for additional information
140+# regarding copyright ownership. The ASF licenses this file
141+# to you under the Apache License, Version 2.0 (the
142+# "License"); you may not use this file except in compliance
143+# with the License. You may obtain a copy of the License at
144+#
145+# http://www.apache.org/licenses/LICENSE-2.0
146+#
147+# Unless required by applicable law or agreed to in writing,
148+# software distributed under the License is distributed on an
149+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
150+# KIND, either express or implied. See the License for the
151+# specific language governing permissions and limitations
152+# under the License.
153+#
154+
155+from time import time
156+#from txamqp.client import Closed
157+#from txamqp.queue import Empty
158+#from txamqp.content import Content
159+from txamqp.testlib import TestBase
160+
161+#from twisted.internet.defer import, returnValue,
162+from twisted.internet.defer import Deferred, inlineCallbacks
163+
164+class TxTests(TestBase):
165+ """
166+ Tests handling of heartbeat frames
167+ """
168+
169+
170+ @inlineCallbacks
171+ def setUp(self):
172+ """ Set up a heartbeat frame per second """
173+ self.client = yield self.connect(heartbeat=1)
174+
175+ self.channel = yield self.client.channel(1)
176+ yield self.channel.channel_open()
177+
178+
179+ @inlineCallbacks
180+ def test_heartbeat(self):
181+ """
182+ Test that heartbeat frames are sent and received
183+ """
184+ d = Deferred()
185+ d.setTimeout(8)
186+ d.addBoth(lambda x:True)
187+ yield d
188+ t = time()
189+ self.assertTrue(self.client.lastSent > t - 3,
190+ "A heartbeat frame was recently sent")
191+ self.assertTrue(self.client.lastReceived > t - 3,
192+ "A heartbeat frame was recently received")
193
194=== modified file 'src/txamqp/testlib.py'
195--- src/txamqp/testlib.py 2009-02-11 22:28:45 +0000
196+++ src/txamqp/testlib.py 2009-06-04 16:26:58 +0000
197@@ -39,22 +39,26 @@
198 self.user = 'guest'
199 self.password = 'guest'
200 self.vhost = 'localhost'
201+ self.heartbeat = 0
202 self.queues = []
203 self.exchanges = []
204 self.connectors = []
205
206 @inlineCallbacks
207- def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None):
208+ def connect(self, host=None, port=None, spec=None, user=None,
209+ password=None, vhost=None, heartbeat=None):
210 host = host or self.host
211 port = port or self.port
212 spec = spec or self.spec
213 user = user or self.user
214 password = password or self.password
215 vhost = vhost or self.vhost
216+ heartbeat = heartbeat or self.heartbeat
217
218 delegate = TwistedDelegate()
219 onConn = Deferred()
220- f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn)
221+ p = AMQClient(delegate, vhost, heartbeat=heartbeat, spec=txamqp.spec.load(spec))
222+ f = protocol._InstanceFactory(reactor, p, onConn)
223 c = reactor.connectTCP(host, port, f)
224 self.connectors.append(c)
225 client = yield onConn

Subscribers

People subscribed via source and target branches

to status/vote changes: