Merge lp://staging/~jonathan-stoppani/txamqp/catch-closed-queues into lp://staging/txamqp

Proposed by Jonathan Stoppani
Status: Merged
Merged at revision: not available
Proposed branch: lp://staging/~jonathan-stoppani/txamqp/catch-closed-queues
Merge into: lp://staging/txamqp
Diff against target: None lines
To merge this branch: bzr merge lp://staging/~jonathan-stoppani/txamqp/catch-closed-queues
Reviewer Review Type Date Requested Status
txAMQP Team Pending
Review via email: mp+9745@code.staging.launchpad.net
To post a comment you must log in.
Revision history for this message
Jonathan Stoppani (jonathan-stoppani) wrote :

Solves the incorrect shutdown of the sample client and allows correct channel closing.

The problem was solved by adding a errback to the TimeoutDeferreQueue instance used by ThriftAMQClient which only traps a "Closed()" failure.

As I don't know well the internals of txAMQP could be that some more cleanup code should be added to the errback, but the client is shutting down properly, thus I assumed all worked correctly.

Greets

Revision history for this message
Esteve Fernandez (esteve) wrote :

Sorry for the delay, thanks Jonathan for your patch. It looks great, I just made some small modifications (lp:~esteve/txamqp/catch-closed-queues) to it:

- separated handling of client's and server's errbacks
- added two empty functions (handleClientQueueError and handleClosedServerQueue), which can be overridden by subclasses to provide more fine-grained error handling

Let me know what you think of them. Thanks!

Revision history for this message
Jonathan Stoppani (jonathan-stoppani) wrote :

Hi Esteve,
I just took a look at your changes and I have no issues ;-)

Thanks for reviewing and no problems for the delay...

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/txamqp/contrib/thrift/protocol.py'
2--- src/txamqp/contrib/thrift/protocol.py 2009-06-09 13:35:48 +0000
3+++ src/txamqp/contrib/thrift/protocol.py 2009-08-06 08:30:08 +0000
4@@ -2,7 +2,7 @@
5 from txamqp.protocol import AMQClient
6 from txamqp.contrib.thrift.transport import TwistedAMQPTransport
7 from txamqp.content import Content
8-from txamqp.queue import TimeoutDeferredQueue
9+from txamqp.queue import TimeoutDeferredQueue, Closed
10
11 from twisted.internet import defer
12 from twisted.python import log
13@@ -71,15 +71,18 @@
14 thriftClient = clientClass(amqpTransport, oprot_factory)
15
16 queue = yield self.queue(reply.consumer_tag)
17- queue.get().addCallback(self.parseClientMessage, channel, queue,
18- thriftClient, iprot_factory=iprot_factory)
19+ d = queue.get()
20+ d.addCallback(self.parseClientMessage, channel, queue, thriftClient,
21+ iprot_factory=iprot_factory)
22+ d.addErrback(self.handleClosedQueue)
23
24 basicReturnQueue = yield self.thriftBasicReturnQueue(thriftClientName)
25-
26- basicReturnQueue.get().addCallback(
27- self.parseClientUnrouteableMessage, channel, basicReturnQueue,
28- thriftClient, iprot_factory=iprot_factory)
29-
30+
31+ d = basicReturnQueue.get()
32+ d.addCallback(self.parseClientUnrouteableMessage, channel,
33+ basicReturnQueue, thriftClient, iprot_factory=iprot_factory)
34+ d.addErrback(self.handleClosedQueue)
35+
36 defer.returnValue(thriftClient)
37
38 def parseClientMessage(self, msg, channel, queue, thriftClient,
39@@ -102,8 +105,11 @@
40 method(iprot, mtype, rseqid)
41
42 channel.basic_ack(deliveryTag, True)
43- queue.get().addCallback(self.parseClientMessage, channel, queue,
44- thriftClient, iprot_factory=iprot_factory)
45+
46+ d = queue.get()
47+ d.addCallback(self.parseClientMessage, channel, queue, thriftClient,
48+ iprot_factory=iprot_factory)
49+ d.addErrback(self.handleClosedQueue)
50
51 def parseClientUnrouteableMessage(self, msg, channel, queue, thriftClient,
52 iprot_factory=None):
53@@ -129,9 +135,10 @@
54 message='Unrouteable message, routing key = %r calling function %r'
55 % (msg.routing_key, fname)))
56
57- queue.get().addCallback(
58- self.parseClientUnrouteableMessage, channel, queue,
59+ d = queue.get()
60+ d.addCallback(self.parseClientUnrouteableMessage, channel, queue,
61 thriftClient, iprot_factory=iprot_factory)
62+ d.addErrback(self.handleClosedQueue)
63
64 def parseServerMessage(self, msg, channel, exchange, queue, processor,
65 iprot_factory=None, oprot_factory=None):
66@@ -157,9 +164,14 @@
67 d = processor.process(iprot, oprot)
68 channel.basic_ack(deliveryTag, True)
69
70- queue.get().addCallback(self.parseServerMessage, channel,
71- exchange, queue, processor, iprot_factory, oprot_factory)
72+ d = queue.get()
73+ d.addCallback(self.parseServerMessage, channel, exchange, queue,
74+ processor, iprot_factory, oprot_factory)
75+ d.addErrback(self.handleClosedQueue)
76
77+ def handleClosedQueue(self, failure):
78+ # The queue is closed. Catch the exception and cleanup as needed.
79+ failure.trap(Closed)
80
81 class IThriftAMQClientFactory(Interface):
82

Subscribers

People subscribed via source and target branches

to status/vote changes: