Merge lp:~free.ekanayaka/txamqp/amq-endpoint into lp:txamqp

Proposed by Free Ekanayaka
Status: Merged
Merged at revision: 73
Proposed branch: lp:~free.ekanayaka/txamqp/amq-endpoint
Merge into: lp:txamqp
Diff against target: 364 lines (+345/-0)
4 files modified
src/txamqp/endpoint.py (+130/-0)
src/txamqp/factory.py (+61/-0)
src/txamqp/test/test_endpoint.py (+103/-0)
src/txamqp/test/test_factory.py (+51/-0)
To merge this branch: bzr merge lp:~free.ekanayaka/txamqp/amq-endpoint
Reviewer Review Type Date Requested Status
Esteve Fernandez Approve
Review via email: mp+295952@code.launchpad.net

Description of the change

This branch adds an AMQEndpoint class which has the same interface as a regular IStreamClientEndpoint, but with some more assumptions about the factory being passed and the transport to be used.

Example code:

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from txamqp.factory import AMQFactory
from txamqp.endpoint import AMQEndpoint

@inlineCallbacks
def connect():
    endpoint = AMQEndpoint.fromURI(reactor, "amqp://guest@:guest@localhost/")
    client = yield endpoint.connect(AMQFactory())
    # ... "client" is a connected and authenticated AMQClient

For now no support for TLS connections is in place, but it'd be easy
to add as follow-up branch, if there's desire for it.

To post a comment you must log in.
Revision history for this message
Esteve Fernandez (esteve) wrote :

