Skip to content

Commit

Permalink
update async-simple (#882)
Browse files Browse the repository at this point in the history
* update async-simple
  • Loading branch information
poor-circle authored Jan 20, 2025
1 parent 17a2139 commit 45d5061
Show file tree
Hide file tree
Showing 61 changed files with 3,658 additions and 764 deletions.
5 changes: 2 additions & 3 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,8 @@ class client_pool : public std::enable_shared_from_this<
this->weak_from_this(), clients,
(std::max)(collect_time, std::chrono::milliseconds{50}),
pool_config_.idle_queue_per_max_clear_count)
.via(coro_io::get_global_executor())
.start([](auto&&) {
});
.directlyStart([](auto&&) {
},coro_io::get_global_executor());
}
}
}
Expand Down
60 changes: 0 additions & 60 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,66 +429,6 @@ async_simple::coro::Lazy<std::pair<
});
}

template <typename T>
inline decltype(auto) select_impl(T &pair) {
using Func = std::tuple_element_t<1, std::remove_cvref_t<T>>;
using ValueType =
typename std::tuple_element_t<0, std::remove_cvref_t<T>>::ValueType;
using return_type = std::invoke_result_t<Func, async_simple::Try<ValueType>>;

auto &callback = std::get<1>(pair);
if constexpr (coro_io::is_lazy_v<return_type>) {
auto executor = std::get<0>(pair).getExecutor();
return std::make_pair(
std::move(std::get<0>(pair)),
[executor, callback = std::move(callback)](auto &&val) {
if (executor) {
callback(std::move(val)).via(executor).start([](auto &&) {
});
}
else {
callback(std::move(val)).start([](auto &&) {
});
}
});
}
else {
return pair;
}
}

template <typename... T>
inline auto select(T &&...args) {
return async_simple::coro::collectAny(select_impl(args)...);
}

template <typename T, typename Callback>
inline auto select(std::vector<T> vec, Callback callback) {
if constexpr (coro_io::is_lazy_v<Callback>) {
std::vector<async_simple::Executor *> executors;
for (auto &lazy : vec) {
executors.push_back(lazy.getExecutor());
}

return async_simple::coro::collectAny(
std::move(vec),
[executors, callback = std::move(callback)](size_t index, auto &&val) {
auto executor = executors[index];
if (executor) {
callback(index, std::move(val)).via(executor).start([](auto &&) {
});
}
else {
callback(index, std::move(val)).start([](auto &&) {
});
}
});
}
else {
return async_simple::coro::collectAny(std::move(vec), std::move(callback));
}
}

template <typename Socket, typename AsioBuffer>
std::pair<asio::error_code, size_t> read_some(Socket &sock,
AsioBuffer &&buffer) {
Expand Down
1 change: 1 addition & 0 deletions include/ylt/coro_io/detail/client_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class client_queue {
if (size_[index]) {
std::size_t result =
queue_[index].try_dequeue_bulk(fake_iter{}, max_clear_cnt);

size_[index] -= result;
return result;
}
Expand Down
31 changes: 21 additions & 10 deletions include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
#include <async_simple/Executor.h>
#include <async_simple/coro/Lazy.h>

#include <asio/dispatch.hpp>
#include <asio/io_context.hpp>
#include <asio/post.hpp>
#include <asio/steady_timer.hpp>
#include <atomic>
#include <cstdint>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <type_traits>
#include <vector>

#include "asio/dispatch.hpp"
#ifdef __linux__
#include <pthread.h>
#include <sched.h>
Expand All @@ -51,25 +54,25 @@ class ExecutorWrapper : public async_simple::Executor {
using context_t = std::remove_cvref_t<decltype(executor_.context())>;

virtual bool schedule(Func func) override {
if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) {
executor_.dispatch(std::move(func));
asio::post(executor_, std::move(func));
return true;
}

virtual bool schedule(Func func, uint64_t hint) override {
if (hint >=
static_cast<uint64_t>(async_simple::Executor::Priority::YIELD)) {
asio::post(executor_, std::move(func));
}
else {
asio::dispatch(executor_, std::move(func));
}

return true;
}

virtual bool checkin(Func func, void *ctx) override {
using context_t = std::remove_cvref_t<decltype(executor_.context())>;
auto &executor = *(context_t *)ctx;
if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) {
executor.post(std::move(func));
}
else {
asio::dispatch(executor, std::move(func));
}
asio::post(executor, std::move(func));
return true;
}
virtual void *checkout() override { return &executor_.context(); }
Expand Down Expand Up @@ -99,6 +102,14 @@ class ExecutorWrapper : public async_simple::Executor {
fn();
});
}
void schedule(Func func, Duration dur, uint64_t hint,
async_simple::Slot *slot = nullptr) override {
auto timer = std::make_unique<asio::steady_timer>(executor_, dur);
auto tm = timer.get();
tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) {
fn();
});
}
};

