Merge lp://staging/~elachuni/txamqp/heartbeat into lp://staging/txamqp
- heartbeat
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | not available |
Proposed branch: | lp://staging/~elachuni/txamqp/heartbeat |
Merge into: | lp://staging/txamqp |
Diff against target: | None lines |
To merge this branch: | bzr merge lp://staging/~elachuni/txamqp/heartbeat |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Thomas Herve | Approve | ||
Review via email:
|
Commit message
Description of the change

Anthony Lenton (elachuni) wrote : | # |

Thomas Herve (therve) wrote : | # |
Thanks a lot for this code Anthony! It's going to be pretty useful. Here are some comments:
[1] There is a small conflict with current trunk.
[2] The connectionLostEvent doesn't seem to be used anywhere, it can probably be removed.
[3] The test is a bit bad: I understand you want to test it live, but a test running for 8 seconds is too long. Also, setTimeout is deprecated.
Thanks!
- 20. By Anthony Lenton
-
- Merged changes from trunk
- Removed unused connectionLostEvent
- Fixed test.

Anthony Lenton (elachuni) wrote : | # |
Hi Thomas,
>
> [1] There is a small conflict with current trunk.
Merged and resolved.
>
> [2] The connectionLostEvent doesn't seem to be used anywhere, it can probably
> be removed.
Right, it was being used by client code here to easily access connectionLost events, but definitely shouldn't be part of the same patch. Removed.
>
> [3] The test is a bit bad: I understand you want to test it live, but a test
> running for 8 seconds is too long. Also, setTimeout is deprecated.
I've reduced it to 3 seconds. If that's still too long we'll need to test it some other way, as you can't ask for a heartbeat interval of less than one second. And it's not calling setTimeout now.
>
> Thanks!
Thank you! :)

Thomas Herve (therve) wrote : | # |
OK, some last comments:
1) There is a conflict again :). Be careful here, because connectionLost is now defined in trunk, so you want to move your clean code.
2) The heartbeat test class is called TxTests, it should be named HeartbeatTests. The test file doesn't need the ASF license header too.
Thanks, +1!
- 21. By Anthony Lenton
-
Merged changes from trunk.

Esteve Fernandez (esteve) wrote : | # |
Sorry for the slow response. I think adding heartbeat is a very useful feature, thanks for taking the time for implementing it. However, there's a couple of issues:
1 - The virtual host was changed to "/", instead of "localhost". We really need to implement "profiles" or whatever, so we can select some sane defaults for every broker. Until then, I'm not comfortable with introducing a change that's not intrinsic to the problem your branch solves.
2 - Although the code works and is clear, I'd rather use a LoopingCall (http://
class AMQClient(...):
# Max unreceived heartbeat frames. The AMQP standard says it's 3.
MAX_
def __init__(self, ...):
...
if self.heartbeatI
d = self.started.wait()
def sendHeartbeat(
def checkHeartbeat(
def processFrame(self, frame):
...
if frame.payload.type != Frame.HEARTBEAT:
What do you think guys? The changes are minimal and I think they are a bit easier to read.

Esteve Fernandez (esteve) wrote : | # |
I just realized that self.checkHB doesn't need to be a LoopingCall, reactor.callLater is enough.
- 22. By Anthony Lenton
-
Putting back 'localhost' vhost.

Anthony Lenton (elachuni) wrote : | # |
> 1 - The virtual host was changed to "/", instead of "localhost".
Yikes, that was unintentionally left in from running the tests :-/
I've put the 'localhost' vhost back in rev.22, sorry for that.
> 2 - Although the code works and is clear, I'd rather use a LoopingCall (http:/
> /twistedmatrix.
> . It's already built into Twisted, so you don't need to implement things like
> lastSent, lastReceived, etc. I think it would be a bit easier to do something
> like this (it's just pseudo-python, can't guarantee that it works):
>
> (...code...)
>
> What do you think guys? The changes are minimal and I think they are a bit
> easier to read.
I gave your code a try and it works well, with a couple of changes. However I don't really see much benefit in using LoopingCall in this case:
* checkHB could use a regular callLater call, as you've pointed out.
* The rescheduling of sendHB should actually be done in sendFrame, not in processFrame, as it's sending
frames (not receiving them) that postpones the need to send a HB frame.
* After changing that it's practically the same to be using a regular callLater instead of a LoopingCall for sendHB; if in sendFrame you reschedule sendHB whatever the payload type (instead of filtering out HBs) it effectively behaves like a LoopingCall.
* We do get rid of lastSent and lastReceived, but instead we'd be using two callbacks instead of one (checkHeartbeat and sendHeartbeat vs. heartbeatHandler), and two 'callback handles' (checkHB and sendHB vs. pendingHeartbeat).
* The free bonus you get with lastSent and lastReceived is a bit of accountability about how long ago you sent and received frames (including heartbeat frames), which makes it easier to test.
I haven't pushed these changes into this branch, but if you want I can push a working version using LoopingCalls on some other branch so we can compare.
- 23. By Anthony Lenton
-
Removed License header and fixed class name, per therve's review.
Preview Diff
1 | === modified file 'src/txamqp/client.py' |
2 | --- src/txamqp/client.py 2009-05-04 11:12:36 +0000 |
3 | +++ src/txamqp/client.py 2009-05-05 15:41:08 +0000 |
4 | @@ -29,7 +29,8 @@ |
5 | |
6 | def connection_tune(self, ch, msg): |
7 | self.client.MAX_LENGTH = msg.frame_max |
8 | - ch.connection_tune_ok(*msg.fields) |
9 | + args = msg.channel_max, msg.frame_max, self.client.heartbeatInterval |
10 | + ch.connection_tune_ok(*args) |
11 | self.client.started.set() |
12 | |
13 | @defer.inlineCallbacks |
14 | |
15 | === modified file 'src/txamqp/connection.py' |
16 | --- src/txamqp/connection.py 2008-10-29 18:31:04 +0000 |
17 | +++ src/txamqp/connection.py 2009-04-27 17:00:08 +0000 |
18 | @@ -196,3 +196,15 @@ |
19 | |
20 | def __str__(self): |
21 | return "Body(%r)" % self.content |
22 | + |
23 | +class Heartbeat(Payload): |
24 | + type = Frame.HEARTBEAT |
25 | + def __str__(self): |
26 | + return "Heartbeat()" |
27 | + |
28 | + def encode(self, enc): |
29 | + enc.encode_long(0) |
30 | + |
31 | + def decode(spec, dec): |
32 | + dec.decode_long() |
33 | + return Heartbeat() |
34 | |
35 | === modified file 'src/txamqp/protocol.py' |
36 | --- src/txamqp/protocol.py 2009-05-04 11:12:36 +0000 |
37 | +++ src/txamqp/protocol.py 2009-06-04 14:42:57 +0000 |
38 | @@ -1,16 +1,17 @@ |
39 | # coding: utf-8 |
40 | from twisted.python import log |
41 | -from twisted.internet import defer, protocol |
42 | +from twisted.internet import defer, protocol, reactor |
43 | from twisted.protocols import basic |
44 | from txamqp import spec |
45 | from txamqp.codec import Codec, EOF |
46 | -from txamqp.connection import Header, Frame, Method, Body |
47 | +from txamqp.connection import Header, Frame, Method, Body, Heartbeat |
48 | from txamqp.message import Message |
49 | from txamqp.content import Content |
50 | from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed |
51 | from txamqp.client import TwistedEvent, TwistedDelegate, Closed |
52 | from cStringIO import StringIO |
53 | import struct |
54 | +from time import time |
55 | |
56 | class GarbageException(Exception): |
57 | pass |
58 | @@ -200,7 +201,7 @@ |
59 | |
60 | channelClass = AMQChannel |
61 | |
62 | - def __init__(self, delegate, vhost, *args, **kwargs): |
63 | + def __init__(self, delegate, vhost, heartbeat=0, *args, **kwargs): |
64 | FrameReceiver.__init__(self, *args, **kwargs) |
65 | self.delegate = delegate |
66 | |
67 | @@ -218,6 +219,9 @@ |
68 | self.work = defer.DeferredQueue() |
69 | |
70 | self.started = TwistedEvent() |
71 | + self.connectionLostEvent = TwistedEvent() |
72 | + self.lastSent = time() |
73 | + self.lastReceived = time() |
74 | |
75 | self.queueLock = defer.DeferredLock() |
76 | |
77 | @@ -225,6 +229,10 @@ |
78 | |
79 | self.outgoing.get().addCallback(self.writer) |
80 | self.work.get().addCallback(self.worker) |
81 | + self.heartbeatInterval = heartbeat |
82 | + self.pendingHeartbeat = None |
83 | + if self.heartbeatInterval > 0: |
84 | + self.started.wait().addCallback(self.heartbeatHandler) |
85 | |
86 | def check_0_8(self): |
87 | return (self.spec.minor, self.spec.major) == (0, 8) |
88 | @@ -293,13 +301,25 @@ |
89 | self.sendInitString() |
90 | self.setFrameMode() |
91 | |
92 | + def connectionLost(self, reason): |
93 | + if self.pendingHeartbeat is not None and self.pendingHeartbeat.active(): |
94 | + self.pendingHeartbeat.cancel() |
95 | + self.pendingHeartbeat = None |
96 | + self.connectionLostEvent.set() |
97 | + |
98 | def frameReceived(self, frame): |
99 | self.processFrame(frame) |
100 | |
101 | + def sendFrame(self, frame): |
102 | + self.lastSent = time() |
103 | + FrameReceiver.sendFrame(self, frame) |
104 | + |
105 | @defer.inlineCallbacks |
106 | def processFrame(self, frame): |
107 | + self.lastReceived = time() |
108 | ch = yield self.channel(frame.channel) |
109 | - ch.dispatch(frame, self.work) |
110 | + if frame.payload.type != Frame.HEARTBEAT: |
111 | + ch.dispatch(frame, self.work) |
112 | |
113 | @defer.inlineCallbacks |
114 | def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'): |
115 | @@ -320,3 +340,15 @@ |
116 | |
117 | channel0 = yield self.channel(0) |
118 | yield channel0.connection_open(self.vhost) |
119 | + |
120 | + def heartbeatHandler (self, dummy=None): |
121 | + now = time() |
122 | + if self.lastSent + self.heartbeatInterval < now: |
123 | + self.sendFrame(Frame(0, Heartbeat())) |
124 | + if self.lastReceived + self.heartbeatInterval * 3 < now: |
125 | + self.transport.loseConnection() |
126 | + tple = None |
127 | + if self.transport.connected: |
128 | + tple = reactor.callLater(self.heartbeatInterval, self.heartbeatHandler) |
129 | + self.pendingHeartbeat = tple |
130 | + |
131 | |
132 | === added file 'src/txamqp/test/test_heartbeat.py' |
133 | --- src/txamqp/test/test_heartbeat.py 1970-01-01 00:00:00 +0000 |
134 | +++ src/txamqp/test/test_heartbeat.py 2009-06-04 16:26:58 +0000 |
135 | @@ -0,0 +1,57 @@ |
136 | +# |
137 | +# Licensed to the Apache Software Foundation (ASF) under one |
138 | +# or more contributor license agreements. See the NOTICE file |
139 | +# distributed with this work for additional information |
140 | +# regarding copyright ownership. The ASF licenses this file |
141 | +# to you under the Apache License, Version 2.0 (the |
142 | +# "License"); you may not use this file except in compliance |
143 | +# with the License. You may obtain a copy of the License at |
144 | +# |
145 | +# http://www.apache.org/licenses/LICENSE-2.0 |
146 | +# |
147 | +# Unless required by applicable law or agreed to in writing, |
148 | +# software distributed under the License is distributed on an |
149 | +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
150 | +# KIND, either express or implied. See the License for the |
151 | +# specific language governing permissions and limitations |
152 | +# under the License. |
153 | +# |
154 | + |
155 | +from time import time |
156 | +#from txamqp.client import Closed |
157 | +#from txamqp.queue import Empty |
158 | +#from txamqp.content import Content |
159 | +from txamqp.testlib import TestBase |
160 | + |
161 | +#from twisted.internet.defer import, returnValue, |
162 | +from twisted.internet.defer import Deferred, inlineCallbacks |
163 | + |
164 | +class TxTests(TestBase): |
165 | + """ |
166 | + Tests handling of heartbeat frames |
167 | + """ |
168 | + |
169 | + |
170 | + @inlineCallbacks |
171 | + def setUp(self): |
172 | + """ Set up a heartbeat frame per second """ |
173 | + self.client = yield self.connect(heartbeat=1) |
174 | + |
175 | + self.channel = yield self.client.channel(1) |
176 | + yield self.channel.channel_open() |
177 | + |
178 | + |
179 | + @inlineCallbacks |
180 | + def test_heartbeat(self): |
181 | + """ |
182 | + Test that heartbeat frames are sent and received |
183 | + """ |
184 | + d = Deferred() |
185 | + d.setTimeout(8) |
186 | + d.addBoth(lambda x:True) |
187 | + yield d |
188 | + t = time() |
189 | + self.assertTrue(self.client.lastSent > t - 3, |
190 | + "A heartbeat frame was recently sent") |
191 | + self.assertTrue(self.client.lastReceived > t - 3, |
192 | + "A heartbeat frame was recently received") |
193 | |
194 | === modified file 'src/txamqp/testlib.py' |
195 | --- src/txamqp/testlib.py 2009-02-11 22:28:45 +0000 |
196 | +++ src/txamqp/testlib.py 2009-06-04 16:26:58 +0000 |
197 | @@ -39,22 +39,26 @@ |
198 | self.user = 'guest' |
199 | self.password = 'guest' |
200 | self.vhost = 'localhost' |
201 | + self.heartbeat = 0 |
202 | self.queues = [] |
203 | self.exchanges = [] |
204 | self.connectors = [] |
205 | |
206 | @inlineCallbacks |
207 | - def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None): |
208 | + def connect(self, host=None, port=None, spec=None, user=None, |
209 | + password=None, vhost=None, heartbeat=None): |
210 | host = host or self.host |
211 | port = port or self.port |
212 | spec = spec or self.spec |
213 | user = user or self.user |
214 | password = password or self.password |
215 | vhost = vhost or self.vhost |
216 | + heartbeat = heartbeat or self.heartbeat |
217 | |
218 | delegate = TwistedDelegate() |
219 | onConn = Deferred() |
220 | - f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn) |
221 | + p = AMQClient(delegate, vhost, heartbeat=heartbeat, spec=txamqp.spec.load(spec)) |
222 | + f = protocol._InstanceFactory(reactor, p, onConn) |
223 | c = reactor.connectTCP(host, port, f) |
224 | self.connectors.append(c) |
225 | client = yield onConn |
Added handling for heartbeat frames.
The heartbeat interval is configurable by a AMQClient constructor parameter.
Added a brief test to check that it works.
It's only tested on RabbitMQ, but it should work on qpid too.