Merge lp://staging/~txamqpteam/txamqp/sync-with-thrift into lp://staging/txamqp
- sync-with-thrift
- Merge into trunk
Proposed by
Dan Di Spaltro
| Status: | Merged |
|---|---|
| Approved by: | Esteve Fernandez |
| Approved revision: | 14 |
| Merged at revision: | not available |
| Proposed branch: | lp://staging/~txamqpteam/txamqp/sync-with-thrift |
| Merge into: | lp://staging/txamqp |
| Diff against target: | None lines |
| To merge this branch: | bzr merge lp://staging/~txamqpteam/txamqp/sync-with-thrift |
| Related bugs: |
| Reviewer | Review Type | Date Requested | Status |
|---|---|---|---|
| Esteve Fernandez | Approve | ||
|
Review via email:
|
|||
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
| Dan Di Spaltro (dan-dispaltro) wrote : | # |
Revision history for this message
| Esteve Fernandez (esteve) : | # |
review:
Approve
Revision history for this message
| Esteve Fernandez (esteve) wrote : | # |
> This should be merged in to the main branch since it follows thrift
> development much more closely. It seems fairly stable, we use it in over 8
> production apps.
Thanks for taking the time to review it, I just pushed it to trunk.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
| 1 | === modified file 'src/examples/README' |
| 2 | --- src/examples/README 2008-11-20 17:04:06 +0000 |
| 3 | +++ src/examples/README 2009-03-06 11:28:34 +0000 |
| 4 | @@ -26,8 +26,8 @@ |
| 5 | - General |
| 6 | |
| 7 | 1. Download Thrift source code [1] |
| 8 | -2. Check if it contains support for Twisted, download and apply |
| 9 | -patch [2] if not. |
| 10 | +2. Check if it contains support for Twisted, you'll need revision 749795 or |
| 11 | +greater. |
| 12 | 3. Compile Thrift with suppport for Python. |
| 13 | 4. Install Thrift. |
| 14 | |
| 15 | @@ -56,9 +56,8 @@ |
| 16 | is 'guest'). |
| 17 | - path_to_spec: The path to the AMQP spec that you want to use. Keep in mind |
| 18 | that depending on the broker you use, you will need a different spec: |
| 19 | - - RabbitMQ 1.4.0: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml |
| 20 | - - OpenAMQ 1.2d6: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml |
| 21 | + - RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml |
| 22 | + - OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml |
| 23 | - Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml |
| 24 | |
| 25 | 1 - http://incubator.apache.org/thrift/ |
| 26 | -2 - http://issues.apache.org/jira/browse/THRIFT-148 |
| 27 | |
| 28 | === modified file 'src/examples/client.py' |
| 29 | --- src/examples/client.py 2008-12-08 21:48:31 +0000 |
| 30 | +++ src/examples/client.py 2009-02-11 22:28:45 +0000 |
| 31 | @@ -13,9 +13,9 @@ |
| 32 | from twisted.internet.protocol import ClientCreator |
| 33 | |
| 34 | import txamqp.spec |
| 35 | -from txamqp.protocol import AMQClient |
| 36 | from txamqp.client import TwistedDelegate |
| 37 | -from txamqp.contrib.thrift import TwistedAMQPTransport |
| 38 | +from txamqp.contrib.thrift.transport import TwistedAMQPTransport |
| 39 | +from txamqp.contrib.thrift.protocol import ThriftAMQClient |
| 40 | |
| 41 | servicesExchange = "services" |
| 42 | responsesExchange = "responses" |
| 43 | @@ -37,21 +37,9 @@ |
| 44 | print "Got an error" |
| 45 | print error.value.why |
| 46 | |
| 47 | -def parseClientMessage(msg, channel, queue, pfactory, thriftClient): |
| 48 | - deliveryTag = msg.delivery_tag |
| 49 | - tr = TTransport.TMemoryBuffer(msg.content.body) |
| 50 | - iprot = pfactory.getProtocol(tr) |
| 51 | - (fname, mtype, rseqid) = iprot.readMessageBegin() |
| 52 | - |
| 53 | - m = getattr(thriftClient, 'recv_' + fname) |
| 54 | - m(iprot, mtype, rseqid) |
| 55 | - |
| 56 | - channel.basic_ack(deliveryTag, True) |
| 57 | - queue.get().addCallback(parseClientMessage, channel, queue, pfactory, thriftClient) |
| 58 | - |
| 59 | @defer.inlineCallbacks |
| 60 | -def prepareClient(client, authentication): |
| 61 | - yield client.start(authentication) |
| 62 | +def prepareClient(client, username, password): |
| 63 | + yield client.authenticate(username, password) |
| 64 | |
| 65 | channel = yield client.channel(1) |
| 66 | |
| 67 | @@ -59,22 +47,11 @@ |
| 68 | yield channel.exchange_declare(exchange=servicesExchange, type="direct") |
| 69 | yield channel.exchange_declare(exchange=responsesExchange, type="direct") |
| 70 | |
| 71 | - reply = yield channel.queue_declare(exclusive=True, auto_delete=True) |
| 72 | - |
| 73 | - responseQueue = reply.queue |
| 74 | - |
| 75 | - yield channel.queue_bind(queue=responseQueue, exchange=responsesExchange, |
| 76 | - routing_key=responseQueue) |
| 77 | - |
| 78 | - amqpTransport = TwistedAMQPTransport(channel, servicesExchange, calculatorKey, |
| 79 | - replyTo=responseQueue, replyToField=replyToField) |
| 80 | pfactory = TBinaryProtocol.TBinaryProtocolFactory() |
| 81 | - tm = TTwisted.TwistedMemoryBuffer(amqpTransport) |
| 82 | - thriftClient = tutorial.Calculator.Client(tm, pfactory) |
| 83 | - |
| 84 | - reply = yield channel.basic_consume(queue=responseQueue) |
| 85 | - queue = yield client.queue(reply.consumer_tag) |
| 86 | - queue.get().addCallback(parseClientMessage, channel, queue, pfactory, thriftClient) |
| 87 | + thriftClient = yield client.createThriftClient(responsesExchange, |
| 88 | + servicesExchange, calculatorKey, tutorial.Calculator.Client, |
| 89 | + iprot_factory=pfactory, oprot_factory=pfactory) |
| 90 | + |
| 91 | defer.returnValue(thriftClient) |
| 92 | |
| 93 | def gotClient(client): |
| 94 | @@ -82,26 +59,31 @@ |
| 95 | |
| 96 | d2 = client.add(1, 2).addCallback(gotAddResults) |
| 97 | |
| 98 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.ADD}) |
| 99 | - |
| 100 | - d3 = client.calculate(1, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
| 101 | - |
| 102 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.SUBTRACT}) |
| 103 | - |
| 104 | - d4 = client.calculate(2, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
| 105 | - |
| 106 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.MULTIPLY}) |
| 107 | - |
| 108 | - d5 = client.calculate(3, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
| 109 | - |
| 110 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.DIVIDE}) |
| 111 | - |
| 112 | - d6 = client.calculate(4, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
| 113 | + w = Work(num1=2, num2=3, op=Operation.ADD) |
| 114 | + |
| 115 | + d3 = client.calculate(1, w).addCallbacks(gotCalculateResults, |
| 116 | + gotCalculateErrors) |
| 117 | + |
| 118 | + w = Work(num1=2, num2=3, op=Operation.SUBTRACT) |
| 119 | + |
| 120 | + d4 = client.calculate(2, w).addCallbacks(gotCalculateResults, |
| 121 | + gotCalculateErrors) |
| 122 | + |
| 123 | + w = Work(num1=2, num2=3, op=Operation.MULTIPLY) |
| 124 | + |
| 125 | + d5 = client.calculate(3, w).addCallbacks(gotCalculateResults, |
| 126 | + gotCalculateErrors) |
| 127 | + |
| 128 | + w = Work(num1=2, num2=3, op=Operation.DIVIDE) |
| 129 | + |
| 130 | + d6 = client.calculate(4, w).addCallbacks(gotCalculateResults, |
| 131 | + gotCalculateErrors) |
| 132 | |
| 133 | # This will fire an errback |
| 134 | - w = Work({'num1': 2, 'num2': 0, 'op': Operation.DIVIDE}) |
| 135 | + w = Work(num1=2, num2=0, op=Operation.DIVIDE) |
| 136 | |
| 137 | - d7 = client.calculate(5, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
| 138 | + d7 = client.calculate(5, w).addCallbacks(gotCalculateResults, |
| 139 | + gotCalculateErrors) |
| 140 | |
| 141 | d8 = client.zip() |
| 142 | |
| 143 | @@ -126,15 +108,7 @@ |
| 144 | |
| 145 | delegate = TwistedDelegate() |
| 146 | |
| 147 | - d = ClientCreator(reactor, AMQClient, delegate, vhost, |
| 148 | + d = ClientCreator(reactor, ThriftAMQClient, delegate, vhost, |
| 149 | spec).connectTCP(host, port) |
| 150 | - |
| 151 | - if (spec.major, spec.minor) == (8, 0): |
| 152 | - authentication = {"LOGIN": username, "PASSWORD": password} |
| 153 | - replyToField = "reply to" |
| 154 | - else: |
| 155 | - authentication = "\0" + username + "\0" + password |
| 156 | - replyToField = "reply-to" |
| 157 | - |
| 158 | - d.addCallback(prepareClient, authentication).addCallback(gotClient) |
| 159 | + d.addCallback(prepareClient, username, password).addCallback(gotClient) |
| 160 | reactor.run() |
| 161 | |
| 162 | === modified file 'src/examples/server.py' |
| 163 | --- src/examples/server.py 2008-10-29 18:31:04 +0000 |
| 164 | +++ src/examples/server.py 2009-02-11 22:28:45 +0000 |
| 165 | @@ -13,9 +13,11 @@ |
| 166 | from twisted.internet.protocol import ClientCreator |
| 167 | |
| 168 | import txamqp.spec |
| 169 | -from txamqp.protocol import AMQClient |
| 170 | from txamqp.client import TwistedDelegate |
| 171 | -from txamqp.contrib.thrift import TwistedAMQPTransport |
| 172 | +from txamqp.contrib.thrift.transport import TwistedAMQPTransport |
| 173 | +from txamqp.contrib.thrift.protocol import ThriftAMQClient |
| 174 | + |
| 175 | +from zope.interface import implements |
| 176 | |
| 177 | servicesExchange = "services" |
| 178 | responsesExchange = "responses" |
| 179 | @@ -23,6 +25,7 @@ |
| 180 | calculatorKey = "calculator" |
| 181 | |
| 182 | class CalculatorHandler(object): |
| 183 | + implements(tutorial.Calculator.Iface) |
| 184 | |
| 185 | operations = { |
| 186 | Operation.ADD: int.__add__, |
| 187 | @@ -35,7 +38,7 @@ |
| 188 | # Just assume that it may take a long time |
| 189 | results = self.operations[w.op](w.num1, w.num2) |
| 190 | d = defer.Deferred() |
| 191 | - reactor.callLater(3, d.callback, results) |
| 192 | + reactor.callLater(0, d.callback, results) |
| 193 | return d |
| 194 | |
| 195 | def ping(self): |
| 196 | @@ -50,30 +53,14 @@ |
| 197 | try: |
| 198 | return self._dispatchWork(w) |
| 199 | except Exception, e: |
| 200 | - return defer.fail(InvalidOperation({'logid': logid, 'why': e.message})) |
| 201 | + return defer.fail(InvalidOperation(what=logid, why=e.message)) |
| 202 | |
| 203 | def zip(self): |
| 204 | print "zip() called from client" |
| 205 | |
| 206 | -def parseServerMessage(msg, channel, queue, processor, pfactory): |
| 207 | - deliveryTag = msg.delivery_tag |
| 208 | - try: |
| 209 | - replyTo = msg.content[replyToField] |
| 210 | - except KeyError: |
| 211 | - replyTo = None |
| 212 | - tr = TwistedAMQPTransport(channel, responsesExchange, routingKey=replyTo) |
| 213 | - tmi = TTransport.TMemoryBuffer(msg.content.body) |
| 214 | - tmo = TTwisted.TwistedMemoryBuffer(tr) |
| 215 | - iprot = pfactory.getProtocol(tmi) |
| 216 | - oprot = pfactory.getProtocol(tmo) |
| 217 | - processor.process(iprot, oprot) |
| 218 | - channel.basic_ack(deliveryTag, True) |
| 219 | - queue.get().addCallback(parseServerMessage, channel, queue, processor, |
| 220 | - pfactory) |
| 221 | - |
| 222 | @defer.inlineCallbacks |
| 223 | -def prepareClient(client, authentication): |
| 224 | - yield client.start(authentication) |
| 225 | +def prepareClient(client, username, password): |
| 226 | + yield client.authenticate(username, password) |
| 227 | |
| 228 | channel = yield client.channel(1) |
| 229 | |
| 230 | @@ -90,8 +77,8 @@ |
| 231 | |
| 232 | reply = yield channel.basic_consume(queue=calculatorQueue) |
| 233 | queue = yield client.queue(reply.consumer_tag) |
| 234 | - queue.get().addCallback(parseServerMessage, channel, queue, processor, |
| 235 | - pfactory) |
| 236 | + queue.get().addCallback(client.parseServerMessage, channel, responsesExchange, |
| 237 | + queue, processor, pfactory, pfactory) |
| 238 | |
| 239 | if __name__ == '__main__': |
| 240 | import sys |
| 241 | @@ -112,15 +99,7 @@ |
| 242 | |
| 243 | print 'Starting the server...' |
| 244 | |
| 245 | - d = ClientCreator(reactor, AMQClient, delegate, vhost, |
| 246 | + d = ClientCreator(reactor, ThriftAMQClient, delegate, vhost, |
| 247 | spec).connectTCP(host, port) |
| 248 | - |
| 249 | - if (spec.major, spec.minor) == (8, 0): |
| 250 | - authentication = {"LOGIN": username, "PASSWORD": password} |
| 251 | - replyToField = "reply to" |
| 252 | - else: |
| 253 | - authentication = "\0" + username + "\0" + password |
| 254 | - replyToField = "reply-to" |
| 255 | - |
| 256 | - d.addCallback(prepareClient, authentication) |
| 257 | + d.addCallback(prepareClient, username, password) |
| 258 | reactor.run() |
| 259 | |
| 260 | === modified file 'src/txamqp/client.py' |
| 261 | --- src/txamqp/client.py 2008-10-29 18:31:04 +0000 |
| 262 | +++ src/txamqp/client.py 2009-02-11 22:28:45 +0000 |
| 263 | @@ -11,12 +11,15 @@ |
| 264 | self.alreadyCalled = False |
| 265 | |
| 266 | def set(self): |
| 267 | - deferred, self.deferred = self.deferred, defer.Deferred() |
| 268 | - deferred.callback(None) |
| 269 | + self.deferred.callback(True) |
| 270 | |
| 271 | def wait(self): |
| 272 | return self.deferred |
| 273 | |
| 274 | + def reset(self): |
| 275 | + deferred, self.deferred = self.deferred, defer.Deferred() |
| 276 | + deferred.callback(True) |
| 277 | + |
| 278 | class TwistedDelegate(Delegate): |
| 279 | |
| 280 | def connection_start(self, ch, msg): |
| 281 | @@ -40,4 +43,4 @@ |
| 282 | |
| 283 | def close(self, reason): |
| 284 | self.client.closed = True |
| 285 | - self.client.started.set() |
| 286 | + self.client.started.reset() |
| 287 | |
| 288 | === added directory 'src/txamqp/contrib/thrift' |
| 289 | === added file 'src/txamqp/contrib/thrift/__init__.py' |
| 290 | === added file 'src/txamqp/contrib/thrift/protocol.py' |
| 291 | --- src/txamqp/contrib/thrift/protocol.py 1970-01-01 00:00:00 +0000 |
| 292 | +++ src/txamqp/contrib/thrift/protocol.py 2009-02-11 22:28:45 +0000 |
| 293 | @@ -0,0 +1,108 @@ |
| 294 | +from zope.interface import Interface, Attribute |
| 295 | +from txamqp.protocol import AMQClient |
| 296 | +from txamqp.contrib.thrift.transport import TwistedAMQPTransport |
| 297 | +from txamqp.content import Content |
| 298 | + |
| 299 | +from twisted.internet import defer |
| 300 | +from twisted.python import log |
| 301 | + |
| 302 | +from thrift.protocol import TBinaryProtocol |
| 303 | +from thrift.transport import TTwisted, TTransport |
| 304 | + |
| 305 | +class ThriftAMQClient(AMQClient): |
| 306 | + |
| 307 | + def __init__(self, *args, **kwargs): |
| 308 | + AMQClient.__init__(self, *args, **kwargs) |
| 309 | + |
| 310 | + if self.check_0_8(): |
| 311 | + self.replyToField = "reply to" |
| 312 | + else: |
| 313 | + self.replyToField = "reply-to" |
| 314 | + |
| 315 | + @defer.inlineCallbacks |
| 316 | + def createThriftClient(self, responsesExchange, serviceExchange, |
| 317 | + routingKey, clientClass, responseQueue=None, iprot_factory=None, |
| 318 | + oprot_factory=None): |
| 319 | + |
| 320 | + channel = yield self.channel(1) |
| 321 | + |
| 322 | + if responseQueue is None: |
| 323 | + reply = yield channel.queue_declare(exclusive=True, |
| 324 | + auto_delete=True) |
| 325 | + |
| 326 | + responseQueue = reply.queue |
| 327 | + |
| 328 | + yield channel.queue_bind(queue=responseQueue, |
| 329 | + exchange=responsesExchange, routing_key=responseQueue) |
| 330 | + |
| 331 | + reply = yield channel.basic_consume(queue=responseQueue) |
| 332 | + |
| 333 | + log.msg("Consuming messages on queue: %s" % responseQueue) |
| 334 | + |
| 335 | + amqpTransport = TwistedAMQPTransport(channel, serviceExchange, |
| 336 | + routingKey, replyTo=responseQueue, replyToField=self.replyToField) |
| 337 | + |
| 338 | + if iprot_factory is None: |
| 339 | + iprot_factory = self.factory.iprot_factory |
| 340 | + |
| 341 | + if oprot_factory is None: |
| 342 | + oprot_factory = self.factory.oprot_factory |
| 343 | + |
| 344 | + thriftClient = clientClass(amqpTransport, oprot_factory) |
| 345 | + |
| 346 | + queue = yield self.queue(reply.consumer_tag) |
| 347 | + queue.get().addCallback(self.parseClientMessage, channel, queue, |
| 348 | + thriftClient, iprot_factory=iprot_factory) |
| 349 | + |
| 350 | + defer.returnValue(thriftClient) |
| 351 | + |
| 352 | + def parseClientMessage(self, msg, channel, queue, thriftClient, |
| 353 | + iprot_factory=None): |
| 354 | + deliveryTag = msg.delivery_tag |
| 355 | + tr = TTransport.TMemoryBuffer(msg.content.body) |
| 356 | + if iprot_factory is None: |
| 357 | + iprot = self.factory.iprot_factory.getProtocol(tr) |
| 358 | + else: |
| 359 | + iprot = iprot_factory.getProtocol(tr) |
| 360 | + (fname, mtype, rseqid) = iprot.readMessageBegin() |
| 361 | + |
| 362 | + method = getattr(thriftClient, 'recv_' + fname) |
| 363 | + method(iprot, mtype, rseqid) |
| 364 | + |
| 365 | + channel.basic_ack(deliveryTag, True) |
| 366 | + queue.get().addCallback(self.parseClientMessage, channel, queue, |
| 367 | + thriftClient, iprot_factory=iprot_factory) |
| 368 | + |
| 369 | + def parseServerMessage(self, msg, channel, exchange, queue, processor, |
| 370 | + iprot_factory=None, oprot_factory=None): |
| 371 | + deliveryTag = msg.delivery_tag |
| 372 | + try: |
| 373 | + replyTo = msg.content[self.replyToField] |
| 374 | + except KeyError: |
| 375 | + replyTo = None |
| 376 | + |
| 377 | + tmi = TTransport.TMemoryBuffer(msg.content.body) |
| 378 | + tr = TwistedAMQPTransport(channel, exchange, replyTo) |
| 379 | + |
| 380 | + if iprot_factory is None: |
| 381 | + iprot = self.factory.iprot_factory.getProtocol(tmi) |
| 382 | + else: |
| 383 | + iprot = iprot_factory.getProtocol(tmi) |
| 384 | + |
| 385 | + if oprot_factory is None: |
| 386 | + oprot = self.factory.oprot_factory.getProtocol(tr) |
| 387 | + else: |
| 388 | + oprot = oprot_factory.getProtocol(tr) |
| 389 | + |
| 390 | + d = processor.process(iprot, oprot) |
| 391 | + channel.basic_ack(deliveryTag, True) |
| 392 | + |
| 393 | + return queue.get().addCallback(self.parseServerMessage, channel, |
| 394 | + exchange, queue, processor, iprot_factory, oprot_factory) |
| 395 | + |
| 396 | + |
| 397 | +class IThriftAMQClientFactory(Interface): |
| 398 | + |
| 399 | + iprot_factory = Attribute("Input protocol factory") |
| 400 | + oprot_factory = Attribute("Input protocol factory") |
| 401 | + processor = Attribute("Thrift processor") |
| 402 | |
| 403 | === added file 'src/txamqp/contrib/thrift/service.py' |
| 404 | --- src/txamqp/contrib/thrift/service.py 1970-01-01 00:00:00 +0000 |
| 405 | +++ src/txamqp/contrib/thrift/service.py 2009-02-11 22:28:45 +0000 |
| 406 | @@ -0,0 +1,7 @@ |
| 407 | +from zope.interface import Interface, Attribute |
| 408 | + |
| 409 | +class IThriftAMQClientFactory(Interface): |
| 410 | + |
| 411 | + iprot_factory = Attribute("Input protocol factory") |
| 412 | + oprot_factory = Attribute("Input protocol factory") |
| 413 | + processor = Attribute("Thrift processor") |
| 414 | |
| 415 | === renamed file 'src/txamqp/contrib/thrift.py' => 'src/txamqp/contrib/thrift/transport.py' |
| 416 | --- src/txamqp/contrib/thrift.py 2008-10-29 18:31:04 +0000 |
| 417 | +++ src/txamqp/contrib/thrift/transport.py 2009-02-11 22:28:45 +0000 |
| 418 | @@ -1,16 +1,17 @@ |
| 419 | from txamqp.content import Content |
| 420 | +from thrift.transport import TTwisted |
| 421 | |
| 422 | -class TwistedAMQPTransport(object): |
| 423 | - def __init__(self, channel, exchange, routingKey, replyTo=None, |
| 424 | - replyToField="reply to"): |
| 425 | +class TwistedAMQPTransport(TTwisted.TMessageSenderTransport): |
| 426 | + def __init__(self, channel, exchange, routingKey, replyTo=None, replyToField=None): |
| 427 | + TTwisted.TMessageSenderTransport.__init__(self) |
| 428 | self.channel = channel |
| 429 | self.exchange = exchange |
| 430 | self.routingKey = routingKey |
| 431 | self.replyTo = replyTo |
| 432 | self.replyToField = replyToField |
| 433 | |
| 434 | - def write(self, data): |
| 435 | - content = Content(body=data) |
| 436 | + def sendMessage(self, message): |
| 437 | + content = Content(body=message) |
| 438 | if self.replyTo: |
| 439 | content[self.replyToField] = self.replyTo |
| 440 | |
| 441 | |
| 442 | === modified file 'src/txamqp/protocol.py' |
| 443 | --- src/txamqp/protocol.py 2009-02-05 19:22:56 +0000 |
| 444 | +++ src/txamqp/protocol.py 2009-02-11 22:28:45 +0000 |
| 445 | @@ -226,6 +226,9 @@ |
| 446 | self.outgoing.get().addCallback(self.writer) |
| 447 | self.work.get().addCallback(self.worker) |
| 448 | |
| 449 | + def check_0_8(self): |
| 450 | + return (self.spec.minor, self.spec.major) == (0, 8) |
| 451 | + |
| 452 | @defer.inlineCallbacks |
| 453 | def channel(self, id): |
| 454 | yield self.channelLock.acquire() |
| 455 | @@ -297,6 +300,15 @@ |
| 456 | ch.dispatch(frame, self.work) |
| 457 | |
| 458 | @defer.inlineCallbacks |
| 459 | + def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'): |
| 460 | + if self.check_0_8(): |
| 461 | + response = {"LOGIN": username, "PASSWORD": password} |
| 462 | + else: |
| 463 | + response = "\0" + username + "\0" + password |
| 464 | + |
| 465 | + yield self.start(response, mechanism, locale) |
| 466 | + |
| 467 | + @defer.inlineCallbacks |
| 468 | def start(self, response, mechanism='AMQPLAIN', locale='en_US'): |
| 469 | self.response = response |
| 470 | self.mechanism = mechanism |
| 471 | |
| 472 | === modified file 'src/txamqp/testlib.py' |
| 473 | --- src/txamqp/testlib.py 2008-10-29 18:31:04 +0000 |
| 474 | +++ src/txamqp/testlib.py 2009-02-11 22:28:45 +0000 |
| 475 | @@ -39,6 +39,9 @@ |
| 476 | self.user = 'guest' |
| 477 | self.password = 'guest' |
| 478 | self.vhost = 'localhost' |
| 479 | + self.queues = [] |
| 480 | + self.exchanges = [] |
| 481 | + self.connectors = [] |
| 482 | |
| 483 | @inlineCallbacks |
| 484 | def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None): |
| 485 | @@ -56,16 +59,11 @@ |
| 486 | self.connectors.append(c) |
| 487 | client = yield onConn |
| 488 | |
| 489 | - yield client.start({"LOGIN": user, "PASSWORD": password}) |
| 490 | + yield client.authenticate(user, password) |
| 491 | returnValue(client) |
| 492 | |
| 493 | @inlineCallbacks |
| 494 | def setUp(self): |
| 495 | - self.queues = [] |
| 496 | - self.exchanges = [] |
| 497 | - |
| 498 | - self.connectors = [] |
| 499 | - |
| 500 | self.client = yield self.connect() |
| 501 | |
| 502 | self.channel = yield self.client.channel(1) |
This should be merged in to the main branch since it follows thrift development much more closely. It seems fairly stable, we use it in over 8 production apps.