template <typename ExecutorImpl = asio::io_context>
Expand Down
55 changes: 35 additions & 20 deletions include/ylt/coro_rpc/impl/coro_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ context_info_t<rpc_protocol> *&set_context();

class coro_connection : public std::enable_shared_from_this<coro_connection> {
public:
template <typename rpc_protocol_t>
struct connection_lazy_ctx : public async_simple::coro::LazyLocalBase {
inline static char tag;
// init LazyLocalBase by unique address
connection_lazy_ctx(std::shared_ptr<context_info_t<rpc_protocol_t>> info)
: LazyLocalBase(&tag), info_(std::move(info)) {}
static bool classof(const LazyLocalBase *base) {
return base->getTypeTag() == &tag;
}
std::shared_ptr<context_info_t<rpc_protocol_t>> info_;
};
/*!
*
* @param io_context
Expand Down Expand Up @@ -276,26 +287,30 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
auto coro_handler = router.get_coro_handler(key);
set_rpc_return_by_callback();
router.route_coro(coro_handler, payload, serialize_proto.value(), key)
.via(executor_)
.setLazyLocal((void *)context_info.get())
.start([context_info](auto &&result) mutable {
std::pair<coro_rpc::err_code, std::string> &ret = result.value();
if (ret.first)
AS_UNLIKELY {
ELOGI << "rpc error in function:"
<< context_info->get_rpc_function_name()
<< ". error code:" << ret.first.ec
<< ". message : " << ret.second;
}
auto executor = context_info->conn_->get_executor();
executor->schedule([context_info = std::move(context_info),
ret = std::move(ret)]() mutable {
context_info->conn_->template direct_response_msg<rpc_protocol>(
ret.first, ret.second, context_info->req_head_,
std::move(context_info->resp_attachment_),
std::move(context_info->complete_handler_));
});
});
.template setLazyLocal<connection_lazy_ctx<rpc_protocol>>(
context_info)
.directlyStart(
[context_info](auto &&result) mutable {
std::pair<coro_rpc::err_code, std::string> &ret =
result.value();
if (ret.first)
AS_UNLIKELY {
ELOGI << "rpc error in function:"
<< context_info->get_rpc_function_name()
<< ". error code:" << ret.first.ec
<< ". message : " << ret.second;
}
auto executor = context_info->conn_->get_executor();
executor->schedule([context_info = std::move(context_info),
ret = std::move(ret)]() mutable {
context_info->conn_
->template direct_response_msg<rpc_protocol>(
ret.first, ret.second, context_info->req_head_,
std::move(context_info->resp_attachment_),
std::move(context_info->complete_handler_));
});
},
executor_);
}
else {
coro_rpc::detail::set_context<rpc_protocol>() = context_info.get();
Expand Down
61 changes: 41 additions & 20 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class coro_rpc_client {

void close() {
// ELOG_INFO << "client_id " << config_.client_id << " close";
close_socket(control_);
close_socket_async(control_);
}

bool set_req_attachment(std::string_view attachment) {
Expand Down Expand Up @@ -375,18 +375,19 @@ class coro_rpc_client {
bool value = false;
};

void reset() {
close_socket(control_);
async_simple::coro::Lazy<void> reset() {
co_await close_socket(control_);
control_->socket_ =
asio::ip::tcp::socket(control_->executor_.get_asio_executor());
control_->is_timeout_ = false;
control_->has_closed_ = false;
co_return;
}
static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; }

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl() {
if (should_reset_) {
reset();
co_await reset();
}
else {
should_reset_ = true;
Expand All @@ -413,17 +414,15 @@ class coro_rpc_client {
std::error_code err_code;
timer_->cancel(err_code);

if (ec) {
if (control_->is_timeout_) {
co_return errc::timed_out;
}
co_return errc::not_connected;
}

if (control_->is_timeout_) {
ELOG_WARN << "client_id " << config_.client_id << " connect timeout";
co_return errc::timed_out;
}
else if (ec) {
ELOG_WARN << "client_id " << config_.client_id
<< " failed:" << ec.message();
co_return errc::not_connected;
}
if (config_.enable_tcp_no_delay == true) {
control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
}
Expand Down Expand Up @@ -482,7 +481,7 @@ class coro_rpc_client {
}
if (auto self = socket_watcher.lock()) {
self->is_timeout_ = is_timeout;
close_socket(self);
close_socket_async(self);
co_return true;
}
co_return false;
Expand Down Expand Up @@ -710,18 +709,40 @@ class coro_rpc_client {
executor_(executor) {}
};

static void close_socket(
static void close_socket_async(
std::shared_ptr<coro_rpc_client::control_t> control) {
bool expected = false;
if (!control->has_closed_.compare_exchange_strong(expected, true)) {
return;
}
control->executor_.schedule([control]() {
asio::error_code ignored_ec;
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both,
ignored_ec);
control->socket_.close(ignored_ec);
asio::dispatch(control->executor_.get_asio_executor(), [control]() {
assert(&control->executor_.get_asio_executor().context() ==
&control->socket_.get_executor().context());
control->has_closed_ = true;
asio::error_code ec;
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
control->socket_.close(ec);
});
return;
}

static async_simple::coro::Lazy<void> close_socket(
std::shared_ptr<coro_rpc_client::control_t> control) {
bool expected = false;
if (!control->has_closed_.compare_exchange_strong(expected, true)) {
co_return;
}
co_await coro_io::post(
[control = control.get()]() {
assert(&control->executor_.get_asio_executor().context() ==
&control->socket_.get_executor().context());
control->has_closed_ = true;
asio::error_code ec;
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
control->socket_.close(ec);
},
&control->executor_);
co_return;
}

#ifdef UNIT_TEST_INJECT
Expand Down Expand Up @@ -861,7 +882,7 @@ class coro_rpc_client {
break;
}
} while (true);
close_socket(controller);
close_socket_async(controller);
send_err_response(controller.get(), ret.first);
co_return;
}
Expand Down Expand Up @@ -891,7 +912,7 @@ class coro_rpc_client {
handle_response_buffer<T>(ret.buffer_.read_buf_, ret.errc_, has_error);
if (has_error) {
if (auto w = watcher.lock(); w) {
close_socket(std::move(w));
close_socket_async(std::move(w));
}
}
if (result) {
Expand Down
5 changes: 3 additions & 2 deletions include/ylt/coro_rpc/impl/protocol/coro_rpc_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,10 @@ using context = coro_rpc::context_base<return_msg_type,

template <typename rpc_protocol = coro_rpc::protocol::coro_rpc_protocol>
async_simple::coro::Lazy<context_info_t<rpc_protocol>*> get_context_in_coro() {
auto* ctx = co_await async_simple::coro::LazyLocals{};
auto* ctx = co_await async_simple::coro::CurrentLazyLocals<
coro_connection::connection_lazy_ctx<rpc_protocol>>{};
assert(ctx != nullptr);
co_return (context_info_t<rpc_protocol>*) ctx;
co_return ctx->info_.get();
}

namespace detail {
Expand Down
5 changes: 2 additions & 3 deletions include/ylt/thirdparty/async_simple/Collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
#ifndef ASYNC_SIMPLE_COLLECT_H
#define ASYNC_SIMPLE_COLLECT_H

#include <exception>
#ifndef ASYNC_SIMPLE_USE_MODULES
#include <iterator>
#include <vector>
#include "async_simple/Common.h"
#include "async_simple/Future.h"
#include "async_simple/Try.h"

#include <iostream>
#endif // ASYNC_SIMPLE_USE_MODULES

namespace async_simple {

Expand Down
Loading

0 comments on commit 45d5061

Please sign in to comment.