Skip to content

Commit

Permalink
Use Arc instead of dup() to manage multiple eventfds.
Browse files Browse the repository at this point in the history
  • Loading branch information
detly committed Apr 3, 2022
1 parent e82f88c commit f3accfc
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 62 deletions.
88 changes: 86 additions & 2 deletions src/sources/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
//! block to construct event sources whose source of event is not file descriptor, but rather an
//! userspace source (like an other thread).
use nix::unistd::close;
use std::os::unix::io::RawFd;

// The ping source has platform-dependent implementations provided by modules
// under this one. These modules should expose:
// - a make_ping() function
Expand Down Expand Up @@ -55,6 +58,17 @@ pub type PingSource = platform::PingSource;
#[error(transparent)]
pub struct PingError(Box<dyn std::error::Error + Sync + Send>);

#[derive(Debug)]
struct CloseOnDrop(RawFd);

impl Drop for CloseOnDrop {
fn drop(&mut self) {
if let Err(e) = close(self.0) {
log::warn!("[calloop] Failed to close ping fd: {:?}", e);
}
}
}

#[cfg(test)]
mod tests {
use crate::transient::TransientSource;
Expand Down Expand Up @@ -188,13 +202,83 @@ mod tests {
sender.ping();
drop(sender);

// There should be no event, but the loop still needs to wake up to
// process the close event (just like in the ping_closed() test).
// There should be an event, but the source should be removed from the
// loop immediately after.
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);

// Pull the source wrapper out.

event_loop.handle().remove(token);
let wrapper = dispatcher.into_source_inner();

// Check that the inner source is now gone.
assert!(matches!(wrapper, TransientSource::None));
}

#[test]
fn ping_multiple_senders() {
// This is like ping_removed() but for testing the behaviour of multiple
// senders.

// This keeps track of whether the event fired.
let mut dispatched = false;

let mut event_loop = crate::EventLoop::<bool>::try_new().unwrap();

let (sender0, source) = make_ping().unwrap();
let wrapper = TransientSource::from(source);
let sender1 = sender0.clone();
let sender2 = sender1.clone();

// Check that the source starts off in the wrapper.
assert!(!matches!(wrapper, TransientSource::None));

// Put the source in the loop.

let dispatcher =
crate::Dispatcher::new(wrapper, |(), &mut (), dispatched| *dispatched = true);

let token = event_loop
.handle()
.register_dispatcher(dispatcher.clone())
.unwrap();

// Send a ping AND drop the sender and check that it's actually removed.
sender0.ping();
drop(sender0);

// There should be an event, and the source should remain in the loop.
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);

// Now test that the clones still work. Drop after the dispatch loop
// instead of before, this time.
dispatched = false;

sender1.ping();

event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);

// Finally, drop all of them without sending anything.

dispatched = false;

drop(sender1);
drop(sender2);

event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);

// Pull the source wrapper out.

event_loop.handle().remove(token);
Expand Down
99 changes: 51 additions & 48 deletions src/sources/ping/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
//! can then check the LSB and if it's set, we know it was a close event. This
//! only works if a close event never fires more than once.
use std::{os::unix::io::RawFd, sync::Arc};
use std::{os::unix::io::RawFd, sync::Arc, sync::Weak};

use nix::sys::eventfd::{eventfd, EfdFlags};
use nix::unistd::{close, dup, read, write};
use nix::unistd::{read, write};

use super::PingError;
use super::{CloseOnDrop, PingError};
use crate::{
generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
};
Expand All @@ -36,34 +36,22 @@ pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
let read = eventfd(0, EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)?;

// We only have one fd for the eventfd. If the sending end closes it when
// all copies are dropped, the receiving end will be closed as well. We can
// avoid this by duplicating the FD.
match dup(read) {
Ok(write) => {
let source = PingSource {
event: Generic::new(read, Interest::READ, Mode::Level),
};

let ping = Ping {
event: Arc::new(FlagOnDrop { fd: write }),
};

Ok((ping, source))
}
Err(dup_err) => {
if let Err(close_err) = close(read) {
// There's not much we can do with this, especially since this
// might simply have the same cause as the error we're already
// propagating.
log::error!(
"Error closing ping eventfd while handling error: {:?}",
close_err
);
}

Err(dup_err.into())
}
}
// all copies are dropped, the receiving end will be closed as well. We need
// to make sure the fd is not closed until all holders of it have dropped
// it.

