Merge lp:~free.ekanayaka/txamqp/synchronous-testing-helpers into lp:txamqp

Proposed by Free Ekanayaka
Status: Merged
Merge reported by: Esteve Fernandez
Merged at revision: not available
Proposed branch: lp:~free.ekanayaka/txamqp/synchronous-testing-helpers
Merge into: lp:txamqp
Diff against target: 355 lines (+270/-12)
6 files modified
src/txamqp/factory.py (+3/-8)
src/txamqp/protocol.py (+4/-3)
src/txamqp/spec.py (+6/-1)
src/txamqp/test/test_factory.py (+8/-0)
src/txamqp/test/test_testing.py (+52/-0)
src/txamqp/testing.py (+197/-0)
To merge this branch: bzr merge lp:~free.ekanayaka/txamqp/synchronous-testing-helpers
Reviewer Review Type Date Requested Status
Esteve Fernandez Approve
Review via email: mp+296293@code.launchpad.net

Description of the change

This adds a new txamqp.testing.AMQPump class which can use as a fake AMQP transport for writing synchronous unit tests in a convenient way. Example:

client = AMQClient(...)
transport = AMQPump()
transport.connect(client)
channel = self.successResultOf(client.channel(1))
d = clientChannel.basic_consume(queue="test-queue")
transport.channel(1).basic_consume_ok(consumer_tag="consumer")
self.assertTrue(self.successResultOf(d))

The branch also fixes a small bug in txamqp.factory.AMQFactory where newly built protocols were not passed a fresh TwistedDelegate.

It also moves the DEFAULT_SPEC constant from txamqp.factory to txamqp.spec
which seems a better fit.

Finally it makes AMQClient pass its clock to callLater, for easier synchronous testing.

To post a comment you must log in.
Revision history for this message
Esteve Fernandez (esteve) wrote :

Sorry, I seem to have missed this one. Looks great, thanks!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/txamqp/factory.py'
--- src/txamqp/factory.py 2016-05-27 09:44:14 +0000
+++ src/txamqp/factory.py 2016-06-02 08:18:42 +0000
@@ -16,17 +16,12 @@
16# specific language governing permissions and limitations16# specific language governing permissions and limitations
17# under the License.17# under the License.
18#18#
19import os
20
21from twisted.internet.protocol import Factory19from twisted.internet.protocol import Factory
2220
23from txamqp.protocol import AMQClient21from txamqp.protocol import AMQClient
24from txamqp.spec import load22from txamqp.spec import DEFAULT_SPEC, load
25from txamqp.client import TwistedDelegate23from txamqp.client import TwistedDelegate
2624
27DEFAULT_SPEC = os.path.join(
28 os.path.dirname(__file__), "../specs/standard/amqp0-9.stripped.xml")
29
3025
31class AMQFactory(Factory):26class AMQFactory(Factory):
32 """A factory building AMQClient instances."""27 """A factory building AMQClient instances."""
@@ -42,7 +37,6 @@
42 spec = DEFAULT_SPEC37 spec = DEFAULT_SPEC
43 self._spec = load(spec)38 self._spec = load(spec)
44 self._clock = clock39 self._clock = clock
45 self._delegate = TwistedDelegate()
46 self._vhost = "/"40 self._vhost = "/"
47 self._heartbeat = 041 self._heartbeat = 0
4842
@@ -55,7 +49,8 @@
55 self._heartbeat = heartbeat49 self._heartbeat = heartbeat
5650
57 def buildProtocol(self, addr):51 def buildProtocol(self, addr):
52 delegate = TwistedDelegate()
58 protocol = self.protocol(53 protocol = self.protocol(
59 self._delegate, vhost=self._vhost, spec=self._spec,54 delegate, vhost=self._vhost, spec=self._spec,
60 heartbeat=self._heartbeat, clock=self._clock)55 heartbeat=self._heartbeat, clock=self._clock)
61 return protocol56 return protocol
6257
=== modified file 'src/txamqp/protocol.py'
--- src/txamqp/protocol.py 2012-07-11 11:12:55 +0000
+++ src/txamqp/protocol.py 2016-06-02 08:18:42 +0000
@@ -247,10 +247,11 @@
247 self.work.get().addCallback(self.worker)247 self.work.get().addCallback(self.worker)
248 self.heartbeatInterval = heartbeat248 self.heartbeatInterval = heartbeat
249 self.insist = insist249 self.insist = insist
250 if clock is None:
251 from twisted.internet import reactor
252 clock = reactor
253 self.clock = clock
250 if self.heartbeatInterval > 0:254 if self.heartbeatInterval > 0:
251 if clock is None:
252 from twisted.internet import reactor as clock
253 self.clock = clock
254 self.checkHB = self.clock.callLater(self.heartbeatInterval *255 self.checkHB = self.clock.callLater(self.heartbeatInterval *
255 self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)256 self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
256 self.sendHB = LoopingCall(self.sendHeartbeat)257 self.sendHB = LoopingCall(self.sendHeartbeat)
257258
=== modified file 'src/txamqp/spec.py'
--- src/txamqp/spec.py 2010-01-24 00:32:32 +0000
+++ src/txamqp/spec.py 2016-06-02 08:18:42 +0000
@@ -29,10 +29,15 @@
29situations.29situations.
30"""30"""
3131
32import re, textwrap, new32import re, textwrap, new, os
3333
34from txamqp import xmlutil34from txamqp import xmlutil
3535
36
37DEFAULT_SPEC = os.path.join(
38 os.path.dirname(__file__), "../specs/standard/amqp0-9.stripped.xml")
39
40
36class SpecContainer(object):41class SpecContainer(object):
3742
38 def __init__(self):43 def __init__(self):
3944
=== modified file 'src/txamqp/test/test_factory.py'
--- src/txamqp/test/test_factory.py 2016-05-27 09:44:14 +0000
+++ src/txamqp/test/test_factory.py 2016-06-02 08:18:42 +0000
@@ -49,3 +49,11 @@
49 self.assertEqual(spec, client.spec.file)49 self.assertEqual(spec, client.spec.file)
50 self.assertEqual(1, client.heartbeatInterval)50 self.assertEqual(1, client.heartbeatInterval)
51 self.assertEqual(1, len(clock.getDelayedCalls()))51 self.assertEqual(1, len(clock.getDelayedCalls()))
52
53 def test_build_protocol_different_delegates(self):
54 """Test building AMQClient getting different delegates."""
55 address = IPv4Address("TCP", "127.0.0.1", 5672)
56 factory = AMQFactory()
57 client1 = factory.buildProtocol(address)
58 client2 = factory.buildProtocol(address)
59 self.assertIsNot(client2.delegate, client1.delegate)
5260
=== added file 'src/txamqp/test/test_testing.py'
--- src/txamqp/test/test_testing.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/test/test_testing.py 2016-06-02 08:18:42 +0000
@@ -0,0 +1,52 @@
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19from twisted.trial.unittest import TestCase
20from twisted.internet.task import Clock
21
22from txamqp.protocol import AMQClient
23from txamqp.spec import load
24from txamqp.factory import DEFAULT_SPEC
25from txamqp.client import TwistedDelegate
26from txamqp.testing import AMQPump
27
28
29class AMQPumpTest(TestCase):
30
31 def setUp(self):
32 super(AMQPumpTest, self).setUp()
33 delegate = TwistedDelegate()
34 spec = load(DEFAULT_SPEC)
35 self.client = AMQClient(delegate, "/", spec, clock=Clock())
36 self.transport = AMQPump()
37 self.transport.connect(self.client)
38
39 def test_send_receive(self):
40 """
41 Test sending and receiving frames.
42 """
43 clientChannel = self.successResultOf(self.client.channel(1))
44 serverChannel = self.transport.channel(1)
45 d = clientChannel.basic_consume(queue="test-queue")
46 serverChannel.basic_consume_ok(consumer_tag="consumer")
47 reply = self.successResultOf(d)
48 queue = self.successResultOf(self.client.queue(reply.consumer_tag))
49 d = queue.get(timeout=1)
50 serverChannel.deliver("hello", consumer_tag="consumer")
51 message = self.successResultOf(d)
52 self.assertEqual("hello", message.content.body)
053
=== added file 'src/txamqp/testing.py'
--- src/txamqp/testing.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/testing.py 2016-06-02 08:18:42 +0000
@@ -0,0 +1,197 @@
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19"""Utilities for synchronous unit tests.
20
21Classes in this module work without any actual network connection.
22"""
23from functools import partial
24
25from twisted.python.failure import Failure
26from twisted.internet.error import ConnectionDone, ConnectionLost
27
28from txamqp.connection import Method, Frame, Header, Body
29
30
31class AMQPump(object):
32 """Track outgoing frames from an AMQClient and pump incoming frames to it.
33
34 It implements the ITransport API so it can be used a transport for
35 AMQClient protocols.
36
37 @ivar client: The AMQClient to pump frames to. Must be set with connect().
38 @ivar incoming: A dict mapping channel IDs to a list of frames that were
39 received by the client through it.
40 @ivar outgoing: A dict mapping channel IDs to a list of frames that were
41 sent by the client through it.
42 @ivar initialized: Whether the AMQP init string was sent by the client yet.
43 @ivar connected: Whether we're still connected.
44 @ivar aborted: Whether the connection was aborted.
45 """
46
47 def __init__(self, logger=None):
48 """
49 @param logger: Optional Twisted logger, to log incoming and outgoing
50 frames (e.g. for debugging).
51 """
52 self.client = None
53 self.incoming = {}
54 self.outgoing = {}
55 self.initialized = False
56 self.connected = False
57 self.aborted = False
58 self.logger = logger
59
60 def connect(self, client):
61 """Connect this transport to the given AMQClient."""
62 self._log("Client connected: {client}", client=client)
63 self.connected = True
64 self.client = client
65 self.clock = client.clock
66 self.client.makeConnection(self)
67
68 def write(self, data):
69 """Simulate sending data over the network.
70
71 The data will be unpacked back into a frame and appeneded to the
72 outgoing list associated with its channel.
73 """
74 if not self.initialized:
75 # Ignore the init string
76 self.initialized = True
77 return
78 frame = self.client._unpackFrame(data)
79 self.outgoing.setdefault(frame.channel, []).append(frame)
80 self._log("Outgoing frame: {frame}", frame=frame)
81
82 def loseConnection(self):
83 self.connected = False
84 self.client.connectionLost(Failure(ConnectionDone()))
85
86 def abortConnection(self):
87 self.connected = False
88 self.aborted = True
89 self.client.connectionLost(Failure(ConnectionLost()))
90
91 def channel(self, id):
92 """Get an _AMPServerChannel, to be used as convenience to pump frames.
93
94 @param id: The ID of the channel frames will be sent to.
95 """
96 return _AMPServerChannel(self, id)
97
98 def pumpMethod(self, channel, klass, method, **fields):
99 """Convenience for pumping a method frame.
100
101 @param channel: The channel ID of the frame.
102 @param klass: The class name of the method.
103 @param method: The name of the method.
104 @param fields: Fields for the method. Missing fields will be filled
105 with None.
106 """
107 klass = self._classFromPyName(klass)
108 method = self._methodFromPyName(klass, method)
109 args = [None] * len(method.fields.items)
110 for key, value in fields.iteritems():
111 field = method.fields.bypyname[key]
112 args[method.fields.indexes[field]] = value
113 self.pump(channel, Method(method, *args))
114
115 def pumpHeader(self, channel, klass, weight, size, **properties):
116 """Convenience for pumping an header frame.
117
118 @param channel: The channel ID of the frame.
119 @param klass: The class name of the header.
120 @param weight: The weight of the header.
121 @param size: Number of bytes in the follow-up body frame.
122 @param properties: Properties of the header, if any.
123 """
124 klass = self._classFromPyName(klass)
125 self.pump(channel, Header(klass, weight, size, **properties))
126
127 def pumpBody(self, channel, body):
128 """Convenience for pumping a body frame.
129
130 @param channel: The channel ID of the frame.
131 @param body: The data of the body frame.
132 """
133 self.pump(channel, Body(body))
134
135 def pump(self, channel, payload):
136 """Pump a single frame.
137
138 @param channel: The channel ID of the frame.
139 @param payload: The Payload object of the frame.
140 """
141 frame = Frame(channel, payload)
142 self._log("Incoming frame: {frame}", frame=frame)
143 self.incoming.setdefault(channel, []).append(frame)
144 self.client.frameReceived(frame)
145
146 def _classFromPyName(self, name):
147 """Return the spec class metadata object with given name."""
148 return self.client.spec.classes.byname[name]
149
150 def _methodFromPyName(self, klass, name):
151 """Return the spec method metadata object with given name."""
152 return klass.methods.byname[name]
153
154 def _log(self, message, **params):
155 """Log the given message if we were given a logger."""
156 if self.logger:
157 self.logger.debug(message, **params)
158
159
160class _AMPServerChannel(object):
161 """Convenience to pump frames into a connected AMQClient.
162
163 You can invoke any method defined by the connected AMQClient's spec, in
164 a way similar you would for an AMQChannel.
165
166 For example:
167
168 transport = AMQPump()
169 transport.connect(client)
170 channel = transport.serverChannel(1)
171 channel.basic_consume_ok(consumer_tag="foo")
172 """
173
174 def __init__(self, pump, id):
175 """
176 @param pump: The AMQPump used to pump frames to a connected client.
177 @param id: The channel ID frames will be pumped into.
178 """
179 self.pump = pump
180 self.id = id
181
182 def deliver(self, body, **fields):
183 """Convenience for pumping a basic-deliver method frame plus data.
184
185 This will send a frame for the basic-deliver method, one for the
186 message header and one for the message body.
187 """
188 self.pump.pumpMethod(self.id, "basic", "deliver", **fields)
189 self.pump.pumpHeader(self.id, "basic", 0, len(body))
190 self.pump.pumpBody(self.id, body)
191
192 def __getattr__(self, name):
193 """Get a callable that will send the associated method frame."""
194 words = name.split("_")
195 klass = words[0]
196 method = "-".join(words[1:])
197 return partial(self.pump.pumpMethod, self.id, klass, method)

Subscribers

People subscribed via source and target branches

to status/vote changes: