Merge lp://staging/~elachuni/txamqp/heartbeat-loopingcalls into lp://staging/txamqp
- heartbeat-loopingcalls
- Merge into trunk
Proposed by
Anthony Lenton
Status: | Merged |
---|---|
Merged at revision: | not available |
Proposed branch: | lp://staging/~elachuni/txamqp/heartbeat-loopingcalls |
Merge into: | lp://staging/txamqp |
Diff against target: | None lines |
To merge this branch: | bzr merge lp://staging/~elachuni/txamqp/heartbeat-loopingcalls |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
txAMQP Team | Pending | ||
Review via email: mp+9503@code.staging.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Anthony Lenton (elachuni) wrote : | # |
Revision history for this message
Esteve Fernandez (esteve) wrote : | # |
Hi Anthony, sorry for the delay. Thank you very much for your patch.
> This is a branch that implements heartbeat frames using LoopingCall, per
> Esteve's suggestion.
I modified it a bit (lp:~esteve/txamqp/heartbeat-loopingcalls):
- added configurable heartbeats and client classes in testcases, so overriding connect and setUp is not needed, making HeartbeatTests a bit shorter
- re-added lastSent and lastReceived from your original patch. I didn't realize it at first, sorry, but now I think it's a great idea for accounting purposes
What do you think of it?
Thanks for your continuing contributions!
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'src/txamqp/client.py' | |||
2 | --- src/txamqp/client.py 2009-05-28 09:46:03 +0000 | |||
3 | +++ src/txamqp/client.py 2009-06-11 19:45:31 +0000 | |||
4 | @@ -29,7 +29,8 @@ | |||
5 | 29 | 29 | ||
6 | 30 | def connection_tune(self, ch, msg): | 30 | def connection_tune(self, ch, msg): |
7 | 31 | self.client.MAX_LENGTH = msg.frame_max | 31 | self.client.MAX_LENGTH = msg.frame_max |
9 | 32 | ch.connection_tune_ok(*msg.fields) | 32 | args = msg.channel_max, msg.frame_max, self.client.heartbeatInterval |
10 | 33 | ch.connection_tune_ok(*args) | ||
11 | 33 | self.client.started.reset() | 34 | self.client.started.reset() |
12 | 34 | 35 | ||
13 | 35 | @defer.inlineCallbacks | 36 | @defer.inlineCallbacks |
14 | 36 | 37 | ||
15 | === modified file 'src/txamqp/connection.py' | |||
16 | --- src/txamqp/connection.py 2008-10-29 18:31:04 +0000 | |||
17 | +++ src/txamqp/connection.py 2009-04-27 17:00:08 +0000 | |||
18 | @@ -196,3 +196,15 @@ | |||
19 | 196 | 196 | ||
20 | 197 | def __str__(self): | 197 | def __str__(self): |
21 | 198 | return "Body(%r)" % self.content | 198 | return "Body(%r)" % self.content |
22 | 199 | |||
23 | 200 | class Heartbeat(Payload): | ||
24 | 201 | type = Frame.HEARTBEAT | ||
25 | 202 | def __str__(self): | ||
26 | 203 | return "Heartbeat()" | ||
27 | 204 | |||
28 | 205 | def encode(self, enc): | ||
29 | 206 | enc.encode_long(0) | ||
30 | 207 | |||
31 | 208 | def decode(spec, dec): | ||
32 | 209 | dec.decode_long() | ||
33 | 210 | return Heartbeat() | ||
34 | 199 | 211 | ||
35 | === modified file 'src/txamqp/protocol.py' | |||
36 | --- src/txamqp/protocol.py 2009-06-18 08:13:49 +0000 | |||
37 | +++ src/txamqp/protocol.py 2009-07-31 11:43:51 +0000 | |||
38 | @@ -1,16 +1,18 @@ | |||
39 | 1 | # coding: utf-8 | 1 | # coding: utf-8 |
40 | 2 | from twisted.python import log | 2 | from twisted.python import log |
42 | 3 | from twisted.internet import defer, protocol | 3 | from twisted.internet import defer, protocol, reactor |
43 | 4 | from twisted.internet.task import LoopingCall | ||
44 | 4 | from twisted.protocols import basic | 5 | from twisted.protocols import basic |
45 | 5 | from txamqp import spec | 6 | from txamqp import spec |
46 | 6 | from txamqp.codec import Codec, EOF | 7 | from txamqp.codec import Codec, EOF |
48 | 7 | from txamqp.connection import Header, Frame, Method, Body | 8 | from txamqp.connection import Header, Frame, Method, Body, Heartbeat |
49 | 8 | from txamqp.message import Message | 9 | from txamqp.message import Message |
50 | 9 | from txamqp.content import Content | 10 | from txamqp.content import Content |
51 | 10 | from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed | 11 | from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed |
52 | 11 | from txamqp.client import TwistedEvent, TwistedDelegate, Closed | 12 | from txamqp.client import TwistedEvent, TwistedDelegate, Closed |
53 | 12 | from cStringIO import StringIO | 13 | from cStringIO import StringIO |
54 | 13 | import struct | 14 | import struct |
55 | 15 | from time import time | ||
56 | 14 | 16 | ||
57 | 15 | class GarbageException(Exception): | 17 | class GarbageException(Exception): |
58 | 16 | pass | 18 | pass |
59 | @@ -27,10 +29,10 @@ | |||
60 | 27 | self.responses = TimeoutDeferredQueue() | 29 | self.responses = TimeoutDeferredQueue() |
61 | 28 | 30 | ||
62 | 29 | self.queue = None | 31 | self.queue = None |
64 | 30 | 32 | ||
65 | 31 | self.closed = False | 33 | self.closed = False |
66 | 32 | self.reason = None | 34 | self.reason = None |
68 | 33 | 35 | ||
69 | 34 | def close(self, reason): | 36 | def close(self, reason): |
70 | 35 | if self.closed: | 37 | if self.closed: |
71 | 36 | return | 38 | return |
72 | @@ -200,7 +202,10 @@ | |||
73 | 200 | 202 | ||
74 | 201 | channelClass = AMQChannel | 203 | channelClass = AMQChannel |
75 | 202 | 204 | ||
77 | 203 | def __init__(self, delegate, vhost, *args, **kwargs): | 205 | # Max unreceived heartbeat frames. The AMQP standard says it's 3. |
78 | 206 | MAX_UNSEEN_HEARTBEAT = 3 | ||
79 | 207 | |||
80 | 208 | def __init__(self, delegate, vhost, heartbeat=0, *args, **kwargs): | ||
81 | 204 | FrameReceiver.__init__(self, *args, **kwargs) | 209 | FrameReceiver.__init__(self, *args, **kwargs) |
82 | 205 | self.delegate = delegate | 210 | self.delegate = delegate |
83 | 206 | 211 | ||
84 | @@ -225,6 +230,27 @@ | |||
85 | 225 | 230 | ||
86 | 226 | self.outgoing.get().addCallback(self.writer) | 231 | self.outgoing.get().addCallback(self.writer) |
87 | 227 | self.work.get().addCallback(self.worker) | 232 | self.work.get().addCallback(self.worker) |
88 | 233 | self.heartbeatInterval = heartbeat | ||
89 | 234 | self.checkHB = None | ||
90 | 235 | self.sendHB = None | ||
91 | 236 | if self.heartbeatInterval > 0: | ||
92 | 237 | d = self.started.wait() | ||
93 | 238 | d.addCallback(self.reschedule_sendHB) | ||
94 | 239 | d.addCallback(self.reschedule_checkHB) | ||
95 | 240 | |||
96 | 241 | def reschedule_sendHB(self, dummy=None): | ||
97 | 242 | if self.heartbeatInterval > 0: | ||
98 | 243 | if self.sendHB is None: | ||
99 | 244 | self.sendHB = LoopingCall(self.sendHeartbeat) | ||
100 | 245 | elif self.sendHB.running: | ||
101 | 246 | self.sendHB.stop() | ||
102 | 247 | self.sendHB.start(self.heartbeatInterval, now=False) | ||
103 | 248 | |||
104 | 249 | def reschedule_checkHB(self, dummy=None): | ||
105 | 250 | if self.checkHB is not None and self.checkHB.active(): | ||
106 | 251 | self.checkHB.cancel() | ||
107 | 252 | self.checkHB = reactor.callLater(self.heartbeatInterval * | ||
108 | 253 | self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat) | ||
109 | 228 | 254 | ||
110 | 229 | def check_0_8(self): | 255 | def check_0_8(self): |
111 | 230 | return (self.spec.minor, self.spec.major) == (0, 8) | 256 | return (self.spec.minor, self.spec.major) == (0, 8) |
112 | @@ -254,7 +280,7 @@ | |||
113 | 254 | finally: | 280 | finally: |
114 | 255 | self.queueLock.release() | 281 | self.queueLock.release() |
115 | 256 | defer.returnValue(q) | 282 | defer.returnValue(q) |
117 | 257 | 283 | ||
118 | 258 | def close(self, reason): | 284 | def close(self, reason): |
119 | 259 | for ch in self.channels.values(): | 285 | for ch in self.channels.values(): |
120 | 260 | ch.close(reason) | 286 | ch.close(reason) |
121 | @@ -294,10 +320,18 @@ | |||
122 | 294 | def frameReceived(self, frame): | 320 | def frameReceived(self, frame): |
123 | 295 | self.processFrame(frame) | 321 | self.processFrame(frame) |
124 | 296 | 322 | ||
125 | 323 | def sendFrame(self, frame): | ||
126 | 324 | if frame.payload.type != Frame.HEARTBEAT: | ||
127 | 325 | self.reschedule_sendHB() | ||
128 | 326 | FrameReceiver.sendFrame(self, frame) | ||
129 | 327 | |||
130 | 297 | @defer.inlineCallbacks | 328 | @defer.inlineCallbacks |
131 | 298 | def processFrame(self, frame): | 329 | def processFrame(self, frame): |
132 | 299 | ch = yield self.channel(frame.channel) | 330 | ch = yield self.channel(frame.channel) |
134 | 300 | ch.dispatch(frame, self.work) | 331 | if frame.payload.type != Frame.HEARTBEAT: |
135 | 332 | ch.dispatch(frame, self.work) | ||
136 | 333 | if self.heartbeatInterval > 0: | ||
137 | 334 | self.reschedule_checkHB() | ||
138 | 301 | 335 | ||
139 | 302 | @defer.inlineCallbacks | 336 | @defer.inlineCallbacks |
140 | 303 | def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'): | 337 | def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'): |
141 | @@ -319,5 +353,19 @@ | |||
142 | 319 | channel0 = yield self.channel(0) | 353 | channel0 = yield self.channel(0) |
143 | 320 | yield channel0.connection_open(self.vhost) | 354 | yield channel0.connection_open(self.vhost) |
144 | 321 | 355 | ||
145 | 356 | def sendHeartbeat(self): | ||
146 | 357 | self.sendFrame(Frame(0, Heartbeat())) | ||
147 | 358 | |||
148 | 359 | def checkHeartbeat(self): | ||
149 | 360 | if self.checkHB is not None and self.checkHB.active(): | ||
150 | 361 | self.checkHB.cancel() | ||
151 | 362 | self.checkHB = None | ||
152 | 363 | self.transport.loseConnection() | ||
153 | 364 | |||
154 | 322 | def connectionLost(self, reason): | 365 | def connectionLost(self, reason): |
155 | 366 | if self.heartbeatInterval > 0 and self.sendHB.running: | ||
156 | 367 | self.sendHB.stop() | ||
157 | 368 | if self.checkHB is not None and self.checkHB.active(): | ||
158 | 369 | self.checkHB.cancel() | ||
159 | 323 | self.close(reason) | 370 | self.close(reason) |
160 | 371 | |||
161 | 324 | 372 | ||
162 | === added file 'src/txamqp/test/test_heartbeat.py' | |||
163 | --- src/txamqp/test/test_heartbeat.py 1970-01-01 00:00:00 +0000 | |||
164 | +++ src/txamqp/test/test_heartbeat.py 2009-07-31 11:43:51 +0000 | |||
165 | @@ -0,0 +1,59 @@ | |||
166 | 1 | from time import time | ||
167 | 2 | import txamqp | ||
168 | 3 | from txamqp.testlib import TestBase | ||
169 | 4 | from txamqp.protocol import AMQClient, TwistedDelegate | ||
170 | 5 | from twisted.internet import reactor, protocol | ||
171 | 6 | from twisted.internet.defer import Deferred, inlineCallbacks, returnValue | ||
172 | 7 | |||
173 | 8 | class SpyAMQClient(AMQClient): | ||
174 | 9 | called_reschedule_check = 0 | ||
175 | 10 | called_send_hb = 0 | ||
176 | 11 | def reschedule_checkHB(self, dummy=None): | ||
177 | 12 | AMQClient.reschedule_checkHB(self) | ||
178 | 13 | self.called_reschedule_check += 1 | ||
179 | 14 | def sendHeartbeat(self): | ||
180 | 15 | AMQClient.sendHeartbeat(self) | ||
181 | 16 | self.called_send_hb += 1 | ||
182 | 17 | |||
183 | 18 | class HeartbeatTests(TestBase): | ||
184 | 19 | """ | ||
185 | 20 | Tests handling of heartbeat frames | ||
186 | 21 | """ | ||
187 | 22 | |||
188 | 23 | @inlineCallbacks | ||
189 | 24 | def connect(self): | ||
190 | 25 | delegate = TwistedDelegate() | ||
191 | 26 | onConn = Deferred() | ||
192 | 27 | p = SpyAMQClient(delegate, self.vhost, heartbeat=1, | ||
193 | 28 | spec=txamqp.spec.load(self.spec)) | ||
194 | 29 | f = protocol._InstanceFactory(reactor, p, onConn) | ||
195 | 30 | c = reactor.connectTCP(self.host, self.port, f) | ||
196 | 31 | self.connectors.append(c) | ||
197 | 32 | client = yield onConn | ||
198 | 33 | |||
199 | 34 | yield client.authenticate(self.user, self.password) | ||
200 | 35 | returnValue(client) | ||
201 | 36 | |||
202 | 37 | |||
203 | 38 | @inlineCallbacks | ||
204 | 39 | def setUp(self): | ||
205 | 40 | """ Set up a heartbeat frame per second """ | ||
206 | 41 | self.client = yield self.connect() | ||
207 | 42 | |||
208 | 43 | self.channel = yield self.client.channel(1) | ||
209 | 44 | yield self.channel.channel_open() | ||
210 | 45 | |||
211 | 46 | def test_heartbeat(self): | ||
212 | 47 | """ | ||
213 | 48 | Test that heartbeat frames are sent and received | ||
214 | 49 | """ | ||
215 | 50 | d = Deferred() | ||
216 | 51 | def checkPulse(dummy): | ||
217 | 52 | t = time() | ||
218 | 53 | self.assertTrue(self.client.called_send_hb, | ||
219 | 54 | "A heartbeat frame was recently sent") | ||
220 | 55 | self.assertTrue(self.client.called_reschedule_check, | ||
221 | 56 | "A heartbeat frame was recently received") | ||
222 | 57 | d.addCallback(checkPulse) | ||
223 | 58 | reactor.callLater(3, d.callback, None) | ||
224 | 59 | return d | ||
225 | 0 | 60 | ||
226 | === modified file 'src/txamqp/testlib.py' | |||
227 | --- src/txamqp/testlib.py 2009-06-17 16:01:31 +0000 | |||
228 | +++ src/txamqp/testlib.py 2009-07-09 13:26:53 +0000 | |||
229 | @@ -6,9 +6,9 @@ | |||
230 | 6 | # to you under the Apache License, Version 2.0 (the | 6 | # to you under the Apache License, Version 2.0 (the |
231 | 7 | # "License"); you may not use this file except in compliance | 7 | # "License"); you may not use this file except in compliance |
232 | 8 | # with the License. You may obtain a copy of the License at | 8 | # with the License. You may obtain a copy of the License at |
234 | 9 | # | 9 | # |
235 | 10 | # http://www.apache.org/licenses/LICENSE-2.0 | 10 | # http://www.apache.org/licenses/LICENSE-2.0 |
237 | 11 | # | 11 | # |
238 | 12 | # Unless required by applicable law or agreed to in writing, | 12 | # Unless required by applicable law or agreed to in writing, |
239 | 13 | # software distributed under the License is distributed on an | 13 | # software distributed under the License is distributed on an |
240 | 14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
241 | @@ -78,29 +78,33 @@ | |||
242 | 78 | self.user = 'guest' | 78 | self.user = 'guest' |
243 | 79 | self.password = 'guest' | 79 | self.password = 'guest' |
244 | 80 | self.vhost = 'localhost' | 80 | self.vhost = 'localhost' |
245 | 81 | self.heartbeat = 0 | ||
246 | 81 | self.queues = [] | 82 | self.queues = [] |
247 | 82 | self.exchanges = [] | 83 | self.exchanges = [] |
248 | 83 | self.connectors = [] | 84 | self.connectors = [] |
249 | 84 | 85 | ||
250 | 85 | @inlineCallbacks | 86 | @inlineCallbacks |
252 | 86 | def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None): | 87 | def connect(self, host=None, port=None, spec=None, user=None, |
253 | 88 | password=None, vhost=None, heartbeat=None): | ||
254 | 87 | host = host or self.host | 89 | host = host or self.host |
255 | 88 | port = port or self.port | 90 | port = port or self.port |
256 | 89 | spec = spec or self.spec | 91 | spec = spec or self.spec |
257 | 90 | user = user or self.user | 92 | user = user or self.user |
258 | 91 | password = password or self.password | 93 | password = password or self.password |
259 | 92 | vhost = vhost or self.vhost | 94 | vhost = vhost or self.vhost |
260 | 95 | heartbeat = heartbeat or self.heartbeat | ||
261 | 93 | 96 | ||
262 | 94 | delegate = TwistedDelegate() | 97 | delegate = TwistedDelegate() |
263 | 95 | onConn = Deferred() | 98 | onConn = Deferred() |
265 | 96 | f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn) | 99 | p = AMQClient(delegate, vhost, heartbeat=heartbeat, spec=txamqp.spec.load(spec)) |
266 | 100 | f = protocol._InstanceFactory(reactor, p, onConn) | ||
267 | 97 | c = reactor.connectTCP(host, port, f) | 101 | c = reactor.connectTCP(host, port, f) |
268 | 98 | self.connectors.append(c) | 102 | self.connectors.append(c) |
269 | 99 | client = yield onConn | 103 | client = yield onConn |
270 | 100 | 104 | ||
271 | 101 | yield client.authenticate(user, password) | 105 | yield client.authenticate(user, password) |
272 | 102 | returnValue(client) | 106 | returnValue(client) |
274 | 103 | 107 | ||
275 | 104 | @inlineCallbacks | 108 | @inlineCallbacks |
276 | 105 | def setUp(self): | 109 | def setUp(self): |
277 | 106 | self.client = yield self.connect() | 110 | self.client = yield self.connect() |
This is a branch that implements heartbeat frames using LoopingCall, per Esteve's suggestion.