5
5
#include < nano/node/transport/socket.hpp>
6
6
#include < nano/node/transport/transport.hpp>
7
7
8
+ #include < boost/asio/use_future.hpp>
9
+ #include < boost/exception/detail/exception_ptr.hpp>
8
10
#include < boost/format.hpp>
9
11
10
12
#include < cstdint>
@@ -36,8 +38,11 @@ nano::transport::socket::socket (nano::node & node_a, boost::asio::ip::tcp::sock
36
38
last_receive_time_or_init{ nano::seconds_since_epoch () },
37
39
default_timeout{ node_a.config .tcp_io_timeout },
38
40
silent_connection_tolerance_time{ node_a.network_params .network .silent_connection_tolerance_time },
41
+ read_buffer{ 16384 },
42
+ buffer_condition{ strand },
39
43
max_queue_size{ max_queue_size_a }
40
44
{
45
+ os_buffer.insert (os_buffer.begin (), 16384 , 0 );
41
46
}
42
47
43
48
nano::transport::socket::~socket ()
@@ -88,6 +93,7 @@ void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint
88
93
node_l->observers .socket_connected .notify (*this_l);
89
94
}
90
95
callback (ec);
96
+ this_l->begin_read_loop ();
91
97
}));
92
98
});
93
99
}
@@ -101,31 +107,9 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
101
107
if (!closed)
102
108
{
103
109
set_default_timeout ();
104
- boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () mutable {
105
- boost::asio::async_read (this_l->tcp_socket , boost::asio::buffer (buffer_a->data (), size_a),
106
- boost::asio::bind_executor (this_l->strand ,
107
- [this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
108
- debug_assert (this_l->strand .running_in_this_thread ());
109
-
110
- auto node_l = this_l->node_w .lock ();
111
- if (!node_l)
112
- {
113
- return ;
114
- }
115
-
116
- if (ec)
117
- {
118
- node_l->stats .inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
119
- this_l->close ();
120
- }
121
- else
122
- {
123
- node_l->stats .add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size_a);
124
- this_l->set_last_completion ();
125
- this_l->set_last_receive_time ();
126
- }
127
- cbk (ec, size_a);
128
- }));
110
+ boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () {
111
+ this_l->requests .emplace_back (buffer_a, size_a, callback);
112
+ this_l->service_requests_maybe ();
129
113
});
130
114
}
131
115
}
@@ -389,6 +373,15 @@ void nano::transport::socket::close_internal ()
389
373
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
390
374
tcp_socket.close (ec);
391
375
376
+ // FIXME Encapsulate or simplify this
377
+ for (auto const & request : requests)
378
+ {
379
+ node_l->stats .inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
380
+ auto const & [buffer, size, callback] = request;
381
+ callback (boost::asio::error::operation_aborted, 0 );
382
+ }
383
+ requests.clear ();
384
+
392
385
if (ec)
393
386
{
394
387
node_l->stats .inc (nano::stat::type::socket, nano::stat::detail::error_socket_close);
@@ -408,6 +401,90 @@ nano::tcp_endpoint nano::transport::socket::local_endpoint () const
408
401
return local;
409
402
}
410
403
404
+ void nano::transport::socket::begin_read_loop ()
405
+ {
406
+ boost::asio::co_spawn (
407
+ strand, [this_l = shared_from_this ()] () -> asio::awaitable<void > {
408
+ co_await this_l->read_loop ();
409
+ },
410
+ // FIXME This should probably clean up in a structured way by getting a future for this loop and wait for it similar to a thread::join()
411
+ asio::detached);
412
+ }
413
+
414
+ boost::asio::awaitable<void > nano::transport::socket::read_loop ()
415
+ {
416
+ debug_assert (strand.running_in_this_thread ());
417
+
418
+ try
419
+ {
420
+ while (!closed)
421
+ {
422
+ // Wait until there is data available to read in the socket
423
+ co_await tcp_socket.async_wait (boost::asio::ip::tcp::socket::wait_read, boost::asio::use_awaitable);
424
+ if (read_buffer.capacity () == read_buffer.size ())
425
+ {
426
+ // Wait until there is writable space
427
+ co_await buffer_condition.async_wait (boost::asio::use_awaitable);
428
+ }
429
+
430
+ // Read up to as much data from the OS as we can hold in the write section
431
+ auto buffer = boost::asio::buffer (os_buffer.data (), read_buffer.capacity () - read_buffer.size ());
432
+ // Pick up multiple messages in a single syscall
433
+ size_t amount_read = co_await tcp_socket.async_read_some (buffer, boost::asio::use_awaitable);
434
+
435
+ // FIXME This is the undesired copy
436
+ std::transform (os_buffer.begin (), os_buffer.begin () + amount_read, std::back_inserter (read_buffer), [] (uint8_t val) { return val; });
437
+
438
+ service_requests_maybe ();
439
+ }
440
+ }
441
+ catch (boost::system::system_error const & e)
442
+ {
443
+ close ();
444
+ }
445
+ }
446
+
447
+ void nano::transport::socket::service_requests_maybe ()
448
+ {
449
+ debug_assert (strand.running_in_this_thread ());
450
+
451
+ while (!requests.empty ())
452
+ {
453
+ auto front = requests.front ();
454
+ auto const & [buffer, size, callback] = front;
455
+ auto available = read_buffer.size ();
456
+ if (available < size)
457
+ {
458
+ // Once read requests can't be serviced with enough readable data, we're done
459
+ return ;
460
+ }
461
+ std::copy (read_buffer.begin (), read_buffer.begin () + size, buffer->begin ());
462
+ if (read_buffer.capacity () == read_buffer.size ())
463
+ {
464
+ buffer_condition.cancel ();
465
+ }
466
+
467
+ // FIXME having valid iterators will be needed when merging read_buffer and buffer'
468
+ read_buffer.erase (read_buffer.begin (), read_buffer.begin () + size);
469
+
470
+ // FIXME Execute callback outside this socket's strand if possible
471
+ boost::asio::post (strand, [this_l = shared_from_this (), front] () {
472
+ auto const & [buffer, size, callback] = front;
473
+ auto node_l = this_l->node_w .lock ();
474
+ if (!node_l)
475
+ {
476
+ return ;
477
+ }
478
+
479
+ node_l->stats .add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size);
480
+ this_l->set_last_completion ();
481
+ this_l->set_last_receive_time ();
482
+ callback (boost::system::error_code{}, size);
483
+ });
484
+ requests.pop_front ();
485
+ }
486
+ }
487
+
411
488
void nano::transport::socket::operator () (nano::object_stream & obs) const
412
489
{
413
490
obs.write (" remote_endpoint" , remote_endpoint ());
0 commit comments