Merge lp://staging/~txamqpteam/txamqp/sync-with-thrift into lp://staging/txamqp
- sync-with-thrift
- Merge into trunk
Proposed by
Dan Di Spaltro
Status: | Merged |
---|---|
Approved by: | Esteve Fernandez |
Approved revision: | 14 |
Merged at revision: | not available |
Proposed branch: | lp://staging/~txamqpteam/txamqp/sync-with-thrift |
Merge into: | lp://staging/txamqp |
Diff against target: | None lines |
To merge this branch: | bzr merge lp://staging/~txamqpteam/txamqp/sync-with-thrift |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Esteve Fernandez | Approve | ||
Review via email:
|
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message

Dan Di Spaltro (dan-dispaltro) wrote : | # |
Revision history for this message

Esteve Fernandez (esteve) : | # |
review:
Approve
Revision history for this message

Esteve Fernandez (esteve) wrote : | # |
> This should be merged in to the main branch since it follows thrift
> development much more closely. It seems fairly stable, we use it in over 8
> production apps.
Thanks for taking the time to review it, I just pushed it to trunk.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'src/examples/README' |
2 | --- src/examples/README 2008-11-20 17:04:06 +0000 |
3 | +++ src/examples/README 2009-03-06 11:28:34 +0000 |
4 | @@ -26,8 +26,8 @@ |
5 | - General |
6 | |
7 | 1. Download Thrift source code [1] |
8 | -2. Check if it contains support for Twisted, download and apply |
9 | -patch [2] if not. |
10 | +2. Check if it contains support for Twisted, you'll need revision 749795 or |
11 | +greater. |
12 | 3. Compile Thrift with suppport for Python. |
13 | 4. Install Thrift. |
14 | |
15 | @@ -56,9 +56,8 @@ |
16 | is 'guest'). |
17 | - path_to_spec: The path to the AMQP spec that you want to use. Keep in mind |
18 | that depending on the broker you use, you will need a different spec: |
19 | - - RabbitMQ 1.4.0: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml |
20 | - - OpenAMQ 1.2d6: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml |
21 | + - RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml |
22 | + - OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml |
23 | - Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml |
24 | |
25 | 1 - http://incubator.apache.org/thrift/ |
26 | -2 - http://issues.apache.org/jira/browse/THRIFT-148 |
27 | |
28 | === modified file 'src/examples/client.py' |
29 | --- src/examples/client.py 2008-12-08 21:48:31 +0000 |
30 | +++ src/examples/client.py 2009-02-11 22:28:45 +0000 |
31 | @@ -13,9 +13,9 @@ |
32 | from twisted.internet.protocol import ClientCreator |
33 | |
34 | import txamqp.spec |
35 | -from txamqp.protocol import AMQClient |
36 | from txamqp.client import TwistedDelegate |
37 | -from txamqp.contrib.thrift import TwistedAMQPTransport |
38 | +from txamqp.contrib.thrift.transport import TwistedAMQPTransport |
39 | +from txamqp.contrib.thrift.protocol import ThriftAMQClient |
40 | |
41 | servicesExchange = "services" |
42 | responsesExchange = "responses" |
43 | @@ -37,21 +37,9 @@ |
44 | print "Got an error" |
45 | print error.value.why |
46 | |
47 | -def parseClientMessage(msg, channel, queue, pfactory, thriftClient): |
48 | - deliveryTag = msg.delivery_tag |
49 | - tr = TTransport.TMemoryBuffer(msg.content.body) |
50 | - iprot = pfactory.getProtocol(tr) |
51 | - (fname, mtype, rseqid) = iprot.readMessageBegin() |
52 | - |
53 | - m = getattr(thriftClient, 'recv_' + fname) |
54 | - m(iprot, mtype, rseqid) |
55 | - |
56 | - channel.basic_ack(deliveryTag, True) |
57 | - queue.get().addCallback(parseClientMessage, channel, queue, pfactory, thriftClient) |
58 | - |
59 | @defer.inlineCallbacks |
60 | -def prepareClient(client, authentication): |
61 | - yield client.start(authentication) |
62 | +def prepareClient(client, username, password): |
63 | + yield client.authenticate(username, password) |
64 | |
65 | channel = yield client.channel(1) |
66 | |
67 | @@ -59,22 +47,11 @@ |
68 | yield channel.exchange_declare(exchange=servicesExchange, type="direct") |
69 | yield channel.exchange_declare(exchange=responsesExchange, type="direct") |
70 | |
71 | - reply = yield channel.queue_declare(exclusive=True, auto_delete=True) |
72 | - |
73 | - responseQueue = reply.queue |
74 | - |
75 | - yield channel.queue_bind(queue=responseQueue, exchange=responsesExchange, |
76 | - routing_key=responseQueue) |
77 | - |
78 | - amqpTransport = TwistedAMQPTransport(channel, servicesExchange, calculatorKey, |
79 | - replyTo=responseQueue, replyToField=replyToField) |
80 | pfactory = TBinaryProtocol.TBinaryProtocolFactory() |
81 | - tm = TTwisted.TwistedMemoryBuffer(amqpTransport) |
82 | - thriftClient = tutorial.Calculator.Client(tm, pfactory) |
83 | - |
84 | - reply = yield channel.basic_consume(queue=responseQueue) |
85 | - queue = yield client.queue(reply.consumer_tag) |
86 | - queue.get().addCallback(parseClientMessage, channel, queue, pfactory, thriftClient) |
87 | + thriftClient = yield client.createThriftClient(responsesExchange, |
88 | + servicesExchange, calculatorKey, tutorial.Calculator.Client, |
89 | + iprot_factory=pfactory, oprot_factory=pfactory) |
90 | + |
91 | defer.returnValue(thriftClient) |
92 | |
93 | def gotClient(client): |
94 | @@ -82,26 +59,31 @@ |
95 | |
96 | d2 = client.add(1, 2).addCallback(gotAddResults) |
97 | |
98 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.ADD}) |
99 | - |
100 | - d3 = client.calculate(1, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
101 | - |
102 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.SUBTRACT}) |
103 | - |
104 | - d4 = client.calculate(2, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
105 | - |
106 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.MULTIPLY}) |
107 | - |
108 | - d5 = client.calculate(3, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
109 | - |
110 | - w = Work({'num1': 2, 'num2': 3, 'op': Operation.DIVIDE}) |
111 | - |
112 | - d6 = client.calculate(4, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
113 | + w = Work(num1=2, num2=3, op=Operation.ADD) |
114 | + |
115 | + d3 = client.calculate(1, w).addCallbacks(gotCalculateResults, |
116 | + gotCalculateErrors) |
117 | + |
118 | + w = Work(num1=2, num2=3, op=Operation.SUBTRACT) |
119 | + |
120 | + d4 = client.calculate(2, w).addCallbacks(gotCalculateResults, |
121 | + gotCalculateErrors) |
122 | + |
123 | + w = Work(num1=2, num2=3, op=Operation.MULTIPLY) |
124 | + |
125 | + d5 = client.calculate(3, w).addCallbacks(gotCalculateResults, |
126 | + gotCalculateErrors) |
127 | + |
128 | + w = Work(num1=2, num2=3, op=Operation.DIVIDE) |
129 | + |
130 | + d6 = client.calculate(4, w).addCallbacks(gotCalculateResults, |
131 | + gotCalculateErrors) |
132 | |
133 | # This will fire an errback |
134 | - w = Work({'num1': 2, 'num2': 0, 'op': Operation.DIVIDE}) |
135 | + w = Work(num1=2, num2=0, op=Operation.DIVIDE) |
136 | |
137 | - d7 = client.calculate(5, w).addCallbacks(gotCalculateResults, gotCalculateErrors) |
138 | + d7 = client.calculate(5, w).addCallbacks(gotCalculateResults, |
139 | + gotCalculateErrors) |
140 | |
141 | d8 = client.zip() |
142 | |
143 | @@ -126,15 +108,7 @@ |
144 | |
145 | delegate = TwistedDelegate() |
146 | |
147 | - d = ClientCreator(reactor, AMQClient, delegate, vhost, |
148 | + d = ClientCreator(reactor, ThriftAMQClient, delegate, vhost, |
149 | spec).connectTCP(host, port) |
150 | - |
151 | - if (spec.major, spec.minor) == (8, 0): |
152 | - authentication = {"LOGIN": username, "PASSWORD": password} |
153 | - replyToField = "reply to" |
154 | - else: |
155 | - authentication = "\0" + username + "\0" + password |
156 | - replyToField = "reply-to" |
157 | - |
158 | - d.addCallback(prepareClient, authentication).addCallback(gotClient) |
159 | + d.addCallback(prepareClient, username, password).addCallback(gotClient) |
160 | reactor.run() |
161 | |
162 | === modified file 'src/examples/server.py' |
163 | --- src/examples/server.py 2008-10-29 18:31:04 +0000 |
164 | +++ src/examples/server.py 2009-02-11 22:28:45 +0000 |
165 | @@ -13,9 +13,11 @@ |
166 | from twisted.internet.protocol import ClientCreator |
167 | |
168 | import txamqp.spec |
169 | -from txamqp.protocol import AMQClient |
170 | from txamqp.client import TwistedDelegate |
171 | -from txamqp.contrib.thrift import TwistedAMQPTransport |
172 | +from txamqp.contrib.thrift.transport import TwistedAMQPTransport |
173 | +from txamqp.contrib.thrift.protocol import ThriftAMQClient |
174 | + |
175 | +from zope.interface import implements |
176 | |
177 | servicesExchange = "services" |
178 | responsesExchange = "responses" |
179 | @@ -23,6 +25,7 @@ |
180 | calculatorKey = "calculator" |
181 | |
182 | class CalculatorHandler(object): |
183 | + implements(tutorial.Calculator.Iface) |
184 | |
185 | operations = { |
186 | Operation.ADD: int.__add__, |
187 | @@ -35,7 +38,7 @@ |
188 | # Just assume that it may take a long time |
189 | results = self.operations[w.op](w.num1, w.num2) |
190 | d = defer.Deferred() |
191 | - reactor.callLater(3, d.callback, results) |
192 | + reactor.callLater(0, d.callback, results) |
193 | return d |
194 | |
195 | def ping(self): |
196 | @@ -50,30 +53,14 @@ |
197 | try: |
198 | return self._dispatchWork(w) |
199 | except Exception, e: |
200 | - return defer.fail(InvalidOperation({'logid': logid, 'why': e.message})) |
201 | + return defer.fail(InvalidOperation(what=logid, why=e.message)) |
202 | |
203 | def zip(self): |
204 | print "zip() called from client" |
205 | |
206 | -def parseServerMessage(msg, channel, queue, processor, pfactory): |
207 | - deliveryTag = msg.delivery_tag |
208 | - try: |
209 | - replyTo = msg.content[replyToField] |
210 | - except KeyError: |
211 | - replyTo = None |
212 | - tr = TwistedAMQPTransport(channel, responsesExchange, routingKey=replyTo) |
213 | - tmi = TTransport.TMemoryBuffer(msg.content.body) |
214 | - tmo = TTwisted.TwistedMemoryBuffer(tr) |
215 | - iprot = pfactory.getProtocol(tmi) |
216 | - oprot = pfactory.getProtocol(tmo) |
217 | - processor.process(iprot, oprot) |
218 | - channel.basic_ack(deliveryTag, True) |
219 | - queue.get().addCallback(parseServerMessage, channel, queue, processor, |
220 | - pfactory) |
221 | - |
222 | @defer.inlineCallbacks |
223 | -def prepareClient(client, authentication): |
224 | - yield client.start(authentication) |
225 | +def prepareClient(client, username, password): |
226 | + yield client.authenticate(username, password) |
227 | |
228 | channel = yield client.channel(1) |
229 | |
230 | @@ -90,8 +77,8 @@ |
231 | |
232 | reply = yield channel.basic_consume(queue=calculatorQueue) |
233 | queue = yield client.queue(reply.consumer_tag) |
234 | - queue.get().addCallback(parseServerMessage, channel, queue, processor, |
235 | - pfactory) |
236 | + queue.get().addCallback(client.parseServerMessage, channel, responsesExchange, |
237 | + queue, processor, pfactory, pfactory) |
238 | |
239 | if __name__ == '__main__': |
240 | import sys |
241 | @@ -112,15 +99,7 @@ |
242 | |
243 | print 'Starting the server...' |
244 | |
245 | - d = ClientCreator(reactor, AMQClient, delegate, vhost, |
246 | + d = ClientCreator(reactor, ThriftAMQClient, delegate, vhost, |
247 | spec).connectTCP(host, port) |
248 | - |
249 | - if (spec.major, spec.minor) == (8, 0): |
250 | - authentication = {"LOGIN": username, "PASSWORD": password} |
251 | - replyToField = "reply to" |
252 | - else: |
253 | - authentication = "\0" + username + "\0" + password |
254 | - replyToField = "reply-to" |
255 | - |
256 | - d.addCallback(prepareClient, authentication) |
257 | + d.addCallback(prepareClient, username, password) |
258 | reactor.run() |
259 | |
260 | === modified file 'src/txamqp/client.py' |
261 | --- src/txamqp/client.py 2008-10-29 18:31:04 +0000 |
262 | +++ src/txamqp/client.py 2009-02-11 22:28:45 +0000 |
263 | @@ -11,12 +11,15 @@ |
264 | self.alreadyCalled = False |
265 | |
266 | def set(self): |
267 | - deferred, self.deferred = self.deferred, defer.Deferred() |
268 | - deferred.callback(None) |
269 | + self.deferred.callback(True) |
270 | |
271 | def wait(self): |
272 | return self.deferred |
273 | |
274 | + def reset(self): |
275 | + deferred, self.deferred = self.deferred, defer.Deferred() |
276 | + deferred.callback(True) |
277 | + |
278 | class TwistedDelegate(Delegate): |
279 | |
280 | def connection_start(self, ch, msg): |
281 | @@ -40,4 +43,4 @@ |
282 | |
283 | def close(self, reason): |
284 | self.client.closed = True |
285 | - self.client.started.set() |
286 | + self.client.started.reset() |
287 | |
288 | === added directory 'src/txamqp/contrib/thrift' |
289 | === added file 'src/txamqp/contrib/thrift/__init__.py' |
290 | === added file 'src/txamqp/contrib/thrift/protocol.py' |
291 | --- src/txamqp/contrib/thrift/protocol.py 1970-01-01 00:00:00 +0000 |
292 | +++ src/txamqp/contrib/thrift/protocol.py 2009-02-11 22:28:45 +0000 |
293 | @@ -0,0 +1,108 @@ |
294 | +from zope.interface import Interface, Attribute |
295 | +from txamqp.protocol import AMQClient |
296 | +from txamqp.contrib.thrift.transport import TwistedAMQPTransport |
297 | +from txamqp.content import Content |
298 | + |
299 | +from twisted.internet import defer |
300 | +from twisted.python import log |
301 | + |
302 | +from thrift.protocol import TBinaryProtocol |
303 | +from thrift.transport import TTwisted, TTransport |
304 | + |
305 | +class ThriftAMQClient(AMQClient): |
306 | + |
307 | + def __init__(self, *args, **kwargs): |
308 | + AMQClient.__init__(self, *args, **kwargs) |
309 | + |
310 | + if self.check_0_8(): |
311 | + self.replyToField = "reply to" |
312 | + else: |
313 | + self.replyToField = "reply-to" |
314 | + |
315 | + @defer.inlineCallbacks |
316 | + def createThriftClient(self, responsesExchange, serviceExchange, |
317 | + routingKey, clientClass, responseQueue=None, iprot_factory=None, |
318 | + oprot_factory=None): |
319 | + |
320 | + channel = yield self.channel(1) |
321 | + |
322 | + if responseQueue is None: |
323 | + reply = yield channel.queue_declare(exclusive=True, |
324 | + auto_delete=True) |
325 | + |
326 | + responseQueue = reply.queue |
327 | + |
328 | + yield channel.queue_bind(queue=responseQueue, |
329 | + exchange=responsesExchange, routing_key=responseQueue) |
330 | + |
331 | + reply = yield channel.basic_consume(queue=responseQueue) |
332 | + |
333 | + log.msg("Consuming messages on queue: %s" % responseQueue) |
334 | + |
335 | + amqpTransport = TwistedAMQPTransport(channel, serviceExchange, |
336 | + routingKey, replyTo=responseQueue, replyToField=self.replyToField) |
337 | + |
338 | + if iprot_factory is None: |
339 | + iprot_factory = self.factory.iprot_factory |
340 | + |
341 | + if oprot_factory is None: |
342 | + oprot_factory = self.factory.oprot_factory |
343 | + |
344 | + thriftClient = clientClass(amqpTransport, oprot_factory) |
345 | + |
346 | + queue = yield self.queue(reply.consumer_tag) |
347 | + queue.get().addCallback(self.parseClientMessage, channel, queue, |
348 | + thriftClient, iprot_factory=iprot_factory) |
349 | + |
350 | + defer.returnValue(thriftClient) |
351 | + |
352 | + def parseClientMessage(self, msg, channel, queue, thriftClient, |
353 | + iprot_factory=None): |
354 | + deliveryTag = msg.delivery_tag |
355 | + tr = TTransport.TMemoryBuffer(msg.content.body) |
356 | + if iprot_factory is None: |
357 | + iprot = self.factory.iprot_factory.getProtocol(tr) |
358 | + else: |
359 | + iprot = iprot_factory.getProtocol(tr) |
360 | + (fname, mtype, rseqid) = iprot.readMessageBegin() |
361 | + |
362 | + method = getattr(thriftClient, 'recv_' + fname) |
363 | + method(iprot, mtype, rseqid) |
364 | + |
365 | + channel.basic_ack(deliveryTag, True) |
366 | + queue.get().addCallback(self.parseClientMessage, channel, queue, |
367 | + thriftClient, iprot_factory=iprot_factory) |
368 | + |
369 | + def parseServerMessage(self, msg, channel, exchange, queue, processor, |
370 | + iprot_factory=None, oprot_factory=None): |
371 | + deliveryTag = msg.delivery_tag |
372 | + try: |
373 | + replyTo = msg.content[self.replyToField] |
374 | + except KeyError: |
375 | + replyTo = None |
376 | + |
377 | + tmi = TTransport.TMemoryBuffer(msg.content.body) |
378 | + tr = TwistedAMQPTransport(channel, exchange, replyTo) |
379 | + |
380 | + if iprot_factory is None: |
381 | + iprot = self.factory.iprot_factory.getProtocol(tmi) |
382 | + else: |
383 | + iprot = iprot_factory.getProtocol(tmi) |
384 | + |
385 | + if oprot_factory is None: |
386 | + oprot = self.factory.oprot_factory.getProtocol(tr) |
387 | + else: |
388 | + oprot = oprot_factory.getProtocol(tr) |
389 | + |
390 | + d = processor.process(iprot, oprot) |
391 | + channel.basic_ack(deliveryTag, True) |
392 | + |
393 | + return queue.get().addCallback(self.parseServerMessage, channel, |
394 | + exchange, queue, processor, iprot_factory, oprot_factory) |
395 | + |
396 | + |
397 | +class IThriftAMQClientFactory(Interface): |
398 | + |
399 | + iprot_factory = Attribute("Input protocol factory") |
400 | + oprot_factory = Attribute("Input protocol factory") |
401 | + processor = Attribute("Thrift processor") |
402 | |
403 | === added file 'src/txamqp/contrib/thrift/service.py' |
404 | --- src/txamqp/contrib/thrift/service.py 1970-01-01 00:00:00 +0000 |
405 | +++ src/txamqp/contrib/thrift/service.py 2009-02-11 22:28:45 +0000 |
406 | @@ -0,0 +1,7 @@ |
407 | +from zope.interface import Interface, Attribute |
408 | + |
409 | +class IThriftAMQClientFactory(Interface): |
410 | + |
411 | + iprot_factory = Attribute("Input protocol factory") |
412 | + oprot_factory = Attribute("Input protocol factory") |
413 | + processor = Attribute("Thrift processor") |
414 | |
415 | === renamed file 'src/txamqp/contrib/thrift.py' => 'src/txamqp/contrib/thrift/transport.py' |
416 | --- src/txamqp/contrib/thrift.py 2008-10-29 18:31:04 +0000 |
417 | +++ src/txamqp/contrib/thrift/transport.py 2009-02-11 22:28:45 +0000 |
418 | @@ -1,16 +1,17 @@ |
419 | from txamqp.content import Content |
420 | +from thrift.transport import TTwisted |
421 | |
422 | -class TwistedAMQPTransport(object): |
423 | - def __init__(self, channel, exchange, routingKey, replyTo=None, |
424 | - replyToField="reply to"): |
425 | +class TwistedAMQPTransport(TTwisted.TMessageSenderTransport): |
426 | + def __init__(self, channel, exchange, routingKey, replyTo=None, replyToField=None): |
427 | + TTwisted.TMessageSenderTransport.__init__(self) |
428 | self.channel = channel |
429 | self.exchange = exchange |
430 | self.routingKey = routingKey |
431 | self.replyTo = replyTo |
432 | self.replyToField = replyToField |
433 | |
434 | - def write(self, data): |
435 | - content = Content(body=data) |
436 | + def sendMessage(self, message): |
437 | + content = Content(body=message) |
438 | if self.replyTo: |
439 | content[self.replyToField] = self.replyTo |
440 | |
441 | |
442 | === modified file 'src/txamqp/protocol.py' |
443 | --- src/txamqp/protocol.py 2009-02-05 19:22:56 +0000 |
444 | +++ src/txamqp/protocol.py 2009-02-11 22:28:45 +0000 |
445 | @@ -226,6 +226,9 @@ |
446 | self.outgoing.get().addCallback(self.writer) |
447 | self.work.get().addCallback(self.worker) |
448 | |
449 | + def check_0_8(self): |
450 | + return (self.spec.minor, self.spec.major) == (0, 8) |
451 | + |
452 | @defer.inlineCallbacks |
453 | def channel(self, id): |
454 | yield self.channelLock.acquire() |
455 | @@ -297,6 +300,15 @@ |
456 | ch.dispatch(frame, self.work) |
457 | |
458 | @defer.inlineCallbacks |
459 | + def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'): |
460 | + if self.check_0_8(): |
461 | + response = {"LOGIN": username, "PASSWORD": password} |
462 | + else: |
463 | + response = "\0" + username + "\0" + password |
464 | + |
465 | + yield self.start(response, mechanism, locale) |
466 | + |
467 | + @defer.inlineCallbacks |
468 | def start(self, response, mechanism='AMQPLAIN', locale='en_US'): |
469 | self.response = response |
470 | self.mechanism = mechanism |
471 | |
472 | === modified file 'src/txamqp/testlib.py' |
473 | --- src/txamqp/testlib.py 2008-10-29 18:31:04 +0000 |
474 | +++ src/txamqp/testlib.py 2009-02-11 22:28:45 +0000 |
475 | @@ -39,6 +39,9 @@ |
476 | self.user = 'guest' |
477 | self.password = 'guest' |
478 | self.vhost = 'localhost' |
479 | + self.queues = [] |
480 | + self.exchanges = [] |
481 | + self.connectors = [] |
482 | |
483 | @inlineCallbacks |
484 | def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None): |
485 | @@ -56,16 +59,11 @@ |
486 | self.connectors.append(c) |
487 | client = yield onConn |
488 | |
489 | - yield client.start({"LOGIN": user, "PASSWORD": password}) |
490 | + yield client.authenticate(user, password) |
491 | returnValue(client) |
492 | |
493 | @inlineCallbacks |
494 | def setUp(self): |
495 | - self.queues = [] |
496 | - self.exchanges = [] |
497 | - |
498 | - self.connectors = [] |
499 | - |
500 | self.client = yield self.connect() |
501 | |
502 | self.channel = yield self.client.channel(1) |
This should be merged in to the main branch since it follows thrift development much more closely. It seems fairly stable, we use it in over 8 production apps.