Merge lp:~me-smira/txamqp/patches into lp:txamqp

Proposed by Andrey Smirnov
Status: Merged
Merge reported by: Esteve Fernandez
Merged at revision: not available
Proposed branch: lp:~me-smira/txamqp/patches
Merge into: lp:txamqp
Diff against target: 44 lines (+22/-0)
2 files modified
src/txamqp/client.py (+4/-0)
src/txamqp/protocol.py (+18/-0)
To merge this branch: bzr merge lp:~me-smira/txamqp/patches
Reviewer Review Type Date Requested Status
Esteve Fernandez Approve
Review via email: mp+19466@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Andrey Smirnov (me-smira) wrote :

Some small patches I had to apply to txAMQP to keep it working.

I'm using it heavily at qik.com, it receives good load (5M+ messages per day pushed through queues per day).

One of the changes fixes bug with channel-flow messages not being implemented. Other are related to scenario when channel is closed by server due to some error (like "queue doesn't exist") and client should reopen the channel to continue operations.

Thanks for great library!

Revision history for this message
Esteve Fernandez (esteve) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/txamqp/client.py'
2--- src/txamqp/client.py 2009-08-08 17:54:09 +0000
3+++ src/txamqp/client.py 2010-02-17 06:06:16 +0000
4@@ -40,7 +40,11 @@
5 def basic_return_(self, ch, msg):
6 self.client.basic_return_queue.put(msg)
7
8+ def channel_flow(self, ch, msg):
9+ ch.channel_flow_ok(active=msg.active)
10+
11 def channel_close(self, ch, msg):
12+ ch.channel_close_ok()
13 ch.close(msg)
14
15 def connection_close(self, ch, msg):
16
17=== modified file 'src/txamqp/protocol.py'
18--- src/txamqp/protocol.py 2009-11-13 18:50:13 +0000
19+++ src/txamqp/protocol.py 2010-02-17 06:06:16 +0000
20@@ -284,6 +284,24 @@
21 self.queueLock.release()
22 defer.returnValue(q)
23
24+ @defer.inlineCallbacks
25+ def closeChannel(self, channel):
26+ yield self.channelLock.acquire()
27+ try:
28+ channel.close(None)
29+ del self.channels[channel.id]
30+ finally:
31+ self.channelLock.release()
32+
33+ @defer.inlineCallbacks
34+ def closeQueue(self, key, queue):
35+ yield self.queueLock.acquire()
36+ try:
37+ queue.close()
38+ del self.queues[key]
39+ finally:
40+ self.queueLock.release()
41+
42 def close(self, reason):
43 for ch in self.channels.values():
44 ch.close(reason)

Subscribers

People subscribed via source and target branches

to status/vote changes: