From 397009e4ec78ea35dc1bb88236c2f368d1a7a4dd Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 3 Feb 2023 11:41:44 -0800 Subject: [PATCH 1/8] feat: Expose other kqueue filters --- Cargo.toml | 2 +- examples/kqueue-process.rs | 55 ++++++++++ src/lib.rs | 6 +- src/os.rs | 20 ++++ src/os/kqueue.rs | 100 ++++++++++++++++++ src/reactor.rs | 54 +++++----- src/reactor/registration.rs | 200 ++++++++++++++++++++++++++++++++++++ 7 files changed, 405 insertions(+), 32 deletions(-) create mode 100644 examples/kqueue-process.rs create mode 100644 src/os.rs create mode 100644 src/os/kqueue.rs create mode 100644 src/reactor/registration.rs diff --git a/Cargo.toml b/Cargo.toml index af399dc..ad86e0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ concurrent-queue = "2" futures-lite = "1.11.0" log = "0.4.11" parking = "2.0.0" -polling = "2.0.0" +polling = "2.6.0" rustix = { version = "0.37.1", default-features = false, features = ["std", "fs"] } slab = "0.4.2" socket2 = { version = "0.4.2", features = ["all"] } diff --git a/examples/kqueue-process.rs b/examples/kqueue-process.rs new file mode 100644 index 0000000..16fe6b3 --- /dev/null +++ b/examples/kqueue-process.rs @@ -0,0 +1,55 @@ +//! Uses the `async_io::os::kqueue` module to wait for a process to terminate. +//! +//! Run with: +//! +//! ``` +//! cargo run --example kqueue-process +//! ``` + +#[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +))] +fn main() -> std::io::Result<()> { + use std::process::Command; + + use async_io::os::kqueue::{AsyncKqueueExt, Exit}; + use async_io::Async; + use futures_lite::future; + + future::block_on(async { + // Spawn a process. + let process = Command::new("sleep") + .arg("3") + .spawn() + .expect("failed to spawn process"); + + // Wrap the process in an `Async` object that waits for it to exit. + let process = Async::with_filter(Exit::new(process))?; + + // Wait for the process to exit. + process.readable().await?; + + Ok(()) + }) +} + +#[cfg(not(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +)))] +fn main() { + println!("This example only works for kqueue-enabled platforms."); +} diff --git a/src/lib.rs b/src/lib.rs index 450c1b8..4e90b31 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,6 +88,8 @@ use crate::reactor::{Reactor, Source}; mod driver; mod reactor; +pub mod os; + pub use driver::block_on; pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned}; @@ -664,7 +666,7 @@ impl Async { #[cfg(unix)] impl AsRawFd for Async { fn as_raw_fd(&self) -> RawFd { - self.source.raw + self.get_ref().as_raw_fd() } } @@ -740,7 +742,7 @@ impl Async { #[cfg(windows)] impl AsRawSocket for Async { fn as_raw_socket(&self) -> RawSocket { - self.source.raw + self.get_ref().as_raw_socket() } } diff --git a/src/os.rs b/src/os.rs new file mode 100644 index 0000000..713d0ab --- /dev/null +++ b/src/os.rs @@ -0,0 +1,20 @@ +//! Platform-specific functionality. + +#[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +))] +pub mod kqueue; + +mod __private { + #[doc(hidden)] + pub trait AsyncSealed {} + + impl AsyncSealed for crate::Async {} +} diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs new file mode 100644 index 0000000..7a30c1c --- /dev/null +++ b/src/os/kqueue.rs @@ -0,0 +1,100 @@ +//! Functionality that is only available for `kqueue`-based platforms. + +use super::__private::AsyncSealed; +use __private::FilterSealed; + +use crate::reactor::{Reactor, Registration}; +use crate::Async; + +use std::io::Result; +use std::process::Child; + +/// An extension trait for [`Async`](crate::Async) that provides the ability to register other +/// queueable objects into the reactor. +/// +/// The underlying `kqueue` implementation can be used to poll for events besides file descriptor +/// read/write readiness. This API makes these faculties available to the user. +/// +/// See the [`Filter`] trait and its implementors for objects that currently support being registered +/// into the reactor. +pub trait AsyncKqueueExt: AsyncSealed { + /// Create a new [`Async`](crate::Async) around a [`Filter`]. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// + /// use async_io::Async; + /// use async_io::os::kqueue::{AsyncKqueueExt, Exit}; + /// + /// // Create a new process to wait for. + /// let mut child = Command::new("sleep").arg("5").spawn().unwrap(); + /// + /// // Wrap the process in an `Async` object that waits for it to exit. + /// let process = Async::with_filter(Exit::new(child)).unwrap(); + /// + /// // Wait for the process to exit. + /// # async_io::block_on(async { + /// process.readable().await.unwrap(); + /// # }); + /// ``` + fn with_filter(filter: T) -> Result>; +} + +impl AsyncKqueueExt for Async { + fn with_filter(mut filter: T) -> Result> { + Ok(Async { + source: Reactor::get().insert_io(filter.registration())?, + io: Some(filter), + }) + } +} + +/// Objects that can be registered into the reactor via a [`Async`](crate::Async). +pub trait Filter: FilterSealed {} + +/// An object representing a signal. +/// +/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter), +/// it will return a [`readable`](crate::Async::readable) event when the signal is received. +#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub struct Signal(pub i32); + +impl FilterSealed for Signal { + fn registration(&mut self) -> Registration { + (*self).into() + } +} +impl Filter for Signal {} + +/// Wait for a child process to exit. +/// +/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter), +/// it will return a [`readable`](crate::Async::readable) event when the child process exits. +#[derive(Debug)] +pub struct Exit(Option); + +impl Exit { + /// Create a new `Exit` object. + pub fn new(child: Child) -> Self { + Self(Some(child)) + } +} + +impl FilterSealed for Exit { + fn registration(&mut self) -> Registration { + self.0.take().expect("Cannot reregister child").into() + } +} +impl Filter for Exit {} + +mod __private { + use crate::reactor::Registration; + + #[doc(hidden)] + pub trait FilterSealed { + /// Get a registration object for this filter. + fn registration(&mut self) -> Registration; + } +} diff --git a/src/reactor.rs b/src/reactor.rs index 6b933ba..595f409 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -5,10 +5,6 @@ use std::future::Future; use std::io; use std::marker::PhantomData; use std::mem; -#[cfg(unix)] -use std::os::unix::io::RawFd; -#[cfg(windows)] -use std::os::windows::io::RawSocket; use std::panic; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -22,6 +18,9 @@ use futures_lite::ready; use polling::{Event, Poller}; use slab::Slab; +mod registration; +pub use registration::Registration; + const READ: usize = 0; const WRITE: usize = 1; @@ -88,17 +87,13 @@ impl Reactor { } /// Registers an I/O source in the reactor. - pub(crate) fn insert_io( - &self, - #[cfg(unix)] raw: RawFd, - #[cfg(windows)] raw: RawSocket, - ) -> io::Result> { + pub(crate) fn insert_io(&self, raw: impl Into) -> io::Result> { // Create an I/O source for this file descriptor. let source = { let mut sources = self.sources.lock().unwrap(); let key = sources.vacant_entry().key(); let source = Arc::new(Source { - raw, + registration: raw.into(), key, state: Default::default(), }); @@ -107,7 +102,7 @@ impl Reactor { }; // Register the file descriptor. - if let Err(err) = self.poller.add(raw, Event::none(source.key)) { + if let Err(err) = source.registration.add(&self.poller, source.key) { let mut sources = self.sources.lock().unwrap(); sources.remove(source.key); return Err(err); @@ -120,7 +115,7 @@ impl Reactor { pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> { let mut sources = self.sources.lock().unwrap(); sources.remove(source.key); - self.poller.delete(source.raw) + source.registration.delete(&self.poller) } /// Registers a timer in the reactor. @@ -299,8 +294,8 @@ impl ReactorLock<'_> { // e.g. we were previously interested in both readability and writability, // but only one of them was emitted. if !state[READ].is_empty() || !state[WRITE].is_empty() { - self.reactor.poller.modify( - source.raw, + source.registration.modify( + &self.reactor.poller, Event { key: source.key, readable: !state[READ].is_empty(), @@ -341,13 +336,8 @@ enum TimerOp { /// A registered source of I/O events. #[derive(Debug)] pub(crate) struct Source { - /// Raw file descriptor on Unix platforms. - #[cfg(unix)] - pub(crate) raw: RawFd, - - /// Raw socket handle on Windows. - #[cfg(windows)] - pub(crate) raw: RawSocket, + /// This source's registration into the reactor. + registration: Registration, /// The key of this source obtained during registration. key: usize, @@ -436,8 +426,8 @@ impl Source { // Update interest in this I/O handle. if was_empty { - Reactor::get().poller.modify( - self.raw, + self.registration.modify( + &Reactor::get().poller, Event { key: self.key, readable: !state[READ].is_empty(), @@ -490,7 +480,7 @@ impl Future for Readable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("readable: fd={}", self.0.handle.source.raw); + log::trace!("readable: fd={:?}", &self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -510,7 +500,10 @@ impl Future for ReadableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("readable_owned: fd={}", self.0.handle.source.raw); + log::trace!( + "readable_owned: fd={:?}", + &self.0.handle.source.registration + ); Poll::Ready(Ok(())) } } @@ -530,7 +523,7 @@ impl Future for Writable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("writable: fd={}", self.0.handle.source.raw); + log::trace!("writable: fd={:?}", &self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -550,7 +543,10 @@ impl Future for WritableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("writable_owned: fd={}", self.0.handle.source.raw); + log::trace!( + "writable_owned: fd={:?}", + &self.0.handle.source.registration + ); Poll::Ready(Ok(())) } } @@ -610,8 +606,8 @@ impl> + Clone, T> Future for Ready { // Update interest in this I/O handle. if was_empty { - Reactor::get().poller.modify( - handle.borrow().source.raw, + handle.borrow().source.registration.modify( + &Reactor::get().poller, Event { key: handle.borrow().source.key, readable: !state[READ].is_empty(), diff --git a/src/reactor/registration.rs b/src/reactor/registration.rs new file mode 100644 index 0000000..fd86946 --- /dev/null +++ b/src/reactor/registration.rs @@ -0,0 +1,200 @@ +#[cfg(windows)] +mod impl_ { + use polling::{Event, Poller}; + use std::io::Result; + use std::os::windows::io::{AsRawSocket, RawSocket}; + + /// The raw registration into the reactor. + #[derive(Debug)] + #[doc(hidden)] + pub struct Registration { + /// Raw socket handle on Windows. + raw: RawSocket, + } + + impl From for Registration { + fn from(raw: RawSocket) -> Self { + Self { raw } + } + } + + impl Registration { + /// Registers the object into the reactor. + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + poller.add(self.raw, Event::none(token)) + } + + /// Re-registers the object into the reactor. + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + poller.modify(self.raw, interest) + } + + /// Deregisters the object from the reactor. + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + poller.delete(self.raw) + } + } +} + +#[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +))] +mod impl_ { + use crate::os::kqueue::Signal; + + use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal}; + use polling::{Event, PollMode, Poller}; + + use std::io::Result; + use std::os::unix::io::{AsRawFd, RawFd}; + use std::process::Child; + + /// The raw registration into the reactor. + /// + /// This needs to be public, since it is technically exposed through the `QueueableSealed` trait. + #[derive(Debug)] + #[doc(hidden)] + pub enum Registration { + /// Raw file descriptor for readability/writability. + Fd(RawFd), + + /// Raw signal number for signal delivery. + Signal(Signal), + + /// Process for process termination. + Process(Child), + } + + impl From for Registration { + fn from(raw: RawFd) -> Self { + Self::Fd(raw) + } + } + + impl AsRawFd for Registration { + fn as_raw_fd(&self) -> RawFd { + match self { + Self::Fd(raw) => *raw, + _ => panic!("not a file descriptor"), + } + } + } + + impl From for Registration { + fn from(signal: Signal) -> Self { + Self::Signal(signal) + } + } + + impl From for Registration { + fn from(process: Child) -> Self { + Self::Process(process) + } + } + + impl Registration { + /// Registers the object into the reactor. + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + match self { + Self::Fd(raw) => poller.add(*raw, Event::none(token)), + Self::Signal(signal) => { + poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) + } + Self::Process(process) => poller.add_filter( + Process::new(process, ProcessOps::Exit), + token, + PollMode::Oneshot, + ), + } + } + + /// Re-registers the object into the reactor. + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + match self { + Self::Fd(raw) => poller.modify(*raw, interest), + Self::Signal(signal) => { + poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) + } + Self::Process(process) => poller.modify_filter( + Process::new(process, ProcessOps::Exit), + interest.key, + PollMode::Oneshot, + ), + } + } + + /// Deregisters the object from the reactor. + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + match self { + Self::Fd(raw) => poller.delete(*raw), + Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), + Self::Process(process) => { + poller.delete_filter(Process::new(process, ProcessOps::Exit)) + } + } + } + } +} + +#[cfg(not(any( + windows, + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +)))] +mod impl_ { + use polling::{Event, Poller}; + use std::io::Result; + use std::os::unix::io::{AsRawFd, RawFd}; + + /// The raw registration into the reactor. + #[derive(Debug)] + #[doc(hidden)] + pub struct Registration { + /// Raw file descriptor on Unix. + raw: RawFd, + } + + impl From for Registration { + fn from(raw: RawFd) -> Self { + Self { raw } + } + } + + impl AsRawFd for Registration { + fn as_raw_fd(&self) -> RawFd { + self.raw + } + } + + impl Registration { + /// Registers the object into the reactor. + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + poller.add(self.raw, Event::none(token)) + } + + /// Re-registers the object into the reactor. + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + poller.modify(self.raw, interest) + } + + /// Deregisters the object from the reactor. + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + poller.delete(self.raw) + } + } +} + +pub use impl_::Registration; From 069126dddeed310969a4b415fb8060104516ee51 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 3 Feb 2023 16:30:37 -0800 Subject: [PATCH 2/8] Fix windows build error --- src/reactor/registration.rs | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/reactor/registration.rs b/src/reactor/registration.rs index fd86946..2096427 100644 --- a/src/reactor/registration.rs +++ b/src/reactor/registration.rs @@ -2,7 +2,7 @@ mod impl_ { use polling::{Event, Poller}; use std::io::Result; - use std::os::windows::io::{AsRawSocket, RawSocket}; + use std::os::windows::io::RawSocket; /// The raw registration into the reactor. #[derive(Debug)] @@ -16,7 +16,7 @@ mod impl_ { fn from(raw: RawSocket) -> Self { Self { raw } } - } + } impl Registration { /// Registers the object into the reactor. @@ -53,7 +53,7 @@ mod impl_ { use polling::{Event, PollMode, Poller}; use std::io::Result; - use std::os::unix::io::{AsRawFd, RawFd}; + use std::os::unix::io::RawFd; use std::process::Child; /// The raw registration into the reactor. @@ -76,16 +76,7 @@ mod impl_ { fn from(raw: RawFd) -> Self { Self::Fd(raw) } - } - - impl AsRawFd for Registration { - fn as_raw_fd(&self) -> RawFd { - match self { - Self::Fd(raw) => *raw, - _ => panic!("not a file descriptor"), - } - } - } + } impl From for Registration { fn from(signal: Signal) -> Self { @@ -157,7 +148,7 @@ mod impl_ { mod impl_ { use polling::{Event, Poller}; use std::io::Result; - use std::os::unix::io::{AsRawFd, RawFd}; + use std::os::unix::io::RawFd; /// The raw registration into the reactor. #[derive(Debug)] @@ -173,12 +164,6 @@ mod impl_ { } } - impl AsRawFd for Registration { - fn as_raw_fd(&self) -> RawFd { - self.raw - } - } - impl Registration { /// Registers the object into the reactor. pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { From 0a367ca21b8e479da43c4238b46d21683806659c Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 3 Feb 2023 16:35:14 -0800 Subject: [PATCH 3/8] rustfmt --- src/reactor/registration.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reactor/registration.rs b/src/reactor/registration.rs index 2096427..37b62a1 100644 --- a/src/reactor/registration.rs +++ b/src/reactor/registration.rs @@ -16,7 +16,7 @@ mod impl_ { fn from(raw: RawSocket) -> Self { Self { raw } } - } + } impl Registration { /// Registers the object into the reactor. @@ -76,7 +76,7 @@ mod impl_ { fn from(raw: RawFd) -> Self { Self::Fd(raw) } - } + } impl From for Registration { fn from(signal: Signal) -> Self { From 09625275fe691a087925f6e06520f48d24e8733e Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 11 Apr 2023 09:12:00 -0700 Subject: [PATCH 4/8] Split registration.rs into three files --- src/reactor.rs | 26 ++++- src/reactor/kqueue.rs | 85 +++++++++++++++++ src/reactor/registration.rs | 185 ------------------------------------ src/reactor/unix.rs | 36 +++++++ src/reactor/windows.rs | 36 +++++++ 5 files changed, 181 insertions(+), 187 deletions(-) create mode 100644 src/reactor/kqueue.rs delete mode 100644 src/reactor/registration.rs create mode 100644 src/reactor/unix.rs create mode 100644 src/reactor/windows.rs diff --git a/src/reactor.rs b/src/reactor.rs index 595f409..268f7a7 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -18,8 +18,30 @@ use futures_lite::ready; use polling::{Event, Poller}; use slab::Slab; -mod registration; -pub use registration::Registration; +// Choose the proper implementation of `Registration` based on the target platform. +cfg_if::cfg_if! { + if #[cfg(windows)] { + mod windows; + pub use windows::Registration; + } else if #[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + ))] { + mod kqueue; + pub use kqueue::Registration; + } else if #[cfg(unix)] { + mod unix; + pub use unix::Registration; + } else { + compile_error!("unsupported platform"); + } +} const READ: usize = 0; const WRITE: usize = 1; diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs new file mode 100644 index 0000000..e17a0f8 --- /dev/null +++ b/src/reactor/kqueue.rs @@ -0,0 +1,85 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use crate::os::kqueue::Signal; + +use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal}; +use polling::{Event, PollMode, Poller}; + +use std::io::Result; +use std::os::unix::io::RawFd; +use std::process::Child; + +/// The raw registration into the reactor. +/// +/// This needs to be public, since it is technically exposed through the `QueueableSealed` trait. +#[derive(Debug)] +#[doc(hidden)] +pub enum Registration { + /// Raw file descriptor for readability/writability. + Fd(RawFd), + + /// Raw signal number for signal delivery. + Signal(Signal), + + /// Process for process termination. + Process(Child), +} + +impl From for Registration { + fn from(raw: RawFd) -> Self { + Self::Fd(raw) + } +} + +impl From for Registration { + fn from(signal: Signal) -> Self { + Self::Signal(signal) + } +} + +impl From for Registration { + fn from(process: Child) -> Self { + Self::Process(process) + } +} + +impl Registration { + /// Registers the object into the reactor. + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + match self { + Self::Fd(raw) => poller.add(*raw, Event::none(token)), + Self::Signal(signal) => { + poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) + } + Self::Process(process) => poller.add_filter( + Process::new(process, ProcessOps::Exit), + token, + PollMode::Oneshot, + ), + } + } + + /// Re-registers the object into the reactor. + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + match self { + Self::Fd(raw) => poller.modify(*raw, interest), + Self::Signal(signal) => { + poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) + } + Self::Process(process) => poller.modify_filter( + Process::new(process, ProcessOps::Exit), + interest.key, + PollMode::Oneshot, + ), + } + } + + /// Deregisters the object from the reactor. + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + match self { + Self::Fd(raw) => poller.delete(*raw), + Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), + Self::Process(process) => poller.delete_filter(Process::new(process, ProcessOps::Exit)), + } + } +} diff --git a/src/reactor/registration.rs b/src/reactor/registration.rs deleted file mode 100644 index 37b62a1..0000000 --- a/src/reactor/registration.rs +++ /dev/null @@ -1,185 +0,0 @@ -#[cfg(windows)] -mod impl_ { - use polling::{Event, Poller}; - use std::io::Result; - use std::os::windows::io::RawSocket; - - /// The raw registration into the reactor. - #[derive(Debug)] - #[doc(hidden)] - pub struct Registration { - /// Raw socket handle on Windows. - raw: RawSocket, - } - - impl From for Registration { - fn from(raw: RawSocket) -> Self { - Self { raw } - } - } - - impl Registration { - /// Registers the object into the reactor. - pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { - poller.add(self.raw, Event::none(token)) - } - - /// Re-registers the object into the reactor. - pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { - poller.modify(self.raw, interest) - } - - /// Deregisters the object from the reactor. - pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { - poller.delete(self.raw) - } - } -} - -#[cfg(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "watchos", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", - target_os = "dragonfly", -))] -mod impl_ { - use crate::os::kqueue::Signal; - - use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal}; - use polling::{Event, PollMode, Poller}; - - use std::io::Result; - use std::os::unix::io::RawFd; - use std::process::Child; - - /// The raw registration into the reactor. - /// - /// This needs to be public, since it is technically exposed through the `QueueableSealed` trait. - #[derive(Debug)] - #[doc(hidden)] - pub enum Registration { - /// Raw file descriptor for readability/writability. - Fd(RawFd), - - /// Raw signal number for signal delivery. - Signal(Signal), - - /// Process for process termination. - Process(Child), - } - - impl From for Registration { - fn from(raw: RawFd) -> Self { - Self::Fd(raw) - } - } - - impl From for Registration { - fn from(signal: Signal) -> Self { - Self::Signal(signal) - } - } - - impl From for Registration { - fn from(process: Child) -> Self { - Self::Process(process) - } - } - - impl Registration { - /// Registers the object into the reactor. - pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { - match self { - Self::Fd(raw) => poller.add(*raw, Event::none(token)), - Self::Signal(signal) => { - poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) - } - Self::Process(process) => poller.add_filter( - Process::new(process, ProcessOps::Exit), - token, - PollMode::Oneshot, - ), - } - } - - /// Re-registers the object into the reactor. - pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { - match self { - Self::Fd(raw) => poller.modify(*raw, interest), - Self::Signal(signal) => { - poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) - } - Self::Process(process) => poller.modify_filter( - Process::new(process, ProcessOps::Exit), - interest.key, - PollMode::Oneshot, - ), - } - } - - /// Deregisters the object from the reactor. - pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { - match self { - Self::Fd(raw) => poller.delete(*raw), - Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), - Self::Process(process) => { - poller.delete_filter(Process::new(process, ProcessOps::Exit)) - } - } - } - } -} - -#[cfg(not(any( - windows, - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "watchos", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", - target_os = "dragonfly", -)))] -mod impl_ { - use polling::{Event, Poller}; - use std::io::Result; - use std::os::unix::io::RawFd; - - /// The raw registration into the reactor. - #[derive(Debug)] - #[doc(hidden)] - pub struct Registration { - /// Raw file descriptor on Unix. - raw: RawFd, - } - - impl From for Registration { - fn from(raw: RawFd) -> Self { - Self { raw } - } - } - - impl Registration { - /// Registers the object into the reactor. - pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { - poller.add(self.raw, Event::none(token)) - } - - /// Re-registers the object into the reactor. - pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { - poller.modify(self.raw, interest) - } - - /// Deregisters the object from the reactor. - pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { - poller.delete(self.raw) - } - } -} - -pub use impl_::Registration; diff --git a/src/reactor/unix.rs b/src/reactor/unix.rs new file mode 100644 index 0000000..f020f96 --- /dev/null +++ b/src/reactor/unix.rs @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use polling::{Event, Poller}; +use std::io::Result; +use std::os::unix::io::RawFd; + +/// The raw registration into the reactor. +#[derive(Debug)] +#[doc(hidden)] +pub struct Registration { + /// Raw file descriptor on Unix. + raw: RawFd, +} + +impl From for Registration { + fn from(raw: RawFd) -> Self { + Self { raw } + } +} + +impl Registration { + /// Registers the object into the reactor. + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + poller.add(self.raw, Event::none(token)) + } + + /// Re-registers the object into the reactor. + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + poller.modify(self.raw, interest) + } + + /// Deregisters the object from the reactor. + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + poller.delete(self.raw) + } +} diff --git a/src/reactor/windows.rs b/src/reactor/windows.rs new file mode 100644 index 0000000..c92810b --- /dev/null +++ b/src/reactor/windows.rs @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use polling::{Event, Poller}; +use std::io::Result; +use std::os::windows::io::RawSocket; + +/// The raw registration into the reactor. +#[derive(Debug)] +#[doc(hidden)] +pub struct Registration { + /// Raw socket handle on Windows. + raw: RawSocket, +} + +impl From for Registration { + fn from(raw: RawSocket) -> Self { + Self { raw } + } +} + +impl Registration { + /// Registers the object into the reactor. + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + poller.add(self.raw, Event::none(token)) + } + + /// Re-registers the object into the reactor. + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + poller.modify(self.raw, interest) + } + + /// Deregisters the object from the reactor. + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + poller.delete(self.raw) + } +} From 7133b0cb37eb1d2ac432730b29ef84f5cb6df560 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 11 Apr 2023 09:15:20 -0700 Subject: [PATCH 5/8] Adjust debug output --- src/reactor.rs | 14 ++++---------- src/reactor/kqueue.rs | 12 +++++++++++- src/reactor/unix.rs | 9 ++++++++- src/reactor/windows.rs | 8 +++++++- 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index 268f7a7..56d3ce7 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -502,7 +502,7 @@ impl Future for Readable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("readable: fd={:?}", &self.0.handle.source.registration); + log::trace!("readable: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -522,10 +522,7 @@ impl Future for ReadableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!( - "readable_owned: fd={:?}", - &self.0.handle.source.registration - ); + log::trace!("readable_owned: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -545,7 +542,7 @@ impl Future for Writable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("writable: fd={:?}", &self.0.handle.source.registration); + log::trace!("writable: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -565,10 +562,7 @@ impl Future for WritableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!( - "writable_owned: fd={:?}", - &self.0.handle.source.registration - ); + log::trace!("writable_owned: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs index e17a0f8..34a4711 100644 --- a/src/reactor/kqueue.rs +++ b/src/reactor/kqueue.rs @@ -5,6 +5,7 @@ use crate::os::kqueue::Signal; use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal}; use polling::{Event, PollMode, Poller}; +use std::fmt; use std::io::Result; use std::os::unix::io::RawFd; use std::process::Child; @@ -12,7 +13,6 @@ use std::process::Child; /// The raw registration into the reactor. /// /// This needs to be public, since it is technically exposed through the `QueueableSealed` trait. -#[derive(Debug)] #[doc(hidden)] pub enum Registration { /// Raw file descriptor for readability/writability. @@ -25,6 +25,16 @@ pub enum Registration { Process(Child), } +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Fd(raw) => fmt::Debug::fmt(raw, f), + Self::Signal(signal) => fmt::Debug::fmt(signal, f), + Self::Process(process) => fmt::Debug::fmt(process, f), + } + } +} + impl From for Registration { fn from(raw: RawFd) -> Self { Self::Fd(raw) diff --git a/src/reactor/unix.rs b/src/reactor/unix.rs index f020f96..1ad806f 100644 --- a/src/reactor/unix.rs +++ b/src/reactor/unix.rs @@ -1,17 +1,24 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 use polling::{Event, Poller}; + +use std::fmt; use std::io::Result; use std::os::unix::io::RawFd; /// The raw registration into the reactor. -#[derive(Debug)] #[doc(hidden)] pub struct Registration { /// Raw file descriptor on Unix. raw: RawFd, } +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.raw, f) + } +} + impl From for Registration { fn from(raw: RawFd) -> Self { Self { raw } diff --git a/src/reactor/windows.rs b/src/reactor/windows.rs index c92810b..2a86c41 100644 --- a/src/reactor/windows.rs +++ b/src/reactor/windows.rs @@ -1,17 +1,23 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 use polling::{Event, Poller}; +use std::fmt; use std::io::Result; use std::os::windows::io::RawSocket; /// The raw registration into the reactor. -#[derive(Debug)] #[doc(hidden)] pub struct Registration { /// Raw socket handle on Windows. raw: RawSocket, } +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.raw, f) + } +} + impl From for Registration { fn from(raw: RawSocket) -> Self { Self { raw } From c080b9528466f3c2e1ca0d919c7b510c21c2682f Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 11 Apr 2023 09:36:48 -0700 Subject: [PATCH 6/8] Replace the Async extension with Filter --- examples/kqueue-process.rs | 7 +- src/os.rs | 7 -- src/os/kqueue.rs | 207 ++++++++++++++++++++++++++++++++----- 3 files changed, 186 insertions(+), 35 deletions(-) diff --git a/examples/kqueue-process.rs b/examples/kqueue-process.rs index 16fe6b3..c88ccc4 100644 --- a/examples/kqueue-process.rs +++ b/examples/kqueue-process.rs @@ -19,8 +19,7 @@ fn main() -> std::io::Result<()> { use std::process::Command; - use async_io::os::kqueue::{AsyncKqueueExt, Exit}; - use async_io::Async; + use async_io::os::kqueue::{Exit, Filter}; use futures_lite::future; future::block_on(async { @@ -31,10 +30,10 @@ fn main() -> std::io::Result<()> { .expect("failed to spawn process"); // Wrap the process in an `Async` object that waits for it to exit. - let process = Async::with_filter(Exit::new(process))?; + let process = Filter::new(Exit::new(process))?; // Wait for the process to exit. - process.readable().await?; + process.ready().await?; Ok(()) }) diff --git a/src/os.rs b/src/os.rs index 713d0ab..5db3c57 100644 --- a/src/os.rs +++ b/src/os.rs @@ -11,10 +11,3 @@ target_os = "dragonfly", ))] pub mod kqueue; - -mod __private { - #[doc(hidden)] - pub trait AsyncSealed {} - - impl AsyncSealed for crate::Async {} -} diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index 7a30c1c..2feb72a 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -1,24 +1,33 @@ //! Functionality that is only available for `kqueue`-based platforms. -use super::__private::AsyncSealed; -use __private::FilterSealed; +use __private::QueueableSealed; -use crate::reactor::{Reactor, Registration}; +use crate::reactor::{Reactor, Readable, Registration}; use crate::Async; -use std::io::Result; +use std::convert::{TryFrom, TryInto}; +use std::future::Future; +use std::io::{Error, Result}; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::pin::Pin; use std::process::Child; +use std::task::{Context, Poll}; -/// An extension trait for [`Async`](crate::Async) that provides the ability to register other -/// queueable objects into the reactor. +#[cfg(not(async_io_no_io_safety))] +use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + +/// A wrapper around a queueable object that waits until it is ready. /// /// The underlying `kqueue` implementation can be used to poll for events besides file descriptor /// read/write readiness. This API makes these faculties available to the user. /// -/// See the [`Filter`] trait and its implementors for objects that currently support being registered +/// See the [`Queueable`] trait and its implementors for objects that currently support being registered /// into the reactor. -pub trait AsyncKqueueExt: AsyncSealed { - /// Create a new [`Async`](crate::Async) around a [`Filter`]. +#[derive(Debug)] +pub struct Filter(Async); + +impl Filter { + /// Create a new [`Filter`] around a [`Queueable`]. /// /// # Examples /// @@ -26,33 +35,183 @@ pub trait AsyncKqueueExt: AsyncSealed { /// use std::process::Command; /// /// use async_io::Async; - /// use async_io::os::kqueue::{AsyncKqueueExt, Exit}; + /// use async_io::os::kqueue::{Exit, Filter}; /// /// // Create a new process to wait for. /// let mut child = Command::new("sleep").arg("5").spawn().unwrap(); /// /// // Wrap the process in an `Async` object that waits for it to exit. - /// let process = Async::with_filter(Exit::new(child)).unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); /// /// // Wait for the process to exit. /// # async_io::block_on(async { /// process.readable().await.unwrap(); /// # }); /// ``` - fn with_filter(filter: T) -> Result>; -} - -impl AsyncKqueueExt for Async { - fn with_filter(mut filter: T) -> Result> { - Ok(Async { + pub fn new(mut filter: T) -> Result { + Ok(Self(Async { source: Reactor::get().insert_io(filter.registration())?, io: Some(filter), - }) + })) + } +} + +impl AsRawFd for Filter { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(not(async_io_no_io_safety))] +impl AsFd for Filter { + fn as_fd(&self) -> BorrowedFd<'_> { + self.0.as_fd() + } +} + +#[cfg(not(async_io_no_io_safety))] +impl> TryFrom for Filter { + type Error = Error; + + fn try_from(fd: OwnedFd) -> Result { + Ok(Self(Async::try_from(fd)?)) + } +} + +#[cfg(not(async_io_no_io_safety))] +impl> TryFrom> for OwnedFd { + type Error = Error; + + fn try_from(filter: Filter) -> Result { + filter.0.try_into() + } +} + +impl Filter { + /// Gets a reference to the underlying [`Queueable`] object. + /// + /// # Examples + /// + /// ``` + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); + /// let inner = process.get_ref(); + /// # }); + /// ``` + pub fn get_ref(&self) -> &T { + self.0.get_ref() + } + + /// Gets a mutable reference to the underlying [`Queueable`] object. + /// + /// # Examples + /// + /// ``` + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); + /// let mut process = Filter::new(Exit::new(child)).unwrap(); + /// let inner = process.get_mut(); + /// # }); + /// ``` + pub fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } + + /// Unwraps the inner [`Queueable`] object. + /// + /// # Examples + /// + /// ``` + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); + /// let inner = process.into_inner().unwrap(); + /// # }); + /// ``` + pub fn into_inner(self) -> Result { + self.0.into_inner() + } + + /// Waits until the [`Queueable`] object is ready. + /// + /// This method completes when the underlying [`Queueable`] object has completed. See the documentation + /// for the [`Queueable`] object for more information. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = Command::new("sleep").arg("5").spawn()?; + /// let process = Filter::new(Exit::new(child))?; + /// + /// // Wait for the process to exit. + /// process.ready().await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ready(&self) -> Ready<'_, T> { + Ready(self.0.readable()) + } + + /// Polls the I/O handle for readiness. + /// + /// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification + /// that the underlying [`Queueable`] object is ready. See the documentation for the [`Queueable`] + /// object for more information. + /// + /// # Caveats + /// + /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks + /// will just keep waking each other in turn, thus wasting CPU time. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::kqueue::{Exit, Filter}; + /// use futures_lite::future; + /// + /// # futures_lite::future::block_on(async { + /// let child = Command::new("sleep").arg("5").spawn()?; + /// let process = Filter::new(Exit::new(child))?; + /// + /// // Wait for the process to exit. + /// future::poll_fn(|cx| process.poll_ready(cx)).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_readable(cx) + } +} + +/// Future for [`Filter::ready`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Ready<'a, T>(Readable<'a, T>); + +impl Future for Ready<'_, T> { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) } } /// Objects that can be registered into the reactor via a [`Async`](crate::Async). -pub trait Filter: FilterSealed {} +/// +/// These objects represent other filters associated with the `kqueue` runtime aside from readability +/// and writability. Rather than waiting on readable/writable, they wait on "readiness". This is +/// typically used for signals and child process exits. +pub trait Queueable: QueueableSealed {} /// An object representing a signal. /// @@ -61,12 +220,12 @@ pub trait Filter: FilterSealed {} #[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct Signal(pub i32); -impl FilterSealed for Signal { +impl QueueableSealed for Signal { fn registration(&mut self) -> Registration { (*self).into() } } -impl Filter for Signal {} +impl Queueable for Signal {} /// Wait for a child process to exit. /// @@ -82,18 +241,18 @@ impl Exit { } } -impl FilterSealed for Exit { +impl QueueableSealed for Exit { fn registration(&mut self) -> Registration { self.0.take().expect("Cannot reregister child").into() } } -impl Filter for Exit {} +impl Queueable for Exit {} mod __private { use crate::reactor::Registration; #[doc(hidden)] - pub trait FilterSealed { + pub trait QueueableSealed { /// Get a registration object for this filter. fn registration(&mut self) -> Registration; } From 318dd73b3ff947567c8d11f92533eca42ded5355 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 11 Apr 2023 14:10:16 -0700 Subject: [PATCH 7/8] Review comments --- src/os/kqueue.rs | 20 +++++++++++++++----- src/reactor/kqueue.rs | 12 ------------ 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index 2feb72a..78b026e 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -26,6 +26,18 @@ use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; #[derive(Debug)] pub struct Filter(Async); +impl AsRef for Filter { + fn as_ref(&self) -> &T { + self.0.as_ref() + } +} + +impl AsMut for Filter { + fn as_mut(&mut self) -> &mut T { + self.0.as_mut() + } +} + impl Filter { /// Create a new [`Filter`] around a [`Queueable`]. /// @@ -33,8 +45,6 @@ impl Filter { /// /// ```no_run /// use std::process::Command; - /// - /// use async_io::Async; /// use async_io::os::kqueue::{Exit, Filter}; /// /// // Create a new process to wait for. @@ -45,7 +55,7 @@ impl Filter { /// /// // Wait for the process to exit. /// # async_io::block_on(async { - /// process.readable().await.unwrap(); + /// process.ready().await.unwrap(); /// # }); /// ``` pub fn new(mut filter: T) -> Result { @@ -222,7 +232,7 @@ pub struct Signal(pub i32); impl QueueableSealed for Signal { fn registration(&mut self) -> Registration { - (*self).into() + Registration::Signal(*self) } } impl Queueable for Signal {} @@ -243,7 +253,7 @@ impl Exit { impl QueueableSealed for Exit { fn registration(&mut self) -> Registration { - self.0.take().expect("Cannot reregister child").into() + Registration::Process(self.0.take().expect("Cannot reregister child")) } } impl Queueable for Exit {} diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs index 34a4711..0aacc23 100644 --- a/src/reactor/kqueue.rs +++ b/src/reactor/kqueue.rs @@ -41,18 +41,6 @@ impl From for Registration { } } -impl From for Registration { - fn from(signal: Signal) -> Self { - Self::Signal(signal) - } -} - -impl From for Registration { - fn from(process: Child) -> Self { - Self::Process(process) - } -} - impl Registration { /// Registers the object into the reactor. pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { From 820933739e5310ac94859c2f059a1e29178a88e3 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 11 Apr 2023 15:34:52 -0700 Subject: [PATCH 8/8] Inline small functions --- src/reactor/kqueue.rs | 4 ++++ src/reactor/unix.rs | 4 ++++ src/reactor/windows.rs | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs index 0aacc23..4c575a2 100644 --- a/src/reactor/kqueue.rs +++ b/src/reactor/kqueue.rs @@ -36,6 +36,7 @@ impl fmt::Debug for Registration { } impl From for Registration { + #[inline] fn from(raw: RawFd) -> Self { Self::Fd(raw) } @@ -43,6 +44,7 @@ impl From for Registration { impl Registration { /// Registers the object into the reactor. + #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { match self { Self::Fd(raw) => poller.add(*raw, Event::none(token)), @@ -58,6 +60,7 @@ impl Registration { } /// Re-registers the object into the reactor. + #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { match self { Self::Fd(raw) => poller.modify(*raw, interest), @@ -73,6 +76,7 @@ impl Registration { } /// Deregisters the object from the reactor. + #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { match self { Self::Fd(raw) => poller.delete(*raw), diff --git a/src/reactor/unix.rs b/src/reactor/unix.rs index 1ad806f..2db2437 100644 --- a/src/reactor/unix.rs +++ b/src/reactor/unix.rs @@ -20,6 +20,7 @@ impl fmt::Debug for Registration { } impl From for Registration { + #[inline] fn from(raw: RawFd) -> Self { Self { raw } } @@ -27,16 +28,19 @@ impl From for Registration { impl Registration { /// Registers the object into the reactor. + #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { poller.add(self.raw, Event::none(token)) } /// Re-registers the object into the reactor. + #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { poller.modify(self.raw, interest) } /// Deregisters the object from the reactor. + #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { poller.delete(self.raw) } diff --git a/src/reactor/windows.rs b/src/reactor/windows.rs index 2a86c41..59f247f 100644 --- a/src/reactor/windows.rs +++ b/src/reactor/windows.rs @@ -19,6 +19,7 @@ impl fmt::Debug for Registration { } impl From for Registration { + #[inline] fn from(raw: RawSocket) -> Self { Self { raw } } @@ -26,16 +27,19 @@ impl From for Registration { impl Registration { /// Registers the object into the reactor. + #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { poller.add(self.raw, Event::none(token)) } /// Re-registers the object into the reactor. + #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { poller.modify(self.raw, interest) } /// Deregisters the object from the reactor. + #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { poller.delete(self.raw) }