diff --git a/CHANGELOG.md b/CHANGELOG.md index d9c29a08..67e4cbd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ associated event callback. - **Breaking:** The minimum supported Rust version is now 1.53.0 - Introduce `EventLoop::try_new_high_precision()` for sub-millisecond accuracy in the event loop +- The `PingSource` event source now uses an `eventfd` instead of a pipe on Linux. ## 0.9.2 -- 2021-12-27 diff --git a/src/sources/ping.rs b/src/sources/ping.rs index 10f38b66..16dc9dc7 100644 --- a/src/sources/ping.rs +++ b/src/sources/ping.rs @@ -4,20 +4,31 @@ //! [`Ping::ping`](Ping#method.ping) method is called. If the event source is pinged multiple times //! between a single dispatching, it'll only generate one event. //! -//! This event loop is a simple way of waking up the event loop from an other part of your program +//! This event source is a simple way of waking up the event loop from an other part of your program //! (and is what backs the [`LoopSignal`](crate::LoopSignal)). It can also be used as a building //! block to construct event sources whose source of event is not file descriptor, but rather an //! userspace source (like an other thread). -use std::{os::unix::io::RawFd, sync::Arc}; +use nix::unistd::close; +use std::os::unix::io::RawFd; -use nix::{ - fcntl::OFlag, - unistd::{close, read, write}, -}; +// The ping source has platform-dependent implementations provided by modules +// under this one. These modules should expose: +// - a make_ping() function +// - a Ping type +// - a PingSource type +// +// See eg. the pipe implementation for these items' specific requirements. -use super::generic::Generic; -use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory}; +#[cfg(target_os = "linux")] +mod eventfd; +#[cfg(target_os = "linux")] +use eventfd as platform; + +#[cfg(not(target_os = "linux"))] +mod pipe; +#[cfg(not(target_os = "linux"))] +use pipe as platform; /// Create a new ping event source /// @@ -25,43 +36,7 @@ use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, Tok /// event loop, and a [`PingSource`], which you can insert in your event loop to /// receive the pings. pub fn make_ping() -> std::io::Result<(Ping, PingSource)> { - #[cfg(not(target_os = "macos"))] - let (read, write) = { - use nix::unistd::pipe2; - - pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)? - }; - - // macOS does not have pipe2, but we can emulate the behavior of pipe2 by setting the flags after calling pipe. - #[cfg(target_os = "macos")] - let (read, write) = { - use nix::{ - fcntl::{fcntl, FcntlArg}, - unistd::pipe, - }; - - let (read, write) = pipe()?; - - let read_flags = OFlag::from_bits_truncate(fcntl(read, FcntlArg::F_GETFD)?) - | OFlag::O_CLOEXEC - | OFlag::O_NONBLOCK; - let write_flags = OFlag::from_bits_truncate(fcntl(write, FcntlArg::F_GETFD)?) - | OFlag::O_CLOEXEC - | OFlag::O_NONBLOCK; - - fcntl(read, FcntlArg::F_SETFL(read_flags))?; - fcntl(write, FcntlArg::F_SETFL(write_flags))?; - - (read, write) - }; - - let source = PingSource { - pipe: Generic::new(read, Interest::READ, Mode::Level), - }; - let ping = Ping { - pipe: Arc::new(CloseOnDrop(write)), - }; - Ok((ping, source)) + platform::make_ping() } /// The ping event source @@ -70,103 +45,18 @@ pub fn make_ping() -> std::io::Result<(Ping, PingSource)> { /// /// If you use it directly, it will automatically remove itself from the event loop /// once all [`Ping`] instances are dropped. -#[derive(Debug)] -pub struct PingSource { - pipe: Generic, -} - -impl EventSource for PingSource { - type Event = (); - type Metadata = (); - type Ret = (); - type Error = PingError; - - fn process_events( - &mut self, - readiness: Readiness, - token: Token, - mut callback: C, - ) -> Result - where - C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - self.pipe - .process_events(readiness, token, |_, &mut fd| { - let mut buf = [0u8; 32]; - let mut read_something = false; - let mut action = PostAction::Continue; - loop { - match read(fd, &mut buf) { - Ok(0) => { - // The other end of the pipe was closed, mark ourselved to for removal - action = PostAction::Remove; - break; - } - Ok(_) => read_something = true, - - Err(e) => { - let e: std::io::Error = e.into(); - - if e.kind() == std::io::ErrorKind::WouldBlock { - // nothing more to read - break; - } else { - // propagate error - return Err(e); - } - } - } - } - if read_something { - callback((), &mut ()); - } - Ok(action) - }) - .map_err(|e| PingError(e.into())) - } - - fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.pipe.register(poll, token_factory) - } - - fn reregister( - &mut self, - poll: &mut Poll, - token_factory: &mut TokenFactory, - ) -> crate::Result<()> { - self.pipe.reregister(poll, token_factory) - } - - fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.pipe.unregister(poll) - } -} - -impl Drop for PingSource { - fn drop(&mut self) { - if let Err(e) = close(self.pipe.file) { - log::warn!("[calloop] Failed to close read ping: {:?}", e); - } - } -} +pub type Ping = platform::Ping; /// The Ping handle /// /// This handle can be cloned and sent accross threads. It can be used to /// send pings to the `PingSource`. -#[derive(Clone, Debug)] -pub struct Ping { - pipe: Arc, -} +pub type PingSource = platform::PingSource; -impl Ping { - /// Send a ping to the `PingSource` - pub fn ping(&self) { - if let Err(e) = write(self.pipe.0, &[0u8]) { - log::warn!("[calloop] Failed to write a ping: {:?}", e); - } - } -} +/// An error arising from processing events for a ping. +#[derive(thiserror::Error, Debug)] +#[error(transparent)] +pub struct PingError(Box); #[derive(Debug)] struct CloseOnDrop(RawFd); @@ -174,18 +64,16 @@ struct CloseOnDrop(RawFd); impl Drop for CloseOnDrop { fn drop(&mut self) { if let Err(e) = close(self.0) { - log::warn!("[calloop] Failed to close write ping: {:?}", e); + log::warn!("[calloop] Failed to close ping fd: {:?}", e); } } } -/// An error arising from processing events for a ping. -#[derive(thiserror::Error, Debug)] -#[error(transparent)] -pub struct PingError(Box); - #[cfg(test)] mod tests { + use crate::transient::TransientSource; + use std::time::Duration; + use super::*; #[test] @@ -241,4 +129,162 @@ mod tests { .unwrap(); assert!(now.elapsed() >= std::time::Duration::from_millis(100)); } + + #[test] + fn ping_removed() { + // This keeps track of whether the event fired. + let mut dispatched = false; + + let mut event_loop = crate::EventLoop::::try_new().unwrap(); + + let (sender, source) = make_ping().unwrap(); + let wrapper = TransientSource::from(source); + + // Check that the source starts off in the wrapper. + assert!(!matches!(wrapper, TransientSource::None)); + + // Put the source in the loop. + + let dispatcher = + crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true); + + let token = event_loop + .handle() + .register_dispatcher(dispatcher.clone()) + .unwrap(); + + // Drop the sender and check that it's actually removed. + drop(sender); + + // There should be no event, but the loop still needs to wake up to + // process the close event (just like in the ping_closed() test). + event_loop + .dispatch(Duration::ZERO, &mut dispatched) + .unwrap(); + assert!(!dispatched); + + // Pull the source wrapper out. + + event_loop.handle().remove(token); + let wrapper = dispatcher.into_source_inner(); + + // Check that the inner source is now gone. + assert!(matches!(wrapper, TransientSource::None)); + } + + #[test] + fn ping_fired_and_removed() { + // This is like ping_removed() with the single difference that we fire a + // ping and drop it between two successive dispatches of the loop. + + // This keeps track of whether the event fired. + let mut dispatched = false; + + let mut event_loop = crate::EventLoop::::try_new().unwrap(); + + let (sender, source) = make_ping().unwrap(); + let wrapper = TransientSource::from(source); + + // Check that the source starts off in the wrapper. + assert!(!matches!(wrapper, TransientSource::None)); + + // Put the source in the loop. + + let dispatcher = + crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true); + + let token = event_loop + .handle() + .register_dispatcher(dispatcher.clone()) + .unwrap(); + + // Send a ping AND drop the sender and check that it's actually removed. + sender.ping(); + drop(sender); + + // There should be an event, but the source should be removed from the + // loop immediately after. + event_loop + .dispatch(Duration::ZERO, &mut dispatched) + .unwrap(); + assert!(dispatched); + + // Pull the source wrapper out. + + event_loop.handle().remove(token); + let wrapper = dispatcher.into_source_inner(); + + // Check that the inner source is now gone. + assert!(matches!(wrapper, TransientSource::None)); + } + + #[test] + fn ping_multiple_senders() { + // This is like ping_removed() but for testing the behaviour of multiple + // senders. + + // This keeps track of whether the event fired. + let mut dispatched = false; + + let mut event_loop = crate::EventLoop::::try_new().unwrap(); + + let (sender0, source) = make_ping().unwrap(); + let wrapper = TransientSource::from(source); + let sender1 = sender0.clone(); + let sender2 = sender1.clone(); + + // Check that the source starts off in the wrapper. + assert!(!matches!(wrapper, TransientSource::None)); + + // Put the source in the loop. + + let dispatcher = + crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true); + + let token = event_loop + .handle() + .register_dispatcher(dispatcher.clone()) + .unwrap(); + + // Send a ping AND drop the sender and check that it's actually removed. + sender0.ping(); + drop(sender0); + + // There should be an event, and the source should remain in the loop. + event_loop + .dispatch(Duration::ZERO, &mut dispatched) + .unwrap(); + assert!(dispatched); + + // Now test that the clones still work. Drop after the dispatch loop + // instead of before, this time. + dispatched = false; + + sender1.ping(); + + event_loop + .dispatch(Duration::ZERO, &mut dispatched) + .unwrap(); + assert!(dispatched); + + // Finally, drop all of them without sending anything. + + dispatched = false; + + drop(sender1); + drop(sender2); + + event_loop + .dispatch(Duration::ZERO, &mut dispatched) + .unwrap(); + assert!(!dispatched); + + // Pull the source wrapper out. + + event_loop.handle().remove(token); + let wrapper = dispatcher.into_source_inner(); + + // Check that the inner source is now gone. + assert!(matches!(wrapper, TransientSource::None)); + } } diff --git a/src/sources/ping/eventfd.rs b/src/sources/ping/eventfd.rs new file mode 100644 index 00000000..e1a9289c --- /dev/null +++ b/src/sources/ping/eventfd.rs @@ -0,0 +1,189 @@ +//! Eventfd based implementation of the ping event source. +//! +//! # Implementation notes +//! +//! The eventfd is a much lighter signalling mechanism provided by the Linux +//! kernel. Rather than write an arbitrary sequence of bytes, it only has a +//! 64-bit counter. +//! +//! To avoid closing the eventfd early, we wrap it in a RAII-style closer +//! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another +//! wrapper `FlagOnDrop` handles signalling this to the event source, which is +//! the sole owner of the eventfd itself. The senders have weak references to +//! the eventfd, and if the source is dropped before the senders, they will +//! simply not do anything (except log a message). +//! +//! To differentiate between regular ping events and close ping events, we add 2 +//! to the counter for regular events and 1 for close events. In the source we +//! can then check the LSB and if it's set, we know it was a close event. This +//! only works if a close event never fires more than once. + +use std::{os::unix::io::RawFd, sync::Arc}; + +use nix::sys::eventfd::{eventfd, EfdFlags}; +use nix::unistd::{read, write}; + +use super::{CloseOnDrop, PingError}; +use crate::{ + generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, +}; + +// These are not bitfields! They are increments to add to the eventfd counter. +// Since the fd can only be closed once, we can effectively use the +// INCREMENT_CLOSE value as a bitmask when checking. +const INCREMENT_PING: u64 = 0x2; +const INCREMENT_CLOSE: u64 = 0x1; + +#[inline] +pub fn make_ping() -> std::io::Result<(Ping, PingSource)> { + let read = eventfd(0, EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)?; + + // We only have one fd for the eventfd. If the sending end closes it when + // all copies are dropped, the receiving end will be closed as well. We need + // to make sure the fd is not closed until all holders of it have dropped + // it. + + let fd_arc = Arc::new(CloseOnDrop(read)); + + let ping = Ping { + event: Arc::new(FlagOnDrop(Arc::clone(&fd_arc))), + }; + + let source = PingSource { + event: Generic::new(read, Interest::READ, Mode::Level), + _fd: fd_arc, + }; + + Ok((ping, source)) +} + +// Helper functions for the event source IO. + +#[inline] +fn send_ping(fd: RawFd, count: u64) -> std::io::Result<()> { + assert!(count > 0); + match write(fd, &count.to_ne_bytes()) { + // The write succeeded, the ping will wake up the loop. + Ok(_) => Ok(()), + + // The counter hit its cap, which means previous calls to write() will + // wake up the loop. + Err(nix::errno::Errno::EAGAIN) => Ok(()), + + // Anything else is a real error. + Err(e) => Err(e.into()), + } +} + +#[inline] +fn drain_ping(fd: RawFd) -> std::io::Result { + // The eventfd counter is effectively a u64. + const NBYTES: usize = 8; + let mut buf = [0u8; NBYTES]; + + match read(fd, &mut buf) { + // Reading from an eventfd should only ever produce 8 bytes. No looping + // is required. + Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)), + + Ok(_) => unreachable!(), + + // Any other error can be propagated. + Err(e) => Err(e.into()), + } +} + +// The event source is simply a generic source with one of the eventfds. +#[derive(Debug)] +pub struct PingSource { + event: Generic, + + // This is held only to ensure that there is an owner of the fd that lives + // as long as the Generic source, so that the fd is not closed unexpectedly + // when all the senders are dropped. + _fd: Arc, +} + +impl EventSource for PingSource { + type Event = (); + type Metadata = (); + type Ret = (); + type Error = PingError; + + fn process_events( + &mut self, + readiness: Readiness, + token: Token, + mut callback: C, + ) -> Result + where + C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + self.event + .process_events(readiness, token, |_, &mut fd| { + let counter = drain_ping(fd)?; + + // If the LSB is set, it means we were closed. If anything else + // is also set, it means we were pinged. The two are not + // mutually exclusive. + let close = (counter & INCREMENT_CLOSE) != 0; + let ping = (counter & (u64::MAX - 1)) != 0; + + if ping { + callback((), &mut ()); + } + + if close { + Ok(PostAction::Remove) + } else { + Ok(PostAction::Continue) + } + }) + .map_err(|e| PingError(e.into())) + } + + fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { + self.event.register(poll, token_factory) + } + + fn reregister( + &mut self, + poll: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.event.reregister(poll, token_factory) + } + + fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { + self.event.unregister(poll) + } +} + +#[derive(Clone, Debug)] +pub struct Ping { + // This is an Arc because it's potentially shared with clones. The last one + // dropped needs to signal to the event source via the eventfd. + event: Arc, +} + +impl Ping { + /// Send a ping to the `PingSource`. + pub fn ping(&self) { + if let Err(e) = send_ping(self.event.0 .0, INCREMENT_PING) { + log::warn!("[calloop] Failed to write a ping: {:?}", e); + } + } +} + +/// This manages signalling to the PingSource when it's dropped. There should +/// only ever be one of these per PingSource. +#[derive(Debug)] +struct FlagOnDrop(Arc); + +impl Drop for FlagOnDrop { + fn drop(&mut self) { + if let Err(e) = send_ping(self.0 .0, INCREMENT_CLOSE) { + log::warn!("[calloop] Failed to send close ping: {:?}", e); + } + } +} diff --git a/src/sources/ping/pipe.rs b/src/sources/ping/pipe.rs new file mode 100644 index 00000000..5766b4d7 --- /dev/null +++ b/src/sources/ping/pipe.rs @@ -0,0 +1,162 @@ +//! Pipe based implementation of the ping event source, using the pipe or pipe2 +//! syscall. Sending a ping involves writing to one end of a pipe, and the other +//! end becoming readable is what wakes up the event loop. + +use std::{os::unix::io::RawFd, sync::Arc}; + +use nix::fcntl::OFlag; +use nix::unistd::{close, read, write}; + +use super::{CloseOnDrop, PingError}; +use crate::{ + generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, +}; + +#[cfg(target_os = "macos")] +#[inline] +fn make_ends() -> std::io::Result<(RawFd, RawFd)> { + // macOS does not have pipe2, but we can emulate the behavior of pipe2 by + // setting the flags after calling pipe. + use nix::{ + fcntl::{fcntl, FcntlArg}, + unistd::pipe, + }; + + let (read, write) = pipe()?; + + let read_flags = OFlag::from_bits_truncate(fcntl(read, FcntlArg::F_GETFD)?) + | OFlag::O_CLOEXEC + | OFlag::O_NONBLOCK; + let write_flags = OFlag::from_bits_truncate(fcntl(write, FcntlArg::F_GETFD)?) + | OFlag::O_CLOEXEC + | OFlag::O_NONBLOCK; + + fcntl(read, FcntlArg::F_SETFL(read_flags))?; + fcntl(write, FcntlArg::F_SETFL(write_flags))?; + + Ok((read, write)) +} + +#[cfg(not(target_os = "macos"))] +#[inline] +fn make_ends() -> std::io::Result<(RawFd, RawFd)> { + Ok(nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?) +} + +#[inline] +pub fn make_ping() -> std::io::Result<(Ping, PingSource)> { + let (read, write) = make_ends()?; + + let source = PingSource { + pipe: Generic::new(read, Interest::READ, Mode::Level), + }; + let ping = Ping { + pipe: Arc::new(CloseOnDrop(write)), + }; + Ok((ping, source)) +} + +// Helper functions for the event source IO. + +#[inline] +fn send_ping(fd: RawFd) -> std::io::Result<()> { + write(fd, &[0u8])?; + Ok(()) +} + +// The event source is simply a generic source with the FD of the read end of +// the pipe. +#[derive(Debug)] +pub struct PingSource { + pipe: Generic, +} + +impl EventSource for PingSource { + type Event = (); + type Metadata = (); + type Ret = (); + type Error = PingError; + + fn process_events( + &mut self, + readiness: Readiness, + token: Token, + mut callback: C, + ) -> Result + where + C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + self.pipe + .process_events(readiness, token, |_, &mut fd| { + let mut buf = [0u8; 32]; + let mut read_something = false; + let mut action = PostAction::Continue; + + loop { + match read(fd, &mut buf) { + Ok(0) => { + // The other end of the pipe was closed, mark ourselves + // for removal. + action = PostAction::Remove; + break; + } + + // Got one or more pings. + Ok(_) => read_something = true, + + // Nothing more to read. + Err(nix::errno::Errno::EAGAIN) => break, + + // Propagate error. + Err(e) => return Err(e.into()), + } + } + + if read_something { + callback((), &mut ()); + } + Ok(action) + }) + .map_err(|e| PingError(e.into())) + } + + fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { + self.pipe.register(poll, token_factory) + } + + fn reregister( + &mut self, + poll: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.pipe.reregister(poll, token_factory) + } + + fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { + self.pipe.unregister(poll) + } +} + +impl Drop for PingSource { + fn drop(&mut self) { + if let Err(e) = close(self.pipe.file) { + log::warn!("[calloop] Failed to close read ping: {:?}", e); + } + } +} + +// The sending end of the ping writes zeroes to the write end of the pipe. +#[derive(Clone, Debug)] +pub struct Ping { + pipe: Arc, +} + +// The sending end of the ping writes zeroes to the write end of the pipe. +impl Ping { + /// Send a ping to the `PingSource` + pub fn ping(&self) { + if let Err(e) = send_ping(self.pipe.0) { + log::warn!("[calloop] Failed to write a ping: {:?}", e); + } + } +}