diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index 43f536eb3..a5c656cb8 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -257,9 +257,8 @@ class client_pool : public std::enable_shared_from_this< this->weak_from_this(), clients, (std::max)(collect_time, std::chrono::milliseconds{50}), pool_config_.idle_queue_per_max_clear_count) - .via(coro_io::get_global_executor()) - .start([](auto&&) { - }); + .directlyStart([](auto&&) { + },coro_io::get_global_executor()); } } } diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 3992116ca..e26fab75c 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -429,66 +429,6 @@ async_simple::coro::Lazy -inline decltype(auto) select_impl(T &pair) { - using Func = std::tuple_element_t<1, std::remove_cvref_t>; - using ValueType = - typename std::tuple_element_t<0, std::remove_cvref_t>::ValueType; - using return_type = std::invoke_result_t>; - - auto &callback = std::get<1>(pair); - if constexpr (coro_io::is_lazy_v) { - auto executor = std::get<0>(pair).getExecutor(); - return std::make_pair( - std::move(std::get<0>(pair)), - [executor, callback = std::move(callback)](auto &&val) { - if (executor) { - callback(std::move(val)).via(executor).start([](auto &&) { - }); - } - else { - callback(std::move(val)).start([](auto &&) { - }); - } - }); - } - else { - return pair; - } -} - -template -inline auto select(T &&...args) { - return async_simple::coro::collectAny(select_impl(args)...); -} - -template -inline auto select(std::vector vec, Callback callback) { - if constexpr (coro_io::is_lazy_v) { - std::vector executors; - for (auto &lazy : vec) { - executors.push_back(lazy.getExecutor()); - } - - return async_simple::coro::collectAny( - std::move(vec), - [executors, callback = std::move(callback)](size_t index, auto &&val) { - auto executor = executors[index]; - if (executor) { - callback(index, std::move(val)).via(executor).start([](auto &&) { - }); - } - else { - callback(index, std::move(val)).start([](auto &&) { - }); - } - }); - } - else { - return async_simple::coro::collectAny(std::move(vec), std::move(callback)); - } -} - template std::pair read_some(Socket &sock, AsioBuffer &&buffer) { diff --git a/include/ylt/coro_io/detail/client_queue.hpp b/include/ylt/coro_io/detail/client_queue.hpp index 449e28d76..137d65f57 100644 --- a/include/ylt/coro_io/detail/client_queue.hpp +++ b/include/ylt/coro_io/detail/client_queue.hpp @@ -80,6 +80,7 @@ class client_queue { if (size_[index]) { std::size_t result = queue_[index].try_dequeue_bulk(fake_iter{}, max_clear_cnt); + size_[index] -= result; return result; } diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 9e02e3bbe..0e725f9eb 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -17,10 +17,11 @@ #include #include -#include #include +#include #include #include +#include #include #include #include @@ -28,6 +29,8 @@ #include #include #include + +#include "asio/dispatch.hpp" #ifdef __linux__ #include #include @@ -51,25 +54,25 @@ class ExecutorWrapper : public async_simple::Executor { using context_t = std::remove_cvref_t; virtual bool schedule(Func func) override { - if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) { - executor_.dispatch(std::move(func)); + asio::post(executor_, std::move(func)); + return true; + } + + virtual bool schedule(Func func, uint64_t hint) override { + if (hint >= + static_cast(async_simple::Executor::Priority::YIELD)) { + asio::post(executor_, std::move(func)); } else { asio::dispatch(executor_, std::move(func)); } - return true; } virtual bool checkin(Func func, void *ctx) override { using context_t = std::remove_cvref_t; auto &executor = *(context_t *)ctx; - if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) { - executor.post(std::move(func)); - } - else { - asio::dispatch(executor, std::move(func)); - } + asio::post(executor, std::move(func)); return true; } virtual void *checkout() override { return &executor_.context(); } @@ -99,6 +102,14 @@ class ExecutorWrapper : public async_simple::Executor { fn(); }); } + void schedule(Func func, Duration dur, uint64_t hint, + async_simple::Slot *slot = nullptr) override { + auto timer = std::make_unique(executor_, dur); + auto tm = timer.get(); + tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) { + fn(); + }); + } }; template diff --git a/include/ylt/coro_rpc/impl/coro_connection.hpp b/include/ylt/coro_rpc/impl/coro_connection.hpp index be52bc77c..67b6c00d8 100644 --- a/include/ylt/coro_rpc/impl/coro_connection.hpp +++ b/include/ylt/coro_rpc/impl/coro_connection.hpp @@ -122,6 +122,17 @@ context_info_t *&set_context(); class coro_connection : public std::enable_shared_from_this { public: + template + struct connection_lazy_ctx : public async_simple::coro::LazyLocalBase { + inline static char tag; + // init LazyLocalBase by unique address + connection_lazy_ctx(std::shared_ptr> info) + : LazyLocalBase(&tag), info_(std::move(info)) {} + static bool classof(const LazyLocalBase *base) { + return base->getTypeTag() == &tag; + } + std::shared_ptr> info_; + }; /*! * * @param io_context @@ -276,26 +287,30 @@ class coro_connection : public std::enable_shared_from_this { auto coro_handler = router.get_coro_handler(key); set_rpc_return_by_callback(); router.route_coro(coro_handler, payload, serialize_proto.value(), key) - .via(executor_) - .setLazyLocal((void *)context_info.get()) - .start([context_info](auto &&result) mutable { - std::pair &ret = result.value(); - if (ret.first) - AS_UNLIKELY { - ELOGI << "rpc error in function:" - << context_info->get_rpc_function_name() - << ". error code:" << ret.first.ec - << ". message : " << ret.second; - } - auto executor = context_info->conn_->get_executor(); - executor->schedule([context_info = std::move(context_info), - ret = std::move(ret)]() mutable { - context_info->conn_->template direct_response_msg( - ret.first, ret.second, context_info->req_head_, - std::move(context_info->resp_attachment_), - std::move(context_info->complete_handler_)); - }); - }); + .template setLazyLocal>( + context_info) + .directlyStart( + [context_info](auto &&result) mutable { + std::pair &ret = + result.value(); + if (ret.first) + AS_UNLIKELY { + ELOGI << "rpc error in function:" + << context_info->get_rpc_function_name() + << ". error code:" << ret.first.ec + << ". message : " << ret.second; + } + auto executor = context_info->conn_->get_executor(); + executor->schedule([context_info = std::move(context_info), + ret = std::move(ret)]() mutable { + context_info->conn_ + ->template direct_response_msg( + ret.first, ret.second, context_info->req_head_, + std::move(context_info->resp_attachment_), + std::move(context_info->complete_handler_)); + }); + }, + executor_); } else { coro_rpc::detail::set_context() = context_info.get(); diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 8ce0e8335..be718868e 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -345,7 +345,7 @@ class coro_rpc_client { void close() { // ELOG_INFO << "client_id " << config_.client_id << " close"; - close_socket(control_); + close_socket_async(control_); } bool set_req_attachment(std::string_view attachment) { @@ -375,18 +375,19 @@ class coro_rpc_client { bool value = false; }; - void reset() { - close_socket(control_); + async_simple::coro::Lazy reset() { + co_await close_socket(control_); control_->socket_ = asio::ip::tcp::socket(control_->executor_.get_asio_executor()); control_->is_timeout_ = false; control_->has_closed_ = false; + co_return; } static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; } [[nodiscard]] async_simple::coro::Lazy connect_impl() { if (should_reset_) { - reset(); + co_await reset(); } else { should_reset_ = true; @@ -413,17 +414,15 @@ class coro_rpc_client { std::error_code err_code; timer_->cancel(err_code); - if (ec) { - if (control_->is_timeout_) { - co_return errc::timed_out; - } - co_return errc::not_connected; - } - if (control_->is_timeout_) { ELOG_WARN << "client_id " << config_.client_id << " connect timeout"; co_return errc::timed_out; } + else if (ec) { + ELOG_WARN << "client_id " << config_.client_id + << " failed:" << ec.message(); + co_return errc::not_connected; + } if (config_.enable_tcp_no_delay == true) { control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec); } @@ -482,7 +481,7 @@ class coro_rpc_client { } if (auto self = socket_watcher.lock()) { self->is_timeout_ = is_timeout; - close_socket(self); + close_socket_async(self); co_return true; } co_return false; @@ -710,18 +709,40 @@ class coro_rpc_client { executor_(executor) {} }; - static void close_socket( + static void close_socket_async( std::shared_ptr control) { bool expected = false; if (!control->has_closed_.compare_exchange_strong(expected, true)) { return; } - control->executor_.schedule([control]() { - asio::error_code ignored_ec; - control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, - ignored_ec); - control->socket_.close(ignored_ec); + asio::dispatch(control->executor_.get_asio_executor(), [control]() { + assert(&control->executor_.get_asio_executor().context() == + &control->socket_.get_executor().context()); + control->has_closed_ = true; + asio::error_code ec; + control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); + control->socket_.close(ec); }); + return; + } + + static async_simple::coro::Lazy close_socket( + std::shared_ptr control) { + bool expected = false; + if (!control->has_closed_.compare_exchange_strong(expected, true)) { + co_return; + } + co_await coro_io::post( + [control = control.get()]() { + assert(&control->executor_.get_asio_executor().context() == + &control->socket_.get_executor().context()); + control->has_closed_ = true; + asio::error_code ec; + control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); + control->socket_.close(ec); + }, + &control->executor_); + co_return; } #ifdef UNIT_TEST_INJECT @@ -861,7 +882,7 @@ class coro_rpc_client { break; } } while (true); - close_socket(controller); + close_socket_async(controller); send_err_response(controller.get(), ret.first); co_return; } @@ -891,7 +912,7 @@ class coro_rpc_client { handle_response_buffer(ret.buffer_.read_buf_, ret.errc_, has_error); if (has_error) { if (auto w = watcher.lock(); w) { - close_socket(std::move(w)); + close_socket_async(std::move(w)); } } if (result) { diff --git a/include/ylt/coro_rpc/impl/protocol/coro_rpc_protocol.hpp b/include/ylt/coro_rpc/impl/protocol/coro_rpc_protocol.hpp index c22fcbc80..f06d278c7 100644 --- a/include/ylt/coro_rpc/impl/protocol/coro_rpc_protocol.hpp +++ b/include/ylt/coro_rpc/impl/protocol/coro_rpc_protocol.hpp @@ -213,9 +213,10 @@ using context = coro_rpc::context_base async_simple::coro::Lazy*> get_context_in_coro() { - auto* ctx = co_await async_simple::coro::LazyLocals{}; + auto* ctx = co_await async_simple::coro::CurrentLazyLocals< + coro_connection::connection_lazy_ctx>{}; assert(ctx != nullptr); - co_return (context_info_t*) ctx; + co_return ctx->info_.get(); } namespace detail { diff --git a/include/ylt/thirdparty/async_simple/Collect.h b/include/ylt/thirdparty/async_simple/Collect.h index 937b86333..4cb7ccc6e 100644 --- a/include/ylt/thirdparty/async_simple/Collect.h +++ b/include/ylt/thirdparty/async_simple/Collect.h @@ -16,14 +16,13 @@ #ifndef ASYNC_SIMPLE_COLLECT_H #define ASYNC_SIMPLE_COLLECT_H -#include +#ifndef ASYNC_SIMPLE_USE_MODULES #include #include -#include "async_simple/Common.h" #include "async_simple/Future.h" #include "async_simple/Try.h" -#include +#endif // ASYNC_SIMPLE_USE_MODULES namespace async_simple { diff --git a/include/ylt/thirdparty/async_simple/Common.h b/include/ylt/thirdparty/async_simple/Common.h index bde017e6d..c8e7ddd33 100644 --- a/include/ylt/thirdparty/async_simple/Common.h +++ b/include/ylt/thirdparty/async_simple/Common.h @@ -16,47 +16,11 @@ #ifndef ASYNC_SIMPLE_COMMON_H #define ASYNC_SIMPLE_COMMON_H +#ifndef ASYNC_SIMPLE_USE_MODULES #include +#include "async_simple/CommonMacros.h" -#if __has_cpp_attribute(likely) && __has_cpp_attribute(unlikely) -#define AS_LIKELY [[likely]] -#define AS_UNLIKELY [[unlikely]] -#else -#define AS_LIKELY -#define AS_UNLIKELY -#endif - -#ifdef _WIN32 -#define AS_INLINE -#else -#define AS_INLINE __attribute__((__always_inline__)) inline -#endif - -#ifdef __clang__ -#if __has_feature(address_sanitizer) -#define AS_INTERNAL_USE_ASAN 1 -#endif // __has_feature(address_sanitizer) -#endif // __clang__ - -#ifdef __GNUC__ -#ifdef __SANITIZE_ADDRESS__ // GCC -#define AS_INTERNAL_USE_ASAN 1 -#endif // __SANITIZE_ADDRESS__ -#endif // __GNUC__ - -#if defined(__alibaba_clang__) && \ - __has_cpp_attribute(ACC::coro_only_destroy_when_complete) -#define CORO_ONLY_DESTROY_WHEN_DONE [[ACC::coro_only_destroy_when_complete]] -#else -#define CORO_ONLY_DESTROY_WHEN_DONE -#endif - -#if defined(__alibaba_clang__) && \ - __has_cpp_attribute(ACC::elideable_after_await) -#define ELIDEABLE_AFTER_AWAIT [[ACC::elideable_after_await]] -#else -#define ELIDEABLE_AFTER_AWAIT -#endif +#endif // ASYNC_SIMPLE_USE_MODULES namespace async_simple { // Different from assert, logicAssert is meaningful in diff --git a/include/ylt/thirdparty/async_simple/CommonMacros.h b/include/ylt/thirdparty/async_simple/CommonMacros.h new file mode 100644 index 000000000..75f434876 --- /dev/null +++ b/include/ylt/thirdparty/async_simple/CommonMacros.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2024, Alibaba Group Holding Limited; + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ASYNC_SIMPLE_COMMON_MACROS_H +#define ASYNC_SIMPLE_COMMON_MACROS_H + +#if __has_cpp_attribute(likely) && __has_cpp_attribute(unlikely) +#define AS_LIKELY [[likely]] +#define AS_UNLIKELY [[unlikely]] +#else +#define AS_LIKELY +#define AS_UNLIKELY +#endif + +#ifdef _WIN32 +#define AS_INLINE +#else +#define AS_INLINE __attribute__((__always_inline__)) inline +#endif + +#ifdef __clang__ +#if __has_feature(address_sanitizer) +#define AS_INTERNAL_USE_ASAN 1 +#endif // __has_feature(address_sanitizer) +#endif // __clang__ + +#ifdef __GNUC__ +#ifdef __SANITIZE_ADDRESS__ // GCC +#define AS_INTERNAL_USE_ASAN 1 +#endif // __SANITIZE_ADDRESS__ +#endif // __GNUC__ + +#if defined(__alibaba_clang__) && \ + __has_cpp_attribute(ACC::coro_only_destroy_when_complete) +#define CORO_ONLY_DESTROY_WHEN_DONE [[ACC::coro_only_destroy_when_complete]] +#else +#define CORO_ONLY_DESTROY_WHEN_DONE +#endif + +#if defined(__alibaba_clang__) && \ + __has_cpp_attribute(ACC::elideable_after_await) +#define ELIDEABLE_AFTER_AWAIT [[ACC::elideable_after_await]] +#else +#define ELIDEABLE_AFTER_AWAIT +#endif + +#endif diff --git a/include/ylt/thirdparty/async_simple/Executor.h b/include/ylt/thirdparty/async_simple/Executor.h index d1edfedf8..c7aaf1967 100644 --- a/include/ylt/thirdparty/async_simple/Executor.h +++ b/include/ylt/thirdparty/async_simple/Executor.h @@ -16,14 +16,22 @@ #ifndef ASYNC_SIMPLE_EXECUTOR_H #define ASYNC_SIMPLE_EXECUTOR_H +#ifndef ASYNC_SIMPLE_USE_MODULES #include +#include #include +#include +#include +#include #include #include #include "async_simple/MoveWrapper.h" +#include "async_simple/Signal.h" #include "async_simple/experimental/coroutine.h" #include "async_simple/util/move_only_function.h" +#endif // ASYNC_SIMPLE_USE_MODULES + namespace async_simple { // Stat information for an executor. // It contains the number of pending task @@ -89,12 +97,39 @@ class Executor { // should guarantee that the func would be executed. virtual bool schedule(Func func) = 0; + // 4-bits priority, less level is more important. Default + // value of async-simple schedule is DEFAULT. For scheduling level >= + // YIELD, if executor always execute the work immediately if other + // works, it may cause dead lock. are waiting. + enum class Priority { + HIGHEST = 0x0, + DEFAULT = 0x7, + YIELD = 0x8, + LOWEST = 0xF + }; + + // Low 16-bit of schedule_info is reserved for async-simple, and the lowest + // 4-bit is stand for priority level. The implementation of scheduling logic + // isn't necessary, which is determined by implementation. However, to avoid + // spinlock/yield deadlock, when priority level >= YIELD, scheduler + // can't always execute the work immediately when other works are + // waiting. + virtual bool schedule(Func func, uint64_t schedule_info) { + return schedule(std::move(func)); + } + // Schedule a move only functor bool schedule_move_only(util::move_only_function func) { MoveWrapper tmp(std::move(func)); return schedule([func = tmp]() { func.get()(); }); } + bool schedule_move_only(util::move_only_function func, + uint64_t schedule_info) { + MoveWrapper tmp(std::move(func)); + return schedule([func = tmp]() { func.get()(); }, schedule_info); + } + // Return true if caller runs in the executor. virtual bool currentThreadInExecutor() const { throw std::logic_error("Not implemented"); @@ -119,11 +154,15 @@ class Executor { const std::string &name() const { return _name; } - // Use - // co_await executor.after(sometime) + // Use co_await executor.after(sometime) // to schedule current execution after some time. TimeAwaitable after(Duration dur); + // Use co_await executor.after(sometime) + // to schedule current execution after some time. + TimeAwaitable after(Duration dur, uint64_t schedule_info, + Slot *slot = nullptr); + // IOExecutor accepts IO read/write requests. // Return nullptr if the executor doesn't offer an IOExecutor. virtual IOExecutor *getIOExecutor() { @@ -132,8 +171,21 @@ class Executor { protected: virtual void schedule(Func func, Duration dur) { - std::thread([this, func = std::move(func), dur]() { - std::this_thread::sleep_for(dur); + return schedule(func, dur, + static_cast(Executor::Priority::DEFAULT), + nullptr); + } + virtual void schedule(Func func, Duration dur, uint64_t schedule_info, + Slot *slot = nullptr) { + std::thread([this, func = std::move(func), dur, slot]() { + auto promise = std::make_unique>(); + auto future = promise->get_future(); + bool hasnt_canceled = signalHelper{Terminate}.tryEmplace( + slot, [p = std::move(promise)](SignalType, Signal *) { + p->set_value(); + }); + if (hasnt_canceled) + future.wait_for(dur); schedule(std::move(func)); }).detach(); } @@ -145,37 +197,59 @@ class Executor { // Awaiter to implement Executor::after. class Executor::TimeAwaiter { public: - TimeAwaiter(Executor *ex, Executor::Duration dur) : _ex(ex), _dur(dur) {} + TimeAwaiter(Executor *ex, Executor::Duration dur, uint64_t schedule_info, + Slot *slot) + : _ex(ex), _dur(dur), _schedule_info(schedule_info), _slot(slot) {} public: - bool await_ready() const noexcept { return false; } + bool await_ready() const noexcept { + return signalHelper{Terminate}.hasCanceled(_slot); + } template - void await_suspend(std::coroutine_handle continuation) { - _ex->schedule(std::move(continuation), _dur); + void await_suspend(coro::CoroHandle continuation) { + _ex->schedule(std::move(continuation), _dur, _schedule_info, _slot); + } + void await_resume() { + signalHelper{Terminate}.checkHasCanceled( + _slot, "async_simple's timer is canceled!"); } - void await_resume() const noexcept {} private: Executor *_ex; Executor::Duration _dur; + uint64_t _schedule_info; + Slot *_slot; }; // Awaitable to implement Executor::after. class Executor::TimeAwaitable { public: - TimeAwaitable(Executor *ex, Executor::Duration dur) : _ex(ex), _dur(dur) {} + TimeAwaitable(Executor *ex, Executor::Duration dur, uint64_t schedule_info, + Slot *slot) + : _ex(ex), _dur(dur), _schedule_info(schedule_info), _slot(slot) {} - auto coAwait(Executor *) { return Executor::TimeAwaiter(_ex, _dur); } + auto coAwait(Executor *) { + return Executor::TimeAwaiter(_ex, _dur, _schedule_info, _slot); + } private: Executor *_ex; Executor::Duration _dur; + uint64_t _schedule_info; + Slot *_slot; }; Executor::TimeAwaitable inline Executor::after(Executor::Duration dur) { - return Executor::TimeAwaitable(this, dur); -}; + return Executor::TimeAwaitable( + this, dur, static_cast(Executor::Priority::DEFAULT), nullptr); +} + +Executor::TimeAwaitable inline Executor::after(Executor::Duration dur, + uint64_t schedule_info, + Slot *slot) { + return Executor::TimeAwaitable(this, dur, schedule_info, slot); +} } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/Future.h b/include/ylt/thirdparty/async_simple/Future.h index 014e9578a..b54192ffd 100644 --- a/include/ylt/thirdparty/async_simple/Future.h +++ b/include/ylt/thirdparty/async_simple/Future.h @@ -16,13 +16,18 @@ #ifndef ASYNC_SIMPLE_FUTURE_H #define ASYNC_SIMPLE_FUTURE_H +#ifndef ASYNC_SIMPLE_USE_MODULES +#include +#include #include + #include "async_simple/Executor.h" #include "async_simple/FutureState.h" #include "async_simple/LocalState.h" -#include "async_simple/Promise.h" #include "async_simple/Traits.h" +#endif // ASYNC_SIMPLE_USE_MODULES + namespace async_simple { template diff --git a/include/ylt/thirdparty/async_simple/FutureState.h b/include/ylt/thirdparty/async_simple/FutureState.h index 7f4958399..c411a6d80 100644 --- a/include/ylt/thirdparty/async_simple/FutureState.h +++ b/include/ylt/thirdparty/async_simple/FutureState.h @@ -16,20 +16,19 @@ #ifndef ASYNC_SIMPLE_FUTURESTATE_H #define ASYNC_SIMPLE_FUTURESTATE_H +#ifndef ASYNC_SIMPLE_USE_MODULES #include -#include - #include -#include -#include -#include +#include #include -#include + #include "async_simple/Common.h" #include "async_simple/Executor.h" #include "async_simple/Try.h" #include "async_simple/util/move_only_function.h" +#endif // ASYNC_SIMPLE_USE_MODULES + namespace async_simple { // Details about the State of Future/Promise, diff --git a/include/ylt/thirdparty/async_simple/IOExecutor.h b/include/ylt/thirdparty/async_simple/IOExecutor.h index 4facbbce3..f8759ed7b 100644 --- a/include/ylt/thirdparty/async_simple/IOExecutor.h +++ b/include/ylt/thirdparty/async_simple/IOExecutor.h @@ -16,8 +16,11 @@ #ifndef ASYNC_SIMPLE_IO_EXECUTOR_H #define ASYNC_SIMPLE_IO_EXECUTOR_H +#ifndef ASYNC_SIMPLE_USE_MODULES +#include #include -#include + +#endif // ASYNC_SIMPLE_USE_MODULES namespace async_simple { diff --git a/include/ylt/thirdparty/async_simple/LocalState.h b/include/ylt/thirdparty/async_simple/LocalState.h index 297de380c..b414af4ac 100644 --- a/include/ylt/thirdparty/async_simple/LocalState.h +++ b/include/ylt/thirdparty/async_simple/LocalState.h @@ -17,18 +17,15 @@ #ifndef ASYNC_SIMPLE_LOCALSTATE_H #define ASYNC_SIMPLE_LOCALSTATE_H -#include - -#include +#ifndef ASYNC_SIMPLE_USE_MODULES #include -#include -#include -#include #include -#include "async_simple/Common.h" + #include "async_simple/Executor.h" #include "async_simple/Try.h" +#endif // ASYNC_SIMPLE_USE_MODULES + namespace async_simple { // A component of Future/Promise. LocalState is owned by diff --git a/include/ylt/thirdparty/async_simple/MoveWrapper.h b/include/ylt/thirdparty/async_simple/MoveWrapper.h index 0fbfb9b4a..af55902c7 100644 --- a/include/ylt/thirdparty/async_simple/MoveWrapper.h +++ b/include/ylt/thirdparty/async_simple/MoveWrapper.h @@ -17,9 +17,10 @@ #ifndef ASYNC_SIMPLE_MOVEWRAPPER_H #define ASYNC_SIMPLE_MOVEWRAPPER_H -#include +#ifndef ASYNC_SIMPLE_USE_MODULES +#include -#include "async_simple/Common.h" +#endif // ASYNC_SIMPLE_USE_MODULES namespace async_simple { @@ -27,23 +28,23 @@ namespace async_simple { // copy as move. template class MoveWrapper { - public: - MoveWrapper() = default; - MoveWrapper(T&& value) : _value(std::move(value)) {} +public: + MoveWrapper() = default; + MoveWrapper(T&& value) : _value(std::move(value)) {} - MoveWrapper(const MoveWrapper& other) : _value(std::move(other._value)) {} - MoveWrapper(MoveWrapper&& other) : _value(std::move(other._value)) {} + MoveWrapper(const MoveWrapper& other) : _value(std::move(other._value)) {} + MoveWrapper(MoveWrapper&& other) : _value(std::move(other._value)) {} - MoveWrapper& operator=(const MoveWrapper&) = delete; - MoveWrapper& operator=(MoveWrapper&&) = delete; + MoveWrapper& operator=(const MoveWrapper&) = delete; + MoveWrapper& operator=(MoveWrapper&&) = delete; - T& get() { return _value; } - const T& get() const { return _value; } + T& get() { return _value; } + const T& get() const { return _value; } - ~MoveWrapper() {} + ~MoveWrapper() {} - private: - mutable T _value; +private: + mutable T _value; }; } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/Promise.h b/include/ylt/thirdparty/async_simple/Promise.h index 083e840ca..985fc42b5 100644 --- a/include/ylt/thirdparty/async_simple/Promise.h +++ b/include/ylt/thirdparty/async_simple/Promise.h @@ -16,10 +16,13 @@ #ifndef ASYNC_SIMPLE_PROMISE_H #define ASYNC_SIMPLE_PROMISE_H +#ifndef ASYNC_SIMPLE_USE_MODULES #include #include "async_simple/Common.h" #include "async_simple/Future.h" +#endif // ASYNC_SIMPLE_USE_MODULES + namespace async_simple { template diff --git a/include/ylt/thirdparty/async_simple/Signal.h b/include/ylt/thirdparty/async_simple/Signal.h new file mode 100644 index 000000000..8f5dfd302 --- /dev/null +++ b/include/ylt/thirdparty/async_simple/Signal.h @@ -0,0 +1,493 @@ +/* + * Copyright (c) 2024, Alibaba Group Holding Limited; + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef ASYNC_SIMPLE_SIGNAL_H +#define ASYNC_SIMPLE_SIGNAL_H + +#ifndef ASYNC_SIMPLE_USE_MODULES + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_simple/Common.h" +#include "util/move_only_function.h" +#endif // ASYNC_SIMPLE_USE_MODULES + +namespace async_simple { + +enum SignalType : uint64_t { + None = 0, + // low 32 bit signal only trigger once. + // regist those signal's handler after triggered will failed and return + // false. + Terminate = 1, // signal type: terminate + // 1-16 bits reserve for async-simple + // 17-32 bits could used for user-defined signal + + // high 32 bit signal, could trigger multiple times, emplace an handler + // 33-48 bits reserve for async-simple + // 49-64 bits could used for user-defined signal + // after signal triggered will always success and return true. + All = UINT64_MAX, +}; + +class Slot; + +namespace detail { +struct SignalSlotSharedState; +constexpr uint64_t high32bits_mask = uint64_t{UINT32_MAX} << 32; +constexpr uint64_t MinMultiTriggerSignal = 1ull << 32; +} // namespace detail + +// The Signal type is used to emit a signal, whereas a Slot is used to +// receive signals. We can create a signal using factory methods and bind +// multiple Slots to the same Signal. When the Signal emits a signal, all +// bound `Slot`s will receive the corresponding signal. +// The Signal's life-time is not shorter than the bound Slots. Because those +// slots owns a shared_ptr of signal. +class Signal : public std::enable_shared_from_this { + friend class Slot; + friend struct detail::SignalSlotSharedState; + +private: + // A list to manage different type of signal's handler. + detail::SignalSlotSharedState& registSlot(SignalType filter); + SignalType UpdateState(std::atomic& state, SignalType type) { + uint64_t expected = state.load(std::memory_order_acquire); + uint64_t validSignal; + do { + validSignal = (expected | type) ^ expected; + } while (validSignal && + !state.compare_exchange_weak(expected, expected | type, + std::memory_order_release)); + // high 32 bits signal is always valid, they can be trigger multiple + // times. low 32 bits signal only trigger once. + return static_cast(validSignal | + (type & detail::high32bits_mask)); + } + + struct PrivateConstructTag {}; + +public: + Signal(PrivateConstructTag){}; + // Same type Signal can only trigger once. This function emit a signal to + // binding slots, then execute the slot callback functions. It will return + // the signal which success triggered. If no signal success triggger, return + // SignalType::none. + SignalType emit(SignalType state) noexcept; + + // Return now signal type. + SignalType state() const noexcept { + return static_cast(_state.load(std::memory_order_acquire)); + } + // Create Signal by Factory function. + // + template + static std::shared_ptr create() { + static_assert(std::is_base_of_v, + "T should be signal or derived from Signal"); + return std::make_shared(PrivateConstructTag{}); + } + virtual ~Signal(); + +private: + // Default _state is zero. It's value means what signal type has triggered. + // It was updated by UpdateState(). If a signal is low 32bits, it can't emit + // multiple times. + std::atomic _state; + std::atomic _slotsHead; +}; +namespace detail { +// Each Slot has a reference to SignalSlotSharedState. +// SignalSlotSharedState lifetime is managed by Signal. +// It will destructed when Signal destructed. +struct SignalSlotSharedState { + // The type of Signal Handler + using Handler = util::move_only_function; + // The type of Transfer Signal Handler + using ChainedSignalHandler = util::move_only_function; + struct HandlerManager { + uint8_t _index; + std::atomic _handler; + HandlerManager* _next; + HandlerManager(uint8_t index, HandlerManager* next) + : _index(index), _next(next) {} + void releaseNodes() { + for (HandlerManager* tmp = _next; tmp != nullptr; tmp = _next) { + _next = _next->_next; + delete tmp; + } + delete this; + } + ~HandlerManager(); + static inline Handler emittedTag; + }; + static bool isMultiTriggerSignal(SignalType type) { + return type >= MinMultiTriggerSignal; + } + SignalSlotSharedState(SignalType filter) + : _handlerManager(), _filter(filter), _next(nullptr) {} + ~SignalSlotSharedState(); + void releaseNodes() { + for (SignalSlotSharedState *node = this->_next, *next_node; + node != nullptr; node = next_node) { + next_node = node->_next; + delete node; + } + delete this; + } + bool triggered(SignalType state) const noexcept { + return static_cast(state) & + static_cast(_filter.load(std::memory_order_acquire)); + } + void setFilter(SignalType filter) noexcept { + _filter.store(filter, std::memory_order_release); + } + SignalType getFilter() const noexcept { + return _filter.load(std::memory_order_acquire); + } + void invoke(SignalType type, SignalType signal, Signal* self, + std::atomic& handlerPtr); + void operator()(SignalType type, Signal* self); + std::atomic _handlerManager; + std::atomic _chainedSignalHandler; + std::atomic _filter; + SignalSlotSharedState* _next; +}; +} // namespace detail + +// `Slot` is used to receive signals for an asynchronous task. We can create a +// signal through factory methods and bind multiple `Slot`s to the same +// `Signal`. When a `Signal` is triggered, all `Slot`s will receive the +// corresponding signal. When construct Slot, we should bing it to a signal and +// extend the signal's life-time by own a shared_ptr of signal. Since each +// asynchronous task should own its own `Slot`, slot objects are not +// thread-safe. We prohibit the concurrent invocation of the public interface of +// the same `Slot`. + +class Slot { + friend class Signal; + +public: + // regist a slot to signal + Slot(Signal* signal, SignalType filter = SignalType::All) + : _signal(signal->shared_from_this()), + _node(signal->registSlot(filter)){}; + Slot(const Slot&) = delete; + +public: + // Register a signal handler. Returns false if the signal has + // already been triggered(and signal can't trigger twice). + template + [[nodiscard]] bool emplace(SignalType type, Args&&... args) { + static_assert(sizeof...(args) >= 1, + "we dont allow emplace an empty signal handler"); + logicAssert(std::popcount(static_cast(type)) == 1, + "It's not allow to emplace for multiple signals"); + // trigger-once signal has already been triggered + if (!detail::SignalSlotSharedState::isMultiTriggerSignal(type) && + (signal()->state() & type)) { + return false; + } + auto handler = std::make_unique( + std::forward(args)...); + auto oldHandlerPtr = loadHandler(type); + auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire); + if (oldHandler == + &detail::SignalSlotSharedState::HandlerManager::emittedTag) { + return false; + } + auto new_handler = handler.release(); + if (!oldHandlerPtr->compare_exchange_strong( + oldHandler, new_handler, std::memory_order_release)) { + // if slot is triggered and new emplace handler doesn't exec, return + // false + assert(oldHandler == + &detail::SignalSlotSharedState::HandlerManager::emittedTag); + delete new_handler; + return false; + } + delete oldHandler; + return true; + } + // Clear the signal handler. If return false, it indicates + // that the signal handler has been executed or handler is empty + bool clear(SignalType type) { + logicAssert(std::popcount(static_cast(type)) == 1, + "It's not allow to emplace for multiple signals"); + auto oldHandlerPtr = loadHandler(type); + if (oldHandlerPtr == nullptr) { + return false; + } + auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire); + if (oldHandler == + &detail::SignalSlotSharedState::HandlerManager::emittedTag || + oldHandler == nullptr) { + return false; + } + if (!oldHandlerPtr->compare_exchange_strong( + oldHandler, nullptr, std::memory_order_release)) { + assert(oldHandler == + &detail::SignalSlotSharedState::HandlerManager::emittedTag); + return false; + } + delete oldHandler; + return true; + } + + class FilterGuard { + friend class Slot; + + public: + ~FilterGuard() noexcept { _slot->_node.setFilter(_oldFilter); } + + private: + FilterGuard(Slot* slot, SignalType newFilter) noexcept + : _slot(slot), _oldFilter(_slot->getFilter()) { + auto filter = static_cast(_oldFilter) & + static_cast(newFilter); + _slot->_node.setFilter(static_cast(filter)); + } + Slot* _slot; + SignalType _oldFilter; + }; + + // Filter signals within the specified scope. If signal type & filter is 0, + // then the signal type will not be triggered within this scope. Nested + // filters are allowed. + // The returned guard object can be used to restore the previous filter. + // It's life-time must shorter than Slot. + [[nodiscard]] FilterGuard setScopedFilter(SignalType filter) noexcept { + return FilterGuard{this, filter}; + } + // Set the current scope's filter. + void setFilter(SignalType filter) noexcept { _node.setFilter(filter); } + + // Get the current scope's filter. + SignalType getFilter() const noexcept { return _node.getFilter(); } + + // Check whether the filtered signal has emitted. + // if return true, the signal has emitted(but handler may hasn't execute + // now) + bool hasTriggered(SignalType type) const noexcept { + return _node.triggered( + static_cast(signal()->state() & type)); + } + bool canceled() const noexcept { + return hasTriggered(SignalType::Terminate); + } + + // The slot holds ownership of the corresponding signal, so the signal's + // lifetime is always no shorter than the slot's. To extend the signal's + // lifetime, you can call signal()->shared_from_this(), or start a new + // coroutine with the signal. + Signal* signal() const noexcept { return _signal.get(); } + + // bind a chained signal to this slot. all triggered signal will transfer to + // this chainedSignal. + void chainedSignal(Signal* chainedSignal) { + if (chainedSignal == nullptr) { + _node._chainedSignalHandler = nullptr; + return; + } + + _node._chainedSignalHandler.store( + new detail::SignalSlotSharedState::ChainedSignalHandler( + [chainedSignal = + chainedSignal->weak_from_this()](SignalType type) { + if (auto signal = chainedSignal.lock(); signal != nullptr) { + signal->emit(type); + } + }), + std::memory_order_release); + } + ~Slot() { + delete _node._chainedSignalHandler.exchange(nullptr, + std::memory_order_release); + } + +private: + template + std::atomic* loadHandler( + SignalType type) { + uint8_t index = std::countr_zero(static_cast(type)); + if (auto iter = _handlerTables.find(index); + iter != _handlerTables.end()) { + return iter->second; + } + if constexpr (!should_emplace) { + return nullptr; + } else { + auto handlerManager = + _node._handlerManager.load(std::memory_order_acquire); + auto newManager = new detail::SignalSlotSharedState::HandlerManager( + index, handlerManager); + // It's ok here, write only has once thread + _node._handlerManager.store(newManager, std::memory_order_release); + _handlerTables.emplace(index, &newManager->_handler); + return &newManager->_handler; + } + } + + // TODO: may be flatmap better? + std::unordered_map*> + _handlerTables; + std::shared_ptr _signal; + detail::SignalSlotSharedState& _node; +}; + +class SignalException : public std::runtime_error { +private: + SignalType _signal; + +public: + explicit SignalException(SignalType signal, std::string msg = "") + : std::runtime_error(std::move(msg)), _signal(signal) {} + SignalType value() const { return _signal; } +}; + +// those helper cancel functions are used for coroutine await_resume, +// await_suspend & await_ready + +struct signalHelper { + signalHelper(SignalType sign) : _sign(sign) {} + template + [[nodiscard]] bool tryEmplace(Slot* slot, Args&&... args) noexcept { + if (slot && + !slot->emplace(_sign, + std::forward(args)...)) { // has canceled + return false; + } + return true; + } + + void checkHasCanceled(Slot* slot, std::string error_msg = "") { + if (slot && !slot->clear(_sign) && slot->canceled()) { + if (error_msg.empty()) { + error_msg = + "async-simple signal triggered and throw exception, signal " + "no: " + + std::to_string(std::countr_zero(uint64_t{_sign})); + } + // TODO: may add option for user to wait signal handler finished? + throw SignalException{_sign, std::move(error_msg)}; + } + } + + bool hasCanceled(const Slot* slot) noexcept { + return slot && slot->hasTriggered(_sign); + } + SignalType _sign; +}; + +inline void detail::SignalSlotSharedState::invoke( + SignalType type, SignalType signal, Signal* self, + std::atomic& handlerPtr) { + if (!isMultiTriggerSignal(type)) { + if (auto* handler = handlerPtr.exchange( + &SignalSlotSharedState::HandlerManager::emittedTag, + std::memory_order_acq_rel); + handler != nullptr) { + (*handler)(signal, self); + delete handler; + } + } else { + if (auto* handler = handlerPtr.load(std::memory_order_acquire); + handler != nullptr) { + (*handler)(signal, self); + } + } +} + +inline void detail::SignalSlotSharedState::operator()(SignalType type, + Signal* self) { + uint64_t state = type & _filter.load(std::memory_order_acquire); + auto handlerManager = _handlerManager.load(std::memory_order_acquire); + if (auto* handler = _chainedSignalHandler.load(std::memory_order_acquire); + handler != nullptr && state) { + (*handler)(static_cast(state)); + } + while (state && handlerManager) { + SignalType nowSignal = + static_cast(uint64_t{1} << handlerManager->_index); + if (nowSignal & state) { + invoke(nowSignal, static_cast(state), self, + handlerManager->_handler); + state ^= nowSignal; + } + handlerManager = handlerManager->_next; + } +} + +inline detail::SignalSlotSharedState::~SignalSlotSharedState() { + if (auto manager = _handlerManager.load(std::memory_order_acquire); + manager) { + manager->releaseNodes(); + } +} + +inline SignalType Signal::emit(SignalType state) noexcept { + if (state != SignalType::None) { + SignalType vaildSignal = UpdateState(_state, state); + if (vaildSignal) { + for (detail::SignalSlotSharedState* node = + _slotsHead.load(std::memory_order_acquire); + node != nullptr; node = node->_next) { + (*node)(vaildSignal, this); + } + } + return vaildSignal; + } + return SignalType::None; +} + +inline detail::SignalSlotSharedState& Signal::registSlot(SignalType filter) { + auto next_node = _slotsHead.load(std::memory_order_acquire); + auto* node = new detail::SignalSlotSharedState{filter}; + node->_next = next_node; + while (!_slotsHead.compare_exchange_weak(node->_next, node, + std::memory_order_release, + std::memory_order_relaxed)) + ; + return *node; +} + +inline Signal::~Signal() { + if (auto node = _slotsHead.load(std::memory_order_acquire); + node != nullptr) { + node->releaseNodes(); + } +} + +inline detail::SignalSlotSharedState::HandlerManager::~HandlerManager() { + auto handler = _handler.load(std::memory_order_acquire); + if (handler != &HandlerManager::emittedTag) { + delete _handler; + } +} + +} // namespace async_simple + +#endif // ASYNC_SIMPLE_SIGNAL_H \ No newline at end of file diff --git a/include/ylt/thirdparty/async_simple/Traits.h b/include/ylt/thirdparty/async_simple/Traits.h index 60b32a09a..409ae3d03 100644 --- a/include/ylt/thirdparty/async_simple/Traits.h +++ b/include/ylt/thirdparty/async_simple/Traits.h @@ -16,10 +16,10 @@ #ifndef ASYNC_SIMPLE_TRAITS_H #define ASYNC_SIMPLE_TRAITS_H -#include -#include "async_simple/Common.h" +#ifndef ASYNC_SIMPLE_USE_MODULES #include "async_simple/Try.h" -#include "async_simple/Unit.h" + +#endif // ASYNC_SIMPLE_USE_MODULES namespace async_simple { diff --git a/include/ylt/thirdparty/async_simple/Try.h b/include/ylt/thirdparty/async_simple/Try.h index 8ec401893..2a13d544e 100644 --- a/include/ylt/thirdparty/async_simple/Try.h +++ b/include/ylt/thirdparty/async_simple/Try.h @@ -16,15 +16,17 @@ #ifndef ASYNC_SIMPLE_TRY_H #define ASYNC_SIMPLE_TRY_H +#ifndef ASYNC_SIMPLE_USE_MODULES #include #include #include -#include #include #include #include "async_simple/Common.h" #include "async_simple/Unit.h" +#endif // ASYNC_SIMPLE_USE_MODULES + namespace async_simple { // Forward declaration @@ -70,8 +72,7 @@ class Try { } template - Try(U&&... value) - requires std::is_constructible_v + Try(U&&... value) requires std::is_constructible_v : _value(std::in_place_type, std::forward(value)...) {} Try(std::exception_ptr error) : _value(error) {} @@ -174,7 +175,7 @@ class Try { bool hasError() const { return _error.operator bool(); } void setException(std::exception_ptr error) { _error = error; } - std::exception_ptr getException() { return _error; } + std::exception_ptr getException() const { return _error; } private: std::exception_ptr _error; diff --git a/include/ylt/thirdparty/async_simple/Unit.h b/include/ylt/thirdparty/async_simple/Unit.h index 4217282d9..6989300db 100644 --- a/include/ylt/thirdparty/async_simple/Unit.h +++ b/include/ylt/thirdparty/async_simple/Unit.h @@ -16,10 +16,6 @@ #ifndef ASYNC_SIMPLE_UNIT_H #define ASYNC_SIMPLE_UNIT_H -#include -#include "async_simple/Common.h" -#include "async_simple/Try.h" - namespace async_simple { // Unit plays the role of a simplest type in case we couldn't diff --git a/include/ylt/thirdparty/async_simple/async_simple.cppm b/include/ylt/thirdparty/async_simple/async_simple.cppm new file mode 100644 index 000000000..3d3a9df45 --- /dev/null +++ b/include/ylt/thirdparty/async_simple/async_simple.cppm @@ -0,0 +1,77 @@ +// There unhandled macro uses found in the body: +// 'CPU_SETSIZE' defined in /usr/include/sched.h:82:10 +// 'CPU_ISSET' defined in /usr/include/sched.h:85:10 +// 'CPU_ZERO' defined in /usr/include/sched.h:87:10 +// 'CPU_SET' defined in /usr/include/sched.h:83:10 +module; +// WARNING: Detected unhandled non interesting includes. +// It is not suggested mix includes and imports from the compiler's +// perspective. Since it may introduce redeclarations within different +// translation units and the compiler is not able to handle such patterns +// efficiently. +// +// See +// https://clang.llvm.org/docs/StandardCPlusPlusModules.html#performance-tips +#include +#include +#include +#include + +#ifdef __linux__ +#include +#endif + +export module async_simple; +import std; +#define ASYNC_SIMPLE_USE_MODULES +export extern "C++" { + #include "util/move_only_function.h" + #include "coro/Traits.h" + #include "MoveWrapper.h" + #include "experimental/coroutine.h" + #include "Executor.h" + #include "CommonMacros.h" + #include "Common.h" + #include "Unit.h" + #include "Try.h" + #include "FutureState.h" + #include "LocalState.h" + #include "Traits.h" + #include "Future.h" + #include "Promise.h" + #include "coro/DetachedCoroutine.h" + #include "coro/ViaCoroutine.h" + #include "coro/Lazy.h" + #include "uthread/internal/thread_impl.h" + #include "uthread/Await.h" + #include "uthread/Latch.h" + #include "uthread/internal/thread.h" + #include "uthread/Uthread.h" + #include "uthread/Async.h" + #include "coro/ConditionVariable.h" + #include "coro/SpinLock.h" + #include "coro/Latch.h" + #include "coro/CountEvent.h" + #include "coro/Collect.h" + #include "IOExecutor.h" + #include "coro/SharedMutex.h" + #include "uthread/Collect.h" + #include "executors/SimpleIOExecutor.h" + #include "coro/Mutex.h" + #include "Collect.h" + #include "util/Condition.h" + #include "coro/Dispatch.h" + #include "coro/Sleep.h" + #include "util/Queue.h" + #include "util/ThreadPool.h" + #include "coro/ResumeBySchedule.h" + #include "coro/FutureAwaiter.h" + #include "coro/SyncAwait.h" + #include "executors/SimpleExecutor.h" + #include "coro/Semaphore.h" + // There are some bugs in clang lower versions. +#if defined(__clang_major__) && __clang_major__ >= 17 + #include "coro/PromiseAllocator.h" + #include "coro/Generator.h" +#endif +} diff --git a/include/ylt/thirdparty/async_simple/coro/Collect.h b/include/ylt/thirdparty/async_simple/coro/Collect.h index 16f239672..cb224556a 100644 --- a/include/ylt/thirdparty/async_simple/coro/Collect.h +++ b/include/ylt/thirdparty/async_simple/coro/Collect.h @@ -16,21 +16,26 @@ #ifndef ASYNC_SIMPLE_CORO_COLLECT_H #define ASYNC_SIMPLE_CORO_COLLECT_H +#ifndef ASYNC_SIMPLE_USE_MODULES #include #include #include -#include +#include #include #include #include #include #include "async_simple/Common.h" +#include "async_simple/Signal.h" #include "async_simple/Try.h" #include "async_simple/Unit.h" #include "async_simple/coro/CountEvent.h" #include "async_simple/coro/Lazy.h" +#include "async_simple/coro/LazyLocalBase.h" #include "async_simple/experimental/coroutine.h" +#endif // ASYNC_SIMPLE_USE_MODULES + namespace async_simple { namespace coro { @@ -48,12 +53,17 @@ namespace detail { template struct CollectAnyResult { CollectAnyResult() : _idx(static_cast(-1)), _value() {} - CollectAnyResult(size_t idx, std::add_rvalue_reference_t value) requires( - !std::is_void_v) + CollectAnyResult(size_t idx, std::add_rvalue_reference_t value) : _idx(idx), _value(std::move(value)) {} CollectAnyResult(const CollectAnyResult&) = delete; CollectAnyResult& operator=(const CollectAnyResult&) = delete; + CollectAnyResult& operator=(CollectAnyResult&& other) { + _idx = std::move(other._idx); + _value = std::move(other._value); + other._idx = static_cast(-1); + return *this; + } CollectAnyResult(CollectAnyResult&& other) : _idx(std::move(other._idx)), _value(std::move(other._value)) { other._idx = static_cast(-1); @@ -84,177 +94,124 @@ struct CollectAnyResult { #endif }; -template +template <> +struct CollectAnyResult { + CollectAnyResult() : _idx(static_cast(-1)), _value() {} + CollectAnyResult(size_t idx) : _idx(idx) {} + + CollectAnyResult(const CollectAnyResult&) = delete; + CollectAnyResult& operator=(const CollectAnyResult&) = delete; + CollectAnyResult& operator=(CollectAnyResult&& other) { + _idx = std::move(other._idx); + _value = std::move(other._value); + other._idx = static_cast(-1); + return *this; + } + CollectAnyResult(CollectAnyResult&& other) + : _idx(std::move(other._idx)), _value(std::move(other._value)) { + other._idx = static_cast(-1); + } + + size_t _idx; + Try _value; + + size_t index() const { return _idx; } + + bool hasError() const { return _value.hasError(); } + // Require hasError() == true. Otherwise it is UB to call + // this method. + std::exception_ptr getException() const { return _value.getException(); } +}; + +template struct CollectAnyAwaiter { using ValueType = typename LazyType::ValueType; using ResultType = CollectAnyResult; - CollectAnyAwaiter(std::vector&& input) - : _input(std::move(input)), _result(nullptr) {} - - CollectAnyAwaiter(std::vector&& input, Callback callback) - : _input(std::move(input)), + CollectAnyAwaiter(Slot* slot, SignalType SignalType, + std::vector&& input) + : _slot(slot), + _SignalType(SignalType), _result(nullptr), - _callback(std::move(callback)) {} + _input(std::move(input)) {} CollectAnyAwaiter(const CollectAnyAwaiter&) = delete; CollectAnyAwaiter& operator=(const CollectAnyAwaiter&) = delete; CollectAnyAwaiter(CollectAnyAwaiter&& other) - : _input(std::move(other._input)), + : _slot(other._slot), + _SignalType(other._SignalType), _result(std::move(other._result)), - _callback(std::move(other._callback)) {} + _input(std::move(other._input)) {} bool await_ready() const noexcept { - return _input.empty() || - (_result && _result->_idx != static_cast(-1)); + return _input.empty() || signalHelper{Terminate}.hasCanceled(_slot); } - void await_suspend(std::coroutine_handle<> continuation) { + bool await_suspend(std::coroutine_handle<> continuation) { auto promise_type = std::coroutine_handle::from_address( continuation.address()) .promise(); + auto executor = promise_type._executor; - // we should take care of input's life-time after resume. - std::vector input(std::move(_input)); - // Make local copies to shared_ptr to avoid deleting objects too early - // if any coroutine finishes before this function. - auto result = std::make_shared(); - auto event = std::make_shared(input.size()); - auto callback = std::move(_callback); - - _result = result; - for (size_t i = 0; - i < input.size() && (result->_idx == static_cast(-1)); - ++i) { - if (!input[i]._coro.promise()._executor) { - input[i]._coro.promise()._executor = executor; - } - if constexpr (std::is_same_v) { - (void)callback; - input[i].start([i, size = input.size(), r = result, - c = continuation, - e = event](Try&& result) mutable { - assert(e != nullptr); + // Make local copies to shared_ptr to avoid deleting objects too early + // if coroutine resume before this function. + auto input = std::move(_input); + auto event = std::make_shared(input.size() + 1); + auto signal = Signal::create(); + if (_slot) + _slot->chainedSignal(signal.get()); + if (!signalHelper{Terminate}.tryEmplace( + _slot, [c = continuation, e = event, size = input.size()]( + SignalType type, Signal*) mutable { auto count = e->downCount(); - if (count == size + 1) { - r->_idx = i; - r->_value = std::move(result); + if (count > size + 1) { c.resume(); } - }); - } else { - input[i].start([i, size = input.size(), r = result, - c = continuation, e = event, - callback](Try&& result) mutable { + })) { // has canceled + return false; + } + + for (size_t i = 0; i < input.size(); ++i) { + if (!input[i]._coro.promise()._executor) { + input[i]._coro.promise()._executor = executor; + } + std::unique_ptr local; + local = std::make_unique(signal.get()); + input[i]._coro.promise()._lazy_local = local.get(); + input[i].start( + [this, i, size = input.size(), c = continuation, e = event, + local = std::move(local)](Try&& result) mutable { assert(e != nullptr); auto count = e->downCount(); - if (count == size + 1) { - r->_idx = i; - (void)(*callback)(i, std::move(result)); + // n+1: n coro + 1 cancel handler + if (count > size + 1) { + _result = std::make_unique(); + _result->_idx = i; + _result->_value = std::move(result); + if (auto ptr = local->getSlot(); ptr) { + ptr->signal()->emit(_SignalType); + } c.resume(); } }); - } } // end for + return true; } auto await_resume() { - if constexpr (std::is_same_v) { - assert(_result != nullptr); - return std::move(*_result); - } else { - return _result->index(); + signalHelper{Terminate}.checkHasCanceled( + _slot, "async_simple::CollectAny is canceled!"); + if (_result == nullptr) { + return ResultType{}; } + return std::move(*_result); } + Slot* _slot; + SignalType _SignalType; + std::unique_ptr _result; std::vector _input; - std::shared_ptr _result; - [[no_unique_address]] Callback _callback; -}; - -template -struct CollectAnyVariadicPairAwaiter { - using InputType = std::tuple; - - CollectAnyVariadicPairAwaiter(Ts&&... inputs) - : _input(std::move(inputs)...), _result(nullptr) {} - - CollectAnyVariadicPairAwaiter(InputType&& inputs) - : _input(std::move(inputs)), _result(nullptr) {} - - CollectAnyVariadicPairAwaiter(const CollectAnyVariadicPairAwaiter&) = - delete; - CollectAnyVariadicPairAwaiter& operator=( - const CollectAnyVariadicPairAwaiter&) = delete; - CollectAnyVariadicPairAwaiter(CollectAnyVariadicPairAwaiter&& other) - : _input(std::move(other._input)), _result(std::move(other._result)) {} - - bool await_ready() const noexcept { - return _result && _result->has_value(); - } - - void await_suspend(std::coroutine_handle<> continuation) { - auto promise_type = - std::coroutine_handle::from_address( - continuation.address()) - .promise(); - auto executor = promise_type._executor; - auto event = - std::make_shared(std::tuple_size()); - auto result = std::make_shared>(); - _result = result; - - auto input = std::move(_input); - - [&](std::index_sequence) { - ( - [&](auto& lazy, auto& callback) { - if (result->has_value()) { - return; - } - - if (!lazy._coro.promise()._executor) { - lazy._coro.promise()._executor = executor; - } - - lazy.start([result, event, continuation, - callback](auto&& res) mutable { - auto count = event->downCount(); - if (count == std::tuple_size() + 1) { - (void)callback(std::move(res)); - *result = I; - continuation.resume(); - } - }); - }(std::get<0>(std::get(input)), - std::get<1>(std::get(input))), - ...); - } - (std::make_index_sequence()); - } - - auto await_resume() { - assert(_result != nullptr); - return std::move(_result->value()); - } - - std::tuple _input; - std::shared_ptr> _result; -}; - -template -struct SimpleCollectAnyVariadicPairAwaiter { - using InputType = std::tuple; - - InputType _inputs; - - SimpleCollectAnyVariadicPairAwaiter(Ts&&... inputs) - : _inputs(std::move(inputs)...) {} - - auto coAwait(Executor* ex) { - return CollectAnyVariadicPairAwaiter(std::move(_inputs)); - } }; template