Skip to content

Commit 9cff7bc

Browse files
authored
Merge pull request #72 from JohanMabille/client_fix
Fixed stop handling
2 parents ac45ec5 + 7a30042 commit 9cff7bc

5 files changed

+47
-18
lines changed

src/client/xclient_messenger.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ namespace xeus
4747
m_heartbeat_controller.send(stop_msg, zmq::send_flags::none);
4848
(void)m_heartbeat_controller.recv(response);
4949
}
50-
}
50+
}

src/client/xclient_zmq_impl.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ namespace xeus
3636

3737
// Has to be in the cpp because incomplete
3838
// types are used in unique_ptr in the header
39-
xclient_zmq_impl::~xclient_zmq_impl() = default;
39+
xclient_zmq_impl::~xclient_zmq_impl()
40+
{
41+
m_iopub_thread.join();
42+
m_heartbeat_thread.join();
43+
}
4044

4145
void xclient_zmq_impl::send_on_shell(xmessage msg)
4246
{

src/client/xheartbeat_client.cpp

+34-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
* The full license is in the file LICENSE, distributed with this software. *
88
****************************************************************************/
99

10+
#include <iostream>
11+
1012
#include "xheartbeat_client.hpp"
1113
#include "xclient_zmq_impl.hpp"
1214
#include "../common/xmiddleware_impl.hpp"
@@ -23,6 +25,7 @@ namespace xeus
2325
, m_max_retry(max_retry)
2426
, m_heartbeat_timeout(timeout)
2527
, m_heartbeat_end_point("")
28+
, m_request_stop(false)
2629
{
2730
m_heartbeat_end_point = get_end_point(config.m_transport, config.m_ip, config.m_hb_port);
2831
m_heartbeat.connect(m_heartbeat_end_point);
@@ -42,9 +45,35 @@ namespace xeus
4245

4346
bool xheartbeat_client::wait_for_answer(long timeout)
4447
{
45-
m_heartbeat.set(zmq::sockopt::linger, static_cast<int>(timeout));
46-
zmq::message_t response;
47-
return m_heartbeat.recv(response).has_value();
48+
zmq::pollitem_t items[] = {
49+
{ m_heartbeat, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 }
50+
};
51+
52+
zmq::poll(&items[0], 2, std::chrono::milliseconds(timeout));
53+
try
54+
{
55+
if (items[0].revents & ZMQ_POLLIN)
56+
{
57+
zmq::multipart_t wire_msg;
58+
wire_msg.recv(m_heartbeat);
59+
}
60+
61+
if (items[1].revents & ZMQ_POLLIN)
62+
{
63+
// stop message
64+
zmq::multipart_t wire_msg;
65+
wire_msg.recv(m_controller);
66+
wire_msg.send(m_controller);
67+
m_request_stop = true;
68+
}
69+
70+
return true;
71+
}
72+
catch (std::exception& e)
73+
{
74+
std::cerr << e.what() << std::endl;
75+
}
76+
return false;
4877
}
4978

5079
void xheartbeat_client::register_kernel_status_listener(const kernel_status_listener& l)
@@ -59,10 +88,9 @@ namespace xeus
5988

6089
void xheartbeat_client::run()
6190
{
62-
bool stop = false;
6391
std::size_t retry_count = 0;
6492

65-
while(!stop)
93+
while(!m_request_stop)
6694
{
6795
send_heartbeat_message();
6896
if(!wait_for_answer(m_heartbeat_timeout))
@@ -74,7 +102,7 @@ namespace xeus
74102
else
75103
{
76104
notify_kernel_dead(true);
77-
stop = true;
105+
break;
78106
}
79107
}
80108
else

src/client/xheartbeat_client.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ namespace xeus
4848
const long m_heartbeat_timeout;
4949

5050
std::string m_heartbeat_end_point;
51+
bool m_request_stop;
5152
};
5253
}
5354

54-
#endif
55+
#endif

src/client/xiopub_client.cpp

+5-9
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ namespace xeus
5959

6060
void xiopub_client::run()
6161
{
62-
zmq::multipart_t wire_msg;
6362
zmq::pollitem_t items[] = {
6463
{ m_iopub, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 }
6564
};
@@ -71,6 +70,7 @@ namespace xeus
7170
{
7271
if (items[0].revents & ZMQ_POLLIN)
7372
{
73+
zmq::multipart_t wire_msg;
7474
wire_msg.recv(m_iopub);
7575
xpub_message msg = p_client_impl->deserialize_iopub(wire_msg);
7676
{
@@ -80,15 +80,11 @@ namespace xeus
8080
}
8181
if (items[1].revents & ZMQ_POLLIN)
8282
{
83+
// stop message
84+
zmq::multipart_t wire_msg;
8385
wire_msg.recv(m_controller);
84-
if (wire_msg.size() > 0)
85-
{
86-
std::string received_msg = wire_msg.at(0).to_string();
87-
if (received_msg == "stop")
88-
{
89-
break;
90-
}
91-
}
86+
wire_msg.send(m_controller);
87+
break;
9288
}
9389
}
9490
catch (std::exception& e)

0 commit comments

Comments
 (0)