Merge lp://staging/~jonathan-stoppani/txamqp/catch-closed-queues into lp://staging/txamqp

Proposed by Jonathan Stoppani
Status: Merged
Merged at revision: not available
Proposed branch: lp://staging/~jonathan-stoppani/txamqp/catch-closed-queues
Merge into: lp://staging/txamqp
Diff against target: None lines
To merge this branch: bzr merge lp://staging/~jonathan-stoppani/txamqp/catch-closed-queues
Reviewer Review Type Date Requested Status
txAMQP Team Pending
Review via email: mp+9745@code.staging.launchpad.net
To post a comment you must log in.
Revision history for this message
Jonathan Stoppani (jonathan-stoppani) wrote :

Solves the incorrect shutdown of the sample client and allows correct channel closing.

The problem was solved by adding a errback to the TimeoutDeferreQueue instance used by ThriftAMQClient which only traps a "Closed()" failure.

As I don't know well the internals of txAMQP could be that some more cleanup code should be added to the errback, but the client is shutting down properly, thus I assumed all worked correctly.

Greets

Revision history for this message
Esteve Fernandez (esteve) wrote :

Sorry for the delay, thanks Jonathan for your patch. It looks great, I just made some small modifications (lp:~esteve/txamqp/catch-closed-queues) to it:

- separated handling of client's and server's errbacks
- added two empty functions (handleClientQueueError and handleClosedServerQueue), which can be overridden by subclasses to provide more fine-grained error handling

Let me know what you think of them. Thanks!

Revision history for this message
Jonathan Stoppani (jonathan-stoppani) wrote :

Hi Esteve,
I just took a look at your changes and I have no issues ;-)

Thanks for reviewing and no problems for the delay...

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/txamqp/contrib/thrift/protocol.py'
--- src/txamqp/contrib/thrift/protocol.py 2009-06-09 13:35:48 +0000
+++ src/txamqp/contrib/thrift/protocol.py 2009-08-06 08:30:08 +0000
@@ -2,7 +2,7 @@
2from txamqp.protocol import AMQClient2from txamqp.protocol import AMQClient
3from txamqp.contrib.thrift.transport import TwistedAMQPTransport3from txamqp.contrib.thrift.transport import TwistedAMQPTransport
4from txamqp.content import Content4from txamqp.content import Content
5from txamqp.queue import TimeoutDeferredQueue5from txamqp.queue import TimeoutDeferredQueue, Closed
66
7from twisted.internet import defer7from twisted.internet import defer
8from twisted.python import log8from twisted.python import log
@@ -71,15 +71,18 @@
71 thriftClient = clientClass(amqpTransport, oprot_factory)71 thriftClient = clientClass(amqpTransport, oprot_factory)
7272
73 queue = yield self.queue(reply.consumer_tag)73 queue = yield self.queue(reply.consumer_tag)
74 queue.get().addCallback(self.parseClientMessage, channel, queue,74 d = queue.get()
75 thriftClient, iprot_factory=iprot_factory)75 d.addCallback(self.parseClientMessage, channel, queue, thriftClient,
76 iprot_factory=iprot_factory)
77 d.addErrback(self.handleClosedQueue)
7678
77 basicReturnQueue = yield self.thriftBasicReturnQueue(thriftClientName)79 basicReturnQueue = yield self.thriftBasicReturnQueue(thriftClientName)
7880
79 basicReturnQueue.get().addCallback(81 d = basicReturnQueue.get()
80 self.parseClientUnrouteableMessage, channel, basicReturnQueue,82 d.addCallback(self.parseClientUnrouteableMessage, channel,
81 thriftClient, iprot_factory=iprot_factory)83 basicReturnQueue, thriftClient, iprot_factory=iprot_factory)
8284 d.addErrback(self.handleClosedQueue)
85
83 defer.returnValue(thriftClient)86 defer.returnValue(thriftClient)
8487
85 def parseClientMessage(self, msg, channel, queue, thriftClient,88 def parseClientMessage(self, msg, channel, queue, thriftClient,
@@ -102,8 +105,11 @@
102 method(iprot, mtype, rseqid)105 method(iprot, mtype, rseqid)
103106
104 channel.basic_ack(deliveryTag, True)107 channel.basic_ack(deliveryTag, True)
105 queue.get().addCallback(self.parseClientMessage, channel, queue,108
106 thriftClient, iprot_factory=iprot_factory)109 d = queue.get()
110 d.addCallback(self.parseClientMessage, channel, queue, thriftClient,
111 iprot_factory=iprot_factory)
112 d.addErrback(self.handleClosedQueue)
107113
108 def parseClientUnrouteableMessage(self, msg, channel, queue, thriftClient,114 def parseClientUnrouteableMessage(self, msg, channel, queue, thriftClient,
109 iprot_factory=None):115 iprot_factory=None):
@@ -129,9 +135,10 @@
129 message='Unrouteable message, routing key = %r calling function %r'135 message='Unrouteable message, routing key = %r calling function %r'
130 % (msg.routing_key, fname)))136 % (msg.routing_key, fname)))
131137
132 queue.get().addCallback(138 d = queue.get()
133 self.parseClientUnrouteableMessage, channel, queue,139 d.addCallback(self.parseClientUnrouteableMessage, channel, queue,
134 thriftClient, iprot_factory=iprot_factory)140 thriftClient, iprot_factory=iprot_factory)
141 d.addErrback(self.handleClosedQueue)
135142
136 def parseServerMessage(self, msg, channel, exchange, queue, processor,143 def parseServerMessage(self, msg, channel, exchange, queue, processor,
137 iprot_factory=None, oprot_factory=None):144 iprot_factory=None, oprot_factory=None):
@@ -157,9 +164,14 @@
157 d = processor.process(iprot, oprot)164 d = processor.process(iprot, oprot)
158 channel.basic_ack(deliveryTag, True)165 channel.basic_ack(deliveryTag, True)
159166
160 queue.get().addCallback(self.parseServerMessage, channel,167 d = queue.get()
161 exchange, queue, processor, iprot_factory, oprot_factory)168 d.addCallback(self.parseServerMessage, channel, exchange, queue,
169 processor, iprot_factory, oprot_factory)
170 d.addErrback(self.handleClosedQueue)
162171
172 def handleClosedQueue(self, failure):
173 # The queue is closed. Catch the exception and cleanup as needed.
174 failure.trap(Closed)
163175
164class IThriftAMQClientFactory(Interface):176class IThriftAMQClientFactory(Interface):
165177

Subscribers

People subscribed via source and target branches

to status/vote changes: