Skip to content

Commit

Permalink
fix client
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Jan 17, 2025
1 parent 2d0fa63 commit fe93d38
Showing 1 changed file with 31 additions and 42 deletions.
73 changes: 31 additions & 42 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,9 @@ class coro_rpc_client {

uint32_t get_client_id() const { return config_.client_id; }

coro_rpc::rpc_error close() {
void close() {
// ELOG_INFO << "client_id " << config_.client_id << " close";
auto ec = close_socket_async(control_);
if (ec) {
return ec;
}
else
return {};
close_socket_async(control_);
}

bool set_req_attachment(std::string_view attachment) {
Expand Down Expand Up @@ -380,25 +375,19 @@ class coro_rpc_client {
bool value = false;
};

rpc_error reset() {
auto ec = close_socket_async(control_);
if (ec) {
return ec;
}
async_simple::coro::Lazy<void> reset() {
co_await close_socket(control_);
control_->socket_ =
asio::ip::tcp::socket(control_->executor_.get_asio_executor());
control_->is_timeout_ = false;
control_->has_closed_ = false;
return {};
co_return;
}
static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; }

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl() {
if (should_reset_) {
auto ec = reset();
if (ec) {
co_return errc::io_error;
}
co_await reset();
}
else {
should_reset_ = true;
Expand Down Expand Up @@ -492,7 +481,7 @@ class coro_rpc_client {
}
if (auto self = socket_watcher.lock()) {
self->is_timeout_ = is_timeout;
close_socket(self);
close_socket_async(self);
co_return true;
}
co_return false;
Expand Down Expand Up @@ -720,39 +709,42 @@ class coro_rpc_client {
executor_(executor) {}
};

static rpc_error close_socket_async(
static void close_socket_async(
std::shared_ptr<coro_rpc_client::control_t> control) {
bool expected = false;
if (!control->has_closed_.compare_exchange_strong(expected, true)) {
return {};
return;
}
auto p = std::make_unique<async_simple::Promise<std::error_code>>();
auto future = p->getFuture();
asio::dispatch(control->executor_.get_asio_executor(),
[control, p = std::move(p)]() {
[control]() {
assert(&control->executor_.get_asio_executor().context() ==
&control->socket_.get_executor().context());
p->setValue(close_socket(std::move(control)));
control->has_closed_ = true;
asio::error_code ec;
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
control->socket_.close(ec);
});
future.wait();
auto v = future.value();
if (v) {
return rpc_error{errc::io_error, v.message()};
}
else
return {};
return;
}

static std::error_code close_socket(
static async_simple::coro::Lazy<void> close_socket(
std::shared_ptr<coro_rpc_client::control_t> control) {
control->has_closed_ = true;
bool expected = false;
asio::error_code ec;
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
control->socket_.close(ec);
return ec;
if (!control->has_closed_.compare_exchange_strong(expected, true)) {
co_return;
}
co_await coro_io::post([control]() {
assert(&control->executor_.get_asio_executor().context() ==
&control->socket_.get_executor().context());
control->has_closed_ = true;
asio::error_code ec;
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
control->socket_.close(ec);
},&control->executor_);
co_return;
}


#ifdef UNIT_TEST_INJECT
public:
coro_rpc::err_code sync_connect(const std::string &host,
Expand Down Expand Up @@ -890,7 +882,7 @@ class coro_rpc_client {
break;
}
} while (true);
close_socket(controller);
close_socket_async(controller);
send_err_response(controller.get(), ret.first);
co_return;
}
Expand Down Expand Up @@ -920,10 +912,7 @@ class coro_rpc_client {
handle_response_buffer<T>(ret.buffer_.read_buf_, ret.errc_, has_error);
if (has_error) {
if (auto w = watcher.lock(); w) {
auto ec = close_socket_async(std::move(w));
if (ec) {
co_return coro_rpc::unexpected<rpc_error>{std::move(ec)};
}
close_socket_async(std::move(w));
}
}
if (result) {
Expand Down

0 comments on commit fe93d38

Please sign in to comment.