Merge lp:~free.ekanayaka/txamqp/synchronous-testing-helpers into lp:txamqp

Proposed by Free Ekanayaka
Status: Merged
Merge reported by: Esteve Fernandez
Merged at revision: not available
Proposed branch: lp:~free.ekanayaka/txamqp/synchronous-testing-helpers
Merge into: lp:txamqp
Diff against target: 355 lines (+270/-12)
6 files modified
src/txamqp/factory.py (+3/-8)
src/txamqp/protocol.py (+4/-3)
src/txamqp/spec.py (+6/-1)
src/txamqp/test/test_factory.py (+8/-0)
src/txamqp/test/test_testing.py (+52/-0)
src/txamqp/testing.py (+197/-0)
To merge this branch: bzr merge lp:~free.ekanayaka/txamqp/synchronous-testing-helpers
Reviewer Review Type Date Requested Status
Esteve Fernandez Approve
Review via email: mp+296293@code.launchpad.net

Description of the change

This adds a new txamqp.testing.AMQPump class which can use as a fake AMQP transport for writing synchronous unit tests in a convenient way. Example:

client = AMQClient(...)
transport = AMQPump()
transport.connect(client)
channel = self.successResultOf(client.channel(1))
d = clientChannel.basic_consume(queue="test-queue")
transport.channel(1).basic_consume_ok(consumer_tag="consumer")
self.assertTrue(self.successResultOf(d))

The branch also fixes a small bug in txamqp.factory.AMQFactory where newly built protocols were not passed a fresh TwistedDelegate.

It also moves the DEFAULT_SPEC constant from txamqp.factory to txamqp.spec
which seems a better fit.

Finally it makes AMQClient pass its clock to callLater, for easier synchronous testing.

To post a comment you must log in.
Revision history for this message
Esteve Fernandez (esteve) wrote :

