Merge lp:~terrycojones/txamqp/lots-of-comments into lp:txamqp

Proposed by Terry Jones
Status: Approved
Approved by: Thomas Herve
Approved revision: 43
Proposed branch: lp:~terrycojones/txamqp/lots-of-comments
Merge into: lp:txamqp
Diff against target: 830 lines (+305/-101)
17 files modified
doc/README (+29/-20)
setup.py (+3/-1)
src/examples/simple/README (+5/-1)
src/examples/simple/txconsumer.py (+31/-15)
src/examples/simple/txpublisher.py (+40/-26)
src/examples/thrift/README (+25/-19)
src/txamqp/client.py (+19/-1)
src/txamqp/codec.py (+2/-0)
src/txamqp/connection.py (+2/-0)
src/txamqp/content.py (+2/-0)
src/txamqp/delegate.py (+22/-1)
src/txamqp/message.py (+2/-0)
src/txamqp/protocol.py (+115/-16)
src/txamqp/queue.py (+2/-1)
src/txamqp/spec.py (+2/-0)
src/txamqp/testlib.py (+2/-0)
src/txamqp/xmlutil.py (+2/-0)
To merge this branch: bzr merge lp:~terrycojones/txamqp/lots-of-comments
Reviewer Review Type Date Requested Status
Thomas Herve Approve
Review via email: mp+21048@code.launchpad.net

Description of the change

This branch is aimed at making the txAMQP code base easier for people to understand. The changes are almost all docstrings, comments, README clarifications, and some cleaning / simplification of the src/examples/simple code.

To post a comment you must log in.
42. By Terry Jones

Added code to print a friendly message on failure to authenticate with the broker.

43. By Terry Jones

Make sure the reactor is running when we go to stop it.

Revision history for this message
Thomas Herve (therve) wrote :

[1] The link to the blog post is broken, mainly you should remove it.

[2]
+ $ bzr branch lp:txamqp txamqp

"bzr branch lp:txamqp" is enough to work.

[3]
+ - path_to_spec: The path to the AMQP spec that you want to use. Note
+ that depending on the broker you use, you will need a
+ different spec:
+ RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml

I think this path changed.

[4]
-# coding: utf-8
+# -*- coding: utf-8; -*-

We don't really need those as the files are pure ASCII afaict.

[5]
+ """A class that acts like an event that can be fired (via set). This is

+ """Call the init method (if any) in all superclasses of the subclass of

Can you break the line at the beginning?

