Skip to content

Commit

Permalink
refactor core: prepare SetupWakeups for non-coroutine space
Browse files Browse the repository at this point in the history
Use a new API with `EarlyWakeup` in each `SetupWakeups`.
11553e62a3ae20ec544148cb079e73b602af214c
  • Loading branch information
ArkadyRudenko committed Aug 1, 2024
1 parent 7191f23 commit b5df56f
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 57 deletions.
1 change: 1 addition & 0 deletions core/include/userver/engine/semaphore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class CancellableSemaphore final {

private:
enum class TryLockStatus { kSuccess, kTransientFailure, kPermanentFailure };
class SemaphoreWaitStrategy;

TryLockStatus DoTryLock(Counter count);
TryLockStatus LockFastPath(Counter count);
Expand Down
6 changes: 1 addition & 5 deletions core/src/engine/impl/future_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ void FutureStateBase::WaitForResult() {
}

EarlyWakeup FutureStateBase::TryAppendWaiter(TaskContext& waiter) {
if (finish_waiters_->GetSignalOrAppend(&waiter)) {
waiter.WakeupCurrent();
return EarlyWakeup{true};
}
return EarlyWakeup{false};
return EarlyWakeup{finish_waiters_->GetSignalOrAppend(&waiter)};
}

void FutureStateBase::RemoveWaiter(TaskContext& context) noexcept {
Expand Down
53 changes: 34 additions & 19 deletions core/src/engine/impl/mutex_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class MutexImpl {
private:
class MutexWaitStrategy;

bool TryLockWithTaskContext(TaskContext& current);

bool LockFastPath(TaskContext&) noexcept;
bool LockSlowPath(TaskContext&, Deadline);

Expand All @@ -48,29 +50,28 @@ template <>
class MutexImpl<WaitList>::MutexWaitStrategy final : public WaitStrategy {
public:
MutexWaitStrategy(MutexImpl<WaitList>& mutex, TaskContext& current)
: mutex_(mutex),
current_(current),
waiter_token_(mutex_.lock_waiters_),
lock_(mutex_.lock_waiters_) {}
: mutex_(mutex), current_(current), waiter_token_(mutex_.lock_waiters_) {}

EarlyWakeup SetupWakeups() override {
mutex_.lock_waiters_.Append(lock_, &current_);
lock_.unlock();
WaitList::Lock lock(mutex_.lock_waiters_);
if (mutex_.TryLockWithTaskContext(current_)) {
return EarlyWakeup{true};
}
// A race is not possible here, because check + Append is performed under
// WaitList::Lock, and notification also takes WaitList::Lock.
mutex_.lock_waiters_.Append(lock, &current_);
return EarlyWakeup{false};
}

void DisableWakeups() noexcept override {
lock_.lock();
mutex_.lock_waiters_.Remove(lock_, current_);
WaitList::Lock lock(mutex_.lock_waiters_);
mutex_.lock_waiters_.Remove(lock, current_);
}

private:
MutexImpl<WaitList>& mutex_;
TaskContext& current_;
const WaitList::WaitersScopeCounter waiter_token_;
WaitList::Lock lock_;
};

template <>
Expand All @@ -80,6 +81,9 @@ class MutexImpl<WaitListLight>::MutexWaitStrategy final : public WaitStrategy {
: mutex_(mutex), current_(current) {}

EarlyWakeup SetupWakeups() override {
if (TryLock()) {
return EarlyWakeup{true};
}
mutex_.lock_waiters_.Append(&current_);
if (mutex_.owner_.load() == nullptr) {
mutex_.lock_waiters_.Remove(current_);
Expand All @@ -93,6 +97,17 @@ class MutexImpl<WaitListLight>::MutexWaitStrategy final : public WaitStrategy {
}

private:
bool TryLock() {
TaskContext* expected = nullptr;
if (mutex_.owner_.compare_exchange_strong(expected, &current_,
std::memory_order_relaxed)) {
return true;
}
UINVARIANT(expected != &current_,
"MutexImpl is locked twice from the same task");
return false;
}

MutexImpl<WaitListLight>& mutex_;
TaskContext& current_;
};
Expand Down Expand Up @@ -121,21 +136,16 @@ bool MutexImpl<Waiters>::LockFastPath(TaskContext& current) noexcept {

template <class Waiters>
bool MutexImpl<Waiters>::LockSlowPath(TaskContext& current, Deadline deadline) {
TaskContext* expected = nullptr;

const engine::TaskCancellationBlocker block_cancels;
MutexWaitStrategy wait_manager{*this, current};
while (!owner_.compare_exchange_strong(expected, &current,
std::memory_order_relaxed)) {
UINVARIANT(expected != &current,
"MutexImpl is locked twice from the same task");

while (true) {
const auto wakeup_source = current.Sleep(wait_manager, deadline);
if (owner_.load() == &current) {
return true;
}
if (!HasWaitSucceeded(wakeup_source)) {
return false;
}

expected = nullptr;
}
return true;
}
Expand Down Expand Up @@ -170,11 +180,16 @@ void MutexImpl<Waiters>::unlock() {

template <class Waiters>
bool MutexImpl<Waiters>::try_lock() {
auto& current = current_task::GetCurrentTaskContext();
return TryLockWithTaskContext(current);
}

template <class Waiters>
bool MutexImpl<Waiters>::TryLockWithTaskContext(TaskContext& current) {
#if USERVER_IMPL_HAS_TSAN
__tsan_mutex_pre_lock(this, __tsan_mutex_try_lock);
#endif

auto& current = current_task::GetCurrentTaskContext();
const auto result = LockFastPath(current);

#if USERVER_IMPL_HAS_TSAN
Expand Down
2 changes: 0 additions & 2 deletions core/src/engine/impl/wait_list_light.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ void WaitListLight::Append(
bool WaitListLight::GetSignalOrAppend(
boost::intrusive_ptr<TaskContext>&& context) noexcept {
UASSERT(context);
UASSERT(context->IsCurrent());

const Waiter new_waiter{context.get(), context->GetEpoch()};
LOG_TRACE() << "Append waiter=" << fmt::to_string(new_waiter)
Expand Down Expand Up @@ -144,7 +143,6 @@ void WaitListLight::SetSignalAndWakeupOne() {
}

void WaitListLight::Remove(TaskContext& context) noexcept {
UASSERT(context.IsCurrent());
const Waiter expected{&context, context.GetEpoch()};

auto old_waiter = expected;
Expand Down
49 changes: 27 additions & 22 deletions core/src/engine/semaphore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,46 @@ USERVER_NAMESPACE_BEGIN

namespace engine {

namespace {

class SemaphoreWaitStrategy final : public impl::WaitStrategy {
class CancellableSemaphore::SemaphoreWaitStrategy final
: public impl::WaitStrategy {
public:
SemaphoreWaitStrategy(impl::WaitList& waiters,
impl::TaskContext& current) noexcept
: waiters_(waiters),
SemaphoreWaitStrategy(impl::TaskContext& current, CancellableSemaphore& sem,
CancellableSemaphore::Counter count) noexcept
: sem_(sem),
current_(current),
waiter_token_(waiters_),
lock_(waiters_) {}
waiter_token_(*sem_.lock_waiters_),
count_(count) {}

impl::EarlyWakeup SetupWakeups() override {
waiters_.Append(lock_, &current_);
lock_.unlock();
impl::WaitList::Lock lock(*sem_.lock_waiters_);
status_ = sem_.DoTryLock(count_);
if (status_ != TryLockStatus::kTransientFailure) {
return impl::EarlyWakeup{status_ == TryLockStatus::kSuccess};
}
if (sem_.UsedApprox() <= sem_.GetCapacity() - count_) {
return impl::EarlyWakeup{true};
}
// A race is not possible here, because check + Append is performed under
// WaitList::Lock, and notification also takes WaitList::Lock.
sem_.lock_waiters_->Append(lock, &current_);
return impl::EarlyWakeup{false};
}

void DisableWakeups() noexcept override {
lock_.lock();
waiters_.Remove(lock_, current_);
impl::WaitList::Lock lock(*sem_.lock_waiters_);
sem_.lock_waiters_->Remove(lock, current_);
}

TryLockStatus GetTryLockStatus() const noexcept { return status_; }

private:
impl::WaitList& waiters_;
CancellableSemaphore& sem_;
impl::TaskContext& current_;
const impl::WaitList::WaitersScopeCounter waiter_token_;
impl::WaitList::Lock lock_;
const CancellableSemaphore::Counter count_;
TryLockStatus status_{TryLockStatus::kTransientFailure};
};

} // namespace

CancellableSemaphore::CancellableSemaphore(Counter capacity)
: acquired_locks_(0), capacity_(capacity) {}

Expand Down Expand Up @@ -143,15 +150,13 @@ bool CancellableSemaphore::LockSlowPath(Deadline deadline,
UASSERT(count > 0);

auto& current = current_task::GetCurrentTaskContext();
SemaphoreWaitStrategy wait_strategy{*lock_waiters_, current};
SemaphoreWaitStrategy wait_strategy{current, *this, count};

while (true) {
const auto status = DoTryLock(count);
if (status != TryLockStatus::kTransientFailure) {
return status == TryLockStatus::kSuccess;
}

const auto wakeup_source = current.Sleep(wait_strategy, deadline);
if (wait_strategy.GetTryLockStatus() != TryLockStatus::kTransientFailure) {
return wait_strategy.GetTryLockStatus() == TryLockStatus::kSuccess;
}
if (!impl::HasWaitSucceeded(wakeup_source)) {
return false;
}
Expand Down
8 changes: 0 additions & 8 deletions core/src/engine/task/task_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,6 @@ void TaskContext::Wakeup(WakeupSource source, NoEpoch) {
}
}

void TaskContext::WakeupCurrent() {
UASSERT(IsCurrent());
UASSERT(GetState() == Task::State::kRunning);

sleep_state_.FetchOrFlags<std::memory_order_seq_cst>(
static_cast<SleepFlags>(WakeupSource::kWaitList));
}

class TaskContext::LocalStorageGuard {
public:
explicit LocalStorageGuard(TaskContext& context) : context_(context) {
Expand Down
1 change: 0 additions & 1 deletion core/src/engine/task/task_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ class TaskContext final : public ContextAccessor {
// normally non-blocking, except corner cases in TaskProcessor::Schedule()
void Wakeup(WakeupSource, SleepState::Epoch epoch);
void Wakeup(WakeupSource, NoEpoch);
void WakeupCurrent();

static void CoroFunc(TaskPipe& task_pipe);

Expand Down

0 comments on commit b5df56f

Please sign in to comment.