Merge lp:~txamqpteam/txamqp/support-basic-return into lp:txamqp

Proposed by Terry Jones
Status: Merged
Merged at revision: not available
Proposed branch: lp:~txamqpteam/txamqp/support-basic-return
Merge into: lp:txamqp
Diff against target: None lines
To merge this branch: bzr merge lp:~txamqpteam/txamqp/support-basic-return
Reviewer Review Type Date Requested Status
Dan Di Spaltro code Approve
Esteve Fernandez Approve
Review via email: mp+7561@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Terry Jones (terrycojones) wrote :

This is work done with Esteve to add support for the AMQP basic return message. It adds a mandatory argument to outgoing messages, resulting in a basic return if the message is not routable.

There is an outstanding issue with what, if anything, to do with Thrift oneway methods. They have no client side deferred whose errback can be called if they are unroutable.

Revision history for this message
Esteve Fernandez (esteve) :
review: Approve
Revision history for this message
Esteve Fernandez (esteve) wrote :

Dan, given that you're the only one I know using Thrift and AMQP together (apart from Terry and me), could you review this branch? Thanks!

22. By Terry Jones

Cleaned up handling of KeyError n txamqp/contrib/thrift/client.py (which had a silly logic error - of mine). Added a __repr__ to the Content class in txamqp/content.py. And I now have a reproducible example of 'headers' not being in msg.content when trying to get the thriftClientName in txamqp/contrib/thrift/client.py though haven't tried to dig into why it happens.

Revision history for this message
Dan Di Spaltro (dan-dispaltro) wrote :

+1

Ran the code works well, very handy to boot, now our services are more reliable. The log stuff could probably be cleaned up a bit and lines shortened but that is just cosmetic.

review: Approve (code)
Revision history for this message
Esteve Fernandez (esteve) wrote :

> +1
>
> Ran the code works well, very handy to boot, now our services are more
> reliable. The log stuff could probably be cleaned up a bit and lines
> shortened but that is just cosmetic.

Thanks! Just merged it.

Revision history for this message
Terry Jones (terrycojones) wrote :

Hi Dan

>>>>> "Dan" == Dan Di Spaltro <email address hidden> writes:
Dan> Ran the code works well, very handy to boot, now our services are more
Dan> reliable. The log stuff could probably be cleaned up a bit and lines
Dan> shortened but that is just cosmetic.

The logging in src/txamqp/contrib/thrift/client.py (if that's what you were
referring to) is there due to very the occasional absence of 'headers' in
msg.content. I've seen it happen a couple of times, and had it in
reproducible form (though in the complex setup that is FluidDB, so not easy
to reduce and post for others). But I've not had time to look at it. So I
left the logging there in the hope that someone else might see this pop up
and feel like digging into it :-) I'm sure I will at some point.

It may be benign / harmless. Esteve, I think, came up with a scenario in
which this might happen, but I forget the details.

Terry

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/examples/client.py'
2--- src/examples/client.py 2009-02-11 22:28:45 +0000
3+++ src/examples/client.py 2009-06-02 20:33:36 +0000
4@@ -6,7 +6,7 @@
5 sys.path.insert(0, os.path.join(os.path.abspath(os.path.split(sys.argv[0])[0]), 'gen-py'))
6 import tutorial.Calculator
7 from tutorial.ttypes import *
8-from thrift.transport import TTwisted
9+from thrift.transport import TTwisted, TTransport
10 from thrift.protocol import TBinaryProtocol
11
12 from twisted.internet import reactor, defer
13@@ -34,9 +34,15 @@
14 print results
15
16 def gotCalculateErrors(error):
17- print "Got an error"
18+ error.trap(InvalidOperation)
19+ print "Got a calculator error"
20 print error.value.why
21
22+def gotTransportError(error):
23+ error.trap(TTransport.TTransportException)
24+ print "Got an AMQP unroutable message error:"
25+ print error.value.message
26+
27 @defer.inlineCallbacks
28 def prepareClient(client, username, password):
29 yield client.authenticate(username, password)
30@@ -48,6 +54,11 @@
31 yield channel.exchange_declare(exchange=responsesExchange, type="direct")
32
33 pfactory = TBinaryProtocol.TBinaryProtocolFactory()
34+
35+ # To trigger an unroutable message error (caught in the above
36+ # gotTransportError errback), change the routing key (i.e.,
37+ # calculatorKey) in the following to be something invalid, like
38+ # calculatorKey + 'xxx'.
39 thriftClient = yield client.createThriftClient(responsesExchange,
40 servicesExchange, calculatorKey, tutorial.Calculator.Client,
41 iprot_factory=pfactory, oprot_factory=pfactory)
42@@ -55,35 +66,35 @@
43 defer.returnValue(thriftClient)
44
45 def gotClient(client):
46- d1 = client.ping().addCallback(gotPing)
47+ d1 = client.ping().addCallback(gotPing).addErrback(gotTransportError)
48
49- d2 = client.add(1, 2).addCallback(gotAddResults)
50+ d2 = client.add(1, 2).addCallback(gotAddResults).addErrback(gotTransportError)
51
52 w = Work(num1=2, num2=3, op=Operation.ADD)
53
54 d3 = client.calculate(1, w).addCallbacks(gotCalculateResults,
55- gotCalculateErrors)
56+ gotCalculateErrors).addErrback(gotTransportError)
57
58 w = Work(num1=2, num2=3, op=Operation.SUBTRACT)
59
60 d4 = client.calculate(2, w).addCallbacks(gotCalculateResults,
61- gotCalculateErrors)
62+ gotCalculateErrors).addErrback(gotTransportError)
63
64 w = Work(num1=2, num2=3, op=Operation.MULTIPLY)
65
66 d5 = client.calculate(3, w).addCallbacks(gotCalculateResults,
67- gotCalculateErrors)
68+ gotCalculateErrors).addErrback(gotTransportError)
69
70 w = Work(num1=2, num2=3, op=Operation.DIVIDE)
71
72 d6 = client.calculate(4, w).addCallbacks(gotCalculateResults,
73- gotCalculateErrors)
74+ gotCalculateErrors).addErrback(gotTransportError)
75
76 # This will fire an errback
77 w = Work(num1=2, num2=0, op=Operation.DIVIDE)
78
79 d7 = client.calculate(5, w).addCallbacks(gotCalculateResults,
80- gotCalculateErrors)
81+ gotCalculateErrors).addErrback(gotTransportError)
82
83 d8 = client.zip()
84
85
86=== modified file 'src/txamqp/client.py'
87--- src/txamqp/client.py 2009-05-28 09:46:03 +0000
88+++ src/txamqp/client.py 2009-06-02 20:33:36 +0000
89@@ -36,6 +36,9 @@
90 def basic_deliver(self, ch, msg):
91 (yield self.client.queue(msg.consumer_tag)).put(msg)
92
93+ def basic_return_(self, ch, msg):
94+ self.client.basic_return_queue.put(msg)
95+
96 def channel_close(self, ch, msg):
97 ch.close(msg)
98
99
100=== added file 'src/txamqp/contrib/thrift/client.py'
101--- src/txamqp/contrib/thrift/client.py 1970-01-01 00:00:00 +0000
102+++ src/txamqp/contrib/thrift/client.py 2009-06-09 13:29:13 +0000
103@@ -0,0 +1,21 @@
104+from twisted.internet import defer
105+
106+from txamqp.client import TwistedDelegate
107+
108+
109+class ThriftTwistedDelegate(TwistedDelegate):
110+
111+ @defer.inlineCallbacks
112+ def basic_return_(self, ch, msg):
113+ try:
114+ thriftClientName = msg.content['headers']['thriftClientName']
115+ except KeyError:
116+ from twisted.python import log
117+ if 'headers' in msg.content:
118+ log.msg("'headers' not in msg.content: %r" % msg.content)
119+ else:
120+ log.msg("'thriftClientName' not in msg.content headers: %r" %
121+ msg.content['headers'])
122+ else:
123+ (yield self.client.thriftBasicReturnQueue(thriftClientName))\
124+ .put(msg)
125
126=== modified file 'src/txamqp/contrib/thrift/protocol.py'
127--- src/txamqp/contrib/thrift/protocol.py 2009-05-29 00:20:49 +0000
128+++ src/txamqp/contrib/thrift/protocol.py 2009-06-09 13:35:48 +0000
129@@ -2,6 +2,7 @@
130 from txamqp.protocol import AMQClient
131 from txamqp.contrib.thrift.transport import TwistedAMQPTransport
132 from txamqp.content import Content
133+from txamqp.queue import TimeoutDeferredQueue
134
135 from twisted.internet import defer
136 from twisted.python import log
137@@ -19,6 +20,22 @@
138 else:
139 self.replyToField = "reply-to"
140
141+ self.thriftBasicReturnQueueLock = defer.DeferredLock()
142+ self.thriftBasicReturnQueues = {}
143+
144+ @defer.inlineCallbacks
145+ def thriftBasicReturnQueue(self, key):
146+ yield self.thriftBasicReturnQueueLock.acquire()
147+ try:
148+ try:
149+ q = self.thriftBasicReturnQueues[key]
150+ except KeyError:
151+ q = TimeoutDeferredQueue()
152+ self.thriftBasicReturnQueues[key] = q
153+ finally:
154+ self.thriftBasicReturnQueueLock.release()
155+ defer.returnValue(q)
156+
157 @defer.inlineCallbacks
158 def createThriftClient(self, responsesExchange, serviceExchange,
159 routingKey, clientClass, channel=1, responseQueue=None, iprot_factory=None,
160@@ -39,8 +56,11 @@
161
162 log.msg("Consuming messages on queue: %s" % responseQueue)
163
164- amqpTransport = TwistedAMQPTransport(channel, serviceExchange,
165- routingKey, replyTo=responseQueue, replyToField=self.replyToField)
166+ thriftClientName = clientClass.__name__ + routingKey
167+
168+ amqpTransport = TwistedAMQPTransport(
169+ channel, serviceExchange, routingKey, clientName=thriftClientName,
170+ replyTo=responseQueue, replyToField=self.replyToField)
171
172 if iprot_factory is None:
173 iprot_factory = self.factory.iprot_factory
174@@ -54,10 +74,16 @@
175 queue.get().addCallback(self.parseClientMessage, channel, queue,
176 thriftClient, iprot_factory=iprot_factory)
177
178+ basicReturnQueue = yield self.thriftBasicReturnQueue(thriftClientName)
179+
180+ basicReturnQueue.get().addCallback(
181+ self.parseClientUnrouteableMessage, channel, basicReturnQueue,
182+ thriftClient, iprot_factory=iprot_factory)
183+
184 defer.returnValue(thriftClient)
185
186 def parseClientMessage(self, msg, channel, queue, thriftClient,
187- iprot_factory=None):
188+ iprot_factory=None):
189 deliveryTag = msg.delivery_tag
190 tr = TTransport.TMemoryBuffer(msg.content.body)
191 if iprot_factory is None:
192@@ -66,6 +92,12 @@
193 iprot = iprot_factory.getProtocol(tr)
194 (fname, mtype, rseqid) = iprot.readMessageBegin()
195
196+ if rseqid in thriftClient._reqs:
197+ # log.msg('Got reply: fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body))
198+ pass
199+ else:
200+ log.msg('Missing rseqid! fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body))
201+
202 method = getattr(thriftClient, 'recv_' + fname)
203 method(iprot, mtype, rseqid)
204
205@@ -73,6 +105,34 @@
206 queue.get().addCallback(self.parseClientMessage, channel, queue,
207 thriftClient, iprot_factory=iprot_factory)
208
209+ def parseClientUnrouteableMessage(self, msg, channel, queue, thriftClient,
210+ iprot_factory=None):
211+ tr = TTransport.TMemoryBuffer(msg.content.body)
212+ if iprot_factory is None:
213+ iprot = self.factory.iprot_factory.getProtocol(tr)
214+ else:
215+ iprot = iprot_factory.getProtocol(tr)
216+ (fname, mtype, rseqid) = iprot.readMessageBegin()
217+
218+ # log.msg('Got unroutable. fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body))
219+
220+ try:
221+ d = thriftClient._reqs.pop(rseqid)
222+ except KeyError:
223+ # KeyError will occur if the remote Thrift method is oneway,
224+ # since there is no outstanding local request deferred for
225+ # oneway calls.
226+ pass
227+ else:
228+ d.errback(TTransport.TTransportException(
229+ type=TTransport.TTransportException.NOT_OPEN,
230+ message='Unrouteable message, routing key = %r calling function %r'
231+ % (msg.routing_key, fname)))
232+
233+ queue.get().addCallback(
234+ self.parseClientUnrouteableMessage, channel, queue,
235+ thriftClient, iprot_factory=iprot_factory)
236+
237 def parseServerMessage(self, msg, channel, exchange, queue, processor,
238 iprot_factory=None, oprot_factory=None):
239 deliveryTag = msg.delivery_tag
240
241=== modified file 'src/txamqp/contrib/thrift/transport.py'
242--- src/txamqp/contrib/thrift/transport.py 2009-02-11 22:28:45 +0000
243+++ src/txamqp/contrib/thrift/transport.py 2009-06-05 13:34:08 +0000
244@@ -2,18 +2,26 @@
245 from thrift.transport import TTwisted
246
247 class TwistedAMQPTransport(TTwisted.TMessageSenderTransport):
248- def __init__(self, channel, exchange, routingKey, replyTo=None, replyToField=None):
249+ def __init__(self, channel, exchange, routingKey, clientName=None,
250+ replyTo=None, replyToField=None):
251 TTwisted.TMessageSenderTransport.__init__(self)
252 self.channel = channel
253 self.exchange = exchange
254 self.routingKey = routingKey
255+ # clientName is the name of the client class we are trying to get
256+ # the message through to. We need to send it seeing as the message
257+ # may be unroutable and we need a basic return that will tell us
258+ # who were trying to reach.
259+ self.clientName = clientName
260 self.replyTo = replyTo
261 self.replyToField = replyToField
262
263 def sendMessage(self, message):
264 content = Content(body=message)
265+ if self.clientName:
266+ content['headers'] = { 'thriftClientName' : self.clientName }
267 if self.replyTo:
268 content[self.replyToField] = self.replyTo
269
270 self.channel.basic_publish(exchange=self.exchange,
271- routing_key=self.routingKey, content=content)
272+ routing_key=self.routingKey, content=content, mandatory=True)
273
274=== modified file 'src/txamqp/protocol.py'
275--- src/txamqp/protocol.py 2009-06-01 16:35:03 +0000
276+++ src/txamqp/protocol.py 2009-06-09 13:35:48 +0000
277@@ -220,6 +220,7 @@
278 self.started = TwistedEvent()
279
280 self.queueLock = defer.DeferredLock()
281+ self.basic_return_queue = TimeoutDeferredQueue()
282
283 self.queues = {}
284
285@@ -254,7 +255,7 @@
286 finally:
287 self.queueLock.release()
288 defer.returnValue(q)
289-
290+
291 def close(self, reason):
292 for ch in self.channels.values():
293 ch.close(reason)

Subscribers

People subscribed via source and target branches

to status/vote changes: