Merge lp://staging/~txamqpteam/txamqp/support-basic-return into lp://staging/txamqp
- support-basic-return
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | not available |
Proposed branch: | lp://staging/~txamqpteam/txamqp/support-basic-return |
Merge into: | lp://staging/txamqp |
Diff against target: | None lines |
To merge this branch: | bzr merge lp://staging/~txamqpteam/txamqp/support-basic-return |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Dan Di Spaltro | code | Approve | |
Esteve Fernandez | Approve | ||
Review via email:
|
Commit message
Description of the change

Terry Jones (terrycojones) wrote : | # |

Esteve Fernandez (esteve) : | # |

Esteve Fernandez (esteve) wrote : | # |
Dan, given that you're the only one I know using Thrift and AMQP together (apart from Terry and me), could you review this branch? Thanks!
- 22. By Terry Jones
-
Cleaned up handling of KeyError n txamqp/
contrib/ thrift/ client. py (which had a silly logic error - of mine). Added a __repr__ to the Content class in txamqp/content.py. And I now have a reproducible example of 'headers' not being in msg.content when trying to get the thriftClientName in txamqp/ contrib/ thrift/ client. py though haven't tried to dig into why it happens.

Dan Di Spaltro (dan-dispaltro) wrote : | # |
+1
Ran the code works well, very handy to boot, now our services are more reliable. The log stuff could probably be cleaned up a bit and lines shortened but that is just cosmetic.

Esteve Fernandez (esteve) wrote : | # |
> +1
>
> Ran the code works well, very handy to boot, now our services are more
> reliable. The log stuff could probably be cleaned up a bit and lines
> shortened but that is just cosmetic.
Thanks! Just merged it.

Terry Jones (terrycojones) wrote : | # |
Hi Dan
>>>>> "Dan" == Dan Di Spaltro <email address hidden> writes:
Dan> Ran the code works well, very handy to boot, now our services are more
Dan> reliable. The log stuff could probably be cleaned up a bit and lines
Dan> shortened but that is just cosmetic.
The logging in src/txamqp/
referring to) is there due to very the occasional absence of 'headers' in
msg.content. I've seen it happen a couple of times, and had it in
reproducible form (though in the complex setup that is FluidDB, so not easy
to reduce and post for others). But I've not had time to look at it. So I
left the logging there in the hope that someone else might see this pop up
and feel like digging into it :-) I'm sure I will at some point.
It may be benign / harmless. Esteve, I think, came up with a scenario in
which this might happen, but I forget the details.
Terry
Preview Diff
1 | === modified file 'src/examples/client.py' |
2 | --- src/examples/client.py 2009-02-11 22:28:45 +0000 |
3 | +++ src/examples/client.py 2009-06-02 20:33:36 +0000 |
4 | @@ -6,7 +6,7 @@ |
5 | sys.path.insert(0, os.path.join(os.path.abspath(os.path.split(sys.argv[0])[0]), 'gen-py')) |
6 | import tutorial.Calculator |
7 | from tutorial.ttypes import * |
8 | -from thrift.transport import TTwisted |
9 | +from thrift.transport import TTwisted, TTransport |
10 | from thrift.protocol import TBinaryProtocol |
11 | |
12 | from twisted.internet import reactor, defer |
13 | @@ -34,9 +34,15 @@ |
14 | print results |
15 | |
16 | def gotCalculateErrors(error): |
17 | - print "Got an error" |
18 | + error.trap(InvalidOperation) |
19 | + print "Got a calculator error" |
20 | print error.value.why |
21 | |
22 | +def gotTransportError(error): |
23 | + error.trap(TTransport.TTransportException) |
24 | + print "Got an AMQP unroutable message error:" |
25 | + print error.value.message |
26 | + |
27 | @defer.inlineCallbacks |
28 | def prepareClient(client, username, password): |
29 | yield client.authenticate(username, password) |
30 | @@ -48,6 +54,11 @@ |
31 | yield channel.exchange_declare(exchange=responsesExchange, type="direct") |
32 | |
33 | pfactory = TBinaryProtocol.TBinaryProtocolFactory() |
34 | + |
35 | + # To trigger an unroutable message error (caught in the above |
36 | + # gotTransportError errback), change the routing key (i.e., |
37 | + # calculatorKey) in the following to be something invalid, like |
38 | + # calculatorKey + 'xxx'. |
39 | thriftClient = yield client.createThriftClient(responsesExchange, |
40 | servicesExchange, calculatorKey, tutorial.Calculator.Client, |
41 | iprot_factory=pfactory, oprot_factory=pfactory) |
42 | @@ -55,35 +66,35 @@ |
43 | defer.returnValue(thriftClient) |
44 | |
45 | def gotClient(client): |
46 | - d1 = client.ping().addCallback(gotPing) |
47 | + d1 = client.ping().addCallback(gotPing).addErrback(gotTransportError) |
48 | |
49 | - d2 = client.add(1, 2).addCallback(gotAddResults) |
50 | + d2 = client.add(1, 2).addCallback(gotAddResults).addErrback(gotTransportError) |
51 | |
52 | w = Work(num1=2, num2=3, op=Operation.ADD) |
53 | |
54 | d3 = client.calculate(1, w).addCallbacks(gotCalculateResults, |
55 | - gotCalculateErrors) |
56 | + gotCalculateErrors).addErrback(gotTransportError) |
57 | |
58 | w = Work(num1=2, num2=3, op=Operation.SUBTRACT) |
59 | |
60 | d4 = client.calculate(2, w).addCallbacks(gotCalculateResults, |
61 | - gotCalculateErrors) |
62 | + gotCalculateErrors).addErrback(gotTransportError) |
63 | |
64 | w = Work(num1=2, num2=3, op=Operation.MULTIPLY) |
65 | |
66 | d5 = client.calculate(3, w).addCallbacks(gotCalculateResults, |
67 | - gotCalculateErrors) |
68 | + gotCalculateErrors).addErrback(gotTransportError) |
69 | |
70 | w = Work(num1=2, num2=3, op=Operation.DIVIDE) |
71 | |
72 | d6 = client.calculate(4, w).addCallbacks(gotCalculateResults, |
73 | - gotCalculateErrors) |
74 | + gotCalculateErrors).addErrback(gotTransportError) |
75 | |
76 | # This will fire an errback |
77 | w = Work(num1=2, num2=0, op=Operation.DIVIDE) |
78 | |
79 | d7 = client.calculate(5, w).addCallbacks(gotCalculateResults, |
80 | - gotCalculateErrors) |
81 | + gotCalculateErrors).addErrback(gotTransportError) |
82 | |
83 | d8 = client.zip() |
84 | |
85 | |
86 | === modified file 'src/txamqp/client.py' |
87 | --- src/txamqp/client.py 2009-05-28 09:46:03 +0000 |
88 | +++ src/txamqp/client.py 2009-06-02 20:33:36 +0000 |
89 | @@ -36,6 +36,9 @@ |
90 | def basic_deliver(self, ch, msg): |
91 | (yield self.client.queue(msg.consumer_tag)).put(msg) |
92 | |
93 | + def basic_return_(self, ch, msg): |
94 | + self.client.basic_return_queue.put(msg) |
95 | + |
96 | def channel_close(self, ch, msg): |
97 | ch.close(msg) |
98 | |
99 | |
100 | === added file 'src/txamqp/contrib/thrift/client.py' |
101 | --- src/txamqp/contrib/thrift/client.py 1970-01-01 00:00:00 +0000 |
102 | +++ src/txamqp/contrib/thrift/client.py 2009-06-09 13:29:13 +0000 |
103 | @@ -0,0 +1,21 @@ |
104 | +from twisted.internet import defer |
105 | + |
106 | +from txamqp.client import TwistedDelegate |
107 | + |
108 | + |
109 | +class ThriftTwistedDelegate(TwistedDelegate): |
110 | + |
111 | + @defer.inlineCallbacks |
112 | + def basic_return_(self, ch, msg): |
113 | + try: |
114 | + thriftClientName = msg.content['headers']['thriftClientName'] |
115 | + except KeyError: |
116 | + from twisted.python import log |
117 | + if 'headers' in msg.content: |
118 | + log.msg("'headers' not in msg.content: %r" % msg.content) |
119 | + else: |
120 | + log.msg("'thriftClientName' not in msg.content headers: %r" % |
121 | + msg.content['headers']) |
122 | + else: |
123 | + (yield self.client.thriftBasicReturnQueue(thriftClientName))\ |
124 | + .put(msg) |
125 | |
126 | === modified file 'src/txamqp/contrib/thrift/protocol.py' |
127 | --- src/txamqp/contrib/thrift/protocol.py 2009-05-29 00:20:49 +0000 |
128 | +++ src/txamqp/contrib/thrift/protocol.py 2009-06-09 13:35:48 +0000 |
129 | @@ -2,6 +2,7 @@ |
130 | from txamqp.protocol import AMQClient |
131 | from txamqp.contrib.thrift.transport import TwistedAMQPTransport |
132 | from txamqp.content import Content |
133 | +from txamqp.queue import TimeoutDeferredQueue |
134 | |
135 | from twisted.internet import defer |
136 | from twisted.python import log |
137 | @@ -19,6 +20,22 @@ |
138 | else: |
139 | self.replyToField = "reply-to" |
140 | |
141 | + self.thriftBasicReturnQueueLock = defer.DeferredLock() |
142 | + self.thriftBasicReturnQueues = {} |
143 | + |
144 | + @defer.inlineCallbacks |
145 | + def thriftBasicReturnQueue(self, key): |
146 | + yield self.thriftBasicReturnQueueLock.acquire() |
147 | + try: |
148 | + try: |
149 | + q = self.thriftBasicReturnQueues[key] |
150 | + except KeyError: |
151 | + q = TimeoutDeferredQueue() |
152 | + self.thriftBasicReturnQueues[key] = q |
153 | + finally: |
154 | + self.thriftBasicReturnQueueLock.release() |
155 | + defer.returnValue(q) |
156 | + |
157 | @defer.inlineCallbacks |
158 | def createThriftClient(self, responsesExchange, serviceExchange, |
159 | routingKey, clientClass, channel=1, responseQueue=None, iprot_factory=None, |
160 | @@ -39,8 +56,11 @@ |
161 | |
162 | log.msg("Consuming messages on queue: %s" % responseQueue) |
163 | |
164 | - amqpTransport = TwistedAMQPTransport(channel, serviceExchange, |
165 | - routingKey, replyTo=responseQueue, replyToField=self.replyToField) |
166 | + thriftClientName = clientClass.__name__ + routingKey |
167 | + |
168 | + amqpTransport = TwistedAMQPTransport( |
169 | + channel, serviceExchange, routingKey, clientName=thriftClientName, |
170 | + replyTo=responseQueue, replyToField=self.replyToField) |
171 | |
172 | if iprot_factory is None: |
173 | iprot_factory = self.factory.iprot_factory |
174 | @@ -54,10 +74,16 @@ |
175 | queue.get().addCallback(self.parseClientMessage, channel, queue, |
176 | thriftClient, iprot_factory=iprot_factory) |
177 | |
178 | + basicReturnQueue = yield self.thriftBasicReturnQueue(thriftClientName) |
179 | + |
180 | + basicReturnQueue.get().addCallback( |
181 | + self.parseClientUnrouteableMessage, channel, basicReturnQueue, |
182 | + thriftClient, iprot_factory=iprot_factory) |
183 | + |
184 | defer.returnValue(thriftClient) |
185 | |
186 | def parseClientMessage(self, msg, channel, queue, thriftClient, |
187 | - iprot_factory=None): |
188 | + iprot_factory=None): |
189 | deliveryTag = msg.delivery_tag |
190 | tr = TTransport.TMemoryBuffer(msg.content.body) |
191 | if iprot_factory is None: |
192 | @@ -66,6 +92,12 @@ |
193 | iprot = iprot_factory.getProtocol(tr) |
194 | (fname, mtype, rseqid) = iprot.readMessageBegin() |
195 | |
196 | + if rseqid in thriftClient._reqs: |
197 | + # log.msg('Got reply: fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body)) |
198 | + pass |
199 | + else: |
200 | + log.msg('Missing rseqid! fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body)) |
201 | + |
202 | method = getattr(thriftClient, 'recv_' + fname) |
203 | method(iprot, mtype, rseqid) |
204 | |
205 | @@ -73,6 +105,34 @@ |
206 | queue.get().addCallback(self.parseClientMessage, channel, queue, |
207 | thriftClient, iprot_factory=iprot_factory) |
208 | |
209 | + def parseClientUnrouteableMessage(self, msg, channel, queue, thriftClient, |
210 | + iprot_factory=None): |
211 | + tr = TTransport.TMemoryBuffer(msg.content.body) |
212 | + if iprot_factory is None: |
213 | + iprot = self.factory.iprot_factory.getProtocol(tr) |
214 | + else: |
215 | + iprot = iprot_factory.getProtocol(tr) |
216 | + (fname, mtype, rseqid) = iprot.readMessageBegin() |
217 | + |
218 | + # log.msg('Got unroutable. fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body)) |
219 | + |
220 | + try: |
221 | + d = thriftClient._reqs.pop(rseqid) |
222 | + except KeyError: |
223 | + # KeyError will occur if the remote Thrift method is oneway, |
224 | + # since there is no outstanding local request deferred for |
225 | + # oneway calls. |
226 | + pass |
227 | + else: |
228 | + d.errback(TTransport.TTransportException( |
229 | + type=TTransport.TTransportException.NOT_OPEN, |
230 | + message='Unrouteable message, routing key = %r calling function %r' |
231 | + % (msg.routing_key, fname))) |
232 | + |
233 | + queue.get().addCallback( |
234 | + self.parseClientUnrouteableMessage, channel, queue, |
235 | + thriftClient, iprot_factory=iprot_factory) |
236 | + |
237 | def parseServerMessage(self, msg, channel, exchange, queue, processor, |
238 | iprot_factory=None, oprot_factory=None): |
239 | deliveryTag = msg.delivery_tag |
240 | |
241 | === modified file 'src/txamqp/contrib/thrift/transport.py' |
242 | --- src/txamqp/contrib/thrift/transport.py 2009-02-11 22:28:45 +0000 |
243 | +++ src/txamqp/contrib/thrift/transport.py 2009-06-05 13:34:08 +0000 |
244 | @@ -2,18 +2,26 @@ |
245 | from thrift.transport import TTwisted |
246 | |
247 | class TwistedAMQPTransport(TTwisted.TMessageSenderTransport): |
248 | - def __init__(self, channel, exchange, routingKey, replyTo=None, replyToField=None): |
249 | + def __init__(self, channel, exchange, routingKey, clientName=None, |
250 | + replyTo=None, replyToField=None): |
251 | TTwisted.TMessageSenderTransport.__init__(self) |
252 | self.channel = channel |
253 | self.exchange = exchange |
254 | self.routingKey = routingKey |
255 | + # clientName is the name of the client class we are trying to get |
256 | + # the message through to. We need to send it seeing as the message |
257 | + # may be unroutable and we need a basic return that will tell us |
258 | + # who were trying to reach. |
259 | + self.clientName = clientName |
260 | self.replyTo = replyTo |
261 | self.replyToField = replyToField |
262 | |
263 | def sendMessage(self, message): |
264 | content = Content(body=message) |
265 | + if self.clientName: |
266 | + content['headers'] = { 'thriftClientName' : self.clientName } |
267 | if self.replyTo: |
268 | content[self.replyToField] = self.replyTo |
269 | |
270 | self.channel.basic_publish(exchange=self.exchange, |
271 | - routing_key=self.routingKey, content=content) |
272 | + routing_key=self.routingKey, content=content, mandatory=True) |
273 | |
274 | === modified file 'src/txamqp/protocol.py' |
275 | --- src/txamqp/protocol.py 2009-06-01 16:35:03 +0000 |
276 | +++ src/txamqp/protocol.py 2009-06-09 13:35:48 +0000 |
277 | @@ -220,6 +220,7 @@ |
278 | self.started = TwistedEvent() |
279 | |
280 | self.queueLock = defer.DeferredLock() |
281 | + self.basic_return_queue = TimeoutDeferredQueue() |
282 | |
283 | self.queues = {} |
284 | |
285 | @@ -254,7 +255,7 @@ |
286 | finally: |
287 | self.queueLock.release() |
288 | defer.returnValue(q) |
289 | - |
290 | + |
291 | def close(self, reason): |
292 | for ch in self.channels.values(): |
293 | ch.close(reason) |
This is work done with Esteve to add support for the AMQP basic return message. It adds a mandatory argument to outgoing messages, resulting in a basic return if the message is not routable.
There is an outstanding issue with what, if anything, to do with Thrift oneway methods. They have no client side deferred whose errback can be called if they are unroutable.