Merge lp:~free.ekanayaka/txamqp/amq-endpoint into lp:txamqp

Proposed by Free Ekanayaka
Status: Merged
Merged at revision: 73
Proposed branch: lp:~free.ekanayaka/txamqp/amq-endpoint
Merge into: lp:txamqp
Diff against target: 364 lines (+345/-0)
4 files modified
src/txamqp/endpoint.py (+130/-0)
src/txamqp/factory.py (+61/-0)
src/txamqp/test/test_endpoint.py (+103/-0)
src/txamqp/test/test_factory.py (+51/-0)
To merge this branch: bzr merge lp:~free.ekanayaka/txamqp/amq-endpoint
Reviewer Review Type Date Requested Status
Esteve Fernandez Approve
Review via email: mp+295952@code.launchpad.net

Description of the change

This branch adds an AMQEndpoint class which has the same interface as a regular IStreamClientEndpoint, but with some more assumptions about the factory being passed and the transport to be used.

Example code:

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from txamqp.factory import AMQFactory
from txamqp.endpoint import AMQEndpoint

@inlineCallbacks
def connect():
    endpoint = AMQEndpoint.fromURI(reactor, "amqp://guest@:guest@localhost/")
    client = yield endpoint.connect(AMQFactory())
    # ... "client" is a connected and authenticated AMQClient

For now no support for TLS connections is in place, but it'd be easy
to add as follow-up branch, if there's desire for it.

To post a comment you must log in.
Revision history for this message
Esteve Fernandez (esteve) wrote :

Thanks, changes look great!

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Thanks for the very quick feedback Esteve!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'src/txamqp/endpoint.py'
2--- src/txamqp/endpoint.py 1970-01-01 00:00:00 +0000
3+++ src/txamqp/endpoint.py 2016-05-27 13:16:31 +0000
4@@ -0,0 +1,130 @@
5+#
6+# Licensed to the Apache Software Foundation (ASF) under one
7+# or more contributor license agreements. See the NOTICE file
8+# distributed with this work for additional information
9+# regarding copyright ownership. The ASF licenses this file
10+# to you under the Apache License, Version 2.0 (the
11+# "License"); you may not use this file except in compliance
12+# with the License. You may obtain a copy of the License at
13+#
14+# http://www.apache.org/licenses/LICENSE-2.0
15+#
16+# Unless required by applicable law or agreed to in writing,
17+# software distributed under the License is distributed on an
18+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19+# KIND, either express or implied. See the License for the
20+# specific language governing permissions and limitations
21+# under the License.
22+#
23+from twisted.internet.defer import inlineCallbacks, returnValue
24+from twisted.internet.endpoints import clientFromString
25+from twisted.web.client import URI
26+from twisted.web.http import parse_qs
27+
28+
29+class AMQEndpoint(object):
30+ """An endpoint that knows how to connect to AMQP brokers.
31+
32+ The class implements the same API as IStreamClientEndpoint, however it
33+ requires the protocol factory to be able to speak AMQP for perfoming
34+ the authentication.
35+
36+ @note: Currently TLS connections are not supported.
37+ """
38+
39+ def __init__(self, reactor, host, port, username="", password="",
40+ vhost="/", heartbeat=0, auth_mechanism="AMQPLAIN",
41+ timeout=30):
42+ """
43+ @param reactor: An L{IReactorTCP} provider.
44+
45+ @param username: The username to use when authenticating.
46+ @type username: L{bytes}
47+
48+ @param password: The password to use when authenticating.
49+ @type password: L{bytes}
50+
51+ @param host: Host name or IP address of the AMQP broker.
52+ @type host: L{bytes}
53+
54+ @type port: L{int}
55+ @param port: Port number.
56+
57+ @param vhost: The vhost to open the connection against.
58+ @type vhost: L{bytes}
59+
60+ @param heartbeat: AMQP heartbeat in seconds.
61+ @type heartbeat: L{int}
62+
63+ @type auth_mechanism: Authentication mechanism. Currently only AMQPLAIN
64+ and PLAIN are supported.
65+ @type mechanism: L{bytes}
66+
67+ @param timeout: Number of seconds to wait before assuming the
68+ connection has failed.
69+ @type timeout: int
70+ """
71+ self._reactor = reactor
72+ self._host = host
73+ self._port = port
74+ self._username = username
75+ self._password = password
76+ self._vhost = vhost
77+ self._heartbeat = heartbeat
78+ self._auth_mechanism = auth_mechanism
79+ self._timeout = timeout
80+
81+ @classmethod
82+ def fromURI(cls, reactor, uri):
83+ """Return an AMQEndpoint instance configured with the given AMQP uri.
84+
85+ @see: https://www.rabbitmq.com/uri-spec.html
86+ """
87+ uri = URI.fromBytes(uri, defaultPort=5672)
88+ kwargs = {}
89+ host = uri.host
90+ if "@" in host:
91+ auth, host = uri.netloc.split("@")
92+ username, password = auth.split(":")
93+ kwargs.update({"username": username, "password": password})
94+
95+ vhost = uri.path
96+ if len(vhost) > 1:
97+ vhost = vhost[1:] # Strip leading "/"
98+ kwargs["vhost"] = vhost
99+
100+ params = parse_qs(uri.query)
101+ kwargs.update({name: value[0] for name, value in params.items()})
102+
103+ if "heartbeat" in kwargs:
104+ kwargs["heartbeat"] = int(kwargs["heartbeat"])
105+ return cls(reactor, host, uri.port, **kwargs)
106+
107+ def connect(self, protocolFactory):
108+ """
109+ Connect to the C{protocolFactory} to the AMQP broker specified by the
110+ URI of this endpoint.
111+
112+ @param protocolFactory: An L{AMQFactory} building L{AMQClient} objects.
113+ @return: A L{Deferred} that results in an L{AMQClient} upon successful
114+ connection otherwise a L{Failure} wrapping L{ConnectError} or
115+ L{NoProtocol <twisted.internet.error.NoProtocol>}.
116+ """
117+ # XXX Since AMQClient requires these parameters at __init__ time, we
118+ # need to override them in the provided factory.
119+ protocolFactory.setVHost(self._vhost)
120+ protocolFactory.setHeartbeat(self._heartbeat)
121+
122+ description = "tcp:{}:{}:timeout={}".format(
123+ self._host, self._port, self._timeout)
124+ endpoint = clientFromString(self._reactor, description)
125+
126+ deferred = endpoint.connect(protocolFactory)
127+ return deferred.addCallback(self._authenticate)
128+
129+ @inlineCallbacks
130+ def _authenticate(self, client):
131+ """Perform AMQP authentication."""
132+ yield client.authenticate(
133+ self._username, self._password, mechanism=self._auth_mechanism)
134+ returnValue(client)
135
136=== added file 'src/txamqp/factory.py'
137--- src/txamqp/factory.py 1970-01-01 00:00:00 +0000
138+++ src/txamqp/factory.py 2016-05-27 13:16:31 +0000
139@@ -0,0 +1,61 @@
140+#
141+# Licensed to the Apache Software Foundation (ASF) under one
142+# or more contributor license agreements. See the NOTICE file
143+# distributed with this work for additional information
144+# regarding copyright ownership. The ASF licenses this file
145+# to you under the Apache License, Version 2.0 (the
146+# "License"); you may not use this file except in compliance
147+# with the License. You may obtain a copy of the License at
148+#
149+# http://www.apache.org/licenses/LICENSE-2.0
150+#
151+# Unless required by applicable law or agreed to in writing,
152+# software distributed under the License is distributed on an
153+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
154+# KIND, either express or implied. See the License for the
155+# specific language governing permissions and limitations
156+# under the License.
157+#
158+import os
159+
160+from twisted.internet.protocol import Factory
161+
162+from txamqp.protocol import AMQClient
163+from txamqp.spec import load
164+from txamqp.client import TwistedDelegate
165+
166+DEFAULT_SPEC = os.path.join(
167+ os.path.dirname(__file__), "../specs/standard/amqp0-9.stripped.xml")
168+
169+
170+class AMQFactory(Factory):
171+ """A factory building AMQClient instances."""
172+
173+ protocol = AMQClient
174+
175+ def __init__(self, spec=None, clock=None):
176+ """
177+ @param spec: Path to the spec file. Defaults to the standard AMQP 0.9.
178+ @type spec: L{str} (native string)
179+ """
180+ if spec is None:
181+ spec = DEFAULT_SPEC
182+ self._spec = load(spec)
183+ self._clock = clock
184+ self._delegate = TwistedDelegate()
185+ self._vhost = "/"
186+ self._heartbeat = 0
187+
188+ def setVHost(self, vhost):
189+ """Set a custom vhost."""
190+ self._vhost = vhost
191+
192+ def setHeartbeat(self, heartbeat):
193+ """Set a custom heartbeat."""
194+ self._heartbeat = heartbeat
195+
196+ def buildProtocol(self, addr):
197+ protocol = self.protocol(
198+ self._delegate, vhost=self._vhost, spec=self._spec,
199+ heartbeat=self._heartbeat, clock=self._clock)
200+ return protocol
201
202=== added file 'src/txamqp/test/test_endpoint.py'
203--- src/txamqp/test/test_endpoint.py 1970-01-01 00:00:00 +0000
204+++ src/txamqp/test/test_endpoint.py 2016-05-27 13:16:31 +0000
205@@ -0,0 +1,103 @@
206+#
207+# Licensed to the Apache Software Foundation (ASF) under one
208+# or more contributor license agreements. See the NOTICE file
209+# distributed with this work for additional information
210+# regarding copyright ownership. The ASF licenses this file
211+# to you under the Apache License, Version 2.0 (the
212+# "License"); you may not use this file except in compliance
213+# with the License. You may obtain a copy of the License at
214+#
215+# http://www.apache.org/licenses/LICENSE-2.0
216+#
217+# Unless required by applicable law or agreed to in writing,
218+# software distributed under the License is distributed on an
219+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
220+# KIND, either express or implied. See the License for the
221+# specific language governing permissions and limitations
222+# under the License.
223+#
224+from twisted.trial.unittest import TestCase
225+from twisted.internet import reactor
226+from twisted.internet.defer import inlineCallbacks
227+from twisted.internet.task import Clock
228+from twisted.test.proto_helpers import MemoryReactorClock, StringTransport
229+
230+from txamqp.testlib import TestBase as IntegrationTest
231+from txamqp.factory import AMQFactory
232+from txamqp.endpoint import AMQEndpoint
233+
234+
235+class AMQEndpointTest(TestCase):
236+
237+ def setUp(self):
238+ super(AMQEndpointTest, self).setUp()
239+ self.reactor = MemoryReactorClock()
240+ self.factory = AMQFactory(clock=Clock())
241+
242+ def test_connect(self):
243+ """
244+ The endpoint connects to the broker and performs the AMQP
245+ authentication.
246+ """
247+ endpoint = AMQEndpoint(
248+ self.reactor, "1.2.3.4", "1234", username="me", password="pw")
249+ endpoint.connect(self.factory)
250+ self.assertEqual(("1.2.3.4", 1234), self.reactor.tcpClients[0][:2])
251+ # _WrappingFactory from twisted.internet.endpoints
252+ factory = self.reactor.tcpClients[0][2]
253+ protocol = factory.buildProtocol(None)
254+ protocol.makeConnection(StringTransport())
255+ client = protocol._wrappedProtocol
256+ self.assertEqual({"LOGIN": "me", "PASSWORD": "pw"}, client.response)
257+ self.assertEqual("AMQPLAIN", client.mechanism)
258+
259+ def test_connect_with_vhost_and_heartbeat(self):
260+ """
261+ It's possible to specify a custom vhost and a custom heartbeat.
262+ """
263+ endpoint = AMQEndpoint(
264+ self.reactor, "1.2.3.4", "1234", username="me", password="pw",
265+ vhost="foo", heartbeat=10)
266+ endpoint.connect(self.factory)
267+ # _WrappingFactory from twisted.internet.endpoints
268+ factory = self.reactor.tcpClients[0][2]
269+ protocol = factory.buildProtocol(None)
270+ protocol.makeConnection(StringTransport())
271+ client = protocol._wrappedProtocol
272+ self.assertEqual("foo", client.vhost)
273+ self.assertEqual(10, client.heartbeatInterval)
274+
275+ def test_from_uri(self):
276+ """
277+ It's possible to build an AMQEndpoint from an AMQP URI string.
278+ """
279+ endpoint = AMQEndpoint.fromURI(
280+ self.reactor, "amqp://me:pw@some.broker/foo?heartbeat=10")
281+ endpoint.connect(self.factory)
282+ self.assertEqual(("some.broker", 5672), self.reactor.tcpClients[0][:2])
283+ # _WrappingFactory from twisted.internet.endpoints
284+ factory = self.reactor.tcpClients[0][2]
285+ protocol = factory.buildProtocol(None)
286+ protocol.makeConnection(StringTransport())
287+ client = protocol._wrappedProtocol
288+ self.assertEqual("foo", client.vhost)
289+ self.assertEqual(10, client.heartbeatInterval)
290+ self.assertEqual({"LOGIN": "me", "PASSWORD": "pw"}, client.response)
291+ self.assertEqual("AMQPLAIN", client.mechanism)
292+
293+
294+class AMQEndpointIntegrationTest(IntegrationTest):
295+
296+ @inlineCallbacks
297+ def test_connect(self):
298+ """
299+ The endpoint returns a connected and authenticated client.
300+ """
301+ factory = AMQFactory(spec=self.spec)
302+ endpoint = AMQEndpoint(
303+ reactor, self.host, self.port, username=self.user,
304+ password=self.password, vhost=self.vhost)
305+ client = yield endpoint.connect(factory)
306+ channel = yield client.channel(1)
307+ yield channel.channel_open()
308+ client.close(None)
309
310=== added file 'src/txamqp/test/test_factory.py'
311--- src/txamqp/test/test_factory.py 1970-01-01 00:00:00 +0000
312+++ src/txamqp/test/test_factory.py 2016-05-27 13:16:31 +0000
313@@ -0,0 +1,51 @@
314+#
315+# Licensed to the Apache Software Foundation (ASF) under one
316+# or more contributor license agreements. See the NOTICE file
317+# distributed with this work for additional information
318+# regarding copyright ownership. The ASF licenses this file
319+# to you under the Apache License, Version 2.0 (the
320+# "License"); you may not use this file except in compliance
321+# with the License. You may obtain a copy of the License at
322+#
323+# http://www.apache.org/licenses/LICENSE-2.0
324+#
325+# Unless required by applicable law or agreed to in writing,
326+# software distributed under the License is distributed on an
327+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
328+# KIND, either express or implied. See the License for the
329+# specific language governing permissions and limitations
330+# under the License.
331+#
332+from twisted.trial.unittest import TestCase
333+from twisted.internet.address import IPv4Address
334+from twisted.internet.task import Clock
335+
336+from txamqp.protocol import AMQClient
337+from txamqp.factory import AMQFactory, DEFAULT_SPEC
338+
339+
340+class AMQFactoryTest(TestCase):
341+
342+ def test_build_protocol(self):
343+ """Test building AMQClient instances with default parameters."""
344+ address = IPv4Address("TCP", "127.0.0.1", 5672)
345+ factory = AMQFactory()
346+ client = factory.buildProtocol(address)
347+ self.assertIsInstance(client, AMQClient)
348+ self.assertEqual("/", client.vhost)
349+ self.assertEqual(DEFAULT_SPEC, client.spec.file)
350+ self.assertEqual(0, client.heartbeatInterval)
351+
352+ def test_build_protocol_custom_parameters(self):
353+ """Test building AMQClient instances with custom parameters."""
354+ address = IPv4Address("TCP", "127.0.0.1", 5672)
355+ spec = "../specs/rabbitmq/amqp0-8.stripped.rabbitmq.xml"
356+ clock = Clock()
357+ factory = AMQFactory(spec=spec, clock=clock)
358+ factory.setVHost("foo")
359+ factory.setHeartbeat(1)
360+ client = factory.buildProtocol(address)
361+ self.assertEqual("foo", client.vhost)
362+ self.assertEqual(spec, client.spec.file)
363+ self.assertEqual(1, client.heartbeatInterval)
364+ self.assertEqual(1, len(clock.getDelayedCalls()))

Subscribers

People subscribed via source and target branches

to status/vote changes: