Merge lp:~elachuni/txamqp/heartbeat-loopingcalls into lp:txamqp

Proposed by Anthony Lenton
Status: Merged
Merged at revision: not available
Proposed branch: lp:~elachuni/txamqp/heartbeat-loopingcalls
Merge into: lp:txamqp
Diff against target: None lines
To merge this branch: bzr merge lp:~elachuni/txamqp/heartbeat-loopingcalls
Reviewer Review Type Date Requested Status
txAMQP Team Pending
Review via email: mp+9503@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Anthony Lenton (elachuni) wrote :

This is a branch that implements heartbeat frames using LoopingCall, per Esteve's suggestion.

Revision history for this message
Esteve Fernandez (esteve) wrote :

Hi Anthony, sorry for the delay. Thank you very much for your patch.

> This is a branch that implements heartbeat frames using LoopingCall, per
> Esteve's suggestion.

I modified it a bit (lp:~esteve/txamqp/heartbeat-loopingcalls):

- added configurable heartbeats and client classes in testcases, so overriding connect and setUp is not needed, making HeartbeatTests a bit shorter
- re-added lastSent and lastReceived from your original patch. I didn't realize it at first, sorry, but now I think it's a great idea for accounting purposes

What do you think of it?

Thanks for your continuing contributions!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/txamqp/client.py'
2--- src/txamqp/client.py 2009-05-28 09:46:03 +0000
3+++ src/txamqp/client.py 2009-06-11 19:45:31 +0000
4@@ -29,7 +29,8 @@
5
6 def connection_tune(self, ch, msg):
7 self.client.MAX_LENGTH = msg.frame_max
8- ch.connection_tune_ok(*msg.fields)
9+ args = msg.channel_max, msg.frame_max, self.client.heartbeatInterval
10+ ch.connection_tune_ok(*args)
11 self.client.started.reset()
12
13 @defer.inlineCallbacks
14
15=== modified file 'src/txamqp/connection.py'
16--- src/txamqp/connection.py 2008-10-29 18:31:04 +0000
17+++ src/txamqp/connection.py 2009-04-27 17:00:08 +0000
18@@ -196,3 +196,15 @@
19
20 def __str__(self):
21 return "Body(%r)" % self.content
22+
23+class Heartbeat(Payload):
24+ type = Frame.HEARTBEAT
25+ def __str__(self):
26+ return "Heartbeat()"
27+
28+ def encode(self, enc):
29+ enc.encode_long(0)
30+
31+ def decode(spec, dec):
32+ dec.decode_long()
33+ return Heartbeat()
34
35=== modified file 'src/txamqp/protocol.py'
36--- src/txamqp/protocol.py 2009-06-18 08:13:49 +0000
37+++ src/txamqp/protocol.py 2009-07-31 11:43:51 +0000
38@@ -1,16 +1,18 @@
39 # coding: utf-8
40 from twisted.python import log
41-from twisted.internet import defer, protocol
42+from twisted.internet import defer, protocol, reactor
43+from twisted.internet.task import LoopingCall
44 from twisted.protocols import basic
45 from txamqp import spec
46 from txamqp.codec import Codec, EOF
47-from txamqp.connection import Header, Frame, Method, Body
48+from txamqp.connection import Header, Frame, Method, Body, Heartbeat
49 from txamqp.message import Message
50 from txamqp.content import Content
51 from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed
52 from txamqp.client import TwistedEvent, TwistedDelegate, Closed
53 from cStringIO import StringIO
54 import struct
55+from time import time
56
57 class GarbageException(Exception):
58 pass
59@@ -27,10 +29,10 @@
60 self.responses = TimeoutDeferredQueue()
61
62 self.queue = None
63-
64+
65 self.closed = False
66 self.reason = None
67-
68+
69 def close(self, reason):
70 if self.closed:
71 return
72@@ -200,7 +202,10 @@
73
74 channelClass = AMQChannel
75
76- def __init__(self, delegate, vhost, *args, **kwargs):
77+ # Max unreceived heartbeat frames. The AMQP standard says it's 3.
78+ MAX_UNSEEN_HEARTBEAT = 3
79+
80+ def __init__(self, delegate, vhost, heartbeat=0, *args, **kwargs):
81 FrameReceiver.__init__(self, *args, **kwargs)
82 self.delegate = delegate
83
84@@ -225,6 +230,27 @@
85
86 self.outgoing.get().addCallback(self.writer)
87 self.work.get().addCallback(self.worker)
88+ self.heartbeatInterval = heartbeat
89+ self.checkHB = None
90+ self.sendHB = None
91+ if self.heartbeatInterval > 0:
92+ d = self.started.wait()
93+ d.addCallback(self.reschedule_sendHB)
94+ d.addCallback(self.reschedule_checkHB)
95+
96+ def reschedule_sendHB(self, dummy=None):
97+ if self.heartbeatInterval > 0:
98+ if self.sendHB is None:
99+ self.sendHB = LoopingCall(self.sendHeartbeat)
100+ elif self.sendHB.running:
101+ self.sendHB.stop()
102+ self.sendHB.start(self.heartbeatInterval, now=False)
103+
104+ def reschedule_checkHB(self, dummy=None):
105+ if self.checkHB is not None and self.checkHB.active():
106+ self.checkHB.cancel()
107+ self.checkHB = reactor.callLater(self.heartbeatInterval *
108+ self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
109
110 def check_0_8(self):
111 return (self.spec.minor, self.spec.major) == (0, 8)
112@@ -254,7 +280,7 @@
113 finally:
114 self.queueLock.release()
115 defer.returnValue(q)
116-
117+
118 def close(self, reason):
119 for ch in self.channels.values():
120 ch.close(reason)
121@@ -294,10 +320,18 @@
122 def frameReceived(self, frame):
123 self.processFrame(frame)
124
125+ def sendFrame(self, frame):
126+ if frame.payload.type != Frame.HEARTBEAT:
127+ self.reschedule_sendHB()
128+ FrameReceiver.sendFrame(self, frame)
129+
130 @defer.inlineCallbacks
131 def processFrame(self, frame):
132 ch = yield self.channel(frame.channel)
133- ch.dispatch(frame, self.work)
134+ if frame.payload.type != Frame.HEARTBEAT:
135+ ch.dispatch(frame, self.work)
136+ if self.heartbeatInterval > 0:
137+ self.reschedule_checkHB()
138
139 @defer.inlineCallbacks
140 def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
141@@ -319,5 +353,19 @@
142 channel0 = yield self.channel(0)
143 yield channel0.connection_open(self.vhost)
144
145+ def sendHeartbeat(self):
146+ self.sendFrame(Frame(0, Heartbeat()))
147+
148+ def checkHeartbeat(self):
149+ if self.checkHB is not None and self.checkHB.active():
150+ self.checkHB.cancel()
151+ self.checkHB = None
152+ self.transport.loseConnection()
153+
154 def connectionLost(self, reason):
155+ if self.heartbeatInterval > 0 and self.sendHB.running:
156+ self.sendHB.stop()
157+ if self.checkHB is not None and self.checkHB.active():
158+ self.checkHB.cancel()
159 self.close(reason)
160+
161
162=== added file 'src/txamqp/test/test_heartbeat.py'
163--- src/txamqp/test/test_heartbeat.py 1970-01-01 00:00:00 +0000
164+++ src/txamqp/test/test_heartbeat.py 2009-07-31 11:43:51 +0000
165@@ -0,0 +1,59 @@
166+from time import time
167+import txamqp
168+from txamqp.testlib import TestBase
169+from txamqp.protocol import AMQClient, TwistedDelegate
170+from twisted.internet import reactor, protocol
171+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
172+
173+class SpyAMQClient(AMQClient):
174+ called_reschedule_check = 0
175+ called_send_hb = 0
176+ def reschedule_checkHB(self, dummy=None):
177+ AMQClient.reschedule_checkHB(self)
178+ self.called_reschedule_check += 1
179+ def sendHeartbeat(self):
180+ AMQClient.sendHeartbeat(self)
181+ self.called_send_hb += 1
182+
183+class HeartbeatTests(TestBase):
184+ """
185+ Tests handling of heartbeat frames
186+ """
187+
188+ @inlineCallbacks
189+ def connect(self):
190+ delegate = TwistedDelegate()
191+ onConn = Deferred()
192+ p = SpyAMQClient(delegate, self.vhost, heartbeat=1,
193+ spec=txamqp.spec.load(self.spec))
194+ f = protocol._InstanceFactory(reactor, p, onConn)
195+ c = reactor.connectTCP(self.host, self.port, f)
196+ self.connectors.append(c)
197+ client = yield onConn
198+
199+ yield client.authenticate(self.user, self.password)
200+ returnValue(client)
201+
202+
203+ @inlineCallbacks
204+ def setUp(self):
205+ """ Set up a heartbeat frame per second """
206+ self.client = yield self.connect()
207+
208+ self.channel = yield self.client.channel(1)
209+ yield self.channel.channel_open()
210+
211+ def test_heartbeat(self):
212+ """
213+ Test that heartbeat frames are sent and received
214+ """
215+ d = Deferred()
216+ def checkPulse(dummy):
217+ t = time()
218+ self.assertTrue(self.client.called_send_hb,
219+ "A heartbeat frame was recently sent")
220+ self.assertTrue(self.client.called_reschedule_check,
221+ "A heartbeat frame was recently received")
222+ d.addCallback(checkPulse)
223+ reactor.callLater(3, d.callback, None)
224+ return d
225
226=== modified file 'src/txamqp/testlib.py'
227--- src/txamqp/testlib.py 2009-06-17 16:01:31 +0000
228+++ src/txamqp/testlib.py 2009-07-09 13:26:53 +0000
229@@ -6,9 +6,9 @@
230 # to you under the Apache License, Version 2.0 (the
231 # "License"); you may not use this file except in compliance
232 # with the License. You may obtain a copy of the License at
233-#
234+#
235 # http://www.apache.org/licenses/LICENSE-2.0
236-#
237+#
238 # Unless required by applicable law or agreed to in writing,
239 # software distributed under the License is distributed on an
240 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
241@@ -78,29 +78,33 @@
242 self.user = 'guest'
243 self.password = 'guest'
244 self.vhost = 'localhost'
245+ self.heartbeat = 0
246 self.queues = []
247 self.exchanges = []
248 self.connectors = []
249
250 @inlineCallbacks
251- def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None):
252+ def connect(self, host=None, port=None, spec=None, user=None,
253+ password=None, vhost=None, heartbeat=None):
254 host = host or self.host
255 port = port or self.port
256 spec = spec or self.spec
257 user = user or self.user
258 password = password or self.password
259 vhost = vhost or self.vhost
260+ heartbeat = heartbeat or self.heartbeat
261
262 delegate = TwistedDelegate()
263 onConn = Deferred()
264- f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn)
265+ p = AMQClient(delegate, vhost, heartbeat=heartbeat, spec=txamqp.spec.load(spec))
266+ f = protocol._InstanceFactory(reactor, p, onConn)
267 c = reactor.connectTCP(host, port, f)
268 self.connectors.append(c)
269 client = yield onConn
270
271 yield client.authenticate(user, password)
272 returnValue(client)
273-
274+
275 @inlineCallbacks
276 def setUp(self):
277 self.client = yield self.connect()

Subscribers

People subscribed via source and target branches

to status/vote changes: