From 7835e53b61fae03487da050600b20af3d749b5a3 Mon Sep 17 00:00:00 2001 From: Victor Berger Date: Mon, 21 Mar 2022 15:59:24 +0100 Subject: [PATCH] Rework timer event sources around Poll The reworks the `Timer` event source to have it directly driven by the core polling of the event loop, instead of relying on external timing mechanisms (like a thread or a timerfd). Fixes #9 Fixes #83 Fixes #84 --- CHANGELOG.md | 7 +- README.md | 26 +- examples/high_precision.rs | 30 ++ examples/timer.rs | 29 +- src/lib.rs | 26 +- src/loop_logic.rs | 124 +++----- src/sources/channel.rs | 12 +- src/sources/futures.rs | 4 +- src/sources/generic.rs | 10 +- src/sources/mod.rs | 2 +- src/sources/ping.rs | 6 +- src/sources/timer.rs | 546 ++++++++++++++++++++++++++++++++++ src/sources/timer/mod.rs | 499 ------------------------------- src/sources/timer/threaded.rs | 137 --------- src/sources/timer/timerfd.rs | 122 -------- src/sources/transient.rs | 23 +- src/sys/epoll.rs | 129 +++++--- src/sys/kqueue.rs | 21 +- src/sys/mod.rs | 42 ++- 19 files changed, 825 insertions(+), 970 deletions(-) create mode 100644 examples/high_precision.rs create mode 100644 src/sources/timer.rs delete mode 100644 src/sources/timer/mod.rs delete mode 100644 src/sources/timer/threaded.rs delete mode 100644 src/sources/timer/timerfd.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 75adfcf6..d9c29a08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,12 @@ associated `Error` type on the `EventSource` trait. - **Breaking:** Many API functions now use Calloop's own error type (`calloop::Error`) instead of `std::io::Error` as the error variants of their returned results. -- On Linux `Timer` is now driven by `timerfd`. +- **Breaking:** The `Timer` event source has been completely reworked and is now directly driven by + calloop polling mechanism instead of a background thread. Timer multiplexing is now handled by + creating multiple `Timer`s, and self-repeating timers is handled by the return value of the + associated event callback. +- **Breaking:** The minimum supported Rust version is now 1.53.0 +- Introduce `EventLoop::try_new_high_precision()` for sub-millisecond accuracy in the event loop ## 0.9.2 -- 2021-12-27 diff --git a/README.md b/README.md index 43c52789..57e3f9a7 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ For simple uses, you can just add event sources with callbacks to the event loop. For example, here's a runnable program that exits after five seconds: ```rust -use calloop::{timer::Timer, EventLoop, LoopSignal}; +use calloop::{timer::{Timer, TimeoutAction}, EventLoop, LoopSignal}; fn main() { // Create the event loop. The loop is parameterised by the kind of shared @@ -43,18 +43,8 @@ fn main() { // callbacks. let handle = event_loop.handle(); - // Create our event source, a timer. Note that this is also parameterised by - // the data for the events it generates. We've let Rust infer that here. - let source = Timer::new().expect("Failed to create timer event source!"); - - // Most event source APIs provide two things: an event source to go into the - // event loop, and some way of triggering that source from elsewhere. In - // this case, we use a handle to the timer to set timeouts. - // - // Note that this can go before or after the call to insert_source(), and - // even inside another event callback. - let timer_handle = source.handle(); - timer_handle.add_timeout(std::time::Duration::from_secs(5), "Timeout reached!"); + // Create our event source, a timer, that will expire in 2 seconds + let source = Timer::from_duration(std::time::Duration::from_secs(2)); // Inserting an event source takes this general form. It can also be done // from within the callback of another event source. @@ -65,7 +55,8 @@ fn main() { // a callback that is invoked whenever this source generates an event |event, _metadata, shared_data| { // This callback is given 3 values: - // - the event generated by the source (in our case, a string slice) + // - the event generated by the source (in our case, timer events are the Instant + // representing the deadline for which it has fired) // - &mut access to some metadata, specific to the event source (in our case, a // timer handle) // - &mut access to the global shared data that was passed to EventLoop::run or @@ -73,8 +64,13 @@ fn main() { // // The return type is just () because nothing uses it. Some // sources will expect a Result of some kind instead. - println!("Event fired: {}", event); + println!("Timeout for {:?} expired!", event); + // notify the event loop to stop running using the signal in the shared data + // (see below) shared_data.stop(); + // The timer event source requires us to return a TimeoutAction to + // specify if the timer should be rescheduled. In our case we just drop it. + TimeoutAction::Drop }, ) .expect("Failed to insert event source!"); diff --git a/examples/high_precision.rs b/examples/high_precision.rs new file mode 100644 index 00000000..6b65feb8 --- /dev/null +++ b/examples/high_precision.rs @@ -0,0 +1,30 @@ +use std::time::{Duration, Instant}; + +use calloop::{ + timer::{TimeoutAction, Timer}, + EventLoop, +}; + +fn main() { + let mut event_loop = + EventLoop::try_new_high_precision().expect("Failed to initialize the event loop!"); + + let before = Instant::now(); + + event_loop + .handle() + .insert_source( + Timer::from_duration(Duration::from_micros(20)), + |_, _, _| TimeoutAction::Drop, + ) + .unwrap(); + + event_loop.dispatch(None, &mut ()).unwrap(); + + let elapsed = before.elapsed(); + + println!( + "The event loop slept for {} microseconds.", + elapsed.as_micros() + ); +} diff --git a/examples/timer.rs b/examples/timer.rs index e214db15..36f55500 100644 --- a/examples/timer.rs +++ b/examples/timer.rs @@ -1,4 +1,7 @@ -use calloop::{timer::Timer, EventLoop, LoopSignal}; +use calloop::{ + timer::{TimeoutAction, Timer}, + EventLoop, LoopSignal, +}; fn main() { // Create the event loop. The loop is parameterised by the kind of shared @@ -15,18 +18,8 @@ fn main() { // callbacks. let handle = event_loop.handle(); - // Create our event source, a timer. Note that this is also parameterised by - // the data for the events it generates. We've let Rust infer that here. - let source = Timer::new().expect("Failed to create timer event source!"); - - // Most event source APIs provide two things: an event source to go into the - // event loop, and some way of triggering that source from elsewhere. In - // this case, we use a handle to the timer to set timeouts. - // - // Note that this can go before or after the call to insert_source(), and - // even inside another event callback. - let timer_handle = source.handle(); - timer_handle.add_timeout(std::time::Duration::from_secs(1), "Timeout reached!"); + // Create our event source, a timer, that will expire in 2 seconds + let source = Timer::from_duration(std::time::Duration::from_secs(2)); // Inserting an event source takes this general form. It can also be done // from within the callback of another event source. @@ -37,7 +30,8 @@ fn main() { // a callback that is invoked whenever this source generates an event |event, _metadata, shared_data| { // This callback is given 3 values: - // - the event generated by the source (in our case, a string slice) + // - the event generated by the source (in our case, timer events are the Instant + // representing the deadline for which it has fired) // - &mut access to some metadata, specific to the event source (in our case, a // timer handle) // - &mut access to the global shared data that was passed to EventLoop::run or @@ -45,8 +39,13 @@ fn main() { // // The return type is just () because nothing uses it. Some // sources will expect a Result of some kind instead. - println!("Event fired: {}", event); + println!("Timeout for {:?} expired!", event); + // notify the event loop to stop running using the signal in the shared data + // (see below) shared_data.stop(); + // The timer event source requires us to return a TimeoutAction to + // specify if the timer should be rescheduled. In our case we just drop it. + TimeoutAction::Drop }, ) .expect("Failed to insert event source!"); diff --git a/src/lib.rs b/src/lib.rs index 9e174589..17685c82 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ //! loop. For example, here's a runnable program that exits after five seconds: //! //! ```no_run -//! use calloop::{timer::Timer, EventLoop, LoopSignal}; +//! use calloop::{timer::{Timer, TimeoutAction}, EventLoop, LoopSignal}; //! //! fn main() { //! // Create the event loop. The loop is parameterised by the kind of shared @@ -37,18 +37,8 @@ //! // callbacks. //! let handle = event_loop.handle(); //! -//! // Create our event source, a timer. Note that this is also parameterised by -//! // the data for the events it generates. We've let Rust infer that here. -//! let source = Timer::new().expect("Failed to create timer event source!"); -//! -//! // Most event source APIs provide two things: an event source to go into the -//! // event loop, and some way of triggering that source from elsewhere. In -//! // this case, we use a handle to the timer to set timeouts. -//! // -//! // Note that this can go before or after the call to insert_source(), and -//! // even inside another event callback. -//! let timer_handle = source.handle(); -//! timer_handle.add_timeout(std::time::Duration::from_secs(5), "Timeout reached!"); +//! // Create our event source, a timer, that will expire in 2 seconds +//! let source = Timer::from_duration(std::time::Duration::from_secs(2)); //! //! // Inserting an event source takes this general form. It can also be done //! // from within the callback of another event source. @@ -59,7 +49,8 @@ //! // a callback that is invoked whenever this source generates an event //! |event, _metadata, shared_data| { //! // This callback is given 3 values: -//! // - the event generated by the source (in our case, a string slice) +//! // - the event generated by the source (in our case, timer events are the Instant +//! // representing the deadline for which it has fired) //! // - &mut access to some metadata, specific to the event source (in our case, a //! // timer handle) //! // - &mut access to the global shared data that was passed to EventLoop::run or @@ -67,8 +58,13 @@ //! // //! // The return type is just () because nothing uses it. Some //! // sources will expect a Result of some kind instead. -//! println!("Event fired: {}", event); +//! println!("Timeout for {:?} expired!", event); +//! // notify the event loop to stop running using the signal in the shared data +//! // (see below) //! shared_data.stop(); +//! // The timer event source requires us to return a TimeoutAction to +//! // specify if the timer should be rescheduled. In our case we just drop it. +//! TimeoutAction::Drop //! }, //! ) //! .expect("Failed to insert event source!"); diff --git a/src/loop_logic.rs b/src/loop_logic.rs index 993868ea..c97aae9c 100644 --- a/src/loop_logic.rs +++ b/src/loop_logic.rs @@ -5,7 +5,7 @@ use std::os::unix::io::AsRawFd; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use slotmap::SlotMap; @@ -220,7 +220,22 @@ impl<'l, Data> EventLoop<'l, Data> { /// /// Fails if the initialization of the polling system failed. pub fn try_new() -> crate::Result { - let poll = Poll::new()?; + Self::inner_new(false) + } + + /// Create a new event loop in high precision mode + /// + /// On some platforms it requires to setup more resources to enable high-precision + /// (sub millisecond) capabilities, so you should use this constructor if you need + /// this kind of precision. + /// + /// Fails if the initialization of the polling system failed. + pub fn try_new_high_precision() -> crate::Result { + Self::inner_new(true) + } + + fn inner_new(high_precision: bool) -> crate::Result { + let poll = Poll::new(high_precision)?; let handle = LoopHandle { inner: Rc::new(LoopInner { poll: RefCell::new(poll), @@ -248,10 +263,10 @@ impl<'l, Data> EventLoop<'l, Data> { mut timeout: Option, data: &mut Data, ) -> crate::Result<()> { + let now = Instant::now(); let events = { let mut poll = self.handle.inner.poll.borrow_mut(); loop { - let now = std::time::Instant::now(); let result = poll.poll(timeout); match result { @@ -467,8 +482,8 @@ mod tests { use std::time::Duration; use crate::{ - generic::Generic, ping::*, timer::Timer, Dispatcher, Interest, Mode, Poll, PostAction, - Readiness, RegistrationToken, Token, TokenFactory, + generic::Generic, ping::*, Dispatcher, Interest, Mode, Poll, PostAction, Readiness, + RegistrationToken, Token, TokenFactory, }; use super::EventLoop; @@ -484,7 +499,7 @@ mod tests { }); event_loop - .dispatch(Some(Duration::from_millis(0)), &mut dispatched) + .dispatch(Some(Duration::ZERO), &mut dispatched) .unwrap(); assert!(dispatched); @@ -504,7 +519,7 @@ mod tests { idle.cancel(); event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(!dispatched); @@ -576,7 +591,7 @@ mod tests { ping.ping(); let mut dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(dispatched); @@ -585,7 +600,7 @@ mod tests { event_loop.handle().disable(&ping_token).unwrap(); let mut dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(!dispatched); @@ -596,7 +611,7 @@ mod tests { event_loop.handle().enable(&ping_token).unwrap(); let mut dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(dispatched); } @@ -678,14 +693,14 @@ mod tests { let mut dispatched = 0; ping1.ping(); event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert_eq!(dispatched, 1); dispatched = 0; ping2.ping(); event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert_eq!(dispatched, 2); @@ -693,7 +708,7 @@ mod tests { ping1.ping(); ping2.ping(); event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert_eq!(dispatched, 3); } @@ -744,7 +759,7 @@ mod tests { // first dispatch, nothing is readable let mut dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(!dispatched); @@ -752,14 +767,14 @@ mod tests { write(sock2, &[1, 2, 3]).unwrap(); dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(dispatched); // All has been read, no longer readable dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(!dispatched); @@ -770,7 +785,7 @@ mod tests { // the socket is writable dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(dispatched); @@ -781,7 +796,7 @@ mod tests { // the socket is not readable dispatched = false; event_loop - .dispatch(Duration::from_millis(0), &mut dispatched) + .dispatch(Duration::ZERO, &mut dispatched) .unwrap(); assert!(!dispatched); } @@ -805,80 +820,11 @@ mod tests { let mut opt_src = Some(ping_token); - event_loop - .dispatch(Duration::from_millis(0), &mut opt_src) - .unwrap(); + event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap(); assert!(opt_src.is_none()); } - #[test] - fn non_static_source() { - let mut flag = false; - - { - let timer = Timer::new().unwrap(); - let handle = timer.handle(); - handle.add_timeout(std::time::Duration::from_millis(5), &mut flag); - - let mut fired = false; - - let mut event_loop = EventLoop::::try_new().unwrap(); - let _timer_token = event_loop - .handle() - .insert_source(timer, |flag, _, fired| { - *flag = true; - *fired = true; - }) - .unwrap(); - - event_loop - .dispatch(Duration::from_millis(500), &mut fired) - .unwrap(); - - assert!(fired); - } - - assert!(flag); - } - - #[test] - fn non_static_dispatcher() { - let mut flag = false; - - { - let timer = Timer::<&mut bool>::new().unwrap(); - let _ = timer - .handle() - .add_timeout(std::time::Duration::from_millis(5), &mut flag); - - let dispatcher = Dispatcher::new(timer, |flag, _, fired| { - *flag = true; - *fired = true; - }); - - let mut fired = false; - - let mut event_loop = EventLoop::::try_new().unwrap(); - let disp_token = event_loop - .handle() - .register_dispatcher(dispatcher.clone()) - .unwrap(); - - event_loop - .dispatch(Duration::from_millis(500), &mut fired) - .unwrap(); - assert!(fired); - - event_loop.handle().remove(disp_token); - - let timer = dispatcher.into_source_inner(); - timer.handle().cancel_all_timeouts(); - } - - assert!(flag); - } - #[test] fn non_static_data() { use std::sync::mpsc; @@ -901,7 +847,7 @@ mod tests { ping.ping(); event_loop - .dispatch(Duration::from_millis(0), &mut ref_sender) + .dispatch(Duration::ZERO, &mut ref_sender) .unwrap(); } diff --git a/src/sources/channel.rs b/src/sources/channel.rs index 72641157..8a053b62 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -225,7 +225,7 @@ mod tests { // nothing is sent, nothing is received event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got) + .dispatch(Some(::std::time::Duration::ZERO), &mut got) .unwrap(); assert_eq!(got, (false, false)); @@ -233,7 +233,7 @@ mod tests { // a message is send tx.send(()).unwrap(); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got) + .dispatch(Some(::std::time::Duration::ZERO), &mut got) .unwrap(); assert_eq!(got, (true, false)); @@ -241,7 +241,7 @@ mod tests { // the sender is dropped ::std::mem::drop(tx); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got) + .dispatch(Some(::std::time::Duration::ZERO), &mut got) .unwrap(); assert_eq!(got, (true, true)); @@ -273,7 +273,7 @@ mod tests { // nothing is sent, nothing is received event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut received) + .dispatch(Some(::std::time::Duration::ZERO), &mut received) .unwrap(); assert_eq!(received.0, 0); @@ -286,7 +286,7 @@ mod tests { // empty it event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut received) + .dispatch(Some(::std::time::Duration::ZERO), &mut received) .unwrap(); assert_eq!(received.0, 2); @@ -298,7 +298,7 @@ mod tests { // final read of the channel event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut received) + .dispatch(Some(::std::time::Duration::ZERO), &mut received) .unwrap(); assert_eq!(received.0, 3); diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 28befd66..f44ffc95 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -194,7 +194,7 @@ mod tests { let fut = async { 42 }; event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got) + .dispatch(Some(::std::time::Duration::ZERO), &mut got) .unwrap(); // the future is not yet inserted, and thus has not yet run @@ -203,7 +203,7 @@ mod tests { sched.schedule(fut).unwrap(); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got) + .dispatch(Some(::std::time::Duration::ZERO), &mut got) .unwrap(); // the future has run diff --git a/src/sources/generic.rs b/src/sources/generic.rs index 3cef8a1a..375539f1 100644 --- a/src/sources/generic.rs +++ b/src/sources/generic.rs @@ -187,7 +187,7 @@ mod tests { .unwrap(); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut dispached) + .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) .unwrap(); assert!(!dispached); @@ -197,7 +197,7 @@ mod tests { tx.flush().unwrap(); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut dispached) + .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) .unwrap(); assert!(dispached); @@ -224,7 +224,7 @@ mod tests { let generic_token = handle.register_dispatcher(dispatcher.clone()).unwrap(); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut dispached) + .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) .unwrap(); assert!(!dispached); @@ -238,7 +238,7 @@ mod tests { tx.flush().unwrap(); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut dispached) + .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) .unwrap(); // the source has not been dispatched, as the source is no longer here @@ -262,7 +262,7 @@ mod tests { .unwrap(); event_loop - .dispatch(Some(::std::time::Duration::from_millis(0)), &mut dispached) + .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) .unwrap(); // the has now been properly dispatched diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 9e136d1f..2134ef2c 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -24,7 +24,7 @@ pub mod transient; /// `PostAction` values can be combined with the `|` (bit-or) operator (or with /// `|=`) with the result that: /// - if both values are identical, the result is that value -/// - if they are different, the result is [`Reregister`] +/// - if they are different, the result is [`Reregister`](PostAction::Reregister) /// /// Bit-or-ing these results is useful for composed sources to combine the /// results of their child sources, but note that it only applies to the child diff --git a/src/sources/ping.rs b/src/sources/ping.rs index a71a970f..10f38b66 100644 --- a/src/sources/ping.rs +++ b/src/sources/ping.rs @@ -203,14 +203,14 @@ mod tests { let mut dispatched = false; event_loop - .dispatch(std::time::Duration::from_millis(0), &mut dispatched) + .dispatch(std::time::Duration::ZERO, &mut dispatched) .unwrap(); assert!(dispatched); // Ping has been drained an no longer generates events let mut dispatched = false; event_loop - .dispatch(std::time::Duration::from_millis(0), &mut dispatched) + .dispatch(std::time::Duration::ZERO, &mut dispatched) .unwrap(); assert!(!dispatched); } @@ -230,7 +230,7 @@ mod tests { // If the sender is closed from the start, the ping should first trigger // once, disabling itself but not invoking the callback event_loop - .dispatch(std::time::Duration::from_millis(0), &mut dispatched) + .dispatch(std::time::Duration::ZERO, &mut dispatched) .unwrap(); assert!(!dispatched); diff --git a/src/sources/timer.rs b/src/sources/timer.rs new file mode 100644 index 00000000..cf9634a7 --- /dev/null +++ b/src/sources/timer.rs @@ -0,0 +1,546 @@ +//! Timer event source +//! +//! The [`Timer`] is an event source that will fire its event after a certain amount of time +//! specified at creation. Its timing is tracked directly by the event loop core logic, and it does +//! not consume any system resource. +//! +//! The timer precision depends on whether the loop was initialized in high-precision mode. If not, +//! you can expect precision of order of 1 millisecond, if you need sub-millisecond precision, +//! make sure you initialize the [`EventLoop`](crate::EventLoop) using +//! [`EventLoop::try_new_high_precision()`](crate::EventLoop::try_new_high_precision). Note also +//! that if you need to rely on good precision timers in general, you may need to enable realtime +//! features of your OS to ensure your thread is quickly woken up by the system scheduler. +//! +//! The provided event is an [`Instant`] representing the deadline for which this timer has fired +//! (which can be earlier than the current time depending on the event loop congestion). +//! +//! The callback associated with this event source is expected to return a [`TimeoutAction`], which +//! can be used to implement self-repeating timers by telling calloop to reprogram the same timer +//! for a later timeout after it has fired. + +/* + * This module provides two main types: + * + * - `Timer` is the user-facing type that represents a timer event source + * - `TimerWheel` is an internal data structure for tracking registered timeouts, it is used by + * the polling logic in sys/mod.rs + */ + +use std::{ + cell::RefCell, + collections::BinaryHeap, + rc::Rc, + task::Waker, + time::{Duration, Instant}, +}; + +use crate::{EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory}; + +#[derive(Debug)] +struct Registration { + token: Token, + wheel: Rc>, + counter: u32, +} + +/// A timer event source +#[derive(Debug)] +pub struct Timer { + registration: Option, + deadline: Instant, +} + +impl Timer { + /// Create a timer that will fire immediately when inserted in the event loop + pub fn immediate() -> Timer { + Self::from_deadline(Instant::now()) + } + + /// Create a timer that will fire after a given duration from now + pub fn from_duration(duration: Duration) -> Timer { + Self::from_deadline(Instant::now() + duration) + } + + /// Create a timer that will fire at a given instant + pub fn from_deadline(deadline: Instant) -> Timer { + Timer { + registration: None, + deadline, + } + } +} + +impl EventSource for Timer { + type Event = Instant; + type Metadata = (); + type Ret = TimeoutAction; + type Error = std::io::Error; + + fn process_events( + &mut self, + _: Readiness, + token: Token, + mut callback: F, + ) -> Result + where + F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + if let Some(ref registration) = self.registration { + if registration.token != token { + return Ok(PostAction::Continue); + } + let new_deadline = match callback(self.deadline, &mut ()) { + TimeoutAction::Drop => return Ok(PostAction::Remove), + TimeoutAction::ToInstant(instant) => instant, + TimeoutAction::ToDuration(duration) => Instant::now() + duration, + }; + // If we received an event, we MUST have a valid counter value + registration.wheel.borrow_mut().insert_reuse( + registration.counter, + new_deadline, + registration.token, + ); + self.deadline = new_deadline; + } + Ok(PostAction::Continue) + } + + fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { + let wheel = poll.timers.clone(); + let token = token_factory.token(); + let counter = wheel.borrow_mut().insert(self.deadline, token); + self.registration = Some(Registration { + token, + wheel, + counter, + }); + Ok(()) + } + + fn reregister( + &mut self, + poll: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.unregister(poll)?; + self.register(poll, token_factory) + } + + fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { + if let Some(registration) = self.registration.take() { + poll.timers.borrow_mut().cancel(registration.counter); + } + Ok(()) + } +} + +/// Action to reschedule a timeout if necessary +#[derive(Debug)] +pub enum TimeoutAction { + /// Don't reschedule this timer + Drop, + /// Reschedule this timer to a given [`Instant`] + ToInstant(Instant), + /// Reschedule this timer to a given [`Duration`] in the future + ToDuration(Duration), +} + +// Internal representation of a timeout registered in the TimerWheel +#[derive(Debug)] +struct TimeoutData { + deadline: Instant, + token: RefCell>, + counter: u32, +} + +// A data structure for tracking registered timeouts +#[derive(Debug)] +pub(crate) struct TimerWheel { + heap: BinaryHeap, + counter: u32, +} + +impl TimerWheel { + pub(crate) fn new() -> TimerWheel { + TimerWheel { + heap: BinaryHeap::new(), + counter: 0, + } + } + + pub(crate) fn insert(&mut self, deadline: Instant, token: Token) -> u32 { + self.heap.push(TimeoutData { + deadline, + token: RefCell::new(Some(token)), + counter: self.counter, + }); + let ret = self.counter; + self.counter += 1; + ret + } + + pub(crate) fn insert_reuse(&mut self, counter: u32, deadline: Instant, token: Token) { + self.heap.push(TimeoutData { + deadline, + token: RefCell::new(Some(token)), + counter, + }); + } + + pub(crate) fn cancel(&mut self, counter: u32) { + self.heap + .iter() + .find(|data| data.counter == counter) + .map(|data| data.token.take()); + } + + pub(crate) fn next_expired(&mut self, now: Instant) -> Option<(u32, Token)> { + loop { + // check if there is an expired item + if let Some(data) = self.heap.peek() { + if data.deadline > now { + return None; + } + // there is an expired timeout, continue the + // loop body + } else { + return None; + } + + // There is an item in the heap, this unwrap cannot blow + let data = self.heap.pop().unwrap(); + if let Some(token) = data.token.into_inner() { + return Some((data.counter, token)); + } + // otherwise this timeout was cancelled, continue looping + } + } + + pub(crate) fn next_deadline(&self) -> Option { + self.heap.peek().map(|data| data.deadline) + } +} + +// trait implementations for TimeoutData + +impl std::cmp::Ord for TimeoutData { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // earlier values have priority + self.deadline.cmp(&other.deadline).reverse() + } +} + +impl std::cmp::PartialOrd for TimeoutData { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// This impl is required for PartialOrd but actually never used +// and the type is private, so ignore its coverage +impl std::cmp::PartialEq for TimeoutData { + #[cfg_attr(coverage, no_coverage)] + fn eq(&self, other: &Self) -> bool { + self.deadline == other.deadline + } +} + +impl std::cmp::Eq for TimeoutData {} + +// Logic for timer futures + +/// A future that resolves once a certain timeout is expired +pub struct TimeoutFuture { + deadline: Instant, + waker: Rc>>, +} + +impl std::fmt::Debug for TimeoutFuture { + #[cfg_attr(coverage, no_coverage)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TimeoutFuture") + .field("deadline", &self.deadline) + .finish_non_exhaustive() + } +} + +impl TimeoutFuture { + /// Create a future that resolves after a given duration + pub fn from_duration(handle: &LoopHandle<'_, Data>, duration: Duration) -> TimeoutFuture { + Self::from_deadline(handle, Instant::now() + duration) + } + + /// Create a future that resolves at a given instant + pub fn from_deadline(handle: &LoopHandle<'_, Data>, deadline: Instant) -> TimeoutFuture { + let timer = Timer::from_deadline(deadline); + let waker = Rc::new(RefCell::new(None::)); + let waker2 = waker.clone(); + handle + .insert_source(timer, move |_, &mut (), _| { + if let Some(waker) = waker2.borrow_mut().clone() { + waker.wake() + } + TimeoutAction::Drop + }) + .unwrap(); + + TimeoutFuture { deadline, waker } + } +} + +impl std::future::Future for TimeoutFuture { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + if std::time::Instant::now() >= self.deadline { + return std::task::Poll::Ready(()); + } + *self.waker.borrow_mut() = Some(cx.waker().clone()); + std::task::Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::*; + use std::time::Duration; + + #[test] + fn simple_timer() { + let mut event_loop = EventLoop::try_new().unwrap(); + + let mut dispatched = false; + + event_loop + .handle() + .insert_source( + Timer::from_duration(Duration::from_millis(100)), + |_, &mut (), dispatched| { + *dispatched = true; + TimeoutAction::Drop + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + // not yet dispatched + assert!(!dispatched); + + event_loop + .dispatch(Some(Duration::from_millis(150)), &mut dispatched) + .unwrap(); + // now dispatched + assert!(dispatched); + } + + #[test] + fn simple_timer_instant() { + let mut event_loop = EventLoop::try_new().unwrap(); + + let mut dispatched = false; + + event_loop + .handle() + .insert_source( + Timer::from_duration(Duration::from_millis(100)), + |_, &mut (), dispatched| { + *dispatched = true; + TimeoutAction::Drop + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(Duration::from_millis(150)), &mut dispatched) + .unwrap(); + // now dispatched + assert!(dispatched); + } + + #[test] + fn immediate_timer() { + let mut event_loop = EventLoop::try_new().unwrap(); + + let mut dispatched = false; + + event_loop + .handle() + .insert_source(Timer::immediate(), |_, &mut (), dispatched| { + *dispatched = true; + TimeoutAction::Drop + }) + .unwrap(); + + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + // now dispatched + assert!(dispatched); + } + + // We cannot actually test high precision timers, as they are only high precision in release mode + // This test is here to ensure that the high-precision codepath are executed and work as intended + // even if we cannot test if they are actually high precision + #[test] + fn high_precision_timer() { + let mut event_loop = EventLoop::try_new_high_precision().unwrap(); + + let mut dispatched = false; + + event_loop + .handle() + .insert_source( + Timer::from_duration(Duration::from_millis(100)), + |_, &mut (), dispatched| { + *dispatched = true; + TimeoutAction::Drop + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + // not yet dispatched + assert!(!dispatched); + + event_loop + .dispatch(Some(Duration::from_micros(10200)), &mut dispatched) + .unwrap(); + // yet not dispatched + assert!(!dispatched); + + event_loop + .dispatch(Some(Duration::from_millis(100)), &mut dispatched) + .unwrap(); + // now dispatched + assert!(dispatched); + } + + #[test] + fn cancel_timer() { + let mut event_loop = EventLoop::try_new().unwrap(); + + let mut dispatched = false; + + let token = event_loop + .handle() + .insert_source( + Timer::from_duration(Duration::from_millis(100)), + |_, &mut (), dispatched| { + *dispatched = true; + TimeoutAction::Drop + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + // not yet dispatched + assert!(!dispatched); + + event_loop.handle().remove(token); + + event_loop + .dispatch(Some(Duration::from_millis(150)), &mut dispatched) + .unwrap(); + // still not dispatched + assert!(!dispatched); + } + + #[test] + fn repeating_timer() { + let mut event_loop = EventLoop::try_new().unwrap(); + + let mut dispatched = 0; + + event_loop + .handle() + .insert_source( + Timer::from_duration(Duration::from_millis(500)), + |_, &mut (), dispatched| { + *dispatched += 1; + TimeoutAction::ToDuration(Duration::from_millis(500)) + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(Duration::from_millis(250)), &mut dispatched) + .unwrap(); + assert_eq!(dispatched, 0); + + event_loop + .dispatch(Some(Duration::from_millis(510)), &mut dispatched) + .unwrap(); + assert_eq!(dispatched, 1); + + event_loop + .dispatch(Some(Duration::from_millis(510)), &mut dispatched) + .unwrap(); + assert_eq!(dispatched, 2); + + event_loop + .dispatch(Some(Duration::from_millis(510)), &mut dispatched) + .unwrap(); + assert_eq!(dispatched, 3); + } + + #[cfg(feature = "executor")] + #[test] + fn timeout_future() { + let mut event_loop = EventLoop::try_new().unwrap(); + + let mut dispatched = 0; + + let timeout_1 = + TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(500)); + let timeout_2 = + TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(1500)); + + let (exec, sched) = crate::sources::futures::executor().unwrap(); + event_loop + .handle() + .insert_source(exec, move |(), &mut (), got| { + *got += 1; + }) + .unwrap(); + + sched.schedule(timeout_1).unwrap(); + sched.schedule(timeout_2).unwrap(); + + // We do a 0-timeout dispatch after every regular dispatch to let the timeout triggers + // flow back to the executor + + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + assert_eq!(dispatched, 0); + + event_loop + .dispatch(Some(Duration::from_millis(1000)), &mut dispatched) + .unwrap(); + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + assert_eq!(dispatched, 1); + + event_loop + .dispatch(Some(Duration::from_millis(1100)), &mut dispatched) + .unwrap(); + event_loop + .dispatch(Some(Duration::ZERO), &mut dispatched) + .unwrap(); + assert_eq!(dispatched, 2); + } +} diff --git a/src/sources/timer/mod.rs b/src/sources/timer/mod.rs deleted file mode 100644 index 911cd90f..00000000 --- a/src/sources/timer/mod.rs +++ /dev/null @@ -1,499 +0,0 @@ -//! Timer-based event sources -//! -//! A [`Timer`](Timer) is a general time-tracking object. It is used by setting timeouts, -//! and generates events whenever a timeout expires. -//! -//! The [`Timer`](Timer) event source provides an handle [`TimerHandle`](TimerHandle), which -//! is used to set or cancel timeouts. This handle is cloneable and can be sent accross threads -//! if `T: Send`, allowing you to setup timeouts from any point of your program. - -use std::cell::RefCell; -use std::collections::BinaryHeap; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; - -use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; - -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", - target_os = "macos" -))] -mod threaded; -#[cfg(target_os = "linux")] -mod timerfd; - -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", - target_os = "macos" -))] -use threaded::{TimerScheduler, TimerSource}; -#[cfg(target_os = "linux")] -use timerfd::{TimerScheduler, TimerSource}; - -/// An error arising from processing events for a timer. -#[derive(thiserror::Error, Debug)] -#[error("{0}")] -pub struct TimerError( - #[cfg(target_os = "linux")] std::io::Error, - #[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", - target_os = "macos" - ))] - crate::ping::PingError, -); - -/// A Timer event source -/// -/// It generates events of type `(T, TimerHandle)`, providing you -/// an handle inside the event callback, allowing you to set new timeouts -/// as a response to a timeout being reached (for reccuring ticks for example). -#[derive(Debug)] -pub struct Timer { - inner: Arc>>, - source: TimerSource, -} - -impl Timer { - /// Create a new timer - pub fn new() -> crate::Result> { - let (scheduler, source) = TimerScheduler::new()?; - let inner = TimerInner::new(scheduler); - Ok(Timer { - inner: Arc::new(Mutex::new(inner)), - source, - }) - } - - /// Get an handle for this timer - pub fn handle(&self) -> TimerHandle { - TimerHandle { - inner: self.inner.clone(), - } - } -} - -/// An handle to a timer, used to set or cancel timeouts -/// -/// This handle can be cloned, and can be sent accross thread as long -/// as `T: Send`. -#[derive(Debug)] -pub struct TimerHandle { - inner: Arc>>, -} - -// Manual impl of `Clone` as #[derive(Clone)] adds a `T: Clone` bound -impl Clone for TimerHandle { - #[cfg_attr(coverage, no_coverage)] - fn clone(&self) -> TimerHandle { - TimerHandle { - inner: self.inner.clone(), - } - } -} - -/// An itentifier to cancel a timeout if necessary -#[derive(Debug)] -pub struct Timeout { - counter: u32, -} - -impl TimerHandle { - /// Set a new timeout - /// - /// The associated `data` will be given as argument to the callback. - /// - /// The returned `Timeout` can be used to cancel it. You can drop it if you don't - /// plan to cancel this timeout. - pub fn add_timeout(&self, delay_from_now: Duration, data: T) -> Timeout { - self.inner - .lock() - .unwrap() - .insert(Instant::now() + delay_from_now, data) - } - - /// Cancel a previsouly set timeout and retrieve the associated data - /// - /// This method returns `None` if the timeout does not exist (it has already fired - /// or has already been cancelled). - pub fn cancel_timeout(&self, timeout: &Timeout) -> Option { - self.inner.lock().unwrap().cancel(timeout) - } - - /// Cancel all planned timeouts for this timer - /// - /// All associated data will be dropped. - pub fn cancel_all_timeouts(&self) { - self.inner.lock().unwrap().cancel_all(); - } -} - -impl EventSource for Timer { - type Event = T; - type Metadata = TimerHandle; - type Ret = (); - type Error = TimerError; - - fn process_events( - &mut self, - readiness: Readiness, - token: Token, - mut callback: C, - ) -> Result - where - C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - let mut handle = TimerHandle { - inner: self.inner.clone(), - }; - let inner = &self.inner; - self.source - .process_events(readiness, token, |(), &mut ()| { - loop { - let next_expired: Option = { - let mut guard = inner.lock().unwrap(); - guard.next_expired() - }; - if let Some(val) = next_expired { - callback(val, &mut handle); - } else { - break; - } - } - // now compute the next timeout and signal if necessary - inner.lock().unwrap().reschedule(); - }) - .map_err(TimerError) - } - - fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.source.register(poll, token_factory) - } - - fn reregister( - &mut self, - poll: &mut Poll, - token_factory: &mut TokenFactory, - ) -> crate::Result<()> { - self.source.reregister(poll, token_factory) - } - - fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.source.unregister(poll) - } -} - -/* - * Timer logic - */ - -#[derive(Debug)] -struct TimeoutData { - deadline: Instant, - data: RefCell>, - counter: u32, -} - -#[derive(Debug)] -struct TimerInner { - heap: BinaryHeap>, - scheduler: TimerScheduler, - counter: u32, -} - -impl TimerInner { - fn new(scheduler: TimerScheduler) -> TimerInner { - TimerInner { - heap: BinaryHeap::new(), - scheduler, - counter: 0, - } - } - - fn insert(&mut self, deadline: Instant, value: T) -> Timeout { - self.heap.push(TimeoutData { - deadline, - data: RefCell::new(Some(value)), - counter: self.counter, - }); - let ret = Timeout { - counter: self.counter, - }; - self.counter += 1; - self.reschedule(); - ret - } - - fn cancel(&mut self, timeout: &Timeout) -> Option { - for data in self.heap.iter() { - if data.counter == timeout.counter { - let udata = data.data.borrow_mut().take(); - self.reschedule(); - return udata; - } - } - None - } - - fn cancel_all(&mut self) { - self.heap.clear(); - self.reschedule(); - } - - fn next_expired(&mut self) -> Option { - let now = Instant::now(); - loop { - // check if there is an expired item - if let Some(data) = self.heap.peek() { - if data.deadline > now { - return None; - } - // there is an expired timeout, continue the - // loop body - } else { - return None; - } - - // There is an item in the heap, this unwrap cannot blow - let data = self.heap.pop().unwrap(); - if let Some(val) = data.data.into_inner() { - return Some(val); - } - // otherwise this timeout was cancelled, continue looping - } - } - - fn reschedule(&mut self) { - if let Some(next_deadline) = self.heap.peek().map(|data| data.deadline) { - self.scheduler.reschedule(next_deadline); - } else { - self.scheduler.deschedule(); - } - } -} - -// trait implementations for TimeoutData - -impl std::cmp::Ord for TimeoutData { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // earlier values have priority - self.deadline.cmp(&other.deadline).reverse() - } -} - -impl std::cmp::PartialOrd for TimeoutData { - fn partial_cmp(&self, other: &Self) -> Option { - // earlier values have priority - Some(self.deadline.cmp(&other.deadline).reverse()) - } -} - -impl std::cmp::PartialEq for TimeoutData { - fn eq(&self, other: &Self) -> bool { - // earlier values have priority - self.deadline == other.deadline - } -} - -impl std::cmp::Eq for TimeoutData {} - -/* - * Tests - */ - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - - #[test] - fn single_timer() { - let mut event_loop = crate::EventLoop::try_new().unwrap(); - - let evl_handle = event_loop.handle(); - - let mut fired = false; - - let timer = Timer::<()>::new().unwrap(); - let timer_handle = timer.handle(); - evl_handle - .insert_source(timer, move |(), _, f| { - *f = true; - }) - .unwrap(); - - timer_handle.add_timeout(Duration::from_millis(300), ()); - - event_loop - .dispatch(Some(::std::time::Duration::from_millis(100)), &mut fired) - .unwrap(); - - // it should not have fired yet - assert!(!fired); - - event_loop.dispatch(None, &mut fired).unwrap(); - - // it should have fired now - assert!(fired); - } - - #[test] - fn instant_timer() { - let mut event_loop = crate::EventLoop::try_new().unwrap(); - - let evl_handle = event_loop.handle(); - let mut fired = false; - let timer = Timer::<()>::new().unwrap(); - let timer_handle = timer.handle(); - evl_handle - .insert_source(timer, move |(), _, f| { - *f = true; - }) - .unwrap(); - timer_handle.add_timeout(Duration::from_nanos(0), ()); - event_loop.dispatch(None, &mut fired).unwrap(); - - // The timer should fire right away. - assert!(fired); - } - - #[test] - fn multi_timout_order() { - let mut event_loop = crate::EventLoop::try_new().unwrap(); - - let evl_handle = event_loop.handle(); - - let mut fired = Vec::new(); - - let timer = Timer::::new().unwrap(); - let timer_handle = timer.handle(); - - evl_handle - .insert_source(timer, |val, _, fired: &mut Vec| { - fired.push(val); - }) - .unwrap(); - - timer_handle.add_timeout(Duration::from_millis(300), 1); - timer_handle.add_timeout(Duration::from_millis(100), 2); - timer_handle.add_timeout(Duration::from_millis(600), 3); - - // 3 dispatches as each returns once at least one event occured - - event_loop - .dispatch(Some(::std::time::Duration::from_millis(200)), &mut fired) - .unwrap(); - - assert_eq!(&fired, &[2]); - - event_loop - .dispatch(Some(::std::time::Duration::from_millis(300)), &mut fired) - .unwrap(); - - assert_eq!(&fired, &[2, 1]); - - event_loop - .dispatch(Some(::std::time::Duration::from_millis(400)), &mut fired) - .unwrap(); - - assert_eq!(&fired, &[2, 1, 3]); - } - - #[test] - fn timer_cancel() { - let mut event_loop = crate::EventLoop::try_new().unwrap(); - - let evl_handle = event_loop.handle(); - - let mut fired = Vec::new(); - - let timer = Timer::::new().unwrap(); - let timer_handle = timer.handle(); - - evl_handle - .insert_source(timer, |val, _, fired: &mut Vec| fired.push(val)) - .unwrap(); - - let timeout1 = timer_handle.add_timeout(Duration::from_millis(300), 1); - let timeout2 = timer_handle.add_timeout(Duration::from_millis(100), 2); - let timeout3 = timer_handle.add_timeout(Duration::from_millis(600), 3); - - // 3 dispatches as each returns once at least one event occured - // - // The timeouts 1 and 3 and not cancelled right away, but still before they - // fire - - event_loop - .dispatch(Some(::std::time::Duration::from_millis(200)), &mut fired) - .unwrap(); - - assert_eq!(&fired, &[2]); - - // timeout2 has already fired, we cancel timeout1 - assert_eq!(timer_handle.cancel_timeout(&timeout2), None); - assert_eq!(timer_handle.cancel_timeout(&timeout1), Some(1)); - - event_loop - .dispatch(Some(::std::time::Duration::from_millis(300)), &mut fired) - .unwrap(); - - assert_eq!(&fired, &[2]); - - // cancel timeout3 - assert_eq!(timer_handle.cancel_timeout(&timeout3), Some(3)); - - event_loop - .dispatch(Some(::std::time::Duration::from_millis(600)), &mut fired) - .unwrap(); - - assert_eq!(&fired, &[2]); - } - - #[test] - fn timeout_cancel_early() { - // Cancelling an earlier timeout should not prevent later ones from running - let mut event_loop = crate::EventLoop::try_new().unwrap(); - let handle = event_loop.handle(); - - let timer_source = Timer::new().unwrap(); - let timers = timer_source.handle(); - - handle - .insert_source(timer_source, |_, _, count| { - *count += 1; - }) - .unwrap(); - timers.add_timeout(Duration::from_secs(1), ()); - - let sooner = timers.add_timeout(Duration::from_millis(500), ()); - timers.cancel_timeout(&sooner); - - let mut timeout_count = 0; - event_loop - .dispatch( - Some(::std::time::Duration::from_secs(2)), - &mut timeout_count, - ) - .unwrap(); - // first timeout was cancelled, but the event loop still wakes up for nothing - assert_eq!(timeout_count, 0); - - event_loop - .dispatch( - Some(::std::time::Duration::from_secs(2)), - &mut timeout_count, - ) - .unwrap(); - // second dispatch gets the second timeout - assert_eq!(timeout_count, 1); - } -} diff --git a/src/sources/timer/threaded.rs b/src/sources/timer/threaded.rs deleted file mode 100644 index eac2a5c0..00000000 --- a/src/sources/timer/threaded.rs +++ /dev/null @@ -1,137 +0,0 @@ -//! Timer scheduler which is using thread to schedule timers. - -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::time::Instant; - -use crate::ping::{make_ping, Ping, PingError, PingSource}; -use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; - -#[derive(Debug)] -pub struct TimerSource { - source: PingSource, -} - -impl TimerSource { - fn new() -> std::io::Result<(Ping, Self)> { - let (ping, source) = make_ping()?; - Ok((ping, Self { source })) - } -} - -impl EventSource for TimerSource { - type Event = (); - type Metadata = (); - type Ret = (); - type Error = PingError; - - fn process_events( - &mut self, - readiness: Readiness, - token: Token, - mut callback: C, - ) -> Result - where - C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - self.source.process_events(readiness, token, |_, &mut _| { - callback((), &mut ()); - }) - } - - fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.source.register(poll, token_factory) - } - - fn reregister( - &mut self, - poll: &mut Poll, - token_factory: &mut TokenFactory, - ) -> crate::Result<()> { - self.source.reregister(poll, token_factory) - } - - fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.source.unregister(poll) - } -} - -#[derive(Debug)] -pub struct TimerScheduler { - current_deadline: Arc>>, - kill_switch: Arc, - thread: Option>, -} - -impl TimerScheduler { - pub fn new() -> crate::Result<(TimerScheduler, TimerSource)> { - let current_deadline = Arc::new(Mutex::new(None::)); - let thread_deadline = current_deadline.clone(); - - let kill_switch = Arc::new(AtomicBool::new(false)); - let thread_kill = kill_switch.clone(); - - let (ping, source) = TimerSource::new()?; - - let thread = std::thread::Builder::new() - .name("calloop timer".into()) - .spawn(move || loop { - // stop if requested - if thread_kill.load(Ordering::Acquire) { - return; - } - // otherwise check the timeout - let opt_deadline: Option = { - // subscope to ensure the mutex does not remain locked while the thread is parked - let guard = thread_deadline.lock().unwrap(); - *guard - }; - if let Some(deadline) = opt_deadline { - if let Some(remaining) = deadline.checked_duration_since(Instant::now()) { - // it is not yet expired, go to sleep until it - std::thread::park_timeout(remaining); - } else { - // it is expired, wake the event loop and go to sleep - ping.ping(); - std::thread::park(); - } - } else { - // there is none, got to sleep - std::thread::park(); - } - })?; - - let scheduler = TimerScheduler { - current_deadline, - kill_switch, - thread: Some(thread), - }; - Ok((scheduler, source)) - } - - pub fn reschedule(&mut self, new_deadline: Instant) { - let mut deadline_guard = self.current_deadline.lock().unwrap(); - if let Some(current_deadline) = *deadline_guard { - if new_deadline < current_deadline || current_deadline <= Instant::now() { - *deadline_guard = Some(new_deadline); - self.thread.as_ref().unwrap().thread().unpark(); - } - } else { - *deadline_guard = Some(new_deadline); - self.thread.as_ref().unwrap().thread().unpark(); - } - } - - pub fn deschedule(&mut self) { - *(self.current_deadline.lock().unwrap()) = None; - } -} - -impl Drop for TimerScheduler { - fn drop(&mut self) { - self.kill_switch.store(true, Ordering::Release); - let thread = self.thread.take().unwrap(); - thread.thread().unpark(); - let _ = thread.join(); - } -} diff --git a/src/sources/timer/timerfd.rs b/src/sources/timer/timerfd.rs deleted file mode 100644 index a7834ca0..00000000 --- a/src/sources/timer/timerfd.rs +++ /dev/null @@ -1,122 +0,0 @@ -//! Timer scheduler which is using timerfd system interface to schedule timers. - -use std::os::unix::io::{AsRawFd, RawFd}; -use std::time::{Duration, Instant}; - -use nix::sys::time::TimeSpec; -use nix::sys::timerfd::{ClockId, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags}; - -use crate::generic::Generic; -use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; - -/// Timerfd timer resolution. It's derived from timespec. -const TIMER_RESOLUTION: Duration = Duration::from_nanos(1); - -#[derive(Debug)] -pub struct TimerScheduler { - current_deadline: Option, - timerfd: TimerFd, -} - -impl TimerScheduler { - pub fn new() -> crate::Result<(Self, TimerSource)> { - let timerfd = TimerFd::new( - ClockId::CLOCK_MONOTONIC, - TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK, - )?; - - let source = TimerSource::new(&timerfd); - let scheduler = Self { - timerfd, - current_deadline: None, - }; - - Ok((scheduler, source)) - } - - pub fn reschedule(&mut self, new_deadline: Instant) { - let now = Instant::now(); - - // We should handle the case when duration is zero. Since timerfd can't do that we pass the - // timer resolution, which is 1ns triggering the timer right away. - let time = TimeSpec::from_duration(std::cmp::max( - new_deadline.saturating_duration_since(now), - TIMER_RESOLUTION, - )); - - let time = match self.current_deadline { - Some(current_deadline) if new_deadline > current_deadline && current_deadline > now => { - return; - } - _ => time, - }; - - self.current_deadline = Some(new_deadline); - - let expiration = Expiration::OneShot(time); - let flags = TimerSetTimeFlags::empty(); - self.timerfd - .set(expiration, flags) - .expect("setting timerfd failed."); - } - - pub fn deschedule(&mut self) { - self.current_deadline = None; - self.timerfd.unset().expect("failed unsetting timerfd."); - } -} - -#[derive(Debug)] -pub struct TimerSource { - source: Generic, -} - -impl TimerSource { - fn new(timerfd: &TimerFd) -> Self { - Self { - source: Generic::new( - timerfd.as_raw_fd(), - crate::Interest::READ, - crate::Mode::Level, - ), - } - } -} - -impl EventSource for TimerSource { - type Event = (); - type Metadata = (); - type Ret = (); - type Error = std::io::Error; - - fn process_events( - &mut self, - readiness: Readiness, - token: Token, - mut callback: C, - ) -> Result - where - C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - self.source.process_events(readiness, token, |_, &mut _| { - callback((), &mut ()); - Ok(PostAction::Continue) - }) - } - - fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.source.register(poll, token_factory) - } - - fn reregister( - &mut self, - poll: &mut Poll, - token_factory: &mut TokenFactory, - ) -> crate::Result<()> { - self.source.reregister(poll, token_factory) - } - - fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.source.unregister(poll) - } -} diff --git a/src/sources/transient.rs b/src/sources/transient.rs index 8a9854dd..94504b70 100644 --- a/src/sources/transient.rs +++ b/src/sources/transient.rs @@ -276,9 +276,6 @@ mod tests { time::Duration, }; - // This can be removed in favour of Duration::ZERO when the MSRV is 1.53.0 - const DURATION_ZERO: Duration = Duration::from_nanos(0); - #[test] fn test_transient_drop() { // A test source that sets a flag when it's dropped. @@ -365,7 +362,7 @@ mod tests { // First loop run: the ping generates an event for the inner source. pinger.ping(); - event_loop.dispatch(DURATION_ZERO, &mut fired).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); assert!(fired); assert!(dropped.load(Ordering::Relaxed)); @@ -376,7 +373,7 @@ mod tests { pinger.ping(); - event_loop.dispatch(DURATION_ZERO, &mut fired).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); assert!(!fired); } @@ -410,7 +407,7 @@ mod tests { drop(sender); // Run loop once to process events. - event_loop.dispatch(DURATION_ZERO, &mut msg_queue).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap(); assert!(matches!( msg_queue.as_slice(), @@ -552,24 +549,24 @@ mod tests { // First loop run: the ping generates an event for the inner source. // The ID should be 1 after the increment in register(). pinger.ping(); - event_loop.dispatch(DURATION_ZERO, &mut id).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); assert_eq!(id, 1); // Second loop run: the ID should be 2 after the previous // process_events(). pinger.ping(); - event_loop.dispatch(DURATION_ZERO, &mut id).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); assert_eq!(id, 2); // Third loop run: the ID should be 3 after another process_events(). pinger.ping(); - event_loop.dispatch(DURATION_ZERO, &mut id).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); assert_eq!(id, 3); // Fourth loop run: the callback is no longer called by the inner // source, so our local ID is not incremented. pinger.ping(); - event_loop.dispatch(DURATION_ZERO, &mut id).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); assert_eq!(id, 3); // Remove the dispatcher so we can inspect the sources. @@ -651,13 +648,13 @@ mod tests { // Ping here and not later, to check that disabling after an event is // triggered but not processed does not discard the event. pinger.ping(); - event_loop.dispatch(DURATION_ZERO, &mut fired).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); assert!(fired); // Source should now be disabled. pinger.ping(); fired = false; - event_loop.dispatch(DURATION_ZERO, &mut fired).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); assert!(!fired); // Re-enable the source. @@ -666,7 +663,7 @@ mod tests { // Trigger another event. pinger.ping(); fired = false; - event_loop.dispatch(DURATION_ZERO, &mut fired).unwrap(); + event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); assert!(fired); } } diff --git a/src/sys/epoll.rs b/src/sys/epoll.rs index 81803f35..1e7d4560 100644 --- a/src/sys/epoll.rs +++ b/src/sys/epoll.rs @@ -1,66 +1,127 @@ -use std::os::unix::io::RawFd; +use std::os::unix::io::{AsRawFd, RawFd}; use super::{Interest, Mode, PollEvent, Readiness, Token}; -use nix::sys::epoll; +use nix::sys::{ + epoll::{ + epoll_create1, epoll_ctl, epoll_wait, EpollCreateFlags, EpollEvent, EpollFlags, EpollOp, + }, + time::TimeSpec, + timerfd::{ClockId, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags}, +}; pub struct Epoll { epoll_fd: RawFd, + timer_fd: Option, } -fn make_flags(interest: Interest, mode: Mode) -> epoll::EpollFlags { - let mut flags = epoll::EpollFlags::empty(); +const TIMER_DATA: u64 = u64::MAX; + +fn make_flags(interest: Interest, mode: Mode) -> EpollFlags { + let mut flags = EpollFlags::empty(); if interest.readable { - flags |= epoll::EpollFlags::EPOLLIN; + flags |= EpollFlags::EPOLLIN; } if interest.writable { - flags |= epoll::EpollFlags::EPOLLOUT; + flags |= EpollFlags::EPOLLOUT; } match mode { Mode::Level => { /* This is the default */ } - Mode::Edge => flags |= epoll::EpollFlags::EPOLLET, - Mode::OneShot => flags |= epoll::EpollFlags::EPOLLONESHOT, + Mode::Edge => flags |= EpollFlags::EPOLLET, + Mode::OneShot => flags |= EpollFlags::EPOLLONESHOT, } flags } -fn flags_to_readiness(flags: epoll::EpollFlags) -> Readiness { +fn flags_to_readiness(flags: EpollFlags) -> Readiness { Readiness { - readable: flags.contains(epoll::EpollFlags::EPOLLIN), - writable: flags.contains(epoll::EpollFlags::EPOLLOUT), - error: flags.contains(epoll::EpollFlags::EPOLLERR), + readable: flags.contains(EpollFlags::EPOLLIN), + writable: flags.contains(EpollFlags::EPOLLOUT), + error: flags.contains(EpollFlags::EPOLLERR), } } impl Epoll { - pub(crate) fn new() -> crate::Result { - let epoll_fd = epoll::epoll_create1(epoll::EpollCreateFlags::EPOLL_CLOEXEC)?; - Ok(Epoll { epoll_fd }) + pub(crate) fn new(high_precision: bool) -> crate::Result { + let epoll_fd = epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC)?; + let mut timer_fd = None; + if high_precision { + // Prepare a timerfd for precise time tracking and register it to the event queue + // This timerfd allows for nanosecond precision in setting the timout up (though in practice + // we rather get ~10 microsecond precision), while epoll_wait() API only allows millisecond + // granularity + let timer = TimerFd::new( + ClockId::CLOCK_MONOTONIC, + TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK, + )?; + let mut timer_event = EpollEvent::new(EpollFlags::EPOLLIN, TIMER_DATA); + epoll_ctl( + epoll_fd, + EpollOp::EpollCtlAdd, + timer.as_raw_fd(), + &mut timer_event, + )?; + timer_fd = Some(timer); + } + Ok(Epoll { epoll_fd, timer_fd }) } pub(crate) fn poll( &mut self, timeout: Option, ) -> crate::Result> { - let mut buffer = [epoll::EpollEvent::empty(); 32]; - let timeout = timeout.map(|d| d.as_millis() as isize).unwrap_or(-1); - let n_ready = epoll::epoll_wait(self.epoll_fd, &mut buffer, timeout)?; + let mut buffer = [EpollEvent::empty(); 32]; + if let Some(ref timer) = self.timer_fd { + if let Some(timeout) = timeout { + // Set up the precise timer + timer.set( + Expiration::OneShot(TimeSpec::from_duration(timeout)), + TimerSetTimeFlags::empty(), + )?; + } + } + // add 1 to the millisecond wait, to round up for timer tracking. If the high precision timer is set up + // it'll fire before that timeout + let timeout = timeout.map(|d| (d.as_millis() + 1) as isize).unwrap_or(-1); + let n_ready = epoll_wait(self.epoll_fd, &mut buffer, timeout)?; let events = buffer .iter() .take(n_ready) - .map(|event| { - // In C, the underlying data type is a union including a void - // pointer; in Rust's FFI bindings, it only exposes the u64. The - // round-trip conversion is valid however. - let token_ptr = event.data() as usize as *const Token; - PollEvent { - readiness: flags_to_readiness(event.events()), - // Why this is safe: it points to memory boxed and owned by - // the parent Poller type. - token: unsafe { *token_ptr }, + .flat_map(|event| { + if event.data() == TIMER_DATA { + // We woke up because the high-precision timer fired, we need to disarm it by reading its + // contents to ensure it will be ready for next time + // Timer is created in non-blocking mode, and should have already fired anyway, this + // cannot possibly block + let _ = self + .timer_fd + .as_ref() + .expect("Got an event from high-precision timer while it is not set up?!") + .wait(); + // don't forward this event to downstream + None + } else { + // In C, the underlying data type is a union including a void + // pointer; in Rust's FFI bindings, it only exposes the u64. The + // round-trip conversion is valid however. + let token_ptr = event.data() as usize as *const Token; + Some(PollEvent { + readiness: flags_to_readiness(event.events()), + // Why this is safe: it points to memory boxed and owned by + // the parent Poller type. + token: unsafe { *token_ptr }, + }) } }) .collect(); + if let Some(ref timer) = self.timer_fd { + // in all cases, disarm the timer + timer.unset()?; + // clear the timer in case it fired between epoll_wait and now, as timer is in + // non-blocking mode, this will return Err(WouldBlock) if it had not fired, so + // we ignore the error + let _ = timer.wait(); + } Ok(events) } @@ -71,9 +132,8 @@ impl Epoll { mode: Mode, token: *const Token, ) -> crate::Result<()> { - let mut event = epoll::EpollEvent::new(make_flags(interest, mode), token as usize as u64); - epoll::epoll_ctl(self.epoll_fd, epoll::EpollOp::EpollCtlAdd, fd, &mut event) - .map_err(Into::into) + let mut event = EpollEvent::new(make_flags(interest, mode), token as usize as u64); + epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, &mut event).map_err(Into::into) } pub fn reregister( @@ -83,13 +143,12 @@ impl Epoll { mode: Mode, token: *const Token, ) -> crate::Result<()> { - let mut event = epoll::EpollEvent::new(make_flags(interest, mode), token as usize as u64); - epoll::epoll_ctl(self.epoll_fd, epoll::EpollOp::EpollCtlMod, fd, &mut event) - .map_err(Into::into) + let mut event = EpollEvent::new(make_flags(interest, mode), token as usize as u64); + epoll_ctl(self.epoll_fd, EpollOp::EpollCtlMod, fd, &mut event).map_err(Into::into) } pub fn unregister(&mut self, fd: RawFd) -> crate::Result<()> { - epoll::epoll_ctl(self.epoll_fd, epoll::EpollOp::EpollCtlDel, fd, None).map_err(Into::into) + epoll_ctl(self.epoll_fd, EpollOp::EpollCtlDel, fd, None).map_err(Into::into) } } diff --git a/src/sys/kqueue.rs b/src/sys/kqueue.rs index 0934f80d..6aef68fd 100644 --- a/src/sys/kqueue.rs +++ b/src/sys/kqueue.rs @@ -1,6 +1,9 @@ use std::{io, os::unix::io::RawFd}; -use nix::sys::event::{kevent, kevent_ts, kqueue, EventFilter, EventFlag, FilterFlag, KEvent}; +use nix::{ + libc::{c_long, time_t, timespec}, + sys::event::{kevent_ts, kqueue, EventFilter, EventFlag, FilterFlag, KEvent}, +}; use super::{Interest, Mode, PollEvent, Readiness, Token}; @@ -17,7 +20,8 @@ fn mode_to_flag(mode: Mode) -> EventFlag { } impl Kqueue { - pub(crate) fn new() -> crate::Result { + // Kqueue is always high precision + pub(crate) fn new(_high_precision: bool) -> crate::Result { let kq = kqueue()?; Ok(Kqueue { kq }) } @@ -35,10 +39,15 @@ impl Kqueue { 0, ); 32]; - let nevents = match timeout { - None => kevent_ts(self.kq, &[], &mut buffer, None), - Some(t) => kevent(self.kq, &[], &mut buffer, t.as_millis() as usize), - }?; + let nevents = kevent_ts( + self.kq, + &[], + &mut buffer, + timeout.map(|d| timespec { + tv_sec: d.as_secs() as time_t, + tv_nsec: d.subsec_nanos() as c_long, + }), + )?; let ret = buffer .iter() diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 8531f3fc..defbe11d 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -1,7 +1,7 @@ -use std::{convert::TryInto, os::unix::io::RawFd}; +use std::{cell::RefCell, convert::TryInto, os::unix::io::RawFd, rc::Rc, time::Duration}; use vec_map::VecMap; -use crate::loop_logic::CalloopKey; +use crate::{loop_logic::CalloopKey, sources::timer::TimerWheel}; #[cfg(target_os = "linux")] mod epoll; @@ -176,6 +176,7 @@ pub struct Poll { // VecMap which has that exact constraint for its keys. If that ever // changes, this will need to be changed to a different structure. tokens: VecMap<*mut Token>, + pub(crate) timers: Rc>, } impl std::fmt::Debug for Poll { @@ -186,18 +187,47 @@ impl std::fmt::Debug for Poll { } impl Poll { - pub(crate) fn new() -> crate::Result { + pub(crate) fn new(high_precision: bool) -> crate::Result { Ok(Poll { - poller: Poller::new()?, + poller: Poller::new(high_precision)?, tokens: VecMap::new(), + timers: Rc::new(RefCell::new(TimerWheel::new())), }) } pub(crate) fn poll( &mut self, - timeout: Option, + mut timeout: Option, ) -> crate::Result> { - self.poller.poll(timeout) + let now = std::time::Instant::now(); + // adjust the timeout for the timers + if let Some(next_timeout) = self.timers.borrow().next_deadline() { + if next_timeout <= now { + timeout = Some(Duration::ZERO); + } else if let Some(deadline) = timeout { + timeout = Some(std::cmp::min(deadline, next_timeout - now)); + } else { + timeout = Some(next_timeout - now); + } + }; + + let mut events = self.poller.poll(timeout)?; + + // Update 'now' as some time may have elapsed in poll() + let now = std::time::Instant::now(); + let mut timers = self.timers.borrow_mut(); + while let Some((_, token)) = timers.next_expired(now) { + events.push(PollEvent { + readiness: Readiness { + readable: true, + writable: false, + error: false, + }, + token, + }); + } + + Ok(events) } /// Register a new file descriptor for polling