From fe93d38aa7913c5b9c8c73cd4ed65420b16cf1bb Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Fri, 17 Jan 2025 17:02:10 +0800 Subject: [PATCH] fix client --- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 73 ++++++++----------- 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 69531ca8c..86b9d5df0 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -343,14 +343,9 @@ class coro_rpc_client { uint32_t get_client_id() const { return config_.client_id; } - coro_rpc::rpc_error close() { + void close() { // ELOG_INFO << "client_id " << config_.client_id << " close"; - auto ec = close_socket_async(control_); - if (ec) { - return ec; - } - else - return {}; + close_socket_async(control_); } bool set_req_attachment(std::string_view attachment) { @@ -380,25 +375,19 @@ class coro_rpc_client { bool value = false; }; - rpc_error reset() { - auto ec = close_socket_async(control_); - if (ec) { - return ec; - } + async_simple::coro::Lazy reset() { + co_await close_socket(control_); control_->socket_ = asio::ip::tcp::socket(control_->executor_.get_asio_executor()); control_->is_timeout_ = false; control_->has_closed_ = false; - return {}; + co_return; } static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; } [[nodiscard]] async_simple::coro::Lazy connect_impl() { if (should_reset_) { - auto ec = reset(); - if (ec) { - co_return errc::io_error; - } + co_await reset(); } else { should_reset_ = true; @@ -492,7 +481,7 @@ class coro_rpc_client { } if (auto self = socket_watcher.lock()) { self->is_timeout_ = is_timeout; - close_socket(self); + close_socket_async(self); co_return true; } co_return false; @@ -720,39 +709,42 @@ class coro_rpc_client { executor_(executor) {} }; - static rpc_error close_socket_async( + static void close_socket_async( std::shared_ptr control) { bool expected = false; if (!control->has_closed_.compare_exchange_strong(expected, true)) { - return {}; + return; } - auto p = std::make_unique>(); - auto future = p->getFuture(); asio::dispatch(control->executor_.get_asio_executor(), - [control, p = std::move(p)]() { + [control]() { assert(&control->executor_.get_asio_executor().context() == &control->socket_.get_executor().context()); - p->setValue(close_socket(std::move(control))); + control->has_closed_ = true; + asio::error_code ec; + control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); + control->socket_.close(ec); }); - future.wait(); - auto v = future.value(); - if (v) { - return rpc_error{errc::io_error, v.message()}; - } - else - return {}; + return; } - static std::error_code close_socket( + static async_simple::coro::Lazy close_socket( std::shared_ptr control) { - control->has_closed_ = true; bool expected = false; - asio::error_code ec; - control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); - control->socket_.close(ec); - return ec; + if (!control->has_closed_.compare_exchange_strong(expected, true)) { + co_return; + } + co_await coro_io::post([control]() { + assert(&control->executor_.get_asio_executor().context() == + &control->socket_.get_executor().context()); + control->has_closed_ = true; + asio::error_code ec; + control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); + control->socket_.close(ec); + },&control->executor_); + co_return; } + #ifdef UNIT_TEST_INJECT public: coro_rpc::err_code sync_connect(const std::string &host, @@ -890,7 +882,7 @@ class coro_rpc_client { break; } } while (true); - close_socket(controller); + close_socket_async(controller); send_err_response(controller.get(), ret.first); co_return; } @@ -920,10 +912,7 @@ class coro_rpc_client { handle_response_buffer(ret.buffer_.read_buf_, ret.errc_, has_error); if (has_error) { if (auto w = watcher.lock(); w) { - auto ec = close_socket_async(std::move(w)); - if (ec) { - co_return coro_rpc::unexpected{std::move(ec)}; - } + close_socket_async(std::move(w)); } } if (result) {