Skip to content

Commit

Permalink
Support CompletionToken instead of CompletionHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
seva-deriv committed Dec 12, 2022
1 parent d426258 commit 713d192
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 89 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Boost::ASIO low-level redis client (connector),
- works on linux (clang, gcc) and windows (msvc)
- synchronous & asynchronous interface
- inspired by [beast](https://github.com/vinniefalco/Beast)
- requirements: boost `v1.69` minimum
- requirements: boost `v1.70` minimum

## Changelog

Expand Down Expand Up @@ -781,6 +781,7 @@ MIT
- [Ronny Nowak](https://github.com/dargun)
- [Stephen Chisholm](https://github.com/sbchisholm)
- [amensel](https://github.com/amensel)
- [Usevalad Sauta](https://github.com/VsevolodSauta)

## See also
- https://github.com/Cylix/cpp_redis
Expand Down
12 changes: 6 additions & 6 deletions include/bredis/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ template <typename NextLayer> class Connection {
inline const NextLayer &next_layer() const { return stream_; }

/* asynchronous interface */
template <typename DynamicBuffer, typename WriteCallback>
BOOST_ASIO_INITFN_RESULT_TYPE(WriteCallback,
template <typename DynamicBuffer, typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken,
void(boost::system::error_code, std::size_t))
async_write(DynamicBuffer &tx_buff, const command_wrapper_t &command,
WriteCallback &&write_callback);
CompletionToken &&completion_token);

template <typename DynamicBuffer, typename ReadCallback,
template <typename DynamicBuffer, typename CompletionToken,
typename Policy = bredis::parsing_policy::keep_result>
BOOST_ASIO_INITFN_RESULT_TYPE(ReadCallback,
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken,
void(boost::system::error_code,
BREDIS_PARSE_RESULT(DynamicBuffer,
Policy)))
async_read(DynamicBuffer &rx_buff, ReadCallback &&read_callback,
async_read(DynamicBuffer &rx_buff, CompletionToken &&completion_token,
std::size_t replies_count = 1, Policy policy = Policy{});

/* synchronous interface */
Expand Down
71 changes: 23 additions & 48 deletions include/bredis/impl/async_op.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,19 @@ struct result_visitor_t<Iterator, parsing_policy::keep_result>
}
};

template <typename DynamicBuffer, typename Policy> struct async_read_op_impl {
template <typename NextLayer, typename DynamicBuffer, typename Policy> struct async_read_op_impl {
NextLayer &stream_;
DynamicBuffer &rx_buff_;
std::size_t replies_count_;
enum class state_t { init, read, done } state_ = state_t::init;

async_read_op_impl(DynamicBuffer &rx_buff, std::size_t replies_count)
: rx_buff_{rx_buff}, replies_count_{replies_count} {}
using Iterator = typename to_iterator<DynamicBuffer>::iterator_t;
using ResultVisitor = result_visitor_t<Iterator, Policy>;
using positive_result_t = parse_result_mapper_t<Iterator, Policy>;

async_read_op_impl(NextLayer &stream, DynamicBuffer &rx_buff, std::size_t replies_count) :
stream_(stream), rx_buff_(rx_buff), replies_count_(replies_count) {}

positive_result_t op(boost::system::error_code &error_code,
std::size_t /*bytes_transferred*/) {

Expand Down Expand Up @@ -149,53 +152,25 @@ template <typename DynamicBuffer, typename Policy> struct async_read_op_impl {
}
return result;
}
};

template <typename NextLayer, typename DynamicBuffer, typename ReadCallback,
typename Policy>
class async_read_op {
NextLayer &stream_;
DynamicBuffer &rx_buff_;
std::size_t replies_count_;
ReadCallback callback_;

public:
async_read_op(async_read_op &&) = default;
async_read_op(const async_read_op &) = default;

template <class DeducedHandler>
async_read_op(DeducedHandler &&deduced_handler, NextLayer &stream,
DynamicBuffer &rx_buff, std::size_t replies_count)
: stream_(stream), rx_buff_(rx_buff), replies_count_(replies_count),
callback_(std::forward<ReadCallback>(deduced_handler)) {}

void operator()(boost::system::error_code, std::size_t bytes_transferred);

friend bool asio_handler_is_continuation(async_read_op *op) {
using boost::asio::asio_handler_is_continuation;
return asio_handler_is_continuation(std::addressof(op->callback_));
}

boost::asio::associated_allocator_t<ReadCallback> get_allocator() const noexcept
{
return boost::asio::get_associated_allocator(callback_);
}

boost::asio::associated_executor_t<ReadCallback> get_executor() const noexcept
{
return boost::asio::get_associated_executor(callback_);
template<typename Self>
void operator()(Self& self, boost::system::error_code error_code = {}, std::size_t bytes_transferred = {}) {
switch (state_) {
case state_t::init:
state_ = state_t::read;
async_read_until(stream_, rx_buff_, MatchResult<Iterator>(replies_count_), std::move(self));
break;
case state_t::read:
{
state_ = state_t::done;
auto result = op(error_code, bytes_transferred); // Do not inline! We are sequencing computations
self.complete(error_code, result);
break;
}
default:
assert(false && "We are in unexpected state");
}
}
};

template <typename NextLayer, typename DynamicBuffer, typename ReadCallback,
typename Policy>
void async_read_op<NextLayer, DynamicBuffer, ReadCallback, Policy>::
operator()(boost::system::error_code error_code,
std::size_t bytes_transferred) {
using op_impl = async_read_op_impl<DynamicBuffer, Policy>;
callback_(
error_code,
op_impl(rx_buff_, replies_count_).op(error_code, bytes_transferred));
}

} // namespace bredis
45 changes: 11 additions & 34 deletions include/bredis/impl/connection.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,37 @@
namespace bredis {

template <typename NextLayer>
template <typename DynamicBuffer, typename WriteCallback>
BOOST_ASIO_INITFN_RESULT_TYPE(WriteCallback,
template <typename DynamicBuffer, typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken,
void(boost::system::error_code, std::size_t))
Connection<NextLayer>::async_write(DynamicBuffer &tx_buff,
const command_wrapper_t &command,
WriteCallback &&write_callback) {
CompletionToken &&write_callback) {
namespace asio = boost::asio;
namespace sys = boost::system;

using boost::asio::async_write;
using Signature = void(boost::system::error_code, std::size_t);
using Callback = boost::decay_t<WriteCallback>;
using AsyncResult = asio::async_result<Callback, Signature>;
using CompletionHandler = typename AsyncResult::completion_handler_type;
using serializer_t = command_serializer_visitor<DynamicBuffer>;

boost::apply_visitor(serializer_t(tx_buff), command);

CompletionHandler handler(std::forward<WriteCallback>(write_callback));
AsyncResult result(handler);
async_write(stream_, tx_buff, std::move(handler));
return result.get();
return async_write(stream_, tx_buff, std::forward<CompletionToken>(write_callback));
}

template <typename NextLayer>
template <typename DynamicBuffer, typename ReadCallback, typename Policy>
BOOST_ASIO_INITFN_RESULT_TYPE(ReadCallback,
void(const boost::system::error_code,
template <typename DynamicBuffer, typename CompletionToken, typename Policy>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken,
void(boost::system::error_code,
BREDIS_PARSE_RESULT(DynamicBuffer, Policy)))
Connection<NextLayer>::async_read(DynamicBuffer &rx_buff,
ReadCallback &&read_callback,
CompletionToken &&completion_token,
std::size_t replies_count, Policy) {

namespace asio = boost::asio;
namespace sys = boost::system;

using boost::asio::async_read_until;
using Iterator = typename to_iterator<DynamicBuffer>::iterator_t;
using ParseResult = BREDIS_PARSE_RESULT(DynamicBuffer, Policy);
using Signature = void(boost::system::error_code, ParseResult);
using Callback = boost::decay_t<ReadCallback>;
using AsyncResult = asio::async_result<Callback, Signature>;
using CompletionHandler = typename AsyncResult::completion_handler_type;
using ReadOp =
async_read_op<NextLayer, DynamicBuffer, CompletionHandler, Policy>;

CompletionHandler handler(std::forward<ReadCallback>(read_callback));
AsyncResult result(handler);

ReadOp async_op(std::move(handler), stream_, rx_buff, replies_count);

async_read_until(stream_, rx_buff, MatchResult<Iterator>(replies_count),
std::move(async_op));
return result.get();
return boost::asio::async_compose<CompletionToken, Signature>(
async_read_op_impl<NextLayer, DynamicBuffer, Policy>{stream_, rx_buff, replies_count},
completion_token, stream_);
}

template <typename NextLayer>
Expand Down

0 comments on commit 713d192

Please sign in to comment.