-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
time: avoid traversing entries in the time wheel twice #6718
base: master
Are you sure you want to change the base?
Changes from 7 commits
73d6560
cd70a24
1410a9f
71f60d5
96fef56
004367c
6aaeb88
b1cf26d
08a688a
36b8c07
955ec31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ | |
|
||
mod entry; | ||
pub(crate) use entry::TimerEntry; | ||
use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION}; | ||
use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION, STATE_DEREGISTERED}; | ||
|
||
mod handle; | ||
pub(crate) use self::handle::Handle; | ||
|
@@ -204,7 +204,6 @@ impl Driver { | |
.inner | ||
.next_wake | ||
.store(next_wake_time(expiration_time)); | ||
|
||
// Safety: After updating the `next_wake`, we drop all the locks. | ||
drop(locks); | ||
|
||
|
@@ -324,23 +323,53 @@ impl Handle { | |
now = lock.elapsed(); | ||
} | ||
|
||
while let Some(entry) = lock.poll(now) { | ||
debug_assert!(unsafe { entry.is_pending() }); | ||
|
||
// SAFETY: We hold the driver lock, and just removed the entry from any linked lists. | ||
if let Some(waker) = unsafe { entry.fire(Ok(())) } { | ||
waker_list.push(waker); | ||
|
||
if !waker_list.can_push() { | ||
// Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. | ||
drop(lock); | ||
|
||
waker_list.wake_all(); | ||
|
||
lock = self.inner.lock_sharded_wheel(id); | ||
while let Some(expiration) = lock.poll(now) { | ||
lock.set_elapsed(expiration.deadline); | ||
// It is critical for `GuardedLinkedList` safety that the guard node is | ||
// pinned in memory and is not dropped until the guarded list is dropped. | ||
let guard = TimerShared::new(id); | ||
pin!(guard); | ||
let guard_handle = guard.as_ref().get_ref().handle(); | ||
|
||
// * This list will be still guarded by the lock of the Wheel with the specefied id. | ||
// `EntryWaitersList` wrapper makes sure we hold the lock to modify it. | ||
// * This wrapper will empty the list on drop. It is critical for safety | ||
// that we will not leave any list entry with a pointer to the local | ||
// guard node after this function returns / panics. | ||
// Safety: The `TimerShared` inside this `TimerHandle` is pinned in the memory. | ||
let mut list = unsafe { lock.get_waiters_list(&expiration, guard_handle, id, self) }; | ||
|
||
while let Some(entry) = list.pop_back_locked(&mut lock) { | ||
let deadline = expiration.deadline; | ||
// Try to expire the entry; this is cheap (doesn't synchronize) if | ||
// the timer is not expired, and updates cached_when. | ||
match unsafe { entry.mark_firing(deadline) } { | ||
Ok(()) => { | ||
// Entry was expired. | ||
// SAFETY: We hold the driver lock, and just removed the entry from any linked lists. | ||
if let Some(waker) = unsafe { entry.fire(Ok(())) } { | ||
waker_list.push(waker); | ||
|
||
if !waker_list.can_push() { | ||
// Wake a batch of wakers. To avoid deadlock, | ||
// we must do this with the lock temporarily dropped. | ||
drop(lock); | ||
waker_list.wake_all(); | ||
|
||
lock = self.inner.lock_sharded_wheel(id); | ||
} | ||
} | ||
} | ||
Err(state) if state == STATE_DEREGISTERED => {} | ||
Err(state) => { | ||
// Safety: This Entry has not expired. | ||
unsafe { lock.reinsert_entry(entry, deadline, state) }; | ||
} | ||
} | ||
} | ||
lock.occupied_bit_maintain(&expiration); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old code doesn't call this. Why is it necessary now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be accurate to say that this is an operation that must be called when we are done with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it need to be called before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about I would really like to see a loom test where a timer is inserted during the I think you can do the loom test along these lines:
Spawning the thread after registering the 32 timers should reduce the size of the loom test, since the region with more than 1 thread will be shorter. As long as the |
||
} | ||
|
||
let next_wake_up = lock.poll_at(); | ||
drop(lock); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can
STATE_DEREGISTERED
actually happen?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I think it will not happen.