Thanks, changes look great!

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Thanks for the very quick feedback Esteve!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'src/txamqp/endpoint.py'
--- src/txamqp/endpoint.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/endpoint.py 2016-05-27 13:16:31 +0000
@@ -0,0 +1,130 @@
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19from twisted.internet.defer import inlineCallbacks, returnValue
20from twisted.internet.endpoints import clientFromString
21from twisted.web.client import URI
22from twisted.web.http import parse_qs
23
24
25class AMQEndpoint(object):
26 """An endpoint that knows how to connect to AMQP brokers.
27
28 The class implements the same API as IStreamClientEndpoint, however it
29 requires the protocol factory to be able to speak AMQP for perfoming
30 the authentication.
31
32 @note: Currently TLS connections are not supported.
33 """
34
35 def __init__(self, reactor, host, port, username="", password="",
36 vhost="/", heartbeat=0, auth_mechanism="AMQPLAIN",
37 timeout=30):
38 """
39 @param reactor: An L{IReactorTCP} provider.
40
41 @param username: The username to use when authenticating.
42 @type username: L{bytes}
43
44 @param password: The password to use when authenticating.
45 @type password: L{bytes}
46
47 @param host: Host name or IP address of the AMQP broker.
48 @type host: L{bytes}
49
50 @type port: L{int}
51 @param port: Port number.
52
53 @param vhost: The vhost to open the connection against.
54 @type vhost: L{bytes}
55
56 @param heartbeat: AMQP heartbeat in seconds.
57 @type heartbeat: L{int}
58
59 @type auth_mechanism: Authentication mechanism. Currently only AMQPLAIN
60 and PLAIN are supported.
61 @type mechanism: L{bytes}
62
63 @param timeout: Number of seconds to wait before assuming the
64 connection has failed.
65 @type timeout: int
66 """
67 self._reactor = reactor
68 self._host = host
69 self._port = port
70 self._username = username
71 self._password = password
72 self._vhost = vhost
73 self._heartbeat = heartbeat
74 self._auth_mechanism = auth_mechanism
75 self._timeout = timeout
76
77 @classmethod
78 def fromURI(cls, reactor, uri):
79 """Return an AMQEndpoint instance configured with the given AMQP uri.
80
81 @see: https://www.rabbitmq.com/uri-spec.html
82 """
83 uri = URI.fromBytes(uri, defaultPort=5672)
84 kwargs = {}
85 host = uri.host
86 if "@" in host:
87 auth, host = uri.netloc.split("@")
88 username, password = auth.split(":")
89 kwargs.update({"username": username, "password": password})
90
91 vhost = uri.path
92 if len(vhost) > 1:
93 vhost = vhost[1:] # Strip leading "/"
94 kwargs["vhost"] = vhost
95
96 params = parse_qs(uri.query)
97 kwargs.update({name: value[0] for name, value in params.items()})
98
99 if "heartbeat" in kwargs:
100 kwargs["heartbeat"] = int(kwargs["heartbeat"])
101 return cls(reactor, host, uri.port, **kwargs)
102
103 def connect(self, protocolFactory):
104 """
105 Connect to the C{protocolFactory} to the AMQP broker specified by the
106 URI of this endpoint.
107
108 @param protocolFactory: An L{AMQFactory} building L{AMQClient} objects.
109 @return: A L{Deferred} that results in an L{AMQClient} upon successful
110 connection otherwise a L{Failure} wrapping L{ConnectError} or
111 L{NoProtocol <twisted.internet.error.NoProtocol>}.
112 """
113 # XXX Since AMQClient requires these parameters at __init__ time, we
114 # need to override them in the provided factory.
115 protocolFactory.setVHost(self._vhost)
116 protocolFactory.setHeartbeat(self._heartbeat)
117
118 description = "tcp:{}:{}:timeout={}".format(
119 self._host, self._port, self._timeout)
120 endpoint = clientFromString(self._reactor, description)
121
122 deferred = endpoint.connect(protocolFactory)
123 return deferred.addCallback(self._authenticate)
124
125 @inlineCallbacks
126 def _authenticate(self, client):
127 """Perform AMQP authentication."""
128 yield client.authenticate(
129 self._username, self._password, mechanism=self._auth_mechanism)
130 returnValue(client)
0131
=== added file 'src/txamqp/factory.py'
--- src/txamqp/factory.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/factory.py 2016-05-27 13:16:31 +0000
@@ -0,0 +1,61 @@
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19import os
20
21from twisted.internet.protocol import Factory
22
23from txamqp.protocol import AMQClient
24from txamqp.spec import load
25from txamqp.client import TwistedDelegate
26
27DEFAULT_SPEC = os.path.join(
28 os.path.dirname(__file__), "../specs/standard/amqp0-9.stripped.xml")
29
30
31class AMQFactory(Factory):
32 """A factory building AMQClient instances."""
33
34 protocol = AMQClient
35
36 def __init__(self, spec=None, clock=None):
37 """
38 @param spec: Path to the spec file. Defaults to the standard AMQP 0.9.
39 @type spec: L{str} (native string)
40 """
41 if spec is None:
42 spec = DEFAULT_SPEC
43 self._spec = load(spec)
44 self._clock = clock
45 self._delegate = TwistedDelegate()
46 self._vhost = "/"
47 self._heartbeat = 0
48
49 def setVHost(self, vhost):
50 """Set a custom vhost."""
51 self._vhost = vhost
52
53 def setHeartbeat(self, heartbeat):
54 """Set a custom heartbeat."""
55 self._heartbeat = heartbeat
56
57 def buildProtocol(self, addr):
58 protocol = self.protocol(
59 self._delegate, vhost=self._vhost, spec=self._spec,
60 heartbeat=self._heartbeat, clock=self._clock)
61 return protocol
062
=== added file 'src/txamqp/test/test_endpoint.py'
--- src/txamqp/test/test_endpoint.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/test/test_endpoint.py 2016-05-27 13:16:31 +0000
@@ -0,0 +1,103 @@
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19from twisted.trial.unittest import TestCase
20from twisted.internet import reactor
21from twisted.internet.defer import inlineCallbacks
22from twisted.internet.task import Clock
23from twisted.test.proto_helpers import MemoryReactorClock, StringTransport
24
25from txamqp.testlib import TestBase as IntegrationTest
26from txamqp.factory import AMQFactory
27from txamqp.endpoint import AMQEndpoint
28
29
30class AMQEndpointTest(TestCase):
31
32 def setUp(self):
33 super(AMQEndpointTest, self).setUp()
34 self.reactor = MemoryReactorClock()
35 self.factory = AMQFactory(clock=Clock())
36
37 def test_connect(self):
38 """
39 The endpoint connects to the broker and performs the AMQP
40 authentication.
41 """
42 endpoint = AMQEndpoint(
43 self.reactor, "1.2.3.4", "1234", username="me", password="pw")
44 endpoint.connect(self.factory)
45 self.assertEqual(("1.2.3.4", 1234), self.reactor.tcpClients[0][:2])
46 # _WrappingFactory from twisted.internet.endpoints
47 factory = self.reactor.tcpClients[0][2]
48 protocol = factory.buildProtocol(None)
49 protocol.makeConnection(StringTransport())
50 client = protocol._wrappedProtocol
51 self.assertEqual({"LOGIN": "me", "PASSWORD": "pw"}, client.response)
52 self.assertEqual("AMQPLAIN", client.mechanism)
53
54 def test_connect_with_vhost_and_heartbeat(self):
55 """
56 It's possible to specify a custom vhost and a custom heartbeat.
57 """
58 endpoint = AMQEndpoint(
59 self.reactor, "1.2.3.4", "1234", username="me", password="pw",
60 vhost="foo", heartbeat=10)
61 endpoint.connect(self.factory)
62 # _WrappingFactory from twisted.internet.endpoints
63 factory = self.reactor.tcpClients[0][2]
64 protocol = factory.buildProtocol(None)
65 protocol.makeConnection(StringTransport())
66 client = protocol._wrappedProtocol
67 self.assertEqual("foo", client.vhost)
68 self.assertEqual(10, client.heartbeatInterval)
69
70 def test_from_uri(self):
71 """
72 It's possible to build an AMQEndpoint from an AMQP URI string.
73 """
74 endpoint = AMQEndpoint.fromURI(
75 self.reactor, "amqp://me:pw@some.broker/foo?heartbeat=10")
76 endpoint.connect(self.factory)
77 self.assertEqual(("some.broker", 5672), self.reactor.tcpClients[0][:2])
78 # _WrappingFactory from twisted.internet.endpoints
79 factory = self.reactor.tcpClients[0][2]
80 protocol = factory.buildProtocol(None)
81 protocol.makeConnection(StringTransport())
82 client = protocol._wrappedProtocol
83 self.assertEqual("foo", client.vhost)
84 self.assertEqual(10, client.heartbeatInterval)
85 self.assertEqual({"LOGIN": "me", "PASSWORD": "pw"}, client.response)
86 self.assertEqual("AMQPLAIN", client.mechanism)
87
88
89class AMQEndpointIntegrationTest(IntegrationTest):
90
91 @inlineCallbacks
92 def test_connect(self):
93 """
94 The endpoint returns a connected and authenticated client.
95 """
96 factory = AMQFactory(spec=self.spec)
97 endpoint = AMQEndpoint(
98 reactor, self.host, self.port, username=self.user,
99 password=self.password, vhost=self.vhost)
100 client = yield endpoint.connect(factory)
101 channel = yield client.channel(1)
102 yield channel.channel_open()
103 client.close(None)
0104
=== added file 'src/txamqp/test/test_factory.py'
--- src/txamqp/test/test_factory.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/test/test_factory.py 2016-05-27 13:16:31 +0000
@@ -0,0 +1,51 @@
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19from twisted.trial.unittest import TestCase
20from twisted.internet.address import IPv4Address
21from twisted.internet.task import Clock
22
23from txamqp.protocol import AMQClient
24from txamqp.factory import AMQFactory, DEFAULT_SPEC
25
26
27class AMQFactoryTest(TestCase):
28
29 def test_build_protocol(self):
30 """Test building AMQClient instances with default parameters."""
31 address = IPv4Address("TCP", "127.0.0.1", 5672)
32 factory = AMQFactory()
33 client = factory.buildProtocol(address)
34 self.assertIsInstance(client, AMQClient)
35 self.assertEqual("/", client.vhost)
36 self.assertEqual(DEFAULT_SPEC, client.spec.file)
37 self.assertEqual(0, client.heartbeatInterval)
38
39 def test_build_protocol_custom_parameters(self):
40 """Test building AMQClient instances with custom parameters."""
41 address = IPv4Address("TCP", "127.0.0.1", 5672)
42 spec = "../specs/rabbitmq/amqp0-8.stripped.rabbitmq.xml"
43 clock = Clock()
44 factory = AMQFactory(spec=spec, clock=clock)
45 factory.setVHost("foo")
46 factory.setHeartbeat(1)
47 client = factory.buildProtocol(address)
48 self.assertEqual("foo", client.vhost)
49 self.assertEqual(spec, client.spec.file)
50 self.assertEqual(1, client.heartbeatInterval)
51 self.assertEqual(1, len(clock.getDelayedCalls()))

Subscribers

People subscribed via source and target branches

to status/vote changes: