Skip to content

Commit

Permalink
fix mem order
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Jan 26, 2025
1 parent fad99b5 commit c06428c
Showing 1 changed file with 70 additions and 43 deletions.
113 changes: 70 additions & 43 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ void cancel(T &io_object) {
template <typename ret_type, typename IO_func, typename io_object>
inline async_simple::coro::Lazy<ret_type> async_io(IO_func io_func,
io_object &obj) noexcept {
constexpr int no_cancel_flag = 0;
constexpr int could_cancel_flag = 1;
constexpr int start_cancel_flag = 2;
constexpr int finish_cancel_flag = 3;
callback_awaitor<ret_type> awaitor;
auto slot = co_await async_simple::coro::CurrentSlot{};
if (!slot) {
Expand All @@ -233,54 +237,77 @@ inline async_simple::coro::Lazy<ret_type> async_io(IO_func io_func,
}
else {
auto executor = obj.get_executor();
auto lock = std::make_shared<std::atomic<bool>>();
auto lock = std::make_shared<std::atomic<int>>();
bool hasCanceled;
auto result = co_await awaitor.await_resume(
[&, &lock_ref = lock](auto handler) mutable {
auto lock = lock_ref;
hasCanceled = !slot->emplace(
async_simple::SignalType::Terminate,
[&obj, weak_lock = std::weak_ptr{lock}](
async_simple::SignalType signalType,
async_simple::Signal *signal) {
if (auto ptr = weak_lock.lock(); ptr) {
bool expected = false;
if (!ptr->compare_exchange_strong(
expected, true, std::memory_order_acq_rel)) {
detail::cancel(obj);
}
}
});
if (hasCanceled) {
asio::dispatch(executor, [handler]() {
handler.set_value(
std::make_error_code(std::errc::operation_canceled));
handler.resume();
});
}
else {
io_func([&, handler](auto &&...args) mutable {
slot->clear(async_simple::Terminate);
handler.set_value(std::forward<decltype(args)>(args)...);
handler.resume();
});
bool expected = false;
if (!lock->compare_exchange_strong(expected, true,
auto result = co_await awaitor.await_resume([&, &lock_ref = lock](
auto handler) mutable {
auto lock = lock_ref;
hasCanceled = !slot->emplace(
async_simple::SignalType::Terminate,
[&obj, lock](async_simple::SignalType signalType,
async_simple::Signal *signal) {
int expected = no_cancel_flag;
if (!lock->compare_exchange_strong(expected, could_cancel_flag,
std::memory_order_acq_rel)) {
detail::cancel(obj);
if (expected == could_cancel_flag) {
if (lock->compare_exchange_strong(expected, start_cancel_flag,
std::memory_order_release)) {
obj.cancel();
lock->store(finish_cancel_flag, std::memory_order_release);
}
}
}
lock = nullptr;
}
});
if (hasCanceled) {
asio::dispatch(executor, [handler]() {
handler.set_value(
std::make_error_code(std::errc::operation_canceled));
handler.resume();
});
}
else {
io_func([&, handler](auto &&...args) mutable {
slot->clear(async_simple::Terminate);
handler.set_value(std::forward<decltype(args)>(args)...);
handler.resume();
});
int expected = no_cancel_flag;
if (!lock->compare_exchange_strong(expected, could_cancel_flag,
std::memory_order_acq_rel)) {
if (expected == could_cancel_flag) {
if (lock->compare_exchange_strong(expected, start_cancel_flag,
std::memory_order_release)) {
obj.cancel();
lock->store(finish_cancel_flag, std::memory_order_release);
}
}
}
}
});
if (!hasCanceled) {
auto weak_lock = std::weak_ptr{lock};
lock = nullptr;
// wait cancel finish to make sure io object's life-time
for (; !weak_lock.expired();) {
co_await coro_io::post(
[]() {
},
executor);
int expected = no_cancel_flag;
if (!lock->compare_exchange_strong(expected, finish_cancel_flag,
std::memory_order_acq_rel)) {
if (expected != finish_cancel_flag) {
do {
if (expected == could_cancel_flag) {
if (lock->compare_exchange_strong(expected, finish_cancel_flag,
std::memory_order_acq_rel) ||
expected == finish_cancel_flag) {
break;
}
}
// flag is start_cancel_flag now.
// wait cancel finish to make sure io object's life-time
for (;
lock->load(std::memory_order_acquire) == start_cancel_flag;) {
co_await coro_io::post(
[]() {
},
executor);
}
} while (0);
}
}
}
co_return result;
Expand Down

0 comments on commit c06428c

Please sign in to comment.