Sorry, I seem to have missed this one. Looks great, thanks!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/txamqp/factory.py'
2--- src/txamqp/factory.py 2016-05-27 09:44:14 +0000
3+++ src/txamqp/factory.py 2016-06-02 08:18:42 +0000
4@@ -16,17 +16,12 @@
5 # specific language governing permissions and limitations
6 # under the License.
7 #
8-import os
9-
10 from twisted.internet.protocol import Factory
11
12 from txamqp.protocol import AMQClient
13-from txamqp.spec import load
14+from txamqp.spec import DEFAULT_SPEC, load
15 from txamqp.client import TwistedDelegate
16
17-DEFAULT_SPEC = os.path.join(
18- os.path.dirname(__file__), "../specs/standard/amqp0-9.stripped.xml")
19-
20
21 class AMQFactory(Factory):
22 """A factory building AMQClient instances."""
23@@ -42,7 +37,6 @@
24 spec = DEFAULT_SPEC
25 self._spec = load(spec)
26 self._clock = clock
27- self._delegate = TwistedDelegate()
28 self._vhost = "/"
29 self._heartbeat = 0
30
31@@ -55,7 +49,8 @@
32 self._heartbeat = heartbeat
33
34 def buildProtocol(self, addr):
35+ delegate = TwistedDelegate()
36 protocol = self.protocol(
37- self._delegate, vhost=self._vhost, spec=self._spec,
38+ delegate, vhost=self._vhost, spec=self._spec,
39 heartbeat=self._heartbeat, clock=self._clock)
40 return protocol
41
42=== modified file 'src/txamqp/protocol.py'
43--- src/txamqp/protocol.py 2012-07-11 11:12:55 +0000
44+++ src/txamqp/protocol.py 2016-06-02 08:18:42 +0000
45@@ -247,10 +247,11 @@
46 self.work.get().addCallback(self.worker)
47 self.heartbeatInterval = heartbeat
48 self.insist = insist
49+ if clock is None:
50+ from twisted.internet import reactor
51+ clock = reactor
52+ self.clock = clock
53 if self.heartbeatInterval > 0:
54- if clock is None:
55- from twisted.internet import reactor as clock
56- self.clock = clock
57 self.checkHB = self.clock.callLater(self.heartbeatInterval *
58 self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
59 self.sendHB = LoopingCall(self.sendHeartbeat)
60
61=== modified file 'src/txamqp/spec.py'
62--- src/txamqp/spec.py 2010-01-24 00:32:32 +0000
63+++ src/txamqp/spec.py 2016-06-02 08:18:42 +0000
64@@ -29,10 +29,15 @@
65 situations.
66 """
67
68-import re, textwrap, new
69+import re, textwrap, new, os
70
71 from txamqp import xmlutil
72
73+
74+DEFAULT_SPEC = os.path.join(
75+ os.path.dirname(__file__), "../specs/standard/amqp0-9.stripped.xml")
76+
77+
78 class SpecContainer(object):
79
80 def __init__(self):
81
82=== modified file 'src/txamqp/test/test_factory.py'
83--- src/txamqp/test/test_factory.py 2016-05-27 09:44:14 +0000
84+++ src/txamqp/test/test_factory.py 2016-06-02 08:18:42 +0000
85@@ -49,3 +49,11 @@
86 self.assertEqual(spec, client.spec.file)
87 self.assertEqual(1, client.heartbeatInterval)
88 self.assertEqual(1, len(clock.getDelayedCalls()))
89+
90+ def test_build_protocol_different_delegates(self):
91+ """Test building AMQClient getting different delegates."""
92+ address = IPv4Address("TCP", "127.0.0.1", 5672)
93+ factory = AMQFactory()
94+ client1 = factory.buildProtocol(address)
95+ client2 = factory.buildProtocol(address)
96+ self.assertIsNot(client2.delegate, client1.delegate)
97
98=== added file 'src/txamqp/test/test_testing.py'
99--- src/txamqp/test/test_testing.py 1970-01-01 00:00:00 +0000
100+++ src/txamqp/test/test_testing.py 2016-06-02 08:18:42 +0000
101@@ -0,0 +1,52 @@
102+#
103+# Licensed to the Apache Software Foundation (ASF) under one
104+# or more contributor license agreements. See the NOTICE file
105+# distributed with this work for additional information
106+# regarding copyright ownership. The ASF licenses this file
107+# to you under the Apache License, Version 2.0 (the
108+# "License"); you may not use this file except in compliance
109+# with the License. You may obtain a copy of the License at
110+#
111+# http://www.apache.org/licenses/LICENSE-2.0
112+#
113+# Unless required by applicable law or agreed to in writing,
114+# software distributed under the License is distributed on an
115+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
116+# KIND, either express or implied. See the License for the
117+# specific language governing permissions and limitations
118+# under the License.
119+#
120+from twisted.trial.unittest import TestCase
121+from twisted.internet.task import Clock
122+
123+from txamqp.protocol import AMQClient
124+from txamqp.spec import load
125+from txamqp.factory import DEFAULT_SPEC
126+from txamqp.client import TwistedDelegate
127+from txamqp.testing import AMQPump
128+
129+
130+class AMQPumpTest(TestCase):
131+
132+ def setUp(self):
133+ super(AMQPumpTest, self).setUp()
134+ delegate = TwistedDelegate()
135+ spec = load(DEFAULT_SPEC)
136+ self.client = AMQClient(delegate, "/", spec, clock=Clock())
137+ self.transport = AMQPump()
138+ self.transport.connect(self.client)
139+
140+ def test_send_receive(self):
141+ """
142+ Test sending and receiving frames.
143+ """
144+ clientChannel = self.successResultOf(self.client.channel(1))
145+ serverChannel = self.transport.channel(1)
146+ d = clientChannel.basic_consume(queue="test-queue")
147+ serverChannel.basic_consume_ok(consumer_tag="consumer")
148+ reply = self.successResultOf(d)
149+ queue = self.successResultOf(self.client.queue(reply.consumer_tag))
150+ d = queue.get(timeout=1)
151+ serverChannel.deliver("hello", consumer_tag="consumer")
152+ message = self.successResultOf(d)
153+ self.assertEqual("hello", message.content.body)
154
155=== added file 'src/txamqp/testing.py'
156--- src/txamqp/testing.py 1970-01-01 00:00:00 +0000
157+++ src/txamqp/testing.py 2016-06-02 08:18:42 +0000
158@@ -0,0 +1,197 @@
159+#
160+# Licensed to the Apache Software Foundation (ASF) under one
161+# or more contributor license agreements. See the NOTICE file
162+# distributed with this work for additional information
163+# regarding copyright ownership. The ASF licenses this file
164+# to you under the Apache License, Version 2.0 (the
165+# "License"); you may not use this file except in compliance
166+# with the License. You may obtain a copy of the License at
167+#
168+# http://www.apache.org/licenses/LICENSE-2.0
169+#
170+# Unless required by applicable law or agreed to in writing,
171+# software distributed under the License is distributed on an
172+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
173+# KIND, either express or implied. See the License for the
174+# specific language governing permissions and limitations
175+# under the License.
176+#
177+"""Utilities for synchronous unit tests.
178+
179+Classes in this module work without any actual network connection.
180+"""
181+from functools import partial
182+
183+from twisted.python.failure import Failure
184+from twisted.internet.error import ConnectionDone, ConnectionLost
185+
186+from txamqp.connection import Method, Frame, Header, Body
187+
188+
189+class AMQPump(object):
190+ """Track outgoing frames from an AMQClient and pump incoming frames to it.
191+
192+ It implements the ITransport API so it can be used a transport for
193+ AMQClient protocols.
194+
195+ @ivar client: The AMQClient to pump frames to. Must be set with connect().
196+ @ivar incoming: A dict mapping channel IDs to a list of frames that were
197+ received by the client through it.
198+ @ivar outgoing: A dict mapping channel IDs to a list of frames that were
199+ sent by the client through it.
200+ @ivar initialized: Whether the AMQP init string was sent by the client yet.
201+ @ivar connected: Whether we're still connected.
202+ @ivar aborted: Whether the connection was aborted.
203+ """
204+
205+ def __init__(self, logger=None):
206+ """
207+ @param logger: Optional Twisted logger, to log incoming and outgoing
208+ frames (e.g. for debugging).
209+ """
210+ self.client = None
211+ self.incoming = {}
212+ self.outgoing = {}
213+ self.initialized = False
214+ self.connected = False
215+ self.aborted = False
216+ self.logger = logger
217+
218+ def connect(self, client):
219+ """Connect this transport to the given AMQClient."""
220+ self._log("Client connected: {client}", client=client)
221+ self.connected = True
222+ self.client = client
223+ self.clock = client.clock
224+ self.client.makeConnection(self)
225+
226+ def write(self, data):
227+ """Simulate sending data over the network.
228+
229+ The data will be unpacked back into a frame and appeneded to the
230+ outgoing list associated with its channel.
231+ """
232+ if not self.initialized:
233+ # Ignore the init string
234+ self.initialized = True
235+ return
236+ frame = self.client._unpackFrame(data)
237+ self.outgoing.setdefault(frame.channel, []).append(frame)
238+ self._log("Outgoing frame: {frame}", frame=frame)
239+
240+ def loseConnection(self):
241+ self.connected = False
242+ self.client.connectionLost(Failure(ConnectionDone()))
243+
244+ def abortConnection(self):
245+ self.connected = False
246+ self.aborted = True
247+ self.client.connectionLost(Failure(ConnectionLost()))
248+
249+ def channel(self, id):
250+ """Get an _AMPServerChannel, to be used as convenience to pump frames.
251+
252+ @param id: The ID of the channel frames will be sent to.
253+ """
254+ return _AMPServerChannel(self, id)
255+
256+ def pumpMethod(self, channel, klass, method, **fields):
257+ """Convenience for pumping a method frame.
258+
259+ @param channel: The channel ID of the frame.
260+ @param klass: The class name of the method.
261+ @param method: The name of the method.
262+ @param fields: Fields for the method. Missing fields will be filled
263+ with None.
264+ """
265+ klass = self._classFromPyName(klass)
266+ method = self._methodFromPyName(klass, method)
267+ args = [None] * len(method.fields.items)
268+ for key, value in fields.iteritems():
269+ field = method.fields.bypyname[key]
270+ args[method.fields.indexes[field]] = value
271+ self.pump(channel, Method(method, *args))
272+
273+ def pumpHeader(self, channel, klass, weight, size, **properties):
274+ """Convenience for pumping an header frame.
275+
276+ @param channel: The channel ID of the frame.
277+ @param klass: The class name of the header.
278+ @param weight: The weight of the header.
279+ @param size: Number of bytes in the follow-up body frame.
280+ @param properties: Properties of the header, if any.
281+ """
282+ klass = self._classFromPyName(klass)
283+ self.pump(channel, Header(klass, weight, size, **properties))
284+
285+ def pumpBody(self, channel, body):
286+ """Convenience for pumping a body frame.
287+
288+ @param channel: The channel ID of the frame.
289+ @param body: The data of the body frame.
290+ """
291+ self.pump(channel, Body(body))
292+
293+ def pump(self, channel, payload):
294+ """Pump a single frame.
295+
296+ @param channel: The channel ID of the frame.
297+ @param payload: The Payload object of the frame.
298+ """
299+ frame = Frame(channel, payload)
300+ self._log("Incoming frame: {frame}", frame=frame)
301+ self.incoming.setdefault(channel, []).append(frame)
302+ self.client.frameReceived(frame)
303+
304+ def _classFromPyName(self, name):
305+ """Return the spec class metadata object with given name."""
306+ return self.client.spec.classes.byname[name]
307+
308+ def _methodFromPyName(self, klass, name):
309+ """Return the spec method metadata object with given name."""
310+ return klass.methods.byname[name]
311+
312+ def _log(self, message, **params):
313+ """Log the given message if we were given a logger."""
314+ if self.logger:
315+ self.logger.debug(message, **params)
316+
317+
318+class _AMPServerChannel(object):
319+ """Convenience to pump frames into a connected AMQClient.
320+
321+ You can invoke any method defined by the connected AMQClient's spec, in
322+ a way similar you would for an AMQChannel.
323+
324+ For example:
325+
326+ transport = AMQPump()
327+ transport.connect(client)
328+ channel = transport.serverChannel(1)
329+ channel.basic_consume_ok(consumer_tag="foo")
330+ """
331+
332+ def __init__(self, pump, id):
333+ """
334+ @param pump: The AMQPump used to pump frames to a connected client.
335+ @param id: The channel ID frames will be pumped into.
336+ """
337+ self.pump = pump
338+ self.id = id
339+
340+ def deliver(self, body, **fields):
341+ """Convenience for pumping a basic-deliver method frame plus data.
342+
343+ This will send a frame for the basic-deliver method, one for the
344+ message header and one for the message body.
345+ """
346+ self.pump.pumpMethod(self.id, "basic", "deliver", **fields)
347+ self.pump.pumpHeader(self.id, "basic", 0, len(body))
348+ self.pump.pumpBody(self.id, body)
349+
350+ def __getattr__(self, name):
351+ """Get a callable that will send the associated method frame."""
352+ words = name.split("_")
353+ klass = words[0]
354+ method = "-".join(words[1:])
355+ return partial(self.pump.pumpMethod, self.id, klass, method)

Subscribers

People subscribed via source and target branches

to status/vote changes: