Skip to content

Commit

Permalink
Implement an eventfd-based ping source for Linux.
Browse files Browse the repository at this point in the history
This separates and reorganises the underlying mechanics for the ping source,
and implements an eventfd-based ping for Linux.
  • Loading branch information
detly committed Apr 3, 2022
1 parent 7835e53 commit 0c9cb1b
Show file tree
Hide file tree
Showing 4 changed files with 433 additions and 149 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ readme = "README.md"
codecov = { repository = "Smithay/calloop" }

[dependencies]
cfg-if = "1.0.0"
log = "0.4"
nix = "0.23"
futures-util = { version = "0.3.5", optional = true, default-features = false, features = ["std"]}
Expand Down
216 changes: 67 additions & 149 deletions src/sources/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,59 +9,31 @@
//! block to construct event sources whose source of event is not file descriptor, but rather an
//! userspace source (like an other thread).
use std::{os::unix::io::RawFd, sync::Arc};

use nix::{
fcntl::OFlag,
unistd::{close, read, write},
};

use super::generic::Generic;
use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
// The ping source has platform-dependent implementations provided by modules
// under this one. These modules should expose:
// - a make_ping() function
// - a Ping type
// - a PingSource type
//
// See eg. the pipe implementation for these items' specific requirements.

cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
mod eventfd;
use eventfd as platform;
} else {
mod pipe;
use pipe as platform;
}
}

/// Create a new ping event source
///
/// you are given a [`Ping`] instance, which can be cloned and used to ping the
/// event loop, and a [`PingSource`], which you can insert in your event loop to
/// receive the pings.
pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
#[cfg(not(target_os = "macos"))]
let (read, write) = {
use nix::unistd::pipe2;

pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?
};

// macOS does not have pipe2, but we can emulate the behavior of pipe2 by setting the flags after calling pipe.
#[cfg(target_os = "macos")]
let (read, write) = {
use nix::{
fcntl::{fcntl, FcntlArg},
unistd::pipe,
};

let (read, write) = pipe()?;

let read_flags = OFlag::from_bits_truncate(fcntl(read, FcntlArg::F_GETFD)?)
| OFlag::O_CLOEXEC
| OFlag::O_NONBLOCK;
let write_flags = OFlag::from_bits_truncate(fcntl(write, FcntlArg::F_GETFD)?)
| OFlag::O_CLOEXEC
| OFlag::O_NONBLOCK;

fcntl(read, FcntlArg::F_SETFL(read_flags))?;
fcntl(write, FcntlArg::F_SETFL(write_flags))?;

(read, write)
};

let source = PingSource {
pipe: Generic::new(read, Interest::READ, Mode::Level),
};
let ping = Ping {
pipe: Arc::new(CloseOnDrop(write)),
};
Ok((ping, source))
platform::make_ping()
}

/// The ping event source
Expand All @@ -70,114 +42,13 @@ pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
///
/// If you use it directly, it will automatically remove itself from the event loop
/// once all [`Ping`] instances are dropped.
#[derive(Debug)]
pub struct PingSource {
pipe: Generic<RawFd>,
}

impl EventSource for PingSource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = PingError;

fn process_events<C>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: C,
) -> Result<PostAction, Self::Error>
where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.pipe
.process_events(readiness, token, |_, &mut fd| {
let mut buf = [0u8; 32];
let mut read_something = false;
let mut action = PostAction::Continue;
loop {
match read(fd, &mut buf) {
Ok(0) => {
// The other end of the pipe was closed, mark ourselved to for removal
action = PostAction::Remove;
break;
}
Ok(_) => read_something = true,

Err(e) => {
let e: std::io::Error = e.into();

if e.kind() == std::io::ErrorKind::WouldBlock {
// nothing more to read
break;
} else {
// propagate error
return Err(e);
}
}
}
}
if read_something {
callback((), &mut ());
}
Ok(action)
})
.map_err(|e| PingError(e.into()))
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.pipe.register(poll, token_factory)
}

fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.pipe.reregister(poll, token_factory)
}

fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.pipe.unregister(poll)
}
}

impl Drop for PingSource {
fn drop(&mut self) {
if let Err(e) = close(self.pipe.file) {
log::warn!("[calloop] Failed to close read ping: {:?}", e);
}
}
}
pub type Ping = platform::Ping;

/// The Ping handle
///
/// This handle can be cloned and sent accross threads. It can be used to
/// send pings to the `PingSource`.
#[derive(Clone, Debug)]
pub struct Ping {
pipe: Arc<CloseOnDrop>,
}

impl Ping {
/// Send a ping to the `PingSource`
pub fn ping(&self) {
if let Err(e) = write(self.pipe.0, &[0u8]) {
log::warn!("[calloop] Failed to write a ping: {:?}", e);
}
}
}

#[derive(Debug)]
struct CloseOnDrop(RawFd);

impl Drop for CloseOnDrop {
fn drop(&mut self) {
if let Err(e) = close(self.0) {
log::warn!("[calloop] Failed to close write ping: {:?}", e);
}
}
}
pub type PingSource = platform::PingSource;

/// An error arising from processing events for a ping.
#[derive(thiserror::Error, Debug)]
Expand All @@ -186,6 +57,8 @@ pub struct PingError(Box<dyn std::error::Error + Sync + Send>);

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;

#[test]
Expand Down Expand Up @@ -241,4 +114,49 @@ mod tests {
.unwrap();
assert!(now.elapsed() >= std::time::Duration::from_millis(100));
}

#[test]
fn ping_removed() {
use crate::transient::TransientSource;
use std::time::Duration;

// This keeps track of whether the event fired.
let mut dispatched = false;

let mut event_loop = crate::EventLoop::<bool>::try_new().unwrap();

let (sender, source) = make_ping().unwrap();
let wrapper = TransientSource::from(source);

// Check that the source starts off in the wrapper.
assert!(!matches!(wrapper, TransientSource::None));

// Put the source in the loop.

let dispatcher =
crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true);

let token = event_loop
.handle()
.register_dispatcher(dispatcher.clone())
.unwrap();

// Drop the sender and check that it's actually removed.
drop(sender);

// There should be no event, but the loop still needs to wake up to
// process the close event (just like in the ping_closed() test).
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);

// Pull the source wrapper out.

event_loop.handle().remove(token);
let wrapper = dispatcher.into_source_inner();

// Check that the inner source is now gone.
assert!(matches!(wrapper, TransientSource::None));
}
}
Loading

0 comments on commit 0c9cb1b

Please sign in to comment.