let fd_arc = Arc::new(CloseOnDrop(read));

let ping = Ping {
event: Arc::new(FlagOnDrop(Arc::downgrade(&fd_arc))),
};

let source = PingSource {
event: Generic::new(read, Interest::READ, Mode::Level),
_fd: fd_arc,
};

Ok((ping, source))
}

// Helper functions for the event source IO.
Expand Down Expand Up @@ -106,6 +94,10 @@ fn drain_ping(fd: RawFd) -> std::io::Result<u64> {
#[derive(Debug)]
pub struct PingSource {
event: Generic<RawFd>,

// This is held only to ensure that there is a single owner of the "strong"
// Arc that the senders have weak refs to.
_fd: Arc<CloseOnDrop>,
}

impl EventSource for PingSource {
Expand Down Expand Up @@ -163,42 +155,53 @@ impl EventSource for PingSource {
}
}

impl Drop for PingSource {
fn drop(&mut self) {
if let Err(e) = close(self.event.file) {
log::warn!("[calloop] Failed to close read ping: {:?}", e);
}
}
}

#[derive(Clone, Debug)]
pub struct Ping {
// Arc because potentially shared with clones.
// This is an Arc because it's potentially shared with clones. The last one
// dropped needs to signal to the event source via the eventfd.
event: Arc<FlagOnDrop>,
}

impl Ping {
/// Send a ping to the `PingSource`
/// Send a ping to the `PingSource`.
pub fn ping(&self) {
if let Err(e) = send_ping(self.event.fd, INCREMENT_PING) {
if let Some(Err(e)) = self.event.map(|fd| send_ping(fd, INCREMENT_PING)) {
log::warn!("[calloop] Failed to write a ping: {:?}", e);
}
}
}

// This manages signalling to the PingSource when it's dropped. There should
// only ever be one of these per PingSource.
#[derive(Debug)]
struct FlagOnDrop {
fd: RawFd,
struct FlagOnDrop(Weak<CloseOnDrop>);

impl FlagOnDrop {
/// Applies the function `op` to the wrapped file descriptor, if it still
/// exists behind the weak ref. If it does, returns `Some(result)`,
/// otherwise `None`.
fn map<F, O>(&self, op: F) -> Option<O>
where
F: FnOnce(RawFd) -> O,
{
self.0
.upgrade()
.as_deref()
.map(|fd_wrapper| op(fd_wrapper.0))
}
}

impl Drop for FlagOnDrop {
fn drop(&mut self) {
if let Err(e) = send_ping(self.fd, INCREMENT_CLOSE) {
if let Some(Err(e)) = self.map(|fd| send_ping(fd, INCREMENT_CLOSE)) {
log::warn!("[calloop] Failed to send close ping: {:?}", e);
}

if let Err(e) = close(self.fd) {
log::warn!("[calloop] Failed to close write ping: {:?}", e);
}
// If the above is Some(Ok(_)), the source will be woken up and will
// remove itself from the loop.

// If it is None, then either the source has been dropped and the
// original fd was closed anyway, or the fd has been taken out of the
// source and Arc, and the original fd has a new owner.
}
}
13 changes: 1 addition & 12 deletions src/sources/ping/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{os::unix::io::RawFd, sync::Arc};
use nix::fcntl::OFlag;
use nix::unistd::{close, read, write};

use super::PingError;
use super::{CloseOnDrop, PingError};
use crate::{
generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
};
Expand Down Expand Up @@ -162,14 +162,3 @@ impl Ping {
}
}
}

#[derive(Debug)]
struct CloseOnDrop(RawFd);

impl Drop for CloseOnDrop {
fn drop(&mut self) {
if let Err(e) = close(self.0) {
log::warn!("[calloop] Failed to close write ping: {:?}", e);
}
}
}

0 comments on commit f3accfc

Please sign in to comment.