Merge lp:~terrycojones/txamqp/lots-of-comments into lp:txamqp
- lots-of-comments
- Merge into trunk
Proposed by
Terry Jones
Status: | Approved |
---|---|
Approved by: | Thomas Herve |
Approved revision: | 43 |
Proposed branch: | lp:~terrycojones/txamqp/lots-of-comments |
Merge into: | lp:txamqp |
Diff against target: |
830 lines (+305/-101) 17 files modified
doc/README (+29/-20) setup.py (+3/-1) src/examples/simple/README (+5/-1) src/examples/simple/txconsumer.py (+31/-15) src/examples/simple/txpublisher.py (+40/-26) src/examples/thrift/README (+25/-19) src/txamqp/client.py (+19/-1) src/txamqp/codec.py (+2/-0) src/txamqp/connection.py (+2/-0) src/txamqp/content.py (+2/-0) src/txamqp/delegate.py (+22/-1) src/txamqp/message.py (+2/-0) src/txamqp/protocol.py (+115/-16) src/txamqp/queue.py (+2/-1) src/txamqp/spec.py (+2/-0) src/txamqp/testlib.py (+2/-0) src/txamqp/xmlutil.py (+2/-0) |
To merge this branch: | bzr merge lp:~terrycojones/txamqp/lots-of-comments |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Thomas Herve | Approve | ||
Review via email: mp+21048@code.launchpad.net |
Commit message
Description of the change
This branch is aimed at making the txAMQP code base easier for people to understand. The changes are almost all docstrings, comments, README clarifications, and some cleaning / simplification of the src/examples/simple code.
To post a comment you must log in.
- 42. By Terry Jones
-
Added code to print a friendly message on failure to authenticate with the broker.
- 43. By Terry Jones
-
Make sure the reactor is running when we go to stop it.
Unmerged revisions
- 43. By Terry Jones
-
Make sure the reactor is running when we go to stop it.
- 42. By Terry Jones
-
Added code to print a friendly message on failure to authenticate with the broker.
- 41. By Terry Jones
-
Simplified the simple example publisher code.
- 40. By Terry Jones
-
Added a bunch of comments to try to help people (mainly me, to start with) understand what's going on.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'doc/README' |
2 | --- doc/README 2009-08-15 10:27:56 +0000 |
3 | +++ doc/README 2010-03-11 10:38:21 +0000 |
4 | @@ -1,51 +1,60 @@ |
5 | -This is a quick post on getting started using the txAMQP library with RabbitMQ, |
6 | -based on Dan Reverri's article "Getting started with RabbitMQ, Python, Twisted, |
7 | -and txAMQP on Ubuntu 9.04" [http://www.apparatusproject.org/blog/?p=38] |
8 | +Here is some information you may need for getting started using the txAMQP |
9 | +library with RabbitMQ, based on Dan Reverri's article "Getting started with |
10 | +RabbitMQ, Python, Twisted, and txAMQP on Ubuntu 9.04" |
11 | +[http://www.apparatusproject.org/blog/?p=38] |
12 | |
13 | Install RabbitMQ |
14 | +---------------- |
15 | |
16 | If you're using Debian (or a derivative, like Ubuntu) and RabbitMQ is not |
17 | included in your distribution, you can add RabbitMQ's APT repository: |
18 | |
19 | http://www.rabbitmq.com/debian.html#apt |
20 | |
21 | -Running RabbitMQ |
22 | +Run RabbitMQ |
23 | +------------ |
24 | |
25 | If you installed RabbitMQ using the Debian package, it will be automatically |
26 | started. If not, you'll find an init.d file in /etc/init.d/rabbitmq-server |
27 | |
28 | Install Twisted |
29 | +--------------- |
30 | |
31 | -As of 2009-07-09, txAMQP is being developed using Twisted 8.2.0, it may |
32 | -work with older versions, but it's not guaranteed. In case you are using |
33 | -Ubuntu, and the version available is not recent enough, you can install |
34 | -Twisted through its PPA: |
35 | +txAMQP was developed under Twisted 8.2.0, it may work with older versions, |
36 | +but there is no guarantee. txAMQP is known to run successfully on Twisted |
37 | +9.0 and 10.0. If you are using Ubuntu, and your version of Twisted is not |
38 | +recent enough, you can install Twisted from its PPA: |
39 | |
40 | https://launchpad.net/~twisted-dev/+archive/ppa |
41 | |
42 | Install txAMQP |
43 | +-------------- |
44 | |
45 | The instructions below install txAQMP from trunk. There are Ubuntu packages |
46 | -available if you prefer, refer to the following page for more details: |
47 | +available if you prefer: refer to the following page for more details: |
48 | |
49 | https://launchpad.net/~txamqpteam/+archive/ppa |
50 | |
51 | a. Install Bazaar |
52 | |
53 | -$ sudo apt-get install bzr |
54 | + $ sudo apt-get install bzr |
55 | |
56 | b. Fetch txAQMP |
57 | |
58 | -$ cd ~ |
59 | -$ bzr branch lp:txamqp txamqp |
60 | - |
61 | -If you get the error "bzr: ERROR: Unknown repository format: 'Bazaar |
62 | -RepositoryFormatKnitPack6 (bzr 1.9)", you'll need to upgrade Bazaar. You may |
63 | -do so following the instructions at: |
64 | - |
65 | -http://bazaar-vcs.org/Download |
66 | + $ bzr branch lp:txamqp txamqp |
67 | + |
68 | + If you get the error "bzr: ERROR: Unknown repository format: 'Bazaar |
69 | + RepositoryFormatKnitPack6 (bzr 1.9)" or similar, you'll need to upgrade |
70 | + Bazaar. To do so, follow the instructions at: http://bazaar-vcs.org/Download |
71 | |
72 | c. Install txAMQP |
73 | |
74 | -$ cd txamqp |
75 | -$ sudo python setup.py install |
76 | + $ cd txamqp |
77 | + $ sudo python setup.py install |
78 | + |
79 | +Run a txAMQP example |
80 | +-------------------- |
81 | + |
82 | +You can confirm that txAMQP is properly installed and working correctly by |
83 | +running the scripts in the src/examples/simple directory. See the README |
84 | +file in that directory for detailed instructions. |
85 | |
86 | === modified file 'setup.py' |
87 | --- setup.py 2009-08-16 11:30:01 +0000 |
88 | +++ setup.py 2010-03-11 10:38:21 +0000 |
89 | @@ -10,7 +10,9 @@ |
90 | from setuptools import setup, find_packages |
91 | except ImportError: |
92 | from distutils.core import setup |
93 | - setupdict['packages'] = ['txamqp', 'txamqp.contrib', 'txamqp.contrib.thrift'] |
94 | + setupdict['packages'] = ['txamqp', |
95 | + 'txamqp.contrib', |
96 | + 'txamqp.contrib.thrift'] |
97 | setupdict['package_dir'] = { |
98 | 'txamqp': 'src/txamqp', |
99 | 'txamqp.contrib': 'src/txamqp/contrib', |
100 | |
101 | === modified file 'src/examples/simple/README' |
102 | --- src/examples/simple/README 2009-08-15 10:31:11 +0000 |
103 | +++ src/examples/simple/README 2010-03-11 10:38:21 +0000 |
104 | @@ -1,6 +1,10 @@ |
105 | Instructions |
106 | ============ |
107 | |
108 | +This directory contains a simple example of code that uses txAMQP to |
109 | +connect to an AMQP broker, create an exchange, and then publish and receive |
110 | +a number of messages. |
111 | + |
112 | Requirements |
113 | ------------ |
114 | |
115 | @@ -14,7 +18,7 @@ |
116 | a) In one of them run the txconsumer.py script: |
117 | $ python txconsumer.py host port vhost username password path_to_spec |
118 | e.g. txconsumer.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml |
119 | - b) In the other one run the txpublisher.py script: |
120 | + b) In the other, run the txpublisher.py script: |
121 | $ python txpublisher.py host port vhost username password path_to_spec content [count] |
122 | e.g. txpublisher.py localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml hello 1000 |
123 | |
124 | |
125 | === modified file 'src/examples/simple/txconsumer.py' |
126 | --- src/examples/simple/txconsumer.py 2009-08-15 10:31:11 +0000 |
127 | +++ src/examples/simple/txconsumer.py 2010-03-11 10:38:21 +0000 |
128 | @@ -1,50 +1,64 @@ |
129 | from twisted.internet.defer import inlineCallbacks |
130 | from twisted.internet import reactor |
131 | from twisted.internet.protocol import ClientCreator |
132 | +from twisted.python import log |
133 | from txamqp.protocol import AMQClient |
134 | -from txamqp.client import TwistedDelegate |
135 | +from txamqp.client import TwistedDelegate, Closed |
136 | import txamqp.spec |
137 | |
138 | @inlineCallbacks |
139 | def gotConnection(conn, username, password): |
140 | print "Connected to broker." |
141 | - yield conn.authenticate(username, password) |
142 | + |
143 | + try: |
144 | + yield conn.authenticate(username, password) |
145 | + except Closed: |
146 | + print "Failed to authenticate with AMQP broker. Check that " \ |
147 | + "the username and password you're supplying are correct." |
148 | + return |
149 | |
150 | print "Authenticated. Ready to receive messages" |
151 | chan = yield conn.channel(1) |
152 | yield chan.channel_open() |
153 | |
154 | - yield chan.queue_declare(queue="chatrooms", durable=True, exclusive=False, auto_delete=False) |
155 | - yield chan.exchange_declare(exchange="chatservice", type="direct", durable=True, auto_delete=False) |
156 | - |
157 | - yield chan.queue_bind(queue="chatrooms", exchange="chatservice", routing_key="txamqp_chatroom") |
158 | - |
159 | - yield chan.basic_consume(queue='chatrooms', no_ack=True, consumer_tag="testtag") |
160 | + # Create an exchange (the producer can do this too, with no ill effect). |
161 | + yield chan.exchange_declare(exchange="chatservice", type="direct", |
162 | + durable=True, auto_delete=False) |
163 | + |
164 | + yield chan.queue_declare(queue="chatrooms", durable=True, |
165 | + exclusive=False, auto_delete=False) |
166 | + |
167 | + yield chan.queue_bind(queue="chatrooms", exchange="chatservice", |
168 | + routing_key="txamqp_chatroom") |
169 | + |
170 | + yield chan.basic_consume(queue='chatrooms', no_ack=True, |
171 | + consumer_tag="testtag") |
172 | |
173 | queue = yield conn.queue("testtag") |
174 | |
175 | while True: |
176 | msg = yield queue.get() |
177 | - print 'Received: ' + msg.content.body + ' from channel #' + str(chan.id) |
178 | + print 'Received: %r from channel #%s' % (msg.content.body, chan.id) |
179 | if msg.content.body == "STOP": |
180 | break |
181 | |
182 | + # Cancel consuming from the queue. |
183 | yield chan.basic_cancel("testtag") |
184 | |
185 | + # Close the channel. |
186 | yield chan.channel_close() |
187 | |
188 | + # Close the connection (must be done over channel 0). |
189 | chan0 = yield conn.channel(0) |
190 | - |
191 | yield chan0.connection_close() |
192 | |
193 | - reactor.stop() |
194 | - |
195 | |
196 | if __name__ == "__main__": |
197 | import sys |
198 | if len(sys.argv) < 7: |
199 | print "%s host port vhost username password path_to_spec" % sys.argv[0] |
200 | - print "e.g. %s localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml" % sys.argv[0] |
201 | + print "e.g. %s localhost 5672 / guest guest " \ |
202 | + "../../specs/standard/amqp0-8.xml" % sys.argv[0] |
203 | sys.exit(1) |
204 | |
205 | host = sys.argv[1] |
206 | @@ -58,8 +72,10 @@ |
207 | delegate = TwistedDelegate() |
208 | |
209 | d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost, |
210 | - spec=spec).connectTCP(host, port) |
211 | + spec=spec).connectTCP(host, port) |
212 | |
213 | d.addCallback(gotConnection, username, password) |
214 | - |
215 | + d.addErrback(log.err) |
216 | + d.addBoth(lambda _: reactor.callLater(0, reactor.stop)) |
217 | + |
218 | reactor.run() |
219 | |
220 | === modified file 'src/examples/simple/txpublisher.py' |
221 | --- src/examples/simple/txpublisher.py 2009-08-15 10:31:11 +0000 |
222 | +++ src/examples/simple/txpublisher.py 2010-03-11 10:38:21 +0000 |
223 | @@ -1,51 +1,63 @@ |
224 | from twisted.internet.defer import inlineCallbacks |
225 | from twisted.internet import reactor, task |
226 | from twisted.internet.protocol import ClientCreator |
227 | +from twisted.python import log |
228 | from txamqp.protocol import AMQClient |
229 | -from txamqp.client import TwistedDelegate |
230 | +from txamqp.client import TwistedDelegate, Closed |
231 | from txamqp.content import Content |
232 | import txamqp.spec |
233 | |
234 | @inlineCallbacks |
235 | def gotConnection(conn, username, password, body, count=1): |
236 | + """ |
237 | + |
238 | + @param: conn is an instance of AMQClient, the protocol class that |
239 | + handles talking to AMQP. |
240 | + """ |
241 | print "Connected to broker." |
242 | - yield conn.authenticate(username, password) |
243 | + |
244 | + try: |
245 | + yield conn.authenticate(username, password) |
246 | + except Closed: |
247 | + print "Failed to authenticate with AMQP broker. Check that " \ |
248 | + "the username and password you're supplying are correct." |
249 | + return |
250 | |
251 | print "Authenticated. Ready to send messages" |
252 | chan = yield conn.channel(1) |
253 | yield chan.channel_open() |
254 | |
255 | - def send_messages(): |
256 | - def message_iterator(): |
257 | - for i in range(count): |
258 | - content = body + "-%d" % i |
259 | - msg = Content(content) |
260 | - msg["delivery mode"] = 2 |
261 | - chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom") |
262 | - print "Sending message: %s" % content |
263 | - yield None |
264 | - return task.coiterate(message_iterator()) |
265 | - |
266 | - yield send_messages() |
267 | - |
268 | - stopToken = "STOP" |
269 | - msg = Content(stopToken) |
270 | - msg["delivery mode"] = 2 |
271 | - chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom") |
272 | - print "Sending message: %s" % stopToken |
273 | - |
274 | + # Create an exchange (the consumer can do this too, with no ill effect). |
275 | + yield chan.exchange_declare(exchange="chatservice", type="direct", |
276 | + durable=True, auto_delete=False) |
277 | + |
278 | + def sender(): |
279 | + for text in ['%s-%d' % (body, i) for i in range(count)] + ['STOP']: |
280 | + msg = Content(text) |
281 | + msg["delivery mode"] = 2 |
282 | + print "Sending message: %r" % text |
283 | + yield chan.basic_publish(exchange="chatservice", content=msg, |
284 | + routing_key="txamqp_chatroom") |
285 | + |
286 | + # Use a coiterator to send all the messages. This ensures the reactor |
287 | + # gets control once in a while to do the actual sending. |
288 | + yield task.coiterate(sender()) |
289 | + |
290 | + # Close the channel. |
291 | yield chan.channel_close() |
292 | |
293 | + # Close the connection (must be done over channel 0). |
294 | chan0 = yield conn.channel(0) |
295 | yield chan0.connection_close() |
296 | - |
297 | - reactor.stop() |
298 | + |
299 | |
300 | if __name__ == "__main__": |
301 | import sys |
302 | if len(sys.argv) < 8: |
303 | - print "%s host port vhost username password path_to_spec content [count]" % sys.argv[0] |
304 | - print "e.g. %s localhost 5672 / guest guest ../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0] |
305 | + print "%s host port vhost username password path_to_spec content " \ |
306 | + "[count]" % sys.argv[0] |
307 | + print "e.g. %s localhost 5672 / guest guest " \ |
308 | + "../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0] |
309 | sys.exit(1) |
310 | |
311 | host = sys.argv[1] |
312 | @@ -65,8 +77,10 @@ |
313 | delegate = TwistedDelegate() |
314 | |
315 | d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost, |
316 | - spec=spec).connectTCP(host, port) |
317 | + spec=spec).connectTCP(host, port) |
318 | |
319 | d.addCallback(gotConnection, username, password, content, count) |
320 | + d.addErrback(log.err) |
321 | + d.addBoth(lambda _: reactor.callLater(0, reactor.stop)) |
322 | |
323 | reactor.run() |
324 | |
325 | === modified file 'src/examples/thrift/README' |
326 | --- src/examples/thrift/README 2009-08-15 10:31:11 +0000 |
327 | +++ src/examples/thrift/README 2010-03-11 10:38:21 +0000 |
328 | @@ -1,6 +1,10 @@ |
329 | Instructions |
330 | ============ |
331 | |
332 | +This file contains some brief instructions on how to get and install Thrift |
333 | +for Python, to use it to generate Twisted Python code to set-up a simple |
334 | +service, and to run the example client and server code in this directory. |
335 | + |
336 | Dependencies |
337 | ------------ |
338 | |
339 | @@ -25,9 +29,9 @@ |
340 | |
341 | - General |
342 | |
343 | -1. Download Thrift source code [1] |
344 | +1. Download the Thrift source code from http://incubator.apache.org/thrift/ |
345 | 2. Check if it contains support for Twisted, you'll need revision 749795 or |
346 | -greater. |
347 | + greater. |
348 | 3. Compile Thrift with suppport for Python. |
349 | 4. Install Thrift. |
350 | |
351 | @@ -37,27 +41,29 @@ |
352 | 1. Generate Thrift Python services: |
353 | $ thrift --gen py:twisted shared.thrift |
354 | $ thrift --gen py:twisted tutorial.thrift |
355 | + |
356 | 2. Start a server: |
357 | $ python server.py host port vhost username password path_to_spec |
358 | + |
359 | Note that you can start as many servers as you want at the same time. |
360 | - Incoming requests will be round-robined and the servers load will be |
361 | + Incoming requests will be round-robined; the servers' loads will be |
362 | balanced automatically. |
363 | + |
364 | 3. Start a client: |
365 | $ python client.py host port vhost username password path_to_spec |
366 | |
367 | -Where: |
368 | -- host: The host where the AMQP broker is located. |
369 | -- port: The port where the AMQP broker listens on. |
370 | -- vhost: The virtual host (depending on your broker configuration, default |
371 | - in most cases is '/'). |
372 | -- username: The username to log in to the broker (default in most cases |
373 | - is 'guest'). |
374 | -- password: The username to log in to the broker (default in most cases |
375 | - is 'guest'). |
376 | -- path_to_spec: The path to the AMQP spec that you want to use. Keep in mind |
377 | - that depending on the broker you use, you will need a different spec: |
378 | - - RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml |
379 | - - OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml |
380 | - - Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml |
381 | - |
382 | -1 - http://incubator.apache.org/thrift/ |
383 | + Where: |
384 | + - host: The host where the AMQP broker is located. |
385 | + - port: The port the AMQP broker listens on. |
386 | + - vhost: The broker virtual host (depends on your broker |
387 | + configuration, default in most cases is '/'). |
388 | + - username: The username to log in to the broker with (default in |
389 | + most cases is 'guest'). |
390 | + - password: The password to log in to the broker with (default in |
391 | + most cases is 'guest'). |
392 | + - path_to_spec: The path to the AMQP spec that you want to use. Note |
393 | + that depending on the broker you use, you will need a |
394 | + different spec: |
395 | + RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml |
396 | + OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml |
397 | + Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml |
398 | |
399 | === modified file 'src/txamqp/client.py' |
400 | --- src/txamqp/client.py 2009-08-08 17:54:09 +0000 |
401 | +++ src/txamqp/client.py 2010-03-11 10:38:21 +0000 |
402 | @@ -1,11 +1,18 @@ |
403 | -# coding: utf-8 |
404 | +# -*- coding: utf-8; -*- |
405 | + |
406 | from twisted.internet import defer |
407 | from txamqp.delegate import Delegate |
408 | |
409 | + |
410 | class Closed(Exception): |
411 | pass |
412 | |
413 | + |
414 | class TwistedEvent(object): |
415 | + """A class that acts like an event that can be fired (via set). This is |
416 | + modeled on threading.Event, but in a Twisted style. Hence wait returns |
417 | + a Deferred instead of blocking. |
418 | + """ |
419 | def __init__(self): |
420 | self.deferred = defer.Deferred() |
421 | self.alreadyCalled = False |
422 | @@ -20,8 +27,19 @@ |
423 | deferred, self.deferred = self.deferred, defer.Deferred() |
424 | deferred.callback(True) |
425 | |
426 | + |
427 | class TwistedDelegate(Delegate): |
428 | |
429 | + """ |
430 | + When the broker sends a method call to the client, such as |
431 | + basic_deliver, channel_close, channel_flow, etc., the method is routed |
432 | + to a TwistedDelegate. |
433 | + |
434 | + If you wanted to have something else receiving the messages from the |
435 | + broker, you would replace this class, which is what the Thrift contrib |
436 | + code does (see src/txamqp/contrib/thrift in the txAMQP distribution). |
437 | + """ |
438 | + |
439 | def connection_start(self, ch, msg): |
440 | ch.connection_start_ok(mechanism=self.client.mechanism, |
441 | response=self.client.response, |
442 | |
443 | === modified file 'src/txamqp/codec.py' |
444 | --- src/txamqp/codec.py 2008-10-29 18:31:04 +0000 |
445 | +++ src/txamqp/codec.py 2010-03-11 10:38:21 +0000 |
446 | @@ -1,4 +1,6 @@ |
447 | #!/usr/bin/env python |
448 | +# -*- coding: utf-8; -*- |
449 | + |
450 | |
451 | # |
452 | # Licensed to the Apache Software Foundation (ASF) under one |
453 | |
454 | === modified file 'src/txamqp/connection.py' |
455 | --- src/txamqp/connection.py 2009-04-27 17:00:08 +0000 |
456 | +++ src/txamqp/connection.py 2010-03-11 10:38:21 +0000 |
457 | @@ -1,3 +1,5 @@ |
458 | +# -*- coding: utf-8; -*- |
459 | + |
460 | # |
461 | # Licensed to the Apache Software Foundation (ASF) under one |
462 | # or more contributor license agreements. See the NOTICE file |
463 | |
464 | === modified file 'src/txamqp/content.py' |
465 | --- src/txamqp/content.py 2009-07-07 22:45:21 +0000 |
466 | +++ src/txamqp/content.py 2010-03-11 10:38:21 +0000 |
467 | @@ -1,3 +1,5 @@ |
468 | +# -*- coding: utf-8; -*- |
469 | + |
470 | # |
471 | # Licensed to the Apache Software Foundation (ASF) under one |
472 | # or more contributor license agreements. See the NOTICE file |
473 | |
474 | === modified file 'src/txamqp/delegate.py' |
475 | --- src/txamqp/delegate.py 2008-10-29 18:31:04 +0000 |
476 | +++ src/txamqp/delegate.py 2010-03-11 10:38:21 +0000 |
477 | @@ -1,3 +1,5 @@ |
478 | +# -*- coding: utf-8; -*- |
479 | + |
480 | # |
481 | # Licensed to the Apache Software Foundation (ASF) under one |
482 | # or more contributor license agreements. See the NOTICE file |
483 | @@ -20,7 +22,14 @@ |
484 | import inspect |
485 | from spec import pythonize |
486 | |
487 | -class Delegate: |
488 | + |
489 | +class Delegate(object): |
490 | + """ |
491 | + This is the base class to which the txAMQP protocol class routes messages |
492 | + from the broker. It is subclassed by TwistedDelegate, or anything else |
493 | + that feels like it (for an example, see the ThriftTwistedDelegate class |
494 | + in src/txamqp/contrib/thrift/client.py). |
495 | + """ |
496 | |
497 | def __init__(self): |
498 | self.handlers = {} |
499 | @@ -29,11 +38,23 @@ |
500 | self.invoke_all("init") |
501 | |
502 | def invoke_all(self, meth, *args, **kwargs): |
503 | + """Call the init method (if any) in all superclasses of the subclass of |
504 | + Delegate. This currently doesn't call anything, but is here in case a |
505 | + future subclass needs an initialization mechanism. |
506 | + """ |
507 | for cls in inspect.getmro(self.__class__): |
508 | if hasattr(cls, meth): |
509 | getattr(cls, meth)(self, *args, **kwargs) |
510 | |
511 | def dispatch(self, channel, message): |
512 | + """ |
513 | + Dispatch an incoming AMQP method call to the client's handler. The |
514 | + method to call is either in self.handlers or is looked up on self. In |
515 | + the latter case, the name of the method is composed of the AMQP class |
516 | + ('basic', 'exchange', 'queue', etc) and the AMQP method ('return', |
517 | + 'deliver', 'declare', etc). I.e., full method names look like |
518 | + 'basic_declare' etc. |
519 | + """ |
520 | method = message.method |
521 | |
522 | try: |
523 | |
524 | === modified file 'src/txamqp/message.py' |
525 | --- src/txamqp/message.py 2008-10-29 18:31:04 +0000 |
526 | +++ src/txamqp/message.py 2010-03-11 10:38:21 +0000 |
527 | @@ -1,3 +1,5 @@ |
528 | +# -*- coding: utf-8; -*- |
529 | + |
530 | # |
531 | # Licensed to the Apache Software Foundation (ASF) under one |
532 | # or more contributor license agreements. See the NOTICE file |
533 | |
534 | === modified file 'src/txamqp/protocol.py' |
535 | --- src/txamqp/protocol.py 2009-11-13 18:50:13 +0000 |
536 | +++ src/txamqp/protocol.py 2010-03-11 10:38:21 +0000 |
537 | @@ -1,4 +1,5 @@ |
538 | -# coding: utf-8 |
539 | +# -*- coding: utf-8; -*- |
540 | + |
541 | from twisted.python import log |
542 | from twisted.internet import defer, protocol |
543 | from twisted.internet.task import LoopingCall |
544 | @@ -17,10 +18,12 @@ |
545 | class GarbageException(Exception): |
546 | pass |
547 | |
548 | -# An AMQP channel is a virtual connection that shares the |
549 | -# same socket with others channels. One can have many channels |
550 | -# per connection |
551 | class AMQChannel(object): |
552 | + """ |
553 | + An AMQP channel is a virtual connection that shares the same socket |
554 | + with others channels. One can have many channels per connection. |
555 | + |
556 | + """ |
557 | |
558 | def __init__(self, id, outgoing): |
559 | self.id = id |
560 | @@ -45,21 +48,33 @@ |
561 | payload = frame.payload |
562 | if isinstance(payload, Method): |
563 | if payload.method.response: |
564 | + # If the method is a response (to an earlier method call on |
565 | + # the broker, such as queue_declare), put the frame into |
566 | + # the responses queue. |
567 | self.queue = self.responses |
568 | else: |
569 | + # This is a incoming message from AMQP that requires |
570 | + # processing (e.g., connection_tune, channel_flow, |
571 | + # basic_deliver). We put the incoming queue into the work |
572 | + # queue of the AMQClient protocol so that as it receives |
573 | + # subsequent frames (in order), it can put them into our |
574 | + # incoming queue, which the worker (in AMQClient) consumes |
575 | + # and builds into a complete incoming message for dispatch. |
576 | + # This makes sure that incoming frames are properly |
577 | + # separated by channel number. |
578 | self.queue = self.incoming |
579 | work.put(self.incoming) |
580 | self.queue.put(frame) |
581 | |
582 | @defer.inlineCallbacks |
583 | - def invoke(self, method, args, content = None): |
584 | + def invoke(self, method, args, content=None): |
585 | if self.closed: |
586 | raise Closed(self.reason) |
587 | frame = Frame(self.id, Method(method, *args)) |
588 | self.outgoing.put(frame) |
589 | |
590 | if method.content: |
591 | - if content == None: |
592 | + if content is None: |
593 | content = Content() |
594 | self.writeContent(method.klass, content, self.outgoing) |
595 | |
596 | @@ -90,16 +105,28 @@ |
597 | |
598 | def writeContent(self, klass, content, queue): |
599 | size = content.size() |
600 | - header = Frame(self.id, Header(klass, content.weight(), size, **content.properties)) |
601 | + header = Frame(self.id, Header(klass, content.weight(), size, |
602 | + **content.properties)) |
603 | queue.put(header) |
604 | for child in content.children: |
605 | self.writeContent(klass, child, queue) |
606 | - # should split up if content.body exceeds max frame size |
607 | + # Should split this up if content.body exceeds max frame size. |
608 | if size > 0: |
609 | queue.put(Frame(self.id, Body(content.body))) |
610 | |
611 | + |
612 | class FrameReceiver(protocol.Protocol, basic._PauseableMixin): |
613 | |
614 | + """ |
615 | + A message from AMQP takes the form of one or more frames. The |
616 | + FrameReceiver class reads the binary data from the transport and calls |
617 | + frameReceived (which must be implemented by a subclass) for each |
618 | + complete frame. |
619 | + |
620 | + This class can also send frames to the AMQP broker. |
621 | + |
622 | + """ |
623 | + |
624 | frame_mode = False |
625 | MAX_LENGTH = 4096 |
626 | HEADER_LENGTH = 1 + 2 + 4 + 1 |
627 | @@ -134,7 +161,8 @@ |
628 | payload = Frame.DECODERS[frameType].decode(self.spec, c) |
629 | end = c.decode_octet() |
630 | if end != self.FRAME_END: |
631 | - raise GarbageException('frame error: expected %r, got %r' % (self.FRAME_END, end)) |
632 | + raise GarbageException('frame error: expected %r, got %r' % |
633 | + (self.FRAME_END, end)) |
634 | frame = Frame(channel, payload) |
635 | return frame |
636 | |
637 | @@ -151,7 +179,8 @@ |
638 | while self.frame_mode and not self.paused: |
639 | sz = len(self.__buffer) - self.HEADER_LENGTH |
640 | if sz >= 0: |
641 | - length, = struct.unpack("!I", self.__buffer[3:7]) # size = 4 bytes |
642 | + # size = 4 bytes: |
643 | + length, = struct.unpack("!I", self.__buffer[3:7]) |
644 | if sz >= length: |
645 | packet = self.__buffer[:self.HEADER_LENGTH + length] |
646 | self.__buffer = self.__buffer[self.HEADER_LENGTH + length:] |
647 | @@ -196,10 +225,29 @@ |
648 | content = body.payload.content |
649 | buf.write(content) |
650 | read += len(content) |
651 | - defer.returnValue(Content(buf.getvalue(), children, header.properties.copy())) |
652 | + defer.returnValue(Content(buf.getvalue(), children, |
653 | + header.properties.copy())) |
654 | |
655 | class AMQClient(FrameReceiver): |
656 | |
657 | + """ |
658 | + This is the main protocol class for talking to an AMQP broker. |
659 | + |
660 | + All communication with the broker takes place in a virtual 'channel' |
661 | + (of which there may be many), so this class makes use of a (dynamically |
662 | + constructed) factory class for producing channel instances. |
663 | + |
664 | + The flow of control here is not easy to follow! Frames processed by the |
665 | + FrameReceiver class (which we inherit from) are passed successively to |
666 | + self.frameReceived which (in self.processFrame) passes them to the |
667 | + appropriate channel instance. When the frame payload is a method call |
668 | + (as opposed to a response), the channel puts its incoming queue onto |
669 | + our self.work queue, which results in our self.worker passing it (the |
670 | + channel's incoming queue) to self.dispatch, which reads the method |
671 | + content (if any), builds the complete incoming message and passes it to |
672 | + the delegate (see TwistedDelegate in client.py) to make the appropriate |
673 | + method call.""" |
674 | + |
675 | channelClass = AMQChannel |
676 | |
677 | # Max unreceived heartbeat frames. The AMQP standard says it's 3. |
678 | @@ -214,23 +262,62 @@ |
679 | |
680 | self.vhost = vhost |
681 | |
682 | + # Make a factory class that can generate new channel class |
683 | + # instances. This is done using type as the class creator because |
684 | + # one of two the superclasses of the class we're making was made |
685 | + # dynamically from the AMQP spec. |
686 | self.channelFactory = type("Channel%s" % self.spec.klass.__name__, |
687 | (self.channelClass, self.spec.klass), {}) |
688 | + |
689 | + # A dictionary of channel instances. As dictionaries are not thread |
690 | + # safe, also make a lock for accessing the dictionary. |
691 | self.channels = {} |
692 | self.channelLock = defer.DeferredLock() |
693 | |
694 | + # A queue for sending messages to AMQP. |
695 | self.outgoing = defer.DeferredQueue() |
696 | + |
697 | + # A queue of incoming work in progress. Some AMQP method calls are |
698 | + # composed of several incoming messages (which in turn may be |
699 | + # composed of several frames). The objects that will be put into |
700 | + # the work queue (by a channel instance) are themselves queues. |
701 | + # These will be consumed from to assemble an incoming message. |
702 | self.work = defer.DeferredQueue() |
703 | |
704 | + # The started event will fire once the connection is established |
705 | + # and the initial handshake has completed (prior to authentication). |
706 | self.started = TwistedEvent() |
707 | |
708 | + # A dictionary to hold basic_deliver messages coming from AMQP. |
709 | + # Each of these messages contains a tag that the queue consumer may |
710 | + # specify (or else the broker assigns one) when it sends a |
711 | + # basic_consume to the broker. The tag is used as a key into the |
712 | + # self.queues dictionary. The queuelock is used as a lock for the |
713 | + # dictionary when need to modify it, due to the non-thread-safeness |
714 | + # of dictionaries in Python. |
715 | + self.queues = {} |
716 | self.queueLock = defer.DeferredLock() |
717 | + |
718 | + # There are two cases in which the broker may send a basic_return |
719 | + # message. For messages that are marked as mandatory and/or |
720 | + # immediate, but that cannot be delivered as such. For details, see |
721 | + # the AMQP spec. The basic_return_queue is used to hold these |
722 | + # incoming messages. |
723 | self.basic_return_queue = TimeoutDeferredQueue() |
724 | |
725 | - self.queues = {} |
726 | - |
727 | + # Prepare to write outgoing messages to the broker by having our |
728 | + # writer method await the first outgoing message on the outgoing |
729 | + # queue. Thereafter it will arrange for itself to be called on |
730 | + # subsequent outgoing messages. |
731 | self.outgoing.get().addCallback(self.writer) |
732 | + |
733 | + # Prepare to deal with incoming messages from the broker by having |
734 | + # our worker method await the first set of incoming messages on the |
735 | + # work queue. Thereafter it will arrange for itself to be called on |
736 | + # subsequent incoming messages. |
737 | self.work.get().addCallback(self.worker) |
738 | + |
739 | + # Arrange for heartbeat traffic, if requested. |
740 | self.heartbeatInterval = heartbeat |
741 | if self.heartbeatInterval > 0: |
742 | if clock is None: |
743 | @@ -296,10 +383,22 @@ |
744 | self.outgoing.get().addCallback(self.writer) |
745 | |
746 | def worker(self, queue): |
747 | + """ |
748 | + @param queue: This is a queue from a channel instance created by |
749 | + self.channelFactory (i.e., normally the incoming queue of an |
750 | + AMQChannel subclass. See the dispatch() method of AMQChannel for |
751 | + the work.put call that puts a channel's queue into our self.work |
752 | + queue, which results in the channel's queue arriving here as the |
753 | + argument). |
754 | + """ |
755 | d = self.dispatch(queue) |
756 | + |
757 | + # Set things up so that we (i.e., self.worker) also handle future |
758 | + # work arriving on the self.work queue. |
759 | def cb(ign): |
760 | self.work.get().addCallback(self.worker) |
761 | d.addCallback(cb) |
762 | + |
763 | d.addErrback(self.close) |
764 | |
765 | @defer.inlineCallbacks |
766 | @@ -315,7 +414,7 @@ |
767 | message = Message(payload.method, payload.args, content) |
768 | self.delegate.dispatch(channel, message) |
769 | |
770 | - # As soon as we connect to the target AMQP broker, send the init string |
771 | + # When we connect to the target AMQP broker, send the init string. |
772 | def connectionMade(self): |
773 | self.sendInitString() |
774 | self.setFrameMode() |
775 | @@ -339,7 +438,8 @@ |
776 | self.reschedule_checkHB() |
777 | |
778 | @defer.inlineCallbacks |
779 | - def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'): |
780 | + def authenticate(self, username, password, mechanism='AMQPLAIN', |
781 | + locale='en_US'): |
782 | if self.check_0_8(): |
783 | response = {"LOGIN": username, "PASSWORD": password} |
784 | else: |
785 | @@ -374,4 +474,3 @@ |
786 | if self.checkHB.active(): |
787 | self.checkHB.cancel() |
788 | self.close(reason) |
789 | - |
790 | |
791 | === modified file 'src/txamqp/queue.py' |
792 | --- src/txamqp/queue.py 2009-05-14 15:03:24 +0000 |
793 | +++ src/txamqp/queue.py 2010-03-11 10:38:21 +0000 |
794 | @@ -1,4 +1,5 @@ |
795 | -# coding: utf-8 |
796 | +# -*- coding: utf-8; -*- |
797 | + |
798 | from twisted.internet.defer import DeferredQueue |
799 | |
800 | class Empty(Exception): |
801 | |
802 | === modified file 'src/txamqp/spec.py' |
803 | --- src/txamqp/spec.py 2009-07-17 16:04:44 +0000 |
804 | +++ src/txamqp/spec.py 2010-03-11 10:38:21 +0000 |
805 | @@ -1,3 +1,5 @@ |
806 | +# -*- coding: utf-8; -*- |
807 | + |
808 | # |
809 | # Licensed to the Apache Software Foundation (ASF) under one |
810 | # or more contributor license agreements. See the NOTICE file |
811 | |
812 | === modified file 'src/txamqp/testlib.py' |
813 | --- src/txamqp/testlib.py 2009-08-08 18:03:05 +0000 |
814 | +++ src/txamqp/testlib.py 2010-03-11 10:38:21 +0000 |
815 | @@ -1,3 +1,5 @@ |
816 | +# -*- coding: utf-8; -*- |
817 | + |
818 | # |
819 | # Licensed to the Apache Software Foundation (ASF) under one |
820 | # or more contributor license agreements. See the NOTICE file |
821 | |
822 | === modified file 'src/txamqp/xmlutil.py' |
823 | --- src/txamqp/xmlutil.py 2009-07-09 21:28:09 +0000 |
824 | +++ src/txamqp/xmlutil.py 2010-03-11 10:38:21 +0000 |
825 | @@ -1,3 +1,5 @@ |
826 | +# -*- coding: utf-8; -*- |
827 | + |
828 | # |
829 | # Licensed to the Apache Software Foundation (ASF) under one |
830 | # or more contributor license agreements. See the NOTICE file |
[1] The link to the blog post is broken, mainly you should remove it.
[2]
+ $ bzr branch lp:txamqp txamqp
"bzr branch lp:txamqp" is enough to work.
[3] PATH/src/ specs/standard/ amqp0-8. xml
+ - path_to_spec: The path to the AMQP spec that you want to use. Note
+ that depending on the broker you use, you will need a
+ different spec:
+ RabbitMQ 1.5.3: $TXAMQP_
I think this path changed.
[4]
-# coding: utf-8
+# -*- coding: utf-8; -*-
We don't really need those as the files are pure ASCII afaict.
[5]
+ """A class that acts like an event that can be fired (via set). This is
+ """Call the init method (if any) in all superclasses of the subclass of
Can you break the line at the beginning?
[6]
+ the delegate (see TwistedDelegate in client.py) to make the appropriate
+ method call."""
And at the end? :)
[7] There are a bunch of conflicts, mostly trivial.
Thanks, +1!