[6]
+ the delegate (see TwistedDelegate in client.py) to make the appropriate
+ method call."""

And at the end? :)

[7] There are a bunch of conflicts, mostly trivial.

Thanks, +1!

review: Approve

Unmerged revisions

43. By Terry Jones

Make sure the reactor is running when we go to stop it.

42. By Terry Jones

Added code to print a friendly message on failure to authenticate with the broker.

41. By Terry Jones

Simplified the simple example publisher code.

40. By Terry Jones

Added a bunch of comments to try to help people (mainly me, to start with) understand what's going on.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'doc/README'
2--- doc/README 2009-08-15 10:27:56 +0000
3+++ doc/README 2010-03-11 10:38:21 +0000
4@@ -1,51 +1,60 @@
5-This is a quick post on getting started using the txAMQP library with RabbitMQ,
6-based on Dan Reverri's article "Getting started with RabbitMQ, Python, Twisted,
7-and txAMQP on Ubuntu 9.04" [http://www.apparatusproject.org/blog/?p=38]
8+Here is some information you may need for getting started using the txAMQP
9+library with RabbitMQ, based on Dan Reverri's article "Getting started with
10+RabbitMQ, Python, Twisted, and txAMQP on Ubuntu 9.04"
11+[http://www.apparatusproject.org/blog/?p=38]
12
13 Install RabbitMQ
14+----------------
15
16 If you're using Debian (or a derivative, like Ubuntu) and RabbitMQ is not
17 included in your distribution, you can add RabbitMQ's APT repository:
18
19 http://www.rabbitmq.com/debian.html#apt
20
21-Running RabbitMQ
22+Run RabbitMQ
23+------------
24
25 If you installed RabbitMQ using the Debian package, it will be automatically
26 started. If not, you'll find an init.d file in /etc/init.d/rabbitmq-server
27
28 Install Twisted
29+---------------
30
31-As of 2009-07-09, txAMQP is being developed using Twisted 8.2.0, it may
32-work with older versions, but it's not guaranteed. In case you are using
33-Ubuntu, and the version available is not recent enough, you can install
34-Twisted through its PPA:
35+txAMQP was developed under Twisted 8.2.0, it may work with older versions,
36+but there is no guarantee. txAMQP is known to run successfully on Twisted
37+9.0 and 10.0. If you are using Ubuntu, and your version of Twisted is not
38+recent enough, you can install Twisted from its PPA:
39
40 https://launchpad.net/~twisted-dev/+archive/ppa
41
42 Install txAMQP
43+--------------
44
45 The instructions below install txAQMP from trunk. There are Ubuntu packages
46-available if you prefer, refer to the following page for more details:
47+available if you prefer: refer to the following page for more details:
48
49 https://launchpad.net/~txamqpteam/+archive/ppa
50
51 a. Install Bazaar
52
53-$ sudo apt-get install bzr
54+ $ sudo apt-get install bzr
55
56 b. Fetch txAQMP
57
58-$ cd ~
59-$ bzr branch lp:txamqp txamqp
60-
61-If you get the error "bzr: ERROR: Unknown repository format: 'Bazaar
62-RepositoryFormatKnitPack6 (bzr 1.9)", you'll need to upgrade Bazaar. You may
63-do so following the instructions at:
64-
65-http://bazaar-vcs.org/Download
66+ $ bzr branch lp:txamqp txamqp
67+
68+ If you get the error "bzr: ERROR: Unknown repository format: 'Bazaar
69+ RepositoryFormatKnitPack6 (bzr 1.9)" or similar, you'll need to upgrade
70+ Bazaar. To do so, follow the instructions at: http://bazaar-vcs.org/Download
71
72 c. Install txAMQP
73
74-$ cd txamqp
75-$ sudo python setup.py install
76+ $ cd txamqp
77+ $ sudo python setup.py install
78+
79+Run a txAMQP example
80+--------------------
81+
82+You can confirm that txAMQP is properly installed and working correctly by
83+running the scripts in the src/examples/simple directory. See the README
84+file in that directory for detailed instructions.
85
86=== modified file 'setup.py'
87--- setup.py 2009-08-16 11:30:01 +0000
88+++ setup.py 2010-03-11 10:38:21 +0000
89@@ -10,7 +10,9 @@
90 from setuptools import setup, find_packages
91 except ImportError:
92 from distutils.core import setup
93- setupdict['packages'] = ['txamqp', 'txamqp.contrib', 'txamqp.contrib.thrift']
94+ setupdict['packages'] = ['txamqp',
95+ 'txamqp.contrib',
96+ 'txamqp.contrib.thrift']
97 setupdict['package_dir'] = {
98 'txamqp': 'src/txamqp',
99 'txamqp.contrib': 'src/txamqp/contrib',
100
101=== modified file 'src/examples/simple/README'
102--- src/examples/simple/README 2009-08-15 10:31:11 +0000
103+++ src/examples/simple/README 2010-03-11 10:38:21 +0000
104@@ -1,6 +1,10 @@
105 Instructions
106 ============
107
108+This directory contains a simple example of code that uses txAMQP to
109+connect to an AMQP broker, create an exchange, and then publish and receive
110+a number of messages.
111+
112 Requirements
113 ------------
114
115@@ -14,7 +18,7 @@
116 a) In one of them run the txconsumer.py script:
117 $ python txconsumer.py host port vhost username password path_to_spec
118 e.g. txconsumer.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml
119- b) In the other one run the txpublisher.py script:
120+ b) In the other, run the txpublisher.py script:
121 $ python txpublisher.py host port vhost username password path_to_spec content [count]
122 e.g. txpublisher.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml hello 1000
123
124
125=== modified file 'src/examples/simple/txconsumer.py'
126--- src/examples/simple/txconsumer.py 2009-08-15 10:31:11 +0000
127+++ src/examples/simple/txconsumer.py 2010-03-11 10:38:21 +0000
128@@ -1,50 +1,64 @@
129 from twisted.internet.defer import inlineCallbacks
130 from twisted.internet import reactor
131 from twisted.internet.protocol import ClientCreator
132+from twisted.python import log
133 from txamqp.protocol import AMQClient
134-from txamqp.client import TwistedDelegate
135+from txamqp.client import TwistedDelegate, Closed
136 import txamqp.spec
137
138 @inlineCallbacks
139 def gotConnection(conn, username, password):
140 print "Connected to broker."
141- yield conn.authenticate(username, password)
142+
143+ try:
144+ yield conn.authenticate(username, password)
145+ except Closed:
146+ print "Failed to authenticate with AMQP broker. Check that " \
147+ "the username and password you're supplying are correct."
148+ return
149
150 print "Authenticated. Ready to receive messages"
151 chan = yield conn.channel(1)
152 yield chan.channel_open()
153
154- yield chan.queue_declare(queue="chatrooms", durable=True, exclusive=False, auto_delete=False)
155- yield chan.exchange_declare(exchange="chatservice", type="direct", durable=True, auto_delete=False)
156-
157- yield chan.queue_bind(queue="chatrooms", exchange="chatservice", routing_key="txamqp_chatroom")
158-
159- yield chan.basic_consume(queue='chatrooms', no_ack=True, consumer_tag="testtag")
160+ # Create an exchange (the producer can do this too, with no ill effect).
161+ yield chan.exchange_declare(exchange="chatservice", type="direct",
162+ durable=True, auto_delete=False)
163+
164+ yield chan.queue_declare(queue="chatrooms", durable=True,
165+ exclusive=False, auto_delete=False)
166+
167+ yield chan.queue_bind(queue="chatrooms", exchange="chatservice",
168+ routing_key="txamqp_chatroom")
169+
170+ yield chan.basic_consume(queue='chatrooms', no_ack=True,
171+ consumer_tag="testtag")
172
173 queue = yield conn.queue("testtag")
174
175 while True:
176 msg = yield queue.get()
177- print 'Received: ' + msg.content.body + ' from channel #' + str(chan.id)
178+ print 'Received: %r from channel #%s' % (msg.content.body, chan.id)
179 if msg.content.body == "STOP":
180 break
181
182+ # Cancel consuming from the queue.
183 yield chan.basic_cancel("testtag")
184
185+ # Close the channel.
186 yield chan.channel_close()
187
188+ # Close the connection (must be done over channel 0).
189 chan0 = yield conn.channel(0)
190-
191 yield chan0.connection_close()
192
193- reactor.stop()
194-
195
196 if __name__ == "__main__":
197 import sys
198 if len(sys.argv) < 7:
199 print "%s host port vhost username password path_to_spec" % sys.argv[0]
200- print "e.g. %s localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml" % sys.argv[0]
201+ print "e.g. %s localhost 5672 / guest guest " \
202+ "../../specs/standard/amqp0-8.xml" % sys.argv[0]
203 sys.exit(1)
204
205 host = sys.argv[1]
206@@ -58,8 +72,10 @@
207 delegate = TwistedDelegate()
208
209 d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
210- spec=spec).connectTCP(host, port)
211+ spec=spec).connectTCP(host, port)
212
213 d.addCallback(gotConnection, username, password)
214-
215+ d.addErrback(log.err)
216+ d.addBoth(lambda _: reactor.callLater(0, reactor.stop))
217+
218 reactor.run()
219
220=== modified file 'src/examples/simple/txpublisher.py'
221--- src/examples/simple/txpublisher.py 2009-08-15 10:31:11 +0000
222+++ src/examples/simple/txpublisher.py 2010-03-11 10:38:21 +0000
223@@ -1,51 +1,63 @@
224 from twisted.internet.defer import inlineCallbacks
225 from twisted.internet import reactor, task
226 from twisted.internet.protocol import ClientCreator
227+from twisted.python import log
228 from txamqp.protocol import AMQClient
229-from txamqp.client import TwistedDelegate
230+from txamqp.client import TwistedDelegate, Closed
231 from txamqp.content import Content
232 import txamqp.spec
233
234 @inlineCallbacks
235 def gotConnection(conn, username, password, body, count=1):
236+ """
237+
238+ @param: conn is an instance of AMQClient, the protocol class that
239+ handles talking to AMQP.
240+ """
241 print "Connected to broker."
242- yield conn.authenticate(username, password)
243+
244+ try:
245+ yield conn.authenticate(username, password)
246+ except Closed:
247+ print "Failed to authenticate with AMQP broker. Check that " \
248+ "the username and password you're supplying are correct."
249+ return
250
251 print "Authenticated. Ready to send messages"
252 chan = yield conn.channel(1)
253 yield chan.channel_open()
254
255- def send_messages():
256- def message_iterator():
257- for i in range(count):
258- content = body + "-%d" % i
259- msg = Content(content)
260- msg["delivery mode"] = 2
261- chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom")
262- print "Sending message: %s" % content
263- yield None
264- return task.coiterate(message_iterator())
265-
266- yield send_messages()
267-
268- stopToken = "STOP"
269- msg = Content(stopToken)
270- msg["delivery mode"] = 2
271- chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom")
272- print "Sending message: %s" % stopToken
273-
274+ # Create an exchange (the consumer can do this too, with no ill effect).
275+ yield chan.exchange_declare(exchange="chatservice", type="direct",
276+ durable=True, auto_delete=False)
277+
278+ def sender():
279+ for text in ['%s-%d' % (body, i) for i in range(count)] + ['STOP']:
280+ msg = Content(text)
281+ msg["delivery mode"] = 2
282+ print "Sending message: %r" % text
283+ yield chan.basic_publish(exchange="chatservice", content=msg,
284+ routing_key="txamqp_chatroom")
285+
286+ # Use a coiterator to send all the messages. This ensures the reactor
287+ # gets control once in a while to do the actual sending.
288+ yield task.coiterate(sender())
289+
290+ # Close the channel.
291 yield chan.channel_close()
292
293+ # Close the connection (must be done over channel 0).
294 chan0 = yield conn.channel(0)
295 yield chan0.connection_close()
296-
297- reactor.stop()
298+
299
300 if __name__ == "__main__":
301 import sys
302 if len(sys.argv) < 8:
303- print "%s host port vhost username password path_to_spec content [count]" % sys.argv[0]
304- print "e.g. %s localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0]
305+ print "%s host port vhost username password path_to_spec content " \
306+ "[count]" % sys.argv[0]
307+ print "e.g. %s localhost 5672 / guest guest " \
308+ "../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0]
309 sys.exit(1)
310
311 host = sys.argv[1]
312@@ -65,8 +77,10 @@
313 delegate = TwistedDelegate()
314
315 d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
316- spec=spec).connectTCP(host, port)
317+ spec=spec).connectTCP(host, port)
318
319 d.addCallback(gotConnection, username, password, content, count)
320+ d.addErrback(log.err)
321+ d.addBoth(lambda _: reactor.callLater(0, reactor.stop))
322
323 reactor.run()
324
325=== modified file 'src/examples/thrift/README'
326--- src/examples/thrift/README 2009-08-15 10:31:11 +0000
327+++ src/examples/thrift/README 2010-03-11 10:38:21 +0000
328@@ -1,6 +1,10 @@
329 Instructions
330 ============
331
332+This file contains some brief instructions on how to get and install Thrift
333+for Python, to use it to generate Twisted Python code to set-up a simple
334+service, and to run the example client and server code in this directory.
335+
336 Dependencies
337 ------------
338
339@@ -25,9 +29,9 @@
340
341 - General
342
343-1. Download Thrift source code [1]
344+1. Download the Thrift source code from http://incubator.apache.org/thrift/
345 2. Check if it contains support for Twisted, you'll need revision 749795 or
346-greater.
347+ greater.
348 3. Compile Thrift with suppport for Python.
349 4. Install Thrift.
350
351@@ -37,27 +41,29 @@
352 1. Generate Thrift Python services:
353 $ thrift --gen py:twisted shared.thrift
354 $ thrift --gen py:twisted tutorial.thrift
355+
356 2. Start a server:
357 $ python server.py host port vhost username password path_to_spec
358+
359 Note that you can start as many servers as you want at the same time.
360- Incoming requests will be round-robined and the servers load will be
361+ Incoming requests will be round-robined; the servers' loads will be
362 balanced automatically.
363+
364 3. Start a client:
365 $ python client.py host port vhost username password path_to_spec
366
367-Where:
368-- host: The host where the AMQP broker is located.
369-- port: The port where the AMQP broker listens on.
370-- vhost: The virtual host (depending on your broker configuration, default
371- in most cases is '/').
372-- username: The username to log in to the broker (default in most cases
373- is 'guest').
374-- password: The username to log in to the broker (default in most cases
375- is 'guest').
376-- path_to_spec: The path to the AMQP spec that you want to use. Keep in mind
377- that depending on the broker you use, you will need a different spec:
378- - RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml
379- - OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml
380- - Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml
381-
382-1 - http://incubator.apache.org/thrift/
383+ Where:
384+ - host: The host where the AMQP broker is located.
385+ - port: The port the AMQP broker listens on.
386+ - vhost: The broker virtual host (depends on your broker
387+ configuration, default in most cases is '/').
388+ - username: The username to log in to the broker with (default in
389+ most cases is 'guest').
390+ - password: The password to log in to the broker with (default in
391+ most cases is 'guest').
392+ - path_to_spec: The path to the AMQP spec that you want to use. Note
393+ that depending on the broker you use, you will need a
394+ different spec:
395+ RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml
396+ OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml
397+ Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml
398
399=== modified file 'src/txamqp/client.py'
400--- src/txamqp/client.py 2009-08-08 17:54:09 +0000
401+++ src/txamqp/client.py 2010-03-11 10:38:21 +0000
402@@ -1,11 +1,18 @@
403-# coding: utf-8
404+# -*- coding: utf-8; -*-
405+
406 from twisted.internet import defer
407 from txamqp.delegate import Delegate
408
409+
410 class Closed(Exception):
411 pass
412
413+
414 class TwistedEvent(object):
415+ """A class that acts like an event that can be fired (via set). This is
416+ modeled on threading.Event, but in a Twisted style. Hence wait returns
417+ a Deferred instead of blocking.
418+ """
419 def __init__(self):
420 self.deferred = defer.Deferred()
421 self.alreadyCalled = False
422@@ -20,8 +27,19 @@
423 deferred, self.deferred = self.deferred, defer.Deferred()
424 deferred.callback(True)
425
426+
427 class TwistedDelegate(Delegate):
428
429+ """
430+ When the broker sends a method call to the client, such as
431+ basic_deliver, channel_close, channel_flow, etc., the method is routed
432+ to a TwistedDelegate.
433+
434+ If you wanted to have something else receiving the messages from the
435+ broker, you would replace this class, which is what the Thrift contrib
436+ code does (see src/txamqp/contrib/thrift in the txAMQP distribution).
437+ """
438+
439 def connection_start(self, ch, msg):
440 ch.connection_start_ok(mechanism=self.client.mechanism,
441 response=self.client.response,
442
443=== modified file 'src/txamqp/codec.py'
444--- src/txamqp/codec.py 2008-10-29 18:31:04 +0000
445+++ src/txamqp/codec.py 2010-03-11 10:38:21 +0000
446@@ -1,4 +1,6 @@
447 #!/usr/bin/env python
448+# -*- coding: utf-8; -*-
449+
450
451 #
452 # Licensed to the Apache Software Foundation (ASF) under one
453
454=== modified file 'src/txamqp/connection.py'
455--- src/txamqp/connection.py 2009-04-27 17:00:08 +0000
456+++ src/txamqp/connection.py 2010-03-11 10:38:21 +0000
457@@ -1,3 +1,5 @@
458+# -*- coding: utf-8; -*-
459+
460 #
461 # Licensed to the Apache Software Foundation (ASF) under one
462 # or more contributor license agreements. See the NOTICE file
463
464=== modified file 'src/txamqp/content.py'
465--- src/txamqp/content.py 2009-07-07 22:45:21 +0000
466+++ src/txamqp/content.py 2010-03-11 10:38:21 +0000
467@@ -1,3 +1,5 @@
468+# -*- coding: utf-8; -*-
469+
470 #
471 # Licensed to the Apache Software Foundation (ASF) under one
472 # or more contributor license agreements. See the NOTICE file
473
474=== modified file 'src/txamqp/delegate.py'
475--- src/txamqp/delegate.py 2008-10-29 18:31:04 +0000
476+++ src/txamqp/delegate.py 2010-03-11 10:38:21 +0000
477@@ -1,3 +1,5 @@
478+# -*- coding: utf-8; -*-
479+
480 #
481 # Licensed to the Apache Software Foundation (ASF) under one
482 # or more contributor license agreements. See the NOTICE file
483@@ -20,7 +22,14 @@
484 import inspect
485 from spec import pythonize
486
487-class Delegate:
488+
489+class Delegate(object):
490+ """
491+ This is the base class to which the txAMQP protocol class routes messages
492+ from the broker. It is subclassed by TwistedDelegate, or anything else
493+ that feels like it (for an example, see the ThriftTwistedDelegate class
494+ in src/txamqp/contrib/thrift/client.py).
495+ """
496
497 def __init__(self):
498 self.handlers = {}
499@@ -29,11 +38,23 @@
500 self.invoke_all("init")
501
502 def invoke_all(self, meth, *args, **kwargs):
503+ """Call the init method (if any) in all superclasses of the subclass of
504+ Delegate. This currently doesn't call anything, but is here in case a
505+ future subclass needs an initialization mechanism.
506+ """
507 for cls in inspect.getmro(self.__class__):
508 if hasattr(cls, meth):
509 getattr(cls, meth)(self, *args, **kwargs)
510
511 def dispatch(self, channel, message):
512+ """
513+ Dispatch an incoming AMQP method call to the client's handler. The
514+ method to call is either in self.handlers or is looked up on self. In
515+ the latter case, the name of the method is composed of the AMQP class
516+ ('basic', 'exchange', 'queue', etc) and the AMQP method ('return',
517+ 'deliver', 'declare', etc). I.e., full method names look like
518+ 'basic_declare' etc.
519+ """
520 method = message.method
521
522 try:
523
524=== modified file 'src/txamqp/message.py'
525--- src/txamqp/message.py 2008-10-29 18:31:04 +0000
526+++ src/txamqp/message.py 2010-03-11 10:38:21 +0000
527@@ -1,3 +1,5 @@
528+# -*- coding: utf-8; -*-
529+
530 #
531 # Licensed to the Apache Software Foundation (ASF) under one
532 # or more contributor license agreements. See the NOTICE file
533
534=== modified file 'src/txamqp/protocol.py'
535--- src/txamqp/protocol.py 2009-11-13 18:50:13 +0000
536+++ src/txamqp/protocol.py 2010-03-11 10:38:21 +0000
537@@ -1,4 +1,5 @@
538-# coding: utf-8
539+# -*- coding: utf-8; -*-
540+
541 from twisted.python import log
542 from twisted.internet import defer, protocol
543 from twisted.internet.task import LoopingCall
544@@ -17,10 +18,12 @@
545 class GarbageException(Exception):
546 pass
547
548-# An AMQP channel is a virtual connection that shares the
549-# same socket with others channels. One can have many channels
550-# per connection
551 class AMQChannel(object):
552+ """
553+ An AMQP channel is a virtual connection that shares the same socket
554+ with others channels. One can have many channels per connection.
555+
556+ """
557
558 def __init__(self, id, outgoing):
559 self.id = id
560@@ -45,21 +48,33 @@
561 payload = frame.payload
562 if isinstance(payload, Method):
563 if payload.method.response:
564+ # If the method is a response (to an earlier method call on
565+ # the broker, such as queue_declare), put the frame into
566+ # the responses queue.
567 self.queue = self.responses
568 else:
569+ # This is a incoming message from AMQP that requires
570+ # processing (e.g., connection_tune, channel_flow,
571+ # basic_deliver). We put the incoming queue into the work
572+ # queue of the AMQClient protocol so that as it receives
573+ # subsequent frames (in order), it can put them into our
574+ # incoming queue, which the worker (in AMQClient) consumes
575+ # and builds into a complete incoming message for dispatch.
576+ # This makes sure that incoming frames are properly
577+ # separated by channel number.
578 self.queue = self.incoming
579 work.put(self.incoming)
580 self.queue.put(frame)
581
582 @defer.inlineCallbacks
583- def invoke(self, method, args, content = None):
584+ def invoke(self, method, args, content=None):
585 if self.closed:
586 raise Closed(self.reason)
587 frame = Frame(self.id, Method(method, *args))
588 self.outgoing.put(frame)
589
590 if method.content:
591- if content == None:
592+ if content is None:
593 content = Content()
594 self.writeContent(method.klass, content, self.outgoing)
595
596@@ -90,16 +105,28 @@
597
598 def writeContent(self, klass, content, queue):
599 size = content.size()
600- header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
601+ header = Frame(self.id, Header(klass, content.weight(), size,
602+ **content.properties))
603 queue.put(header)
604 for child in content.children:
605 self.writeContent(klass, child, queue)
606- # should split up if content.body exceeds max frame size
607+ # Should split this up if content.body exceeds max frame size.
608 if size > 0:
609 queue.put(Frame(self.id, Body(content.body)))
610
611+
612 class FrameReceiver(protocol.Protocol, basic._PauseableMixin):
613
614+ """
615+ A message from AMQP takes the form of one or more frames. The
616+ FrameReceiver class reads the binary data from the transport and calls
617+ frameReceived (which must be implemented by a subclass) for each
618+ complete frame.
619+
620+ This class can also send frames to the AMQP broker.
621+
622+ """
623+
624 frame_mode = False
625 MAX_LENGTH = 4096
626 HEADER_LENGTH = 1 + 2 + 4 + 1
627@@ -134,7 +161,8 @@
628 payload = Frame.DECODERS[frameType].decode(self.spec, c)
629 end = c.decode_octet()
630 if end != self.FRAME_END:
631- raise GarbageException('frame error: expected %r, got %r' % (self.FRAME_END, end))
632+ raise GarbageException('frame error: expected %r, got %r' %
633+ (self.FRAME_END, end))
634 frame = Frame(channel, payload)
635 return frame
636
637@@ -151,7 +179,8 @@
638 while self.frame_mode and not self.paused:
639 sz = len(self.__buffer) - self.HEADER_LENGTH
640 if sz >= 0:
641- length, = struct.unpack("!I", self.__buffer[3:7]) # size = 4 bytes
642+ # size = 4 bytes:
643+ length, = struct.unpack("!I", self.__buffer[3:7])
644 if sz >= length:
645 packet = self.__buffer[:self.HEADER_LENGTH + length]
646 self.__buffer = self.__buffer[self.HEADER_LENGTH + length:]
647@@ -196,10 +225,29 @@
648 content = body.payload.content
649 buf.write(content)
650 read += len(content)
651- defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
652+ defer.returnValue(Content(buf.getvalue(), children,
653+ header.properties.copy()))
654
655 class AMQClient(FrameReceiver):
656
657+ """
658+ This is the main protocol class for talking to an AMQP broker.
659+
660+ All communication with the broker takes place in a virtual 'channel'
661+ (of which there may be many), so this class makes use of a (dynamically
662+ constructed) factory class for producing channel instances.
663+
664+ The flow of control here is not easy to follow! Frames processed by the
665+ FrameReceiver class (which we inherit from) are passed successively to
666+ self.frameReceived which (in self.processFrame) passes them to the
667+ appropriate channel instance. When the frame payload is a method call
668+ (as opposed to a response), the channel puts its incoming queue onto
669+ our self.work queue, which results in our self.worker passing it (the
670+ channel's incoming queue) to self.dispatch, which reads the method
671+ content (if any), builds the complete incoming message and passes it to
672+ the delegate (see TwistedDelegate in client.py) to make the appropriate
673+ method call."""
674+
675 channelClass = AMQChannel
676
677 # Max unreceived heartbeat frames. The AMQP standard says it's 3.
678@@ -214,23 +262,62 @@
679
680 self.vhost = vhost
681
682+ # Make a factory class that can generate new channel class
683+ # instances. This is done using type as the class creator because
684+ # one of two the superclasses of the class we're making was made
685+ # dynamically from the AMQP spec.
686 self.channelFactory = type("Channel%s" % self.spec.klass.__name__,
687 (self.channelClass, self.spec.klass), {})
688+
689+ # A dictionary of channel instances. As dictionaries are not thread
690+ # safe, also make a lock for accessing the dictionary.
691 self.channels = {}
692 self.channelLock = defer.DeferredLock()
693
694+ # A queue for sending messages to AMQP.
695 self.outgoing = defer.DeferredQueue()
696+
697+ # A queue of incoming work in progress. Some AMQP method calls are
698+ # composed of several incoming messages (which in turn may be
699+ # composed of several frames). The objects that will be put into
700+ # the work queue (by a channel instance) are themselves queues.
701+ # These will be consumed from to assemble an incoming message.
702 self.work = defer.DeferredQueue()
703
704+ # The started event will fire once the connection is established
705+ # and the initial handshake has completed (prior to authentication).
706 self.started = TwistedEvent()
707
708+ # A dictionary to hold basic_deliver messages coming from AMQP.
709+ # Each of these messages contains a tag that the queue consumer may
710+ # specify (or else the broker assigns one) when it sends a
711+ # basic_consume to the broker. The tag is used as a key into the
712+ # self.queues dictionary. The queuelock is used as a lock for the
713+ # dictionary when need to modify it, due to the non-thread-safeness
714+ # of dictionaries in Python.
715+ self.queues = {}
716 self.queueLock = defer.DeferredLock()
717+
718+ # There are two cases in which the broker may send a basic_return
719+ # message. For messages that are marked as mandatory and/or
720+ # immediate, but that cannot be delivered as such. For details, see
721+ # the AMQP spec. The basic_return_queue is used to hold these
722+ # incoming messages.
723 self.basic_return_queue = TimeoutDeferredQueue()
724
725- self.queues = {}
726-
727+ # Prepare to write outgoing messages to the broker by having our
728+ # writer method await the first outgoing message on the outgoing
729+ # queue. Thereafter it will arrange for itself to be called on
730+ # subsequent outgoing messages.
731 self.outgoing.get().addCallback(self.writer)
732+
733+ # Prepare to deal with incoming messages from the broker by having
734+ # our worker method await the first set of incoming messages on the
735+ # work queue. Thereafter it will arrange for itself to be called on
736+ # subsequent incoming messages.
737 self.work.get().addCallback(self.worker)
738+
739+ # Arrange for heartbeat traffic, if requested.
740 self.heartbeatInterval = heartbeat
741 if self.heartbeatInterval > 0:
742 if clock is None:
743@@ -296,10 +383,22 @@
744 self.outgoing.get().addCallback(self.writer)
745
746 def worker(self, queue):
747+ """
748+ @param queue: This is a queue from a channel instance created by
749+ self.channelFactory (i.e., normally the incoming queue of an
750+ AMQChannel subclass. See the dispatch() method of AMQChannel for
751+ the work.put call that puts a channel's queue into our self.work
752+ queue, which results in the channel's queue arriving here as the
753+ argument).
754+ """
755 d = self.dispatch(queue)
756+
757+ # Set things up so that we (i.e., self.worker) also handle future
758+ # work arriving on the self.work queue.
759 def cb(ign):
760 self.work.get().addCallback(self.worker)
761 d.addCallback(cb)
762+
763 d.addErrback(self.close)
764
765 @defer.inlineCallbacks
766@@ -315,7 +414,7 @@
767 message = Message(payload.method, payload.args, content)
768 self.delegate.dispatch(channel, message)
769
770- # As soon as we connect to the target AMQP broker, send the init string
771+ # When we connect to the target AMQP broker, send the init string.
772 def connectionMade(self):
773 self.sendInitString()
774 self.setFrameMode()
775@@ -339,7 +438,8 @@
776 self.reschedule_checkHB()
777
778 @defer.inlineCallbacks
779- def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
780+ def authenticate(self, username, password, mechanism='AMQPLAIN',
781+ locale='en_US'):
782 if self.check_0_8():
783 response = {"LOGIN": username, "PASSWORD": password}
784 else:
785@@ -374,4 +474,3 @@
786 if self.checkHB.active():
787 self.checkHB.cancel()
788 self.close(reason)
789-
790
791=== modified file 'src/txamqp/queue.py'
792--- src/txamqp/queue.py 2009-05-14 15:03:24 +0000
793+++ src/txamqp/queue.py 2010-03-11 10:38:21 +0000
794@@ -1,4 +1,5 @@
795-# coding: utf-8
796+# -*- coding: utf-8; -*-
797+
798 from twisted.internet.defer import DeferredQueue
799
800 class Empty(Exception):
801
802=== modified file 'src/txamqp/spec.py'
803--- src/txamqp/spec.py 2009-07-17 16:04:44 +0000
804+++ src/txamqp/spec.py 2010-03-11 10:38:21 +0000
805@@ -1,3 +1,5 @@
806+# -*- coding: utf-8; -*-
807+
808 #
809 # Licensed to the Apache Software Foundation (ASF) under one
810 # or more contributor license agreements. See the NOTICE file
811
812=== modified file 'src/txamqp/testlib.py'
813--- src/txamqp/testlib.py 2009-08-08 18:03:05 +0000
814+++ src/txamqp/testlib.py 2010-03-11 10:38:21 +0000
815@@ -1,3 +1,5 @@
816+# -*- coding: utf-8; -*-
817+
818 #
819 # Licensed to the Apache Software Foundation (ASF) under one
820 # or more contributor license agreements. See the NOTICE file
821
822=== modified file 'src/txamqp/xmlutil.py'
823--- src/txamqp/xmlutil.py 2009-07-09 21:28:09 +0000
824+++ src/txamqp/xmlutil.py 2010-03-11 10:38:21 +0000
825@@ -1,3 +1,5 @@
826+# -*- coding: utf-8; -*-
827+
828 #
829 # Licensed to the Apache Software Foundation (ASF) under one
830 # or more contributor license agreements. See the NOTICE file

Subscribers

People subscribed via source and target branches

to status/vote changes: