Skip to content

Commit

Permalink
Implement an eventfd-based ping source for Linux.
Browse files Browse the repository at this point in the history
This separates and reorganises the underlying mechanics for the ping source,
and implements an eventfd-based ping for Linux.
  • Loading branch information
detly committed Apr 5, 2022
1 parent 7835e53 commit a67260f
Show file tree
Hide file tree
Showing 4 changed files with 541 additions and 143 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
associated event callback.
- **Breaking:** The minimum supported Rust version is now 1.53.0
- Introduce `EventLoop::try_new_high_precision()` for sub-millisecond accuracy in the event loop
- The `PingSource` event source now uses an `eventfd` instead of a pipe on Linux.

## 0.9.2 -- 2021-12-27

Expand Down
332 changes: 189 additions & 143 deletions src/sources/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,59 +9,34 @@
//! block to construct event sources whose source of event is not file descriptor, but rather an
//! userspace source (like an other thread).
use std::{os::unix::io::RawFd, sync::Arc};

use nix::{
fcntl::OFlag,
unistd::{close, read, write},
};

use super::generic::Generic;
use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
use nix::unistd::close;
use std::os::unix::io::RawFd;

// The ping source has platform-dependent implementations provided by modules
// under this one. These modules should expose:
// - a make_ping() function
// - a Ping type
// - a PingSource type
//
// See eg. the pipe implementation for these items' specific requirements.

#[cfg(target_os = "linux")]
mod eventfd;
#[cfg(target_os = "linux")]
use eventfd as platform;

#[cfg(not(target_os = "linux"))]
mod pipe;
#[cfg(not(target_os = "linux"))]
use pipe as platform;

/// Create a new ping event source
///
/// you are given a [`Ping`] instance, which can be cloned and used to ping the
/// event loop, and a [`PingSource`], which you can insert in your event loop to
/// receive the pings.
pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
#[cfg(not(target_os = "macos"))]
let (read, write) = {
use nix::unistd::pipe2;

pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?
};

// macOS does not have pipe2, but we can emulate the behavior of pipe2 by setting the flags after calling pipe.
#[cfg(target_os = "macos")]
let (read, write) = {
use nix::{
fcntl::{fcntl, FcntlArg},
unistd::pipe,
};

let (read, write) = pipe()?;

let read_flags = OFlag::from_bits_truncate(fcntl(read, FcntlArg::F_GETFD)?)
| OFlag::O_CLOEXEC
| OFlag::O_NONBLOCK;
let write_flags = OFlag::from_bits_truncate(fcntl(write, FcntlArg::F_GETFD)?)
| OFlag::O_CLOEXEC
| OFlag::O_NONBLOCK;

fcntl(read, FcntlArg::F_SETFL(read_flags))?;
fcntl(write, FcntlArg::F_SETFL(write_flags))?;

(read, write)
};

let source = PingSource {
pipe: Generic::new(read, Interest::READ, Mode::Level),
};
let ping = Ping {
pipe: Arc::new(CloseOnDrop(write)),
};
Ok((ping, source))
platform::make_ping()
}

/// The ping event source
Expand All @@ -70,122 +45,35 @@ pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
///
/// If you use it directly, it will automatically remove itself from the event loop
/// once all [`Ping`] instances are dropped.
#[derive(Debug)]
pub struct PingSource {
pipe: Generic<RawFd>,
}

impl EventSource for PingSource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = PingError;

fn process_events<C>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: C,
) -> Result<PostAction, Self::Error>
where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.pipe
.process_events(readiness, token, |_, &mut fd| {
let mut buf = [0u8; 32];
let mut read_something = false;
let mut action = PostAction::Continue;
loop {
match read(fd, &mut buf) {
Ok(0) => {
// The other end of the pipe was closed, mark ourselved to for removal
action = PostAction::Remove;
break;
}
Ok(_) => read_something = true,

Err(e) => {
let e: std::io::Error = e.into();

if e.kind() == std::io::ErrorKind::WouldBlock {
// nothing more to read
break;
} else {
// propagate error
return Err(e);
}
}
}
}
if read_something {
callback((), &mut ());
}
Ok(action)
})
.map_err(|e| PingError(e.into()))
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.pipe.register(poll, token_factory)
}

fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.pipe.reregister(poll, token_factory)
}

fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.pipe.unregister(poll)
}
}

impl Drop for PingSource {
fn drop(&mut self) {
if let Err(e) = close(self.pipe.file) {
log::warn!("[calloop] Failed to close read ping: {:?}", e);
}
}
}
pub type Ping = platform::Ping;

/// The Ping handle
///
/// This handle can be cloned and sent accross threads. It can be used to
/// send pings to the `PingSource`.
#[derive(Clone, Debug)]
pub struct Ping {
pipe: Arc<CloseOnDrop>,
}
pub type PingSource = platform::PingSource;

impl Ping {
/// Send a ping to the `PingSource`
pub fn ping(&self) {
if let Err(e) = write(self.pipe.0, &[0u8]) {
log::warn!("[calloop] Failed to write a ping: {:?}", e);
}
}
}
/// An error arising from processing events for a ping.
#[derive(thiserror::Error, Debug)]
#[error(transparent)]
pub struct PingError(Box<dyn std::error::Error + Sync + Send>);

#[derive(Debug)]
struct CloseOnDrop(RawFd);

impl Drop for CloseOnDrop {
fn drop(&mut self) {
if let Err(e) = close(self.0) {
log::warn!("[calloop] Failed to close write ping: {:?}", e);
log::warn!("[calloop] Failed to close ping fd: {:?}", e);
}
}
}

/// An error arising from processing events for a ping.
#[derive(thiserror::Error, Debug)]
#[error(transparent)]
pub struct PingError(Box<dyn std::error::Error + Sync + Send>);

#[cfg(test)]
mod tests {
use crate::transient::TransientSource;
use std::time::Duration;

use super::*;

#[test]
Expand Down Expand Up @@ -241,4 +129,162 @@ mod tests {
.unwrap();
assert!(now.elapsed() >= std::time::Duration::from_millis(100));
}

#[test]
fn ping_removed() {
// This keeps track of whether the event fired.
let mut dispatched = false;

let mut event_loop = crate::EventLoop::<bool>::try_new().unwrap();

let (sender, source) = make_ping().unwrap();
let wrapper = TransientSource::from(source);

// Check that the source starts off in the wrapper.
assert!(!matches!(wrapper, TransientSource::None));

// Put the source in the loop.

let dispatcher =
crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true);

let token = event_loop
.handle()
.register_dispatcher(dispatcher.clone())
.unwrap();

// Drop the sender and check that it's actually removed.
drop(sender);

// There should be no event, but the loop still needs to wake up to
// process the close event (just like in the ping_closed() test).
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);

// Pull the source wrapper out.

event_loop.handle().remove(token);
let wrapper = dispatcher.into_source_inner();

// Check that the inner source is now gone.
assert!(matches!(wrapper, TransientSource::None));
}

#[test]
fn ping_fired_and_removed() {
// This is like ping_removed() with the single difference that we fire a
// ping and drop it between two successive dispatches of the loop.

// This keeps track of whether the event fired.
let mut dispatched = false;

let mut event_loop = crate::EventLoop::<bool>::try_new().unwrap();

let (sender, source) = make_ping().unwrap();
let wrapper = TransientSource::from(source);

// Check that the source starts off in the wrapper.
assert!(!matches!(wrapper, TransientSource::None));

// Put the source in the loop.

let dispatcher =
crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true);

let token = event_loop
.handle()
.register_dispatcher(dispatcher.clone())
.unwrap();

// Send a ping AND drop the sender and check that it's actually removed.
sender.ping();
drop(sender);

// There should be an event, but the source should be removed from the
// loop immediately after.
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);

// Pull the source wrapper out.

event_loop.handle().remove(token);
let wrapper = dispatcher.into_source_inner();

// Check that the inner source is now gone.
assert!(matches!(wrapper, TransientSource::None));
}

#[test]
fn ping_multiple_senders() {
// This is like ping_removed() but for testing the behaviour of multiple
// senders.

// This keeps track of whether the event fired.
let mut dispatched = false;

let mut event_loop = crate::EventLoop::<bool>::try_new().unwrap();

let (sender0, source) = make_ping().unwrap();
let wrapper = TransientSource::from(source);
let sender1 = sender0.clone();
let sender2 = sender1.clone();

// Check that the source starts off in the wrapper.
assert!(!matches!(wrapper, TransientSource::None));

// Put the source in the loop.

let dispatcher =
crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true);

let token = event_loop
.handle()
.register_dispatcher(dispatcher.clone())
.unwrap();

// Send a ping AND drop the sender and check that it's actually removed.
sender0.ping();
drop(sender0);

// There should be an event, and the source should remain in the loop.
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);

// Now test that the clones still work. Drop after the dispatch loop
// instead of before, this time.
dispatched = false;

sender1.ping();

event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);

// Finally, drop all of them without sending anything.

dispatched = false;

drop(sender1);
drop(sender2);

event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);

// Pull the source wrapper out.

event_loop.handle().remove(token);
let wrapper = dispatcher.into_source_inner();

// Check that the inner source is now gone.
assert!(matches!(wrapper, TransientSource::None));
}
}
Loading

0 comments on commit a67260f

Please sign in to comment.