Skip to content

Commit 023e956

Browse files
committed
time: delay the Arc::clone until registering timer
There are two usage of this handle for timer: 1. Ensure the time driver is enabled. 2. Registering or clear the entry from the global wheel. For (1), we just need the `&Handle`, no need to make a clone. For (2), we can delay the `.clone()` until we are about to register the entry. Delaying the `Arc::clone` improves the performance on multi-core machine. Signed-off-by: ADD-SP <[email protected]>
1 parent 0d234c3 commit 023e956

File tree

6 files changed

+122
-68
lines changed

6 files changed

+122
-68
lines changed

tokio/src/runtime/driver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl Handle {
110110
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
111111
self.time
112112
.as_ref()
113-
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
113+
.expect(crate::util::error::TIME_DISABLED_ERROR)
114114
}
115115

116116
pub(crate) fn clock(&self) -> &Clock {

tokio/src/runtime/scheduler/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,24 @@ cfg_rt! {
9494
}
9595
}
9696

97+
/// # Panics
98+
///
99+
/// Panics if the current [`Context`] is not available
100+
/// in the current thread.
101+
// remove this `allow(dead_code)` when this method
102+
// is used by other other modules except the `time`.
103+
#[cfg_attr(not(feature = "time"), allow(dead_code))]
104+
#[track_caller]
105+
pub(crate) fn with_current<F, R>(f: F) -> R
106+
where
107+
F: FnOnce(&Handle) -> R,
108+
{
109+
match context::with_current(|hdl| f(hdl)) {
110+
Ok(ret) => ret,
111+
Err(e) => panic!("{e}"),
112+
}
113+
}
114+
97115
pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
98116
match_flavor!(self, Handle(h) => &h.blocking_spawner)
99117
}
@@ -268,8 +286,19 @@ cfg_not_rt! {
268286
))]
269287
impl Handle {
270288
#[track_caller]
289+
#[cfg_attr(feature = "time", allow(dead_code))]
271290
pub(crate) fn current() -> Handle {
272291
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
273292
}
293+
294+
cfg_time! {
295+
#[track_caller]
296+
pub(crate) fn with_current<F, R>(_f: F) -> R
297+
where
298+
F: FnOnce(&Handle) -> R,
299+
{
300+
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
301+
}
302+
}
274303
}
275304
}

tokio/src/runtime/time/entry.rs

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ use crate::loom::sync::atomic::AtomicU64;
5959
use crate::loom::sync::atomic::Ordering;
6060

6161
use crate::runtime::scheduler;
62+
use crate::runtime::time;
6263
use crate::sync::AtomicWaker;
6364
use crate::time::Instant;
65+
use crate::util::error::{RUNTIME_SHUTTING_DOWN_ERROR, TIME_DISABLED_ERROR};
6466
use crate::util::linked_list;
6567

6668
use pin_project_lite::pin_project;
@@ -285,9 +287,6 @@ pin_project! {
285287
// before polling.
286288
#[derive(Debug)]
287289
pub(crate) struct TimerEntry {
288-
// Arc reference to the runtime handle. We can only free the driver after
289-
// deregistering everything from their respective timer wheels.
290-
driver: scheduler::Handle,
291290
// Shared inner structure; this is part of an intrusive linked list, and
292291
// therefore other references can exist to it while mutable references to
293292
// Entry exist.
@@ -340,6 +339,10 @@ pub(crate) struct TimerShared {
340339
/// Only accessed under the entry lock.
341340
pointers: linked_list::Pointers<TimerShared>,
342341

342+
// Arc reference to the runtime handle. We can only free the driver after
343+
// deregistering everything from their respective timer wheels.
344+
driver: scheduler::Handle,
345+
343346
/// The time when the [`TimerEntry`] was registered into the Wheel,
344347
/// [`STATE_DEREGISTERED`] means it is not registered.
345348
///
@@ -384,11 +387,19 @@ generate_addr_of_methods! {
384387

385388
impl TimerShared {
386389
pub(super) fn new() -> Self {
387-
Self {
388-
registered_when: AtomicU64::new(0),
389-
pointers: linked_list::Pointers::new(),
390-
state: StateCell::default(),
391-
_p: PhantomPinned,
390+
// ensure both scheduler handle and time driver are available,
391+
// otherwise panic
392+
let maybe_hdl =
393+
scheduler::Handle::with_current(|hdl| hdl.driver().time.as_ref().map(|_| hdl.clone()));
394+
match maybe_hdl {
395+
Some(hdl) => Self {
396+
driver: hdl,
397+
registered_when: AtomicU64::new(0),
398+
pointers: linked_list::Pointers::new(),
399+
state: StateCell::default(),
400+
_p: PhantomPinned,
401+
},
402+
None => panic!("{TIME_DISABLED_ERROR}"),
392403
}
393404
}
394405

@@ -453,6 +464,10 @@ impl TimerShared {
453464
pub(super) fn might_be_registered(&self) -> bool {
454465
self.state.might_be_registered()
455466
}
467+
468+
fn driver(&self) -> &time::Handle {
469+
self.driver.driver().time()
470+
}
456471
}
457472

458473
unsafe impl linked_list::Link for TimerShared {
@@ -479,12 +494,8 @@ unsafe impl linked_list::Link for TimerShared {
479494

480495
impl TimerEntry {
481496
#[track_caller]
482-
pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
483-
// Panic if the time driver is not enabled
484-
let _ = handle.driver().time();
485-
497+
pub(crate) fn new(deadline: Instant) -> Self {
486498
Self {
487-
driver: handle,
488499
inner: None,
489500
deadline,
490501
registered: false,
@@ -565,15 +576,14 @@ impl TimerEntry {
565576
// driver did so far and happens-before everything the driver does in
566577
// the future. While we have the lock held, we also go ahead and
567578
// deregister the entry if necessary.
568-
unsafe { self.driver().clear_entry(NonNull::from(inner)) };
579+
unsafe { inner.driver().clear_entry(NonNull::from(inner)) };
569580
}
570581

571582
pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
572583
let this = self.as_mut().project();
573584
*this.deadline = new_time;
574585
*this.registered = reregister;
575586

576-
let tick = self.driver().time_source().deadline_to_tick(new_time);
577587
let inner = match self.inner() {
578588
Some(inner) => inner,
579589
None => {
@@ -582,15 +592,17 @@ impl TimerEntry {
582592
.expect("inner should already be initialized by `this.init_inner()`")
583593
}
584594
};
595+
let tick = inner.driver().time_source().deadline_to_tick(new_time);
585596

586597
if inner.extend_expiration(tick).is_ok() {
587598
return;
588599
}
589600

590601
if reregister {
591602
unsafe {
592-
self.driver()
593-
.reregister(&self.driver.driver().io, tick, inner.into());
603+
inner
604+
.driver()
605+
.reregister(&inner.driver.driver().io, tick, inner.into());
594606
}
595607
}
596608
}
@@ -599,12 +611,6 @@ impl TimerEntry {
599611
mut self: Pin<&mut Self>,
600612
cx: &mut Context<'_>,
601613
) -> Poll<Result<(), super::Error>> {
602-
assert!(
603-
!self.driver().is_shutdown(),
604-
"{}",
605-
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
606-
);
607-
608614
if !self.registered {
609615
let deadline = self.deadline;
610616
self.as_mut().reset(deadline, true);
@@ -613,16 +619,13 @@ impl TimerEntry {
613619
let inner = self
614620
.inner()
615621
.expect("inner should already be initialized by `self.reset()`");
616-
inner.state.poll(cx.waker())
617-
}
618622

619-
pub(crate) fn driver(&self) -> &super::Handle {
620-
self.driver.driver().time()
621-
}
623+
assert!(
624+
!inner.driver().is_shutdown(),
625+
"{RUNTIME_SHUTTING_DOWN_ERROR}"
626+
);
622627

623-
#[cfg(all(tokio_unstable, feature = "tracing"))]
624-
pub(crate) fn clock(&self) -> &super::Clock {
625-
self.driver.driver().clock()
628+
inner.state.poll(cx.waker())
626629
}
627630
}
628631

tokio/src/runtime/time/tests/mod.rs

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![cfg(not(target_os = "wasi"))]
22

3+
use std::future::poll_fn;
34
use std::{task::Context, time::Duration};
45

56
#[cfg(not(loom))]
@@ -45,16 +46,19 @@ fn single_timer() {
4546
model(|| {
4647
let rt = rt(false);
4748
let handle = rt.handle();
49+
let handle_clone = handle.clone();
4850

49-
let handle_ = handle.clone();
5051
let jh = thread::spawn(move || {
51-
let entry = TimerEntry::new(
52-
handle_.inner.clone(),
53-
handle_.inner.driver().clock().now() + Duration::from_secs(1),
54-
);
52+
let _guard = handle_clone.enter();
53+
let clock = handle_clone.inner.driver().clock();
54+
let entry = TimerEntry::new(clock.now() + Duration::from_secs(1));
5555
pin!(entry);
5656

57-
block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
57+
block_on(poll_fn(|cx| {
58+
let _guard = handle_clone.enter();
59+
entry.as_mut().poll_elapsed(cx)
60+
}))
61+
.unwrap();
5862
});
5963

6064
thread::yield_now();
@@ -74,13 +78,13 @@ fn drop_timer() {
7478
model(|| {
7579
let rt = rt(false);
7680
let handle = rt.handle();
81+
let handle_clone = handle.clone();
7782

78-
let handle_ = handle.clone();
7983
let jh = thread::spawn(move || {
80-
let entry = TimerEntry::new(
81-
handle_.inner.clone(),
82-
handle_.inner.driver().clock().now() + Duration::from_secs(1),
83-
);
84+
let _guard = handle_clone.enter();
85+
86+
let clock = handle_clone.inner.driver().clock();
87+
let entry = TimerEntry::new(clock.now() + Duration::from_secs(1));
8488
pin!(entry);
8589

8690
let _ = entry
@@ -108,20 +112,24 @@ fn change_waker() {
108112
model(|| {
109113
let rt = rt(false);
110114
let handle = rt.handle();
115+
let handle_clone = handle.clone();
111116

112-
let handle_ = handle.clone();
113117
let jh = thread::spawn(move || {
114-
let entry = TimerEntry::new(
115-
handle_.inner.clone(),
116-
handle_.inner.driver().clock().now() + Duration::from_secs(1),
117-
);
118+
let _guard = handle_clone.enter();
119+
120+
let clock = handle_clone.inner.driver().clock();
121+
let entry = TimerEntry::new(clock.now() + Duration::from_secs(1));
118122
pin!(entry);
119123

120124
let _ = entry
121125
.as_mut()
122126
.poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
123127

124-
block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
128+
block_on(poll_fn(|cx| {
129+
let _guard = handle_clone.enter();
130+
entry.as_mut().poll_elapsed(cx)
131+
}))
132+
.unwrap();
125133
});
126134

127135
thread::yield_now();
@@ -143,13 +151,15 @@ fn reset_future() {
143151

144152
let rt = rt(false);
145153
let handle = rt.handle();
154+
let handle_clone = handle.clone();
146155

147-
let handle_ = handle.clone();
148156
let finished_early_ = finished_early.clone();
149157
let start = handle.inner.driver().clock().now();
150158

151159
let jh = thread::spawn(move || {
152-
let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1));
160+
let _guard = handle_clone.enter();
161+
162+
let entry = TimerEntry::new(start + Duration::from_secs(1));
153163
pin!(entry);
154164

155165
let _ = entry
@@ -159,7 +169,11 @@ fn reset_future() {
159169
entry.as_mut().reset(start + Duration::from_secs(2), true);
160170

161171
// shouldn't complete before 2s
162-
block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
172+
block_on(poll_fn(|cx| {
173+
let _guard = handle_clone.enter();
174+
entry.as_mut().poll_elapsed(cx)
175+
}))
176+
.unwrap();
163177

164178
finished_early_.store(true, Ordering::Relaxed);
165179
});
@@ -202,14 +216,13 @@ fn normal_or_miri<T>(normal: T, miri: T) -> T {
202216
fn poll_process_levels() {
203217
let rt = rt(true);
204218
let handle = rt.handle();
219+
let clock = handle.inner.driver().clock();
220+
let _guard = handle.enter();
205221

206222
let mut entries = vec![];
207223

208224
for i in 0..normal_or_miri(1024, 64) {
209-
let mut entry = Box::pin(TimerEntry::new(
210-
handle.inner.clone(),
211-
handle.inner.driver().clock().now() + Duration::from_millis(i),
212-
));
225+
let mut entry = Box::pin(TimerEntry::new(clock.now() + Duration::from_millis(i)));
213226

214227
let _ = entry
215228
.as_mut()
@@ -239,11 +252,10 @@ fn poll_process_levels_targeted() {
239252

240253
let rt = rt(true);
241254
let handle = rt.handle();
255+
let clock = handle.inner.driver().clock();
256+
let _guard = handle.enter();
242257

243-
let e1 = TimerEntry::new(
244-
handle.inner.clone(),
245-
handle.inner.driver().clock().now() + Duration::from_millis(193),
246-
);
258+
let e1 = TimerEntry::new(clock.now() + Duration::from_millis(193));
247259
pin!(e1);
248260

249261
let handle = handle.inner.driver().time();

tokio/src/time/sleep.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::runtime::time::TimerEntry;
22
use crate::time::{error::Error, Duration, Instant};
3+
use crate::util::error::TIME_DISABLED_ERROR;
34
use crate::util::trace;
45

6+
use crate::runtime::scheduler;
57
use pin_project_lite::pin_project;
68
use std::future::Future;
79
use std::panic::Location;
@@ -251,9 +253,11 @@ impl Sleep {
251253
deadline: Instant,
252254
location: Option<&'static Location<'static>>,
253255
) -> Sleep {
254-
use crate::runtime::scheduler;
255-
let handle = scheduler::Handle::current();
256-
let entry = TimerEntry::new(handle, deadline);
256+
// ensure both scheduler handle and time driver are available,
257+
// otherwise panic
258+
let is_time_enabled = scheduler::Handle::with_current(|hdl| hdl.driver().time.is_some());
259+
assert!(is_time_enabled, "{TIME_DISABLED_ERROR}");
260+
let entry = TimerEntry::new(deadline);
257261
#[cfg(all(tokio_unstable, feature = "tracing"))]
258262
let inner = {
259263
let handle = scheduler::Handle::current();
@@ -380,11 +384,14 @@ impl Sleep {
380384
tracing::trace_span!("runtime.resource.async_op.poll");
381385

382386
let duration = {
383-
let clock = me.entry.clock();
384-
let time_source = me.entry.driver().time_source();
385-
let now = time_source.now(clock);
386-
let deadline_tick = time_source.deadline_to_tick(deadline);
387-
deadline_tick.saturating_sub(now)
387+
scheduler::Handle::with_current(|hdl| {
388+
let driver = hdl.driver();
389+
let clock = driver.clock();
390+
let time_source = driver.time().time_source();
391+
let now = time_source.now(clock);
392+
let deadline_tick = time_source.deadline_to_tick(deadline);
393+
deadline_tick.saturating_sub(now)
394+
})
388395
};
389396

390397
tracing::trace!(

0 commit comments

Comments
 (0)