Merge lp:~terrycojones/txamqp/lots-of-comments into lp:txamqp

Proposed by Terry Jones
Status: Approved
Approved by: Thomas Herve
Approved revision: 43
Proposed branch: lp:~terrycojones/txamqp/lots-of-comments
Merge into: lp:txamqp
Diff against target: 830 lines (+305/-101)
17 files modified
doc/README (+29/-20)
setup.py (+3/-1)
src/examples/simple/README (+5/-1)
src/examples/simple/txconsumer.py (+31/-15)
src/examples/simple/txpublisher.py (+40/-26)
src/examples/thrift/README (+25/-19)
src/txamqp/client.py (+19/-1)
src/txamqp/codec.py (+2/-0)
src/txamqp/connection.py (+2/-0)
src/txamqp/content.py (+2/-0)
src/txamqp/delegate.py (+22/-1)
src/txamqp/message.py (+2/-0)
src/txamqp/protocol.py (+115/-16)
src/txamqp/queue.py (+2/-1)
src/txamqp/spec.py (+2/-0)
src/txamqp/testlib.py (+2/-0)
src/txamqp/xmlutil.py (+2/-0)
To merge this branch: bzr merge lp:~terrycojones/txamqp/lots-of-comments
Reviewer Review Type Date Requested Status
Thomas Herve Approve
Review via email: mp+21048@code.launchpad.net

Description of the change

This branch is aimed at making the txAMQP code base easier for people to understand. The changes are almost all docstrings, comments, README clarifications, and some cleaning / simplification of the src/examples/simple code.

To post a comment you must log in.
42. By Terry Jones

Added code to print a friendly message on failure to authenticate with the broker.

43. By Terry Jones

Make sure the reactor is running when we go to stop it.

Revision history for this message
Thomas Herve (therve) wrote :

[1] The link to the blog post is broken, mainly you should remove it.

[2]
+ $ bzr branch lp:txamqp txamqp

"bzr branch lp:txamqp" is enough to work.

[3]
+ - path_to_spec: The path to the AMQP spec that you want to use. Note
+ that depending on the broker you use, you will need a
+ different spec:
+ RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml

I think this path changed.

[4]
-# coding: utf-8
+# -*- coding: utf-8; -*-

We don't really need those as the files are pure ASCII afaict.

[5]
+ """A class that acts like an event that can be fired (via set). This is

+ """Call the init method (if any) in all superclasses of the subclass of

Can you break the line at the beginning?

[6]
+ the delegate (see TwistedDelegate in client.py) to make the appropriate
+ method call."""

And at the end? :)

[7] There are a bunch of conflicts, mostly trivial.

Thanks, +1!

review: Approve

Unmerged revisions

43. By Terry Jones

Make sure the reactor is running when we go to stop it.

42. By Terry Jones

Added code to print a friendly message on failure to authenticate with the broker.

41. By Terry Jones

Simplified the simple example publisher code.

40. By Terry Jones

Added a bunch of comments to try to help people (mainly me, to start with) understand what's going on.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'doc/README'
--- doc/README 2009-08-15 10:27:56 +0000
+++ doc/README 2010-03-11 10:38:21 +0000
@@ -1,51 +1,60 @@
1This is a quick post on getting started using the txAMQP library with RabbitMQ,1Here is some information you may need for getting started using the txAMQP
2based on Dan Reverri's article "Getting started with RabbitMQ, Python, Twisted,2library with RabbitMQ, based on Dan Reverri's article "Getting started with
3and txAMQP on Ubuntu 9.04" [http://www.apparatusproject.org/blog/?p=38]3RabbitMQ, Python, Twisted, and txAMQP on Ubuntu 9.04"
4[http://www.apparatusproject.org/blog/?p=38]
45
5Install RabbitMQ6Install RabbitMQ
7----------------
6 8
7If you're using Debian (or a derivative, like Ubuntu) and RabbitMQ is not9If you're using Debian (or a derivative, like Ubuntu) and RabbitMQ is not
8included in your distribution, you can add RabbitMQ's APT repository:10included in your distribution, you can add RabbitMQ's APT repository:
911
10http://www.rabbitmq.com/debian.html#apt12http://www.rabbitmq.com/debian.html#apt
1113
12Running RabbitMQ14Run RabbitMQ
15------------
1316
14If you installed RabbitMQ using the Debian package, it will be automatically17If you installed RabbitMQ using the Debian package, it will be automatically
15started. If not, you'll find an init.d file in /etc/init.d/rabbitmq-server18started. If not, you'll find an init.d file in /etc/init.d/rabbitmq-server
1619
17Install Twisted20Install Twisted
21---------------
1822
19As of 2009-07-09, txAMQP is being developed using Twisted 8.2.0, it may23txAMQP was developed under Twisted 8.2.0, it may work with older versions,
20work with older versions, but it's not guaranteed. In case you are using24but there is no guarantee. txAMQP is known to run successfully on Twisted
21Ubuntu, and the version available is not recent enough, you can install259.0 and 10.0. If you are using Ubuntu, and your version of Twisted is not
22Twisted through its PPA:26recent enough, you can install Twisted from its PPA:
2327
24https://launchpad.net/~twisted-dev/+archive/ppa28https://launchpad.net/~twisted-dev/+archive/ppa
2529
26Install txAMQP30Install txAMQP
31--------------
2732
28The instructions below install txAQMP from trunk. There are Ubuntu packages33The instructions below install txAQMP from trunk. There are Ubuntu packages
29available if you prefer, refer to the following page for more details:34available if you prefer: refer to the following page for more details:
3035
31https://launchpad.net/~txamqpteam/+archive/ppa36https://launchpad.net/~txamqpteam/+archive/ppa
3237
33a. Install Bazaar38a. Install Bazaar
3439
35$ sudo apt-get install bzr40 $ sudo apt-get install bzr
3641
37b. Fetch txAQMP42b. Fetch txAQMP
3843
39$ cd ~44 $ bzr branch lp:txamqp txamqp
40$ bzr branch lp:txamqp txamqp45
4146 If you get the error "bzr: ERROR: Unknown repository format: 'Bazaar
42If you get the error "bzr: ERROR: Unknown repository format: 'Bazaar47 RepositoryFormatKnitPack6 (bzr 1.9)" or similar, you'll need to upgrade
43RepositoryFormatKnitPack6 (bzr 1.9)", you'll need to upgrade Bazaar. You may48 Bazaar. To do so, follow the instructions at: http://bazaar-vcs.org/Download
44do so following the instructions at:
45
46http://bazaar-vcs.org/Download
4749
48c. Install txAMQP50c. Install txAMQP
4951
50$ cd txamqp52 $ cd txamqp
51$ sudo python setup.py install53 $ sudo python setup.py install
54
55Run a txAMQP example
56--------------------
57
58You can confirm that txAMQP is properly installed and working correctly by
59running the scripts in the src/examples/simple directory. See the README
60file in that directory for detailed instructions.
5261
=== modified file 'setup.py'
--- setup.py 2009-08-16 11:30:01 +0000
+++ setup.py 2010-03-11 10:38:21 +0000
@@ -10,7 +10,9 @@
10 from setuptools import setup, find_packages10 from setuptools import setup, find_packages
11except ImportError:11except ImportError:
12 from distutils.core import setup12 from distutils.core import setup
13 setupdict['packages'] = ['txamqp', 'txamqp.contrib', 'txamqp.contrib.thrift']13 setupdict['packages'] = ['txamqp',
14 'txamqp.contrib',
15 'txamqp.contrib.thrift']
14 setupdict['package_dir'] = {16 setupdict['package_dir'] = {
15 'txamqp': 'src/txamqp',17 'txamqp': 'src/txamqp',
16 'txamqp.contrib': 'src/txamqp/contrib',18 'txamqp.contrib': 'src/txamqp/contrib',
1719
=== modified file 'src/examples/simple/README'
--- src/examples/simple/README 2009-08-15 10:31:11 +0000
+++ src/examples/simple/README 2010-03-11 10:38:21 +0000
@@ -1,6 +1,10 @@
1Instructions1Instructions
2============2============
33
4This directory contains a simple example of code that uses txAMQP to
5connect to an AMQP broker, create an exchange, and then publish and receive
6a number of messages.
7
4Requirements8Requirements
5------------9------------
610
@@ -14,7 +18,7 @@
14 a) In one of them run the txconsumer.py script:18 a) In one of them run the txconsumer.py script:
15 $ python txconsumer.py host port vhost username password path_to_spec19 $ python txconsumer.py host port vhost username password path_to_spec
16 e.g. txconsumer.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml20 e.g. txconsumer.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml
17 b) In the other one run the txpublisher.py script:21 b) In the other, run the txpublisher.py script:
18 $ python txpublisher.py host port vhost username password path_to_spec content [count]22 $ python txpublisher.py host port vhost username password path_to_spec content [count]
19 e.g. txpublisher.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml hello 100023 e.g. txpublisher.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml hello 1000
2024
2125
=== modified file 'src/examples/simple/txconsumer.py'
--- src/examples/simple/txconsumer.py 2009-08-15 10:31:11 +0000
+++ src/examples/simple/txconsumer.py 2010-03-11 10:38:21 +0000
@@ -1,50 +1,64 @@
1from twisted.internet.defer import inlineCallbacks1from twisted.internet.defer import inlineCallbacks
2from twisted.internet import reactor2from twisted.internet import reactor
3from twisted.internet.protocol import ClientCreator3from twisted.internet.protocol import ClientCreator
4from twisted.python import log
4from txamqp.protocol import AMQClient5from txamqp.protocol import AMQClient
5from txamqp.client import TwistedDelegate6from txamqp.client import TwistedDelegate, Closed
6import txamqp.spec7import txamqp.spec
78
8@inlineCallbacks9@inlineCallbacks
9def gotConnection(conn, username, password):10def gotConnection(conn, username, password):
10 print "Connected to broker."11 print "Connected to broker."
11 yield conn.authenticate(username, password)12
13 try:
14 yield conn.authenticate(username, password)
15 except Closed:
16 print "Failed to authenticate with AMQP broker. Check that " \
17 "the username and password you're supplying are correct."
18 return
1219
13 print "Authenticated. Ready to receive messages"20 print "Authenticated. Ready to receive messages"
14 chan = yield conn.channel(1)21 chan = yield conn.channel(1)
15 yield chan.channel_open()22 yield chan.channel_open()
1623
17 yield chan.queue_declare(queue="chatrooms", durable=True, exclusive=False, auto_delete=False)24 # Create an exchange (the producer can do this too, with no ill effect).
18 yield chan.exchange_declare(exchange="chatservice", type="direct", durable=True, auto_delete=False)25 yield chan.exchange_declare(exchange="chatservice", type="direct",
1926 durable=True, auto_delete=False)
20 yield chan.queue_bind(queue="chatrooms", exchange="chatservice", routing_key="txamqp_chatroom")27
2128 yield chan.queue_declare(queue="chatrooms", durable=True,
22 yield chan.basic_consume(queue='chatrooms', no_ack=True, consumer_tag="testtag")29 exclusive=False, auto_delete=False)
30
31 yield chan.queue_bind(queue="chatrooms", exchange="chatservice",
32 routing_key="txamqp_chatroom")
33
34 yield chan.basic_consume(queue='chatrooms', no_ack=True,
35 consumer_tag="testtag")
2336
24 queue = yield conn.queue("testtag")37 queue = yield conn.queue("testtag")
2538
26 while True:39 while True:
27 msg = yield queue.get()40 msg = yield queue.get()
28 print 'Received: ' + msg.content.body + ' from channel #' + str(chan.id)41 print 'Received: %r from channel #%s' % (msg.content.body, chan.id)
29 if msg.content.body == "STOP":42 if msg.content.body == "STOP":
30 break43 break
3144
45 # Cancel consuming from the queue.
32 yield chan.basic_cancel("testtag")46 yield chan.basic_cancel("testtag")
3347
48 # Close the channel.
34 yield chan.channel_close()49 yield chan.channel_close()
3550
51 # Close the connection (must be done over channel 0).
36 chan0 = yield conn.channel(0)52 chan0 = yield conn.channel(0)
37
38 yield chan0.connection_close()53 yield chan0.connection_close()
3954
40 reactor.stop()
41
4255
43if __name__ == "__main__":56if __name__ == "__main__":
44 import sys57 import sys
45 if len(sys.argv) < 7:58 if len(sys.argv) < 7:
46 print "%s host port vhost username password path_to_spec" % sys.argv[0]59 print "%s host port vhost username password path_to_spec" % sys.argv[0]
47 print "e.g. %s localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml" % sys.argv[0]60 print "e.g. %s localhost 5672 / guest guest " \
61 "../../specs/standard/amqp0-8.xml" % sys.argv[0]
48 sys.exit(1)62 sys.exit(1)
4963
50 host = sys.argv[1]64 host = sys.argv[1]
@@ -58,8 +72,10 @@
58 delegate = TwistedDelegate()72 delegate = TwistedDelegate()
5973
60 d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,74 d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
61 spec=spec).connectTCP(host, port)75 spec=spec).connectTCP(host, port)
6276
63 d.addCallback(gotConnection, username, password)77 d.addCallback(gotConnection, username, password)
6478 d.addErrback(log.err)
79 d.addBoth(lambda _: reactor.callLater(0, reactor.stop))
80
65 reactor.run()81 reactor.run()
6682
=== modified file 'src/examples/simple/txpublisher.py'
--- src/examples/simple/txpublisher.py 2009-08-15 10:31:11 +0000
+++ src/examples/simple/txpublisher.py 2010-03-11 10:38:21 +0000
@@ -1,51 +1,63 @@
1from twisted.internet.defer import inlineCallbacks1from twisted.internet.defer import inlineCallbacks
2from twisted.internet import reactor, task2from twisted.internet import reactor, task
3from twisted.internet.protocol import ClientCreator3from twisted.internet.protocol import ClientCreator
4from twisted.python import log
4from txamqp.protocol import AMQClient5from txamqp.protocol import AMQClient
5from txamqp.client import TwistedDelegate6from txamqp.client import TwistedDelegate, Closed
6from txamqp.content import Content7from txamqp.content import Content
7import txamqp.spec8import txamqp.spec
89
9@inlineCallbacks10@inlineCallbacks
10def gotConnection(conn, username, password, body, count=1):11def gotConnection(conn, username, password, body, count=1):
12 """
13
14 @param: conn is an instance of AMQClient, the protocol class that
15 handles talking to AMQP.
16 """
11 print "Connected to broker."17 print "Connected to broker."
12 yield conn.authenticate(username, password)18
19 try:
20 yield conn.authenticate(username, password)
21 except Closed:
22 print "Failed to authenticate with AMQP broker. Check that " \
23 "the username and password you're supplying are correct."
24 return
1325
14 print "Authenticated. Ready to send messages"26 print "Authenticated. Ready to send messages"
15 chan = yield conn.channel(1)27 chan = yield conn.channel(1)
16 yield chan.channel_open()28 yield chan.channel_open()
1729
18 def send_messages():30 # Create an exchange (the consumer can do this too, with no ill effect).
19 def message_iterator():31 yield chan.exchange_declare(exchange="chatservice", type="direct",
20 for i in range(count):32 durable=True, auto_delete=False)
21 content = body + "-%d" % i33
22 msg = Content(content)34 def sender():
23 msg["delivery mode"] = 235 for text in ['%s-%d' % (body, i) for i in range(count)] + ['STOP']:
24 chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom")36 msg = Content(text)
25 print "Sending message: %s" % content37 msg["delivery mode"] = 2
26 yield None38 print "Sending message: %r" % text
27 return task.coiterate(message_iterator())39 yield chan.basic_publish(exchange="chatservice", content=msg,
2840 routing_key="txamqp_chatroom")
29 yield send_messages()41
30 42 # Use a coiterator to send all the messages. This ensures the reactor
31 stopToken = "STOP"43 # gets control once in a while to do the actual sending.
32 msg = Content(stopToken)44 yield task.coiterate(sender())
33 msg["delivery mode"] = 245
34 chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom")46 # Close the channel.
35 print "Sending message: %s" % stopToken
36
37 yield chan.channel_close()47 yield chan.channel_close()
3848
49 # Close the connection (must be done over channel 0).
39 chan0 = yield conn.channel(0)50 chan0 = yield conn.channel(0)
40 yield chan0.connection_close()51 yield chan0.connection_close()
41 52
42 reactor.stop()
43 53
44if __name__ == "__main__":54if __name__ == "__main__":
45 import sys55 import sys
46 if len(sys.argv) < 8:56 if len(sys.argv) < 8:
47 print "%s host port vhost username password path_to_spec content [count]" % sys.argv[0]57 print "%s host port vhost username password path_to_spec content " \
48 print "e.g. %s localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0]58 "[count]" % sys.argv[0]
59 print "e.g. %s localhost 5672 / guest guest " \
60 "../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0]
49 sys.exit(1)61 sys.exit(1)
5062
51 host = sys.argv[1]63 host = sys.argv[1]
@@ -65,8 +77,10 @@
65 delegate = TwistedDelegate()77 delegate = TwistedDelegate()
6678
67 d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,79 d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
68 spec=spec).connectTCP(host, port)80 spec=spec).connectTCP(host, port)
6981
70 d.addCallback(gotConnection, username, password, content, count)82 d.addCallback(gotConnection, username, password, content, count)
83 d.addErrback(log.err)
84 d.addBoth(lambda _: reactor.callLater(0, reactor.stop))
7185
72 reactor.run()86 reactor.run()
7387
=== modified file 'src/examples/thrift/README'
--- src/examples/thrift/README 2009-08-15 10:31:11 +0000
+++ src/examples/thrift/README 2010-03-11 10:38:21 +0000
@@ -1,6 +1,10 @@
1Instructions1Instructions
2============2============
33
4This file contains some brief instructions on how to get and install Thrift
5for Python, to use it to generate Twisted Python code to set-up a simple
6service, and to run the example client and server code in this directory.
7
4Dependencies8Dependencies
5------------9------------
610
@@ -25,9 +29,9 @@
2529
26- General30- General
2731
281. Download Thrift source code [1]321. Download the Thrift source code from http://incubator.apache.org/thrift/
292. Check if it contains support for Twisted, you'll need revision 749795 or332. Check if it contains support for Twisted, you'll need revision 749795 or
30greater.34 greater.
313. Compile Thrift with suppport for Python.353. Compile Thrift with suppport for Python.
324. Install Thrift.364. Install Thrift.
3337
@@ -37,27 +41,29 @@
371. Generate Thrift Python services:411. Generate Thrift Python services:
38 $ thrift --gen py:twisted shared.thrift42 $ thrift --gen py:twisted shared.thrift
39 $ thrift --gen py:twisted tutorial.thrift43 $ thrift --gen py:twisted tutorial.thrift
44
402. Start a server:452. Start a server:
41 $ python server.py host port vhost username password path_to_spec46 $ python server.py host port vhost username password path_to_spec
47
42 Note that you can start as many servers as you want at the same time.48 Note that you can start as many servers as you want at the same time.
43 Incoming requests will be round-robined and the servers load will be49 Incoming requests will be round-robined; the servers' loads will be
44 balanced automatically.50 balanced automatically.
51
453. Start a client:523. Start a client:
46 $ python client.py host port vhost username password path_to_spec53 $ python client.py host port vhost username password path_to_spec
4754
48Where:55 Where:
49- host: The host where the AMQP broker is located.56 - host: The host where the AMQP broker is located.
50- port: The port where the AMQP broker listens on.57 - port: The port the AMQP broker listens on.
51- vhost: The virtual host (depending on your broker configuration, default58 - vhost: The broker virtual host (depends on your broker
52 in most cases is '/').59 configuration, default in most cases is '/').
53- username: The username to log in to the broker (default in most cases60 - username: The username to log in to the broker with (default in
54 is 'guest').61 most cases is 'guest').
55- password: The username to log in to the broker (default in most cases62 - password: The password to log in to the broker with (default in
56 is 'guest').63 most cases is 'guest').
57- path_to_spec: The path to the AMQP spec that you want to use. Keep in mind64 - path_to_spec: The path to the AMQP spec that you want to use. Note
58 that depending on the broker you use, you will need a different spec:65 that depending on the broker you use, you will need a
59 - RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml66 different spec:
60 - OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml67 RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml
61 - Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml68 OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml
6269 Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml
631 - http://incubator.apache.org/thrift/
6470
=== modified file 'src/txamqp/client.py'
--- src/txamqp/client.py 2009-08-08 17:54:09 +0000
+++ src/txamqp/client.py 2010-03-11 10:38:21 +0000
@@ -1,11 +1,18 @@
1# coding: utf-81# -*- coding: utf-8; -*-
2
2from twisted.internet import defer3from twisted.internet import defer
3from txamqp.delegate import Delegate4from txamqp.delegate import Delegate
45
6
5class Closed(Exception):7class Closed(Exception):
6 pass8 pass
79
10
8class TwistedEvent(object):11class TwistedEvent(object):
12 """A class that acts like an event that can be fired (via set). This is
13 modeled on threading.Event, but in a Twisted style. Hence wait returns
14 a Deferred instead of blocking.
15 """
9 def __init__(self):16 def __init__(self):
10 self.deferred = defer.Deferred()17 self.deferred = defer.Deferred()
11 self.alreadyCalled = False18 self.alreadyCalled = False
@@ -20,8 +27,19 @@
20 deferred, self.deferred = self.deferred, defer.Deferred()27 deferred, self.deferred = self.deferred, defer.Deferred()
21 deferred.callback(True)28 deferred.callback(True)
2229
30
23class TwistedDelegate(Delegate):31class TwistedDelegate(Delegate):
2432
33 """
34 When the broker sends a method call to the client, such as
35 basic_deliver, channel_close, channel_flow, etc., the method is routed
36 to a TwistedDelegate.
37
38 If you wanted to have something else receiving the messages from the
39 broker, you would replace this class, which is what the Thrift contrib
40 code does (see src/txamqp/contrib/thrift in the txAMQP distribution).
41 """
42
25 def connection_start(self, ch, msg):43 def connection_start(self, ch, msg):
26 ch.connection_start_ok(mechanism=self.client.mechanism,44 ch.connection_start_ok(mechanism=self.client.mechanism,
27 response=self.client.response,45 response=self.client.response,
2846
=== modified file 'src/txamqp/codec.py'
--- src/txamqp/codec.py 2008-10-29 18:31:04 +0000
+++ src/txamqp/codec.py 2010-03-11 10:38:21 +0000
@@ -1,4 +1,6 @@
1#!/usr/bin/env python1#!/usr/bin/env python
2# -*- coding: utf-8; -*-
3
24
3#5#
4# Licensed to the Apache Software Foundation (ASF) under one6# Licensed to the Apache Software Foundation (ASF) under one
57
=== modified file 'src/txamqp/connection.py'
--- src/txamqp/connection.py 2009-04-27 17:00:08 +0000
+++ src/txamqp/connection.py 2010-03-11 10:38:21 +0000
@@ -1,3 +1,5 @@
1# -*- coding: utf-8; -*-
2
1#3#
2# Licensed to the Apache Software Foundation (ASF) under one4# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file5# or more contributor license agreements. See the NOTICE file
46
=== modified file 'src/txamqp/content.py'
--- src/txamqp/content.py 2009-07-07 22:45:21 +0000
+++ src/txamqp/content.py 2010-03-11 10:38:21 +0000
@@ -1,3 +1,5 @@
1# -*- coding: utf-8; -*-
2
1#3#
2# Licensed to the Apache Software Foundation (ASF) under one4# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file5# or more contributor license agreements. See the NOTICE file
46
=== modified file 'src/txamqp/delegate.py'
--- src/txamqp/delegate.py 2008-10-29 18:31:04 +0000
+++ src/txamqp/delegate.py 2010-03-11 10:38:21 +0000
@@ -1,3 +1,5 @@
1# -*- coding: utf-8; -*-
2
1#3#
2# Licensed to the Apache Software Foundation (ASF) under one4# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file5# or more contributor license agreements. See the NOTICE file
@@ -20,7 +22,14 @@
20import inspect22import inspect
21from spec import pythonize23from spec import pythonize
2224
23class Delegate:25
26class Delegate(object):
27 """
28 This is the base class to which the txAMQP protocol class routes messages
29 from the broker. It is subclassed by TwistedDelegate, or anything else
30 that feels like it (for an example, see the ThriftTwistedDelegate class
31 in src/txamqp/contrib/thrift/client.py).
32 """
2433
25 def __init__(self):34 def __init__(self):
26 self.handlers = {}35 self.handlers = {}
@@ -29,11 +38,23 @@
29 self.invoke_all("init")38 self.invoke_all("init")
3039
31 def invoke_all(self, meth, *args, **kwargs):40 def invoke_all(self, meth, *args, **kwargs):
41 """Call the init method (if any) in all superclasses of the subclass of
42 Delegate. This currently doesn't call anything, but is here in case a
43 future subclass needs an initialization mechanism.
44 """
32 for cls in inspect.getmro(self.__class__):45 for cls in inspect.getmro(self.__class__):
33 if hasattr(cls, meth):46 if hasattr(cls, meth):
34 getattr(cls, meth)(self, *args, **kwargs)47 getattr(cls, meth)(self, *args, **kwargs)
3548
36 def dispatch(self, channel, message):49 def dispatch(self, channel, message):
50 """
51 Dispatch an incoming AMQP method call to the client's handler. The
52 method to call is either in self.handlers or is looked up on self. In
53 the latter case, the name of the method is composed of the AMQP class
54 ('basic', 'exchange', 'queue', etc) and the AMQP method ('return',
55 'deliver', 'declare', etc). I.e., full method names look like
56 'basic_declare' etc.
57 """
37 method = message.method58 method = message.method
3859
39 try:60 try:
4061
=== modified file 'src/txamqp/message.py'
--- src/txamqp/message.py 2008-10-29 18:31:04 +0000
+++ src/txamqp/message.py 2010-03-11 10:38:21 +0000
@@ -1,3 +1,5 @@
1# -*- coding: utf-8; -*-
2
1#3#
2# Licensed to the Apache Software Foundation (ASF) under one4# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file5# or more contributor license agreements. See the NOTICE file
46
=== modified file 'src/txamqp/protocol.py'
--- src/txamqp/protocol.py 2009-11-13 18:50:13 +0000
+++ src/txamqp/protocol.py 2010-03-11 10:38:21 +0000
@@ -1,4 +1,5 @@
1# coding: utf-81# -*- coding: utf-8; -*-
2
2from twisted.python import log3from twisted.python import log
3from twisted.internet import defer, protocol4from twisted.internet import defer, protocol
4from twisted.internet.task import LoopingCall5from twisted.internet.task import LoopingCall
@@ -17,10 +18,12 @@
17class GarbageException(Exception):18class GarbageException(Exception):
18 pass19 pass
1920
20# An AMQP channel is a virtual connection that shares the
21# same socket with others channels. One can have many channels
22# per connection
23class AMQChannel(object):21class AMQChannel(object):
22 """
23 An AMQP channel is a virtual connection that shares the same socket
24 with others channels. One can have many channels per connection.
25
26 """
2427
25 def __init__(self, id, outgoing):28 def __init__(self, id, outgoing):
26 self.id = id29 self.id = id
@@ -45,21 +48,33 @@
45 payload = frame.payload48 payload = frame.payload
46 if isinstance(payload, Method):49 if isinstance(payload, Method):
47 if payload.method.response:50 if payload.method.response:
51 # If the method is a response (to an earlier method call on
52 # the broker, such as queue_declare), put the frame into
53 # the responses queue.
48 self.queue = self.responses54 self.queue = self.responses
49 else:55 else:
56 # This is a incoming message from AMQP that requires
57 # processing (e.g., connection_tune, channel_flow,
58 # basic_deliver). We put the incoming queue into the work
59 # queue of the AMQClient protocol so that as it receives
60 # subsequent frames (in order), it can put them into our
61 # incoming queue, which the worker (in AMQClient) consumes
62 # and builds into a complete incoming message for dispatch.
63 # This makes sure that incoming frames are properly
64 # separated by channel number.
50 self.queue = self.incoming65 self.queue = self.incoming
51 work.put(self.incoming)66 work.put(self.incoming)
52 self.queue.put(frame)67 self.queue.put(frame)
5368
54 @defer.inlineCallbacks69 @defer.inlineCallbacks
55 def invoke(self, method, args, content = None):70 def invoke(self, method, args, content=None):
56 if self.closed:71 if self.closed:
57 raise Closed(self.reason)72 raise Closed(self.reason)
58 frame = Frame(self.id, Method(method, *args))73 frame = Frame(self.id, Method(method, *args))
59 self.outgoing.put(frame)74 self.outgoing.put(frame)
6075
61 if method.content:76 if method.content:
62 if content == None:77 if content is None:
63 content = Content()78 content = Content()
64 self.writeContent(method.klass, content, self.outgoing)79 self.writeContent(method.klass, content, self.outgoing)
6580
@@ -90,16 +105,28 @@
90105
91 def writeContent(self, klass, content, queue):106 def writeContent(self, klass, content, queue):
92 size = content.size()107 size = content.size()
93 header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))108 header = Frame(self.id, Header(klass, content.weight(), size,
109 **content.properties))
94 queue.put(header)110 queue.put(header)
95 for child in content.children:111 for child in content.children:
96 self.writeContent(klass, child, queue)112 self.writeContent(klass, child, queue)
97 # should split up if content.body exceeds max frame size113 # Should split this up if content.body exceeds max frame size.
98 if size > 0:114 if size > 0:
99 queue.put(Frame(self.id, Body(content.body)))115 queue.put(Frame(self.id, Body(content.body)))
100116
117
101class FrameReceiver(protocol.Protocol, basic._PauseableMixin):118class FrameReceiver(protocol.Protocol, basic._PauseableMixin):
102119
120 """
121 A message from AMQP takes the form of one or more frames. The
122 FrameReceiver class reads the binary data from the transport and calls
123 frameReceived (which must be implemented by a subclass) for each
124 complete frame.
125
126 This class can also send frames to the AMQP broker.
127
128 """
129
103 frame_mode = False130 frame_mode = False
104 MAX_LENGTH = 4096131 MAX_LENGTH = 4096
105 HEADER_LENGTH = 1 + 2 + 4 + 1132 HEADER_LENGTH = 1 + 2 + 4 + 1
@@ -134,7 +161,8 @@
134 payload = Frame.DECODERS[frameType].decode(self.spec, c)161 payload = Frame.DECODERS[frameType].decode(self.spec, c)
135 end = c.decode_octet()162 end = c.decode_octet()
136 if end != self.FRAME_END:163 if end != self.FRAME_END:
137 raise GarbageException('frame error: expected %r, got %r' % (self.FRAME_END, end))164 raise GarbageException('frame error: expected %r, got %r' %
165 (self.FRAME_END, end))
138 frame = Frame(channel, payload)166 frame = Frame(channel, payload)
139 return frame167 return frame
140168
@@ -151,7 +179,8 @@
151 while self.frame_mode and not self.paused:179 while self.frame_mode and not self.paused:
152 sz = len(self.__buffer) - self.HEADER_LENGTH180 sz = len(self.__buffer) - self.HEADER_LENGTH
153 if sz >= 0:181 if sz >= 0:
154 length, = struct.unpack("!I", self.__buffer[3:7]) # size = 4 bytes182 # size = 4 bytes:
183 length, = struct.unpack("!I", self.__buffer[3:7])
155 if sz >= length:184 if sz >= length:
156 packet = self.__buffer[:self.HEADER_LENGTH + length]185 packet = self.__buffer[:self.HEADER_LENGTH + length]
157 self.__buffer = self.__buffer[self.HEADER_LENGTH + length:]186 self.__buffer = self.__buffer[self.HEADER_LENGTH + length:]
@@ -196,10 +225,29 @@
196 content = body.payload.content225 content = body.payload.content
197 buf.write(content)226 buf.write(content)
198 read += len(content)227 read += len(content)
199 defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))228 defer.returnValue(Content(buf.getvalue(), children,
229 header.properties.copy()))
200230
201class AMQClient(FrameReceiver):231class AMQClient(FrameReceiver):
202232
233 """
234 This is the main protocol class for talking to an AMQP broker.
235
236 All communication with the broker takes place in a virtual 'channel'
237 (of which there may be many), so this class makes use of a (dynamically
238 constructed) factory class for producing channel instances.
239
240 The flow of control here is not easy to follow! Frames processed by the
241 FrameReceiver class (which we inherit from) are passed successively to
242 self.frameReceived which (in self.processFrame) passes them to the
243 appropriate channel instance. When the frame payload is a method call
244 (as opposed to a response), the channel puts its incoming queue onto
245 our self.work queue, which results in our self.worker passing it (the
246 channel's incoming queue) to self.dispatch, which reads the method
247 content (if any), builds the complete incoming message and passes it to
248 the delegate (see TwistedDelegate in client.py) to make the appropriate
249 method call."""
250
203 channelClass = AMQChannel251 channelClass = AMQChannel
204252
205 # Max unreceived heartbeat frames. The AMQP standard says it's 3.253 # Max unreceived heartbeat frames. The AMQP standard says it's 3.
@@ -214,23 +262,62 @@
214262
215 self.vhost = vhost263 self.vhost = vhost
216264
265 # Make a factory class that can generate new channel class
266 # instances. This is done using type as the class creator because
267 # one of two the superclasses of the class we're making was made
268 # dynamically from the AMQP spec.
217 self.channelFactory = type("Channel%s" % self.spec.klass.__name__,269 self.channelFactory = type("Channel%s" % self.spec.klass.__name__,
218 (self.channelClass, self.spec.klass), {})270 (self.channelClass, self.spec.klass), {})
271
272 # A dictionary of channel instances. As dictionaries are not thread
273 # safe, also make a lock for accessing the dictionary.
219 self.channels = {}274 self.channels = {}
220 self.channelLock = defer.DeferredLock()275 self.channelLock = defer.DeferredLock()
221276
277 # A queue for sending messages to AMQP.
222 self.outgoing = defer.DeferredQueue()278 self.outgoing = defer.DeferredQueue()
279
280 # A queue of incoming work in progress. Some AMQP method calls are
281 # composed of several incoming messages (which in turn may be
282 # composed of several frames). The objects that will be put into
283 # the work queue (by a channel instance) are themselves queues.
284 # These will be consumed from to assemble an incoming message.
223 self.work = defer.DeferredQueue()285 self.work = defer.DeferredQueue()
224286
287 # The started event will fire once the connection is established
288 # and the initial handshake has completed (prior to authentication).
225 self.started = TwistedEvent()289 self.started = TwistedEvent()
226290
291 # A dictionary to hold basic_deliver messages coming from AMQP.
292 # Each of these messages contains a tag that the queue consumer may
293 # specify (or else the broker assigns one) when it sends a
294 # basic_consume to the broker. The tag is used as a key into the
295 # self.queues dictionary. The queuelock is used as a lock for the
296 # dictionary when need to modify it, due to the non-thread-safeness
297 # of dictionaries in Python.
298 self.queues = {}
227 self.queueLock = defer.DeferredLock()299 self.queueLock = defer.DeferredLock()
300
301 # There are two cases in which the broker may send a basic_return
302 # message. For messages that are marked as mandatory and/or
303 # immediate, but that cannot be delivered as such. For details, see
304 # the AMQP spec. The basic_return_queue is used to hold these
305 # incoming messages.
228 self.basic_return_queue = TimeoutDeferredQueue()306 self.basic_return_queue = TimeoutDeferredQueue()
229307
230 self.queues = {}308 # Prepare to write outgoing messages to the broker by having our
231309 # writer method await the first outgoing message on the outgoing
310 # queue. Thereafter it will arrange for itself to be called on
311 # subsequent outgoing messages.
232 self.outgoing.get().addCallback(self.writer)312 self.outgoing.get().addCallback(self.writer)
313
314 # Prepare to deal with incoming messages from the broker by having
315 # our worker method await the first set of incoming messages on the
316 # work queue. Thereafter it will arrange for itself to be called on
317 # subsequent incoming messages.
233 self.work.get().addCallback(self.worker)318 self.work.get().addCallback(self.worker)
319
320 # Arrange for heartbeat traffic, if requested.
234 self.heartbeatInterval = heartbeat321 self.heartbeatInterval = heartbeat
235 if self.heartbeatInterval > 0:322 if self.heartbeatInterval > 0:
236 if clock is None:323 if clock is None:
@@ -296,10 +383,22 @@
296 self.outgoing.get().addCallback(self.writer)383 self.outgoing.get().addCallback(self.writer)
297384
298 def worker(self, queue):385 def worker(self, queue):
386 """
387 @param queue: This is a queue from a channel instance created by
388 self.channelFactory (i.e., normally the incoming queue of an
389 AMQChannel subclass. See the dispatch() method of AMQChannel for
390 the work.put call that puts a channel's queue into our self.work
391 queue, which results in the channel's queue arriving here as the
392 argument).
393 """
299 d = self.dispatch(queue)394 d = self.dispatch(queue)
395
396 # Set things up so that we (i.e., self.worker) also handle future
397 # work arriving on the self.work queue.
300 def cb(ign):398 def cb(ign):
301 self.work.get().addCallback(self.worker)399 self.work.get().addCallback(self.worker)
302 d.addCallback(cb)400 d.addCallback(cb)
401
303 d.addErrback(self.close)402 d.addErrback(self.close)
304403
305 @defer.inlineCallbacks404 @defer.inlineCallbacks
@@ -315,7 +414,7 @@
315 message = Message(payload.method, payload.args, content)414 message = Message(payload.method, payload.args, content)
316 self.delegate.dispatch(channel, message)415 self.delegate.dispatch(channel, message)
317416
318 # As soon as we connect to the target AMQP broker, send the init string417 # When we connect to the target AMQP broker, send the init string.
319 def connectionMade(self):418 def connectionMade(self):
320 self.sendInitString()419 self.sendInitString()
321 self.setFrameMode()420 self.setFrameMode()
@@ -339,7 +438,8 @@
339 self.reschedule_checkHB()438 self.reschedule_checkHB()
340439
341 @defer.inlineCallbacks440 @defer.inlineCallbacks
342 def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):441 def authenticate(self, username, password, mechanism='AMQPLAIN',
442 locale='en_US'):
343 if self.check_0_8():443 if self.check_0_8():
344 response = {"LOGIN": username, "PASSWORD": password}444 response = {"LOGIN": username, "PASSWORD": password}
345 else:445 else:
@@ -374,4 +474,3 @@
374 if self.checkHB.active():474 if self.checkHB.active():
375 self.checkHB.cancel()475 self.checkHB.cancel()
376 self.close(reason)476 self.close(reason)
377
378477
=== modified file 'src/txamqp/queue.py'
--- src/txamqp/queue.py 2009-05-14 15:03:24 +0000
+++ src/txamqp/queue.py 2010-03-11 10:38:21 +0000
@@ -1,4 +1,5 @@
1# coding: utf-81# -*- coding: utf-8; -*-
2
2from twisted.internet.defer import DeferredQueue3from twisted.internet.defer import DeferredQueue
34
4class Empty(Exception):5class Empty(Exception):
56
=== modified file 'src/txamqp/spec.py'
--- src/txamqp/spec.py 2009-07-17 16:04:44 +0000
+++ src/txamqp/spec.py 2010-03-11 10:38:21 +0000
@@ -1,3 +1,5 @@
1# -*- coding: utf-8; -*-
2
1#3#
2# Licensed to the Apache Software Foundation (ASF) under one4# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file5# or more contributor license agreements. See the NOTICE file
46
=== modified file 'src/txamqp/testlib.py'
--- src/txamqp/testlib.py 2009-08-08 18:03:05 +0000
+++ src/txamqp/testlib.py 2010-03-11 10:38:21 +0000
@@ -1,3 +1,5 @@
1# -*- coding: utf-8; -*-
2
1#3#
2# Licensed to the Apache Software Foundation (ASF) under one4# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file5# or more contributor license agreements. See the NOTICE file
46
=== modified file 'src/txamqp/xmlutil.py'
--- src/txamqp/xmlutil.py 2009-07-09 21:28:09 +0000
+++ src/txamqp/xmlutil.py 2010-03-11 10:38:21 +0000
@@ -1,3 +1,5 @@
1# -*- coding: utf-8; -*-
2
1#3#
2# Licensed to the Apache Software Foundation (ASF) under one4# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file5# or more contributor license agreements. See the NOTICE file

Subscribers

People subscribed via source and target branches

to status/vote changes: