Skip to content

Commit 6783b30

Browse files
committed
protocol.Serializer: Implement ping.
1 parent 7447308 commit 6783b30

File tree

3 files changed

+194
-17
lines changed

3 files changed

+194
-17
lines changed

src/elle/protocol/Serializer.cc

Lines changed: 123 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,9 @@ namespace elle
156156
keep_going = 0,
157157
interrupt = 1,
158158
message = 2,
159-
max = message,
159+
ping = 3,
160+
pong = 4,
161+
max = pong,
160162
};
161163

162164
static
@@ -174,14 +176,39 @@ namespace elle
174176
Impl(std::iostream& stream,
175177
elle::Buffer::Size chunk_size,
176178
bool checksum,
177-
elle::Version const& version)
178-
: _stream(stream)
179+
elle::Version const& version,
180+
boost::optional<std::chrono::milliseconds> ping_period,
181+
boost::optional<std::chrono::milliseconds> ping_timeout)
182+
: _scheduler(reactor::scheduler())
183+
, _pings(0)
184+
, _pongs(0)
185+
, _ping_period(std::move(ping_period))
186+
, _ping_delay(std::move(ping_timeout))
187+
, _pinger(this->_scheduler.io_service())
188+
, _pinger_handler(
189+
[this] (boost::system::error_code const& error)
190+
{
191+
if (error == boost::system::errc::operation_canceled)
192+
return;
193+
else if (error)
194+
ELLE_ABORT("%s: unexpected timer error: %s", this, error);
195+
this->ping();
196+
this->_pinger.expires_from_now(
197+
boost::posix_time::milliseconds(this->_ping_period->count()));
198+
this->_pinger.async_wait(this->_pinger_handler);
199+
})
200+
, _stream(stream)
179201
, _chunk_size(chunk_size)
180202
, _checksum(checksum)
181203
, _version(version)
182204
, _lock_write()
183205
, _lock_read()
184-
{}
206+
{
207+
if (bool(this->_ping_period) != bool(this->_ping_delay))
208+
elle::err("specify either both ping period and timeout or neither");
209+
if (this->_ping_period)
210+
this->_pinger_handler({});
211+
}
185212

186213
virtual
187214
~Impl() = default;
@@ -274,11 +301,31 @@ namespace elle
274301
void
275302
write_control(Control control)
276303
{
304+
if (control != Control::pong && control != Control::ping)
305+
this->write_pings_pongs();
277306
ELLE_DUMP_SCOPE("send control %s", (int) control);
278307
char c = static_cast<char>(control);
279308
this->_stream.write(&c, 1);
280309
}
281310

311+
void
312+
write_pings_pongs()
313+
{
314+
while (this->_pongs || this->_pings)
315+
{
316+
while (this->_pongs)
317+
{
318+
--this->_pongs;
319+
this->write_control(Control::pong);
320+
}
321+
while (this->_pings)
322+
{
323+
--this->_pings;
324+
this->write_control(Control::ping);
325+
}
326+
}
327+
}
328+
282329
bool
283330
read_control()
284331
{
@@ -305,10 +352,70 @@ namespace elle
305352
case Control::message:
306353
ignore_message(this->_stream, this->version());
307354
break;
355+
case Control::ping:
356+
this->pinged();
357+
break;
358+
case Control::pong:
359+
if (this->_ping_timers.empty())
360+
ELLE_ERR("%s: unexpected pong", this);
361+
else
362+
this->_ping_timers.pop_front();
363+
break;
308364
}
309365
}
310366
}
311367

368+
void
369+
ping()
370+
{
371+
++this->_pings;
372+
if (!this->_lock_write.locked())
373+
{
374+
new reactor::Thread(
375+
this->_scheduler,
376+
elle::sprintf("%s pinger", this),
377+
[this]
378+
{
379+
elle::reactor::Lock lock(this->_lock_write);
380+
this->write_pings_pongs();
381+
this->_stream.flush();
382+
},
383+
true);
384+
}
385+
this->_ping_timers.emplace_back(this->_scheduler.io_service());
386+
this->_ping_timers.back().expires_from_now(
387+
boost::posix_time::milliseconds(this->_ping_delay->count()));
388+
this->_ping_timers.back().async_wait(
389+
[this] (boost::system::error_code const& error)
390+
{
391+
if (!error)
392+
this->_ping_timeout();
393+
});
394+
}
395+
396+
void
397+
pinged()
398+
{
399+
++this->_pongs;
400+
if (!this->_lock_write.locked())
401+
{
402+
elle::reactor::Lock lock(this->_lock_write);
403+
this->write_pings_pongs();
404+
this->_stream.flush();
405+
}
406+
}
407+
408+
ELLE_ATTRIBUTE(elle::reactor::Scheduler&, scheduler);
409+
ELLE_ATTRIBUTE(int, pings);
410+
ELLE_ATTRIBUTE(int, pongs);
411+
ELLE_ATTRIBUTE(boost::optional<std::chrono::milliseconds>, ping_period);
412+
ELLE_ATTRIBUTE(boost::optional<std::chrono::milliseconds>, ping_delay);
413+
ELLE_ATTRIBUTE(boost::asio::deadline_timer, pinger);
414+
ELLE_ATTRIBUTE(std::function<void (boost::system::error_code const&)>,
415+
pinger_handler);
416+
ELLE_ATTRIBUTE(std::list<boost::asio::deadline_timer>, ping_timers);
417+
ELLE_ATTRIBUTE_RX(boost::signals2::signal<void ()>, ping_timeout);
418+
312419
void
313420
_write(elle::Buffer const& packet)
314421
{
@@ -360,6 +467,8 @@ namespace elle
360467
this->write_control(Control::keep_going);
361468
send();
362469
};
470+
this->write_pings_pongs();
471+
this->_stream.flush();
363472
}
364473
}
365474
catch (elle::reactor::Terminate const&)
@@ -369,6 +478,7 @@ namespace elle
369478
ELLE_DEBUG("interrupted after sending %s bytes over %s",
370479
offset, packet.size());
371480
this->write_control(Control::interrupt);
481+
this->write_pings_pongs();
372482
this->_stream.flush();
373483
}
374484
throw;
@@ -405,9 +515,12 @@ namespace elle
405515
| Construction |
406516
`-------------*/
407517

408-
Serializer::Serializer(std::iostream& stream,
409-
elle::Version const& version,
410-
bool checksum)
518+
Serializer::Serializer(
519+
std::iostream& stream,
520+
elle::Version const& version,
521+
bool checksum,
522+
boost::optional<std::chrono::milliseconds> ping_period,
523+
boost::optional<std::chrono::milliseconds> ping_timeout)
411524
: Super(*elle::reactor::Scheduler::scheduler())
412525
, _stream(stream)
413526
, _version(version)
@@ -430,7 +543,9 @@ namespace elle
430543
}
431544
ELLE_TRACE("using version: '%s'", this->version());
432545
this->_impl.reset(
433-
new Impl(stream, this->_chunk_size, checksum, version));
546+
new Impl(stream, this->_chunk_size, checksum, version,
547+
std::move(ping_period), std::move(ping_timeout)));
548+
this->_impl->ping_timeout().connect(this->_ping_timeout);
434549
}
435550

436551
Serializer::~Serializer()

src/elle/protocol/Serializer.hh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <chrono>
34
#include <iostream>
45

56
#include <elle/reactor/mutex.hh>
@@ -65,7 +66,9 @@ namespace elle
6566
/// packets sent.
6667
Serializer(std::iostream& stream,
6768
elle::Version const& version = elle::Version(0, 1, 0),
68-
bool checksum = true);
69+
bool checksum = true,
70+
boost::optional<std::chrono::milliseconds> ping_period = {},
71+
boost::optional<std::chrono::milliseconds> ping_timeout = {});
6972
~Serializer();
7073

7174
/*----------.
@@ -102,6 +105,7 @@ namespace elle
102105
ELLE_ATTRIBUTE_R(elle::Version, version, override);
103106
ELLE_ATTRIBUTE_R(elle::Buffer::Size, chunk_size);
104107
ELLE_ATTRIBUTE_R(bool, checksum);
108+
ELLE_ATTRIBUTE_RX(boost::signals2::signal<void ()>, ping_timeout);
105109
public:
106110
class Impl;
107111
private:

tests/elle/protocol/serializer.cc

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ class SocketInstrumentation
191191
, _alice_routed(0)
192192
, _bob_routed(0)
193193
{
194+
this->_alice_barrier.open();
195+
this->_bob_barrier.open();
194196
this->_a_server.listen();
195197
this->_b_server.listen();
196198
this->_alice.reset(
@@ -248,6 +250,7 @@ class SocketInstrumentation
248250
{
249251
auto route = [&] (elle::reactor::network::Socket* sender,
250252
elle::reactor::network::Socket* recipient,
253+
elle::reactor::Barrier& barrier,
251254
int64_t& routed,
252255
Conf& conf)
253256
{
@@ -259,8 +262,7 @@ class SocketInstrumentation
259262
char buffer[1024];
260263
ELLE_DEBUG("reading");
261264
int64_t size =
262-
sender->read_some(elle::WeakBuffer(buffer),
263-
valgrind(250_ms, 10));
265+
sender->read_some(elle::WeakBuffer(buffer));
264266
ELLE_DEBUG("read %s", size);
265267
conf(elle::ConstWeakBuffer(buffer, size));
266268
if (conf.corrupt_offset >= 0
@@ -281,6 +283,7 @@ class SocketInstrumentation
281283
routed = conf.quota;
282284
}
283285
ELLE_DEBUG("%s: route %s bytes", *this, size);
286+
elle::reactor::wait(barrier);
284287
recipient->write(elle::ConstWeakBuffer(buffer, size));
285288
}
286289
}
@@ -291,16 +294,20 @@ class SocketInstrumentation
291294
};
292295
scope.run_background("A to B",
293296
std::bind(route, a.get(), b.get(),
297+
std::ref(this->_alice_barrier),
294298
std::ref(this->_alice_routed),
295299
std::ref(*this->alice_conf)));
296300
scope.run_background("B to A",
297301
std::bind(route, b.get(), a.get(),
302+
std::ref(this->_bob_barrier),
298303
std::ref(this->_bob_routed),
299304
std::ref(*this->bob_conf)));
300305
scope.wait();
301306
};
302307
}
303308

309+
ELLE_ATTRIBUTE_RX(elle::reactor::Barrier, alice_barrier);
310+
ELLE_ATTRIBUTE_RX(elle::reactor::Barrier, bob_barrier);
304311
elle::reactor::network::TCPServer _a_server;
305312
elle::reactor::network::TCPServer _b_server;
306313
std::unique_ptr<elle::reactor::network::TCPSocket> _alice;
@@ -348,7 +355,9 @@ dialog(elle::Version const& version,
348355
std::function<void (elle::protocol::Serializer&)> const& a,
349356
std::function<void (elle::protocol::Serializer&)> const& b,
350357
std::function<void (elle::reactor::Thread&, elle::reactor::Thread&,
351-
SocketProvider&)> const& f = {})
358+
SocketProvider&)> const& f = {},
359+
boost::optional<std::chrono::milliseconds> ping_period = {},
360+
boost::optional<std::chrono::milliseconds> ping_timeout = {})
352361
{
353362
SocketProvider sockets;
354363
std::unique_ptr<elle::protocol::Serializer> alice;
@@ -360,14 +369,16 @@ dialog(elle::Version const& version,
360369
[&]
361370
{
362371
alice.reset(new elle::protocol::Serializer(
363-
sockets.alice(), version, checksum));
372+
sockets.alice(), version, checksum,
373+
ping_period, ping_timeout));
364374
});
365375
scope.run_background(
366376
"setup bob's serializer",
367377
[&]
368378
{
369379
bob.reset(new elle::protocol::Serializer(
370-
sockets.bob(), version, checksum));
380+
sockets.bob(), version, checksum,
381+
ping_period, ping_timeout));
371382
});
372383
scope.wait();
373384
};
@@ -878,7 +889,6 @@ ELLE_TEST_SCHEDULED(interruption2)
878889
// Check that terminating a Channel.read() call does not lose an unrelated
879890
// packet.
880891
namespace ip = elle::protocol;
881-
882892
for (auto version: {elle::Version{0, 2, 0}, elle::Version{0, 3, 0}})
883893
{
884894
ELLE_LOG("test version %s", version);
@@ -954,7 +964,6 @@ ELLE_TEST_SCHEDULED(interruption2)
954964
ELLE_TRACE("read on %s", c.id());
955965
auto buf = c.read();
956966
BOOST_CHECK_EQUAL(buf.string(), "foo");
957-
958967
// just read some more to check we're still on
959968
ip::Channel c3(stream);
960969
c3.write(elle::Buffer("baz"));
@@ -964,7 +973,6 @@ ELLE_TEST_SCHEDULED(interruption2)
964973
ELLE_TRACE("read c3");
965974
buf = c3.read();
966975
BOOST_CHECK_EQUAL(buf.string(), "baz");
967-
968976
// check killing reader thread before any data is read
969977
b.close();
970978
elle::reactor::Thread t2("read nothing", [&] {
@@ -995,6 +1003,55 @@ ELLE_TEST_SCHEDULED(interruption2)
9951003
}
9961004
}
9971005

1006+
ELLE_TEST_SCHEDULED(ping)
1007+
{
1008+
elle::reactor::Barrier alice;
1009+
elle::reactor::Barrier bob;
1010+
bool timeout_expected = false;
1011+
int timeouts = 0;
1012+
dialog<SocketInstrumentation>(
1013+
elle::Version(0, 3, 0),
1014+
true,
1015+
[] (SocketInstrumentation& socket)
1016+
{},
1017+
[&] (elle::protocol::Serializer& s)
1018+
{
1019+
s.ping_timeout().connect(
1020+
[&] { ++timeouts; BOOST_TEST(timeout_expected); });
1021+
s.write(elle::Buffer("alice"));
1022+
BOOST_TEST(s.read() == elle::Buffer("bob"));
1023+
alice.open();
1024+
BOOST_TEST(s.read() == elle::Buffer("bob"));
1025+
},
1026+
[&] (elle::protocol::Serializer& s)
1027+
{
1028+
s.ping_timeout().connect(
1029+
[&] { ++timeouts; BOOST_TEST(timeout_expected); });
1030+
s.write(elle::Buffer("bob"));
1031+
BOOST_TEST(s.read() == elle::Buffer("alice"));
1032+
bob.open();
1033+
BOOST_TEST(s.read() == elle::Buffer("alice"));
1034+
},
1035+
[&] (elle::reactor::Thread& a,
1036+
elle::reactor::Thread& b,
1037+
SocketInstrumentation& socket)
1038+
{
1039+
elle::reactor::wait(elle::reactor::Waitables{&alice, &bob});
1040+
// Let some pings go through
1041+
elle::reactor::sleep(500_ms);
1042+
timeout_expected = true;
1043+
socket.alice_barrier().close();
1044+
socket.bob_barrier().close();
1045+
// Let some timeouts go through
1046+
elle::reactor::sleep(500_ms);
1047+
a.terminate();
1048+
b.terminate();
1049+
BOOST_TEST(timeouts >= 2);
1050+
},
1051+
std::chrono::milliseconds(200),
1052+
std::chrono::milliseconds(100));
1053+
}
1054+
9981055
ELLE_TEST_SUITE()
9991056
{
10001057
auto& suite = boost::unit_test::framework::master_test_suite();
@@ -1008,4 +1065,5 @@ ELLE_TEST_SUITE()
10081065
suite.add(BOOST_TEST_CASE(termination), 0, valgrind(3, 10));
10091066
suite.add(BOOST_TEST_CASE(eof), 0, valgrind(3));
10101067
suite.add(BOOST_TEST_CASE(message), 0, valgrind(3));
1068+
suite.add(BOOST_TEST_CASE(ping), 0, valgrind(3));
10111069
}

0 commit comments

Comments
 (0)