Merge lp://staging/~elachuni/txamqp/heartbeat-loopingcalls into lp://staging/txamqp
- heartbeat-loopingcalls
- Merge into trunk
Proposed by
Anthony Lenton
Status: | Merged |
---|---|
Merged at revision: | not available |
Proposed branch: | lp://staging/~elachuni/txamqp/heartbeat-loopingcalls |
Merge into: | lp://staging/txamqp |
Diff against target: | None lines |
To merge this branch: | bzr merge lp://staging/~elachuni/txamqp/heartbeat-loopingcalls |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
txAMQP Team | Pending | ||
Review via email: mp+9503@code.staging.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Anthony Lenton (elachuni) wrote : | # |
Revision history for this message
Esteve Fernandez (esteve) wrote : | # |
Hi Anthony, sorry for the delay. Thank you very much for your patch.
> This is a branch that implements heartbeat frames using LoopingCall, per
> Esteve's suggestion.
I modified it a bit (lp:~esteve/txamqp/heartbeat-loopingcalls):
- added configurable heartbeats and client classes in testcases, so overriding connect and setUp is not needed, making HeartbeatTests a bit shorter
- re-added lastSent and lastReceived from your original patch. I didn't realize it at first, sorry, but now I think it's a great idea for accounting purposes
What do you think of it?
Thanks for your continuing contributions!
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'src/txamqp/client.py' |
2 | --- src/txamqp/client.py 2009-05-28 09:46:03 +0000 |
3 | +++ src/txamqp/client.py 2009-06-11 19:45:31 +0000 |
4 | @@ -29,7 +29,8 @@ |
5 | |
6 | def connection_tune(self, ch, msg): |
7 | self.client.MAX_LENGTH = msg.frame_max |
8 | - ch.connection_tune_ok(*msg.fields) |
9 | + args = msg.channel_max, msg.frame_max, self.client.heartbeatInterval |
10 | + ch.connection_tune_ok(*args) |
11 | self.client.started.reset() |
12 | |
13 | @defer.inlineCallbacks |
14 | |
15 | === modified file 'src/txamqp/connection.py' |
16 | --- src/txamqp/connection.py 2008-10-29 18:31:04 +0000 |
17 | +++ src/txamqp/connection.py 2009-04-27 17:00:08 +0000 |
18 | @@ -196,3 +196,15 @@ |
19 | |
20 | def __str__(self): |
21 | return "Body(%r)" % self.content |
22 | + |
23 | +class Heartbeat(Payload): |
24 | + type = Frame.HEARTBEAT |
25 | + def __str__(self): |
26 | + return "Heartbeat()" |
27 | + |
28 | + def encode(self, enc): |
29 | + enc.encode_long(0) |
30 | + |
31 | + def decode(spec, dec): |
32 | + dec.decode_long() |
33 | + return Heartbeat() |
34 | |
35 | === modified file 'src/txamqp/protocol.py' |
36 | --- src/txamqp/protocol.py 2009-06-18 08:13:49 +0000 |
37 | +++ src/txamqp/protocol.py 2009-07-31 11:43:51 +0000 |
38 | @@ -1,16 +1,18 @@ |
39 | # coding: utf-8 |
40 | from twisted.python import log |
41 | -from twisted.internet import defer, protocol |
42 | +from twisted.internet import defer, protocol, reactor |
43 | +from twisted.internet.task import LoopingCall |
44 | from twisted.protocols import basic |
45 | from txamqp import spec |
46 | from txamqp.codec import Codec, EOF |
47 | -from txamqp.connection import Header, Frame, Method, Body |
48 | +from txamqp.connection import Header, Frame, Method, Body, Heartbeat |
49 | from txamqp.message import Message |
50 | from txamqp.content import Content |
51 | from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed |
52 | from txamqp.client import TwistedEvent, TwistedDelegate, Closed |
53 | from cStringIO import StringIO |
54 | import struct |
55 | +from time import time |
56 | |
57 | class GarbageException(Exception): |
58 | pass |
59 | @@ -27,10 +29,10 @@ |
60 | self.responses = TimeoutDeferredQueue() |
61 | |
62 | self.queue = None |
63 | - |
64 | + |
65 | self.closed = False |
66 | self.reason = None |
67 | - |
68 | + |
69 | def close(self, reason): |
70 | if self.closed: |
71 | return |
72 | @@ -200,7 +202,10 @@ |
73 | |
74 | channelClass = AMQChannel |
75 | |
76 | - def __init__(self, delegate, vhost, *args, **kwargs): |
77 | + # Max unreceived heartbeat frames. The AMQP standard says it's 3. |
78 | + MAX_UNSEEN_HEARTBEAT = 3 |
79 | + |
80 | + def __init__(self, delegate, vhost, heartbeat=0, *args, **kwargs): |
81 | FrameReceiver.__init__(self, *args, **kwargs) |
82 | self.delegate = delegate |
83 | |
84 | @@ -225,6 +230,27 @@ |
85 | |
86 | self.outgoing.get().addCallback(self.writer) |
87 | self.work.get().addCallback(self.worker) |
88 | + self.heartbeatInterval = heartbeat |
89 | + self.checkHB = None |
90 | + self.sendHB = None |
91 | + if self.heartbeatInterval > 0: |
92 | + d = self.started.wait() |
93 | + d.addCallback(self.reschedule_sendHB) |
94 | + d.addCallback(self.reschedule_checkHB) |
95 | + |
96 | + def reschedule_sendHB(self, dummy=None): |
97 | + if self.heartbeatInterval > 0: |
98 | + if self.sendHB is None: |
99 | + self.sendHB = LoopingCall(self.sendHeartbeat) |
100 | + elif self.sendHB.running: |
101 | + self.sendHB.stop() |
102 | + self.sendHB.start(self.heartbeatInterval, now=False) |
103 | + |
104 | + def reschedule_checkHB(self, dummy=None): |
105 | + if self.checkHB is not None and self.checkHB.active(): |
106 | + self.checkHB.cancel() |
107 | + self.checkHB = reactor.callLater(self.heartbeatInterval * |
108 | + self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat) |
109 | |
110 | def check_0_8(self): |
111 | return (self.spec.minor, self.spec.major) == (0, 8) |
112 | @@ -254,7 +280,7 @@ |
113 | finally: |
114 | self.queueLock.release() |
115 | defer.returnValue(q) |
116 | - |
117 | + |
118 | def close(self, reason): |
119 | for ch in self.channels.values(): |
120 | ch.close(reason) |
121 | @@ -294,10 +320,18 @@ |
122 | def frameReceived(self, frame): |
123 | self.processFrame(frame) |
124 | |
125 | + def sendFrame(self, frame): |
126 | + if frame.payload.type != Frame.HEARTBEAT: |
127 | + self.reschedule_sendHB() |
128 | + FrameReceiver.sendFrame(self, frame) |
129 | + |
130 | @defer.inlineCallbacks |
131 | def processFrame(self, frame): |
132 | ch = yield self.channel(frame.channel) |
133 | - ch.dispatch(frame, self.work) |
134 | + if frame.payload.type != Frame.HEARTBEAT: |
135 | + ch.dispatch(frame, self.work) |
136 | + if self.heartbeatInterval > 0: |
137 | + self.reschedule_checkHB() |
138 | |
139 | @defer.inlineCallbacks |
140 | def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'): |
141 | @@ -319,5 +353,19 @@ |
142 | channel0 = yield self.channel(0) |
143 | yield channel0.connection_open(self.vhost) |
144 | |
145 | + def sendHeartbeat(self): |
146 | + self.sendFrame(Frame(0, Heartbeat())) |
147 | + |
148 | + def checkHeartbeat(self): |
149 | + if self.checkHB is not None and self.checkHB.active(): |
150 | + self.checkHB.cancel() |
151 | + self.checkHB = None |
152 | + self.transport.loseConnection() |
153 | + |
154 | def connectionLost(self, reason): |
155 | + if self.heartbeatInterval > 0 and self.sendHB.running: |
156 | + self.sendHB.stop() |
157 | + if self.checkHB is not None and self.checkHB.active(): |
158 | + self.checkHB.cancel() |
159 | self.close(reason) |
160 | + |
161 | |
162 | === added file 'src/txamqp/test/test_heartbeat.py' |
163 | --- src/txamqp/test/test_heartbeat.py 1970-01-01 00:00:00 +0000 |
164 | +++ src/txamqp/test/test_heartbeat.py 2009-07-31 11:43:51 +0000 |
165 | @@ -0,0 +1,59 @@ |
166 | +from time import time |
167 | +import txamqp |
168 | +from txamqp.testlib import TestBase |
169 | +from txamqp.protocol import AMQClient, TwistedDelegate |
170 | +from twisted.internet import reactor, protocol |
171 | +from twisted.internet.defer import Deferred, inlineCallbacks, returnValue |
172 | + |
173 | +class SpyAMQClient(AMQClient): |
174 | + called_reschedule_check = 0 |
175 | + called_send_hb = 0 |
176 | + def reschedule_checkHB(self, dummy=None): |
177 | + AMQClient.reschedule_checkHB(self) |
178 | + self.called_reschedule_check += 1 |
179 | + def sendHeartbeat(self): |
180 | + AMQClient.sendHeartbeat(self) |
181 | + self.called_send_hb += 1 |
182 | + |
183 | +class HeartbeatTests(TestBase): |
184 | + """ |
185 | + Tests handling of heartbeat frames |
186 | + """ |
187 | + |
188 | + @inlineCallbacks |
189 | + def connect(self): |
190 | + delegate = TwistedDelegate() |
191 | + onConn = Deferred() |
192 | + p = SpyAMQClient(delegate, self.vhost, heartbeat=1, |
193 | + spec=txamqp.spec.load(self.spec)) |
194 | + f = protocol._InstanceFactory(reactor, p, onConn) |
195 | + c = reactor.connectTCP(self.host, self.port, f) |
196 | + self.connectors.append(c) |
197 | + client = yield onConn |
198 | + |
199 | + yield client.authenticate(self.user, self.password) |
200 | + returnValue(client) |
201 | + |
202 | + |
203 | + @inlineCallbacks |
204 | + def setUp(self): |
205 | + """ Set up a heartbeat frame per second """ |
206 | + self.client = yield self.connect() |
207 | + |
208 | + self.channel = yield self.client.channel(1) |
209 | + yield self.channel.channel_open() |
210 | + |
211 | + def test_heartbeat(self): |
212 | + """ |
213 | + Test that heartbeat frames are sent and received |
214 | + """ |
215 | + d = Deferred() |
216 | + def checkPulse(dummy): |
217 | + t = time() |
218 | + self.assertTrue(self.client.called_send_hb, |
219 | + "A heartbeat frame was recently sent") |
220 | + self.assertTrue(self.client.called_reschedule_check, |
221 | + "A heartbeat frame was recently received") |
222 | + d.addCallback(checkPulse) |
223 | + reactor.callLater(3, d.callback, None) |
224 | + return d |
225 | |
226 | === modified file 'src/txamqp/testlib.py' |
227 | --- src/txamqp/testlib.py 2009-06-17 16:01:31 +0000 |
228 | +++ src/txamqp/testlib.py 2009-07-09 13:26:53 +0000 |
229 | @@ -6,9 +6,9 @@ |
230 | # to you under the Apache License, Version 2.0 (the |
231 | # "License"); you may not use this file except in compliance |
232 | # with the License. You may obtain a copy of the License at |
233 | -# |
234 | +# |
235 | # http://www.apache.org/licenses/LICENSE-2.0 |
236 | -# |
237 | +# |
238 | # Unless required by applicable law or agreed to in writing, |
239 | # software distributed under the License is distributed on an |
240 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
241 | @@ -78,29 +78,33 @@ |
242 | self.user = 'guest' |
243 | self.password = 'guest' |
244 | self.vhost = 'localhost' |
245 | + self.heartbeat = 0 |
246 | self.queues = [] |
247 | self.exchanges = [] |
248 | self.connectors = [] |
249 | |
250 | @inlineCallbacks |
251 | - def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None): |
252 | + def connect(self, host=None, port=None, spec=None, user=None, |
253 | + password=None, vhost=None, heartbeat=None): |
254 | host = host or self.host |
255 | port = port or self.port |
256 | spec = spec or self.spec |
257 | user = user or self.user |
258 | password = password or self.password |
259 | vhost = vhost or self.vhost |
260 | + heartbeat = heartbeat or self.heartbeat |
261 | |
262 | delegate = TwistedDelegate() |
263 | onConn = Deferred() |
264 | - f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn) |
265 | + p = AMQClient(delegate, vhost, heartbeat=heartbeat, spec=txamqp.spec.load(spec)) |
266 | + f = protocol._InstanceFactory(reactor, p, onConn) |
267 | c = reactor.connectTCP(host, port, f) |
268 | self.connectors.append(c) |
269 | client = yield onConn |
270 | |
271 | yield client.authenticate(user, password) |
272 | returnValue(client) |
273 | - |
274 | + |
275 | @inlineCallbacks |
276 | def setUp(self): |
277 | self.client = yield self.connect() |
This is a branch that implements heartbeat frames using LoopingCall, per Esteve's suggestion.