diff --git a/Cargo.toml b/Cargo.toml index af399dc..ad86e0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ concurrent-queue = "2" futures-lite = "1.11.0" log = "0.4.11" parking = "2.0.0" -polling = "2.0.0" +polling = "2.6.0" rustix = { version = "0.37.1", default-features = false, features = ["std", "fs"] } slab = "0.4.2" socket2 = { version = "0.4.2", features = ["all"] } diff --git a/examples/kqueue-process.rs b/examples/kqueue-process.rs new file mode 100644 index 0000000..c88ccc4 --- /dev/null +++ b/examples/kqueue-process.rs @@ -0,0 +1,54 @@ +//! Uses the `async_io::os::kqueue` module to wait for a process to terminate. +//! +//! Run with: +//! +//! ``` +//! cargo run --example kqueue-process +//! ``` + +#[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +))] +fn main() -> std::io::Result<()> { + use std::process::Command; + + use async_io::os::kqueue::{Exit, Filter}; + use futures_lite::future; + + future::block_on(async { + // Spawn a process. + let process = Command::new("sleep") + .arg("3") + .spawn() + .expect("failed to spawn process"); + + // Wrap the process in an `Async` object that waits for it to exit. + let process = Filter::new(Exit::new(process))?; + + // Wait for the process to exit. + process.ready().await?; + + Ok(()) + }) +} + +#[cfg(not(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +)))] +fn main() { + println!("This example only works for kqueue-enabled platforms."); +} diff --git a/src/lib.rs b/src/lib.rs index 450c1b8..4e90b31 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,6 +88,8 @@ use crate::reactor::{Reactor, Source}; mod driver; mod reactor; +pub mod os; + pub use driver::block_on; pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned}; @@ -664,7 +666,7 @@ impl Async { #[cfg(unix)] impl AsRawFd for Async { fn as_raw_fd(&self) -> RawFd { - self.source.raw + self.get_ref().as_raw_fd() } } @@ -740,7 +742,7 @@ impl Async { #[cfg(windows)] impl AsRawSocket for Async { fn as_raw_socket(&self) -> RawSocket { - self.source.raw + self.get_ref().as_raw_socket() } } diff --git a/src/os.rs b/src/os.rs new file mode 100644 index 0000000..5db3c57 --- /dev/null +++ b/src/os.rs @@ -0,0 +1,13 @@ +//! Platform-specific functionality. + +#[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", +))] +pub mod kqueue; diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs new file mode 100644 index 0000000..78b026e --- /dev/null +++ b/src/os/kqueue.rs @@ -0,0 +1,269 @@ +//! Functionality that is only available for `kqueue`-based platforms. + +use __private::QueueableSealed; + +use crate::reactor::{Reactor, Readable, Registration}; +use crate::Async; + +use std::convert::{TryFrom, TryInto}; +use std::future::Future; +use std::io::{Error, Result}; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::pin::Pin; +use std::process::Child; +use std::task::{Context, Poll}; + +#[cfg(not(async_io_no_io_safety))] +use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + +/// A wrapper around a queueable object that waits until it is ready. +/// +/// The underlying `kqueue` implementation can be used to poll for events besides file descriptor +/// read/write readiness. This API makes these faculties available to the user. +/// +/// See the [`Queueable`] trait and its implementors for objects that currently support being registered +/// into the reactor. +#[derive(Debug)] +pub struct Filter(Async); + +impl AsRef for Filter { + fn as_ref(&self) -> &T { + self.0.as_ref() + } +} + +impl AsMut for Filter { + fn as_mut(&mut self) -> &mut T { + self.0.as_mut() + } +} + +impl Filter { + /// Create a new [`Filter`] around a [`Queueable`]. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// // Create a new process to wait for. + /// let mut child = Command::new("sleep").arg("5").spawn().unwrap(); + /// + /// // Wrap the process in an `Async` object that waits for it to exit. + /// let process = Filter::new(Exit::new(child)).unwrap(); + /// + /// // Wait for the process to exit. + /// # async_io::block_on(async { + /// process.ready().await.unwrap(); + /// # }); + /// ``` + pub fn new(mut filter: T) -> Result { + Ok(Self(Async { + source: Reactor::get().insert_io(filter.registration())?, + io: Some(filter), + })) + } +} + +impl AsRawFd for Filter { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(not(async_io_no_io_safety))] +impl AsFd for Filter { + fn as_fd(&self) -> BorrowedFd<'_> { + self.0.as_fd() + } +} + +#[cfg(not(async_io_no_io_safety))] +impl> TryFrom for Filter { + type Error = Error; + + fn try_from(fd: OwnedFd) -> Result { + Ok(Self(Async::try_from(fd)?)) + } +} + +#[cfg(not(async_io_no_io_safety))] +impl> TryFrom> for OwnedFd { + type Error = Error; + + fn try_from(filter: Filter) -> Result { + filter.0.try_into() + } +} + +impl Filter { + /// Gets a reference to the underlying [`Queueable`] object. + /// + /// # Examples + /// + /// ``` + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); + /// let inner = process.get_ref(); + /// # }); + /// ``` + pub fn get_ref(&self) -> &T { + self.0.get_ref() + } + + /// Gets a mutable reference to the underlying [`Queueable`] object. + /// + /// # Examples + /// + /// ``` + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); + /// let mut process = Filter::new(Exit::new(child)).unwrap(); + /// let inner = process.get_mut(); + /// # }); + /// ``` + pub fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } + + /// Unwraps the inner [`Queueable`] object. + /// + /// # Examples + /// + /// ``` + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); + /// let inner = process.into_inner().unwrap(); + /// # }); + /// ``` + pub fn into_inner(self) -> Result { + self.0.into_inner() + } + + /// Waits until the [`Queueable`] object is ready. + /// + /// This method completes when the underlying [`Queueable`] object has completed. See the documentation + /// for the [`Queueable`] object for more information. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::kqueue::{Exit, Filter}; + /// + /// # futures_lite::future::block_on(async { + /// let child = Command::new("sleep").arg("5").spawn()?; + /// let process = Filter::new(Exit::new(child))?; + /// + /// // Wait for the process to exit. + /// process.ready().await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ready(&self) -> Ready<'_, T> { + Ready(self.0.readable()) + } + + /// Polls the I/O handle for readiness. + /// + /// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification + /// that the underlying [`Queueable`] object is ready. See the documentation for the [`Queueable`] + /// object for more information. + /// + /// # Caveats + /// + /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks + /// will just keep waking each other in turn, thus wasting CPU time. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::kqueue::{Exit, Filter}; + /// use futures_lite::future; + /// + /// # futures_lite::future::block_on(async { + /// let child = Command::new("sleep").arg("5").spawn()?; + /// let process = Filter::new(Exit::new(child))?; + /// + /// // Wait for the process to exit. + /// future::poll_fn(|cx| process.poll_ready(cx)).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_readable(cx) + } +} + +/// Future for [`Filter::ready`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Ready<'a, T>(Readable<'a, T>); + +impl Future for Ready<'_, T> { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) + } +} + +/// Objects that can be registered into the reactor via a [`Async`](crate::Async). +/// +/// These objects represent other filters associated with the `kqueue` runtime aside from readability +/// and writability. Rather than waiting on readable/writable, they wait on "readiness". This is +/// typically used for signals and child process exits. +pub trait Queueable: QueueableSealed {} + +/// An object representing a signal. +/// +/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter), +/// it will return a [`readable`](crate::Async::readable) event when the signal is received. +#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub struct Signal(pub i32); + +impl QueueableSealed for Signal { + fn registration(&mut self) -> Registration { + Registration::Signal(*self) + } +} +impl Queueable for Signal {} + +/// Wait for a child process to exit. +/// +/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter), +/// it will return a [`readable`](crate::Async::readable) event when the child process exits. +#[derive(Debug)] +pub struct Exit(Option); + +impl Exit { + /// Create a new `Exit` object. + pub fn new(child: Child) -> Self { + Self(Some(child)) + } +} + +impl QueueableSealed for Exit { + fn registration(&mut self) -> Registration { + Registration::Process(self.0.take().expect("Cannot reregister child")) + } +} +impl Queueable for Exit {} + +mod __private { + use crate::reactor::Registration; + + #[doc(hidden)] + pub trait QueueableSealed { + /// Get a registration object for this filter. + fn registration(&mut self) -> Registration; + } +} diff --git a/src/reactor.rs b/src/reactor.rs index 6b933ba..56d3ce7 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -5,10 +5,6 @@ use std::future::Future; use std::io; use std::marker::PhantomData; use std::mem; -#[cfg(unix)] -use std::os::unix::io::RawFd; -#[cfg(windows)] -use std::os::windows::io::RawSocket; use std::panic; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -22,6 +18,31 @@ use futures_lite::ready; use polling::{Event, Poller}; use slab::Slab; +// Choose the proper implementation of `Registration` based on the target platform. +cfg_if::cfg_if! { + if #[cfg(windows)] { + mod windows; + pub use windows::Registration; + } else if #[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + ))] { + mod kqueue; + pub use kqueue::Registration; + } else if #[cfg(unix)] { + mod unix; + pub use unix::Registration; + } else { + compile_error!("unsupported platform"); + } +} + const READ: usize = 0; const WRITE: usize = 1; @@ -88,17 +109,13 @@ impl Reactor { } /// Registers an I/O source in the reactor. - pub(crate) fn insert_io( - &self, - #[cfg(unix)] raw: RawFd, - #[cfg(windows)] raw: RawSocket, - ) -> io::Result> { + pub(crate) fn insert_io(&self, raw: impl Into) -> io::Result> { // Create an I/O source for this file descriptor. let source = { let mut sources = self.sources.lock().unwrap(); let key = sources.vacant_entry().key(); let source = Arc::new(Source { - raw, + registration: raw.into(), key, state: Default::default(), }); @@ -107,7 +124,7 @@ impl Reactor { }; // Register the file descriptor. - if let Err(err) = self.poller.add(raw, Event::none(source.key)) { + if let Err(err) = source.registration.add(&self.poller, source.key) { let mut sources = self.sources.lock().unwrap(); sources.remove(source.key); return Err(err); @@ -120,7 +137,7 @@ impl Reactor { pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> { let mut sources = self.sources.lock().unwrap(); sources.remove(source.key); - self.poller.delete(source.raw) + source.registration.delete(&self.poller) } /// Registers a timer in the reactor. @@ -299,8 +316,8 @@ impl ReactorLock<'_> { // e.g. we were previously interested in both readability and writability, // but only one of them was emitted. if !state[READ].is_empty() || !state[WRITE].is_empty() { - self.reactor.poller.modify( - source.raw, + source.registration.modify( + &self.reactor.poller, Event { key: source.key, readable: !state[READ].is_empty(), @@ -341,13 +358,8 @@ enum TimerOp { /// A registered source of I/O events. #[derive(Debug)] pub(crate) struct Source { - /// Raw file descriptor on Unix platforms. - #[cfg(unix)] - pub(crate) raw: RawFd, - - /// Raw socket handle on Windows. - #[cfg(windows)] - pub(crate) raw: RawSocket, + /// This source's registration into the reactor. + registration: Registration, /// The key of this source obtained during registration. key: usize, @@ -436,8 +448,8 @@ impl Source { // Update interest in this I/O handle. if was_empty { - Reactor::get().poller.modify( - self.raw, + self.registration.modify( + &Reactor::get().poller, Event { key: self.key, readable: !state[READ].is_empty(), @@ -490,7 +502,7 @@ impl Future for Readable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("readable: fd={}", self.0.handle.source.raw); + log::trace!("readable: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -510,7 +522,7 @@ impl Future for ReadableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("readable_owned: fd={}", self.0.handle.source.raw); + log::trace!("readable_owned: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -530,7 +542,7 @@ impl Future for Writable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("writable: fd={}", self.0.handle.source.raw); + log::trace!("writable: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -550,7 +562,7 @@ impl Future for WritableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("writable_owned: fd={}", self.0.handle.source.raw); + log::trace!("writable_owned: fd={:?}", self.0.handle.source.registration); Poll::Ready(Ok(())) } } @@ -610,8 +622,8 @@ impl> + Clone, T> Future for Ready { // Update interest in this I/O handle. if was_empty { - Reactor::get().poller.modify( - handle.borrow().source.raw, + handle.borrow().source.registration.modify( + &Reactor::get().poller, Event { key: handle.borrow().source.key, readable: !state[READ].is_empty(), diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs new file mode 100644 index 0000000..4c575a2 --- /dev/null +++ b/src/reactor/kqueue.rs @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use crate::os::kqueue::Signal; + +use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal}; +use polling::{Event, PollMode, Poller}; + +use std::fmt; +use std::io::Result; +use std::os::unix::io::RawFd; +use std::process::Child; + +/// The raw registration into the reactor. +/// +/// This needs to be public, since it is technically exposed through the `QueueableSealed` trait. +#[doc(hidden)] +pub enum Registration { + /// Raw file descriptor for readability/writability. + Fd(RawFd), + + /// Raw signal number for signal delivery. + Signal(Signal), + + /// Process for process termination. + Process(Child), +} + +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Fd(raw) => fmt::Debug::fmt(raw, f), + Self::Signal(signal) => fmt::Debug::fmt(signal, f), + Self::Process(process) => fmt::Debug::fmt(process, f), + } + } +} + +impl From for Registration { + #[inline] + fn from(raw: RawFd) -> Self { + Self::Fd(raw) + } +} + +impl Registration { + /// Registers the object into the reactor. + #[inline] + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + match self { + Self::Fd(raw) => poller.add(*raw, Event::none(token)), + Self::Signal(signal) => { + poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) + } + Self::Process(process) => poller.add_filter( + Process::new(process, ProcessOps::Exit), + token, + PollMode::Oneshot, + ), + } + } + + /// Re-registers the object into the reactor. + #[inline] + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + match self { + Self::Fd(raw) => poller.modify(*raw, interest), + Self::Signal(signal) => { + poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) + } + Self::Process(process) => poller.modify_filter( + Process::new(process, ProcessOps::Exit), + interest.key, + PollMode::Oneshot, + ), + } + } + + /// Deregisters the object from the reactor. + #[inline] + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + match self { + Self::Fd(raw) => poller.delete(*raw), + Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), + Self::Process(process) => poller.delete_filter(Process::new(process, ProcessOps::Exit)), + } + } +} diff --git a/src/reactor/unix.rs b/src/reactor/unix.rs new file mode 100644 index 0000000..2db2437 --- /dev/null +++ b/src/reactor/unix.rs @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use polling::{Event, Poller}; + +use std::fmt; +use std::io::Result; +use std::os::unix::io::RawFd; + +/// The raw registration into the reactor. +#[doc(hidden)] +pub struct Registration { + /// Raw file descriptor on Unix. + raw: RawFd, +} + +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.raw, f) + } +} + +impl From for Registration { + #[inline] + fn from(raw: RawFd) -> Self { + Self { raw } + } +} + +impl Registration { + /// Registers the object into the reactor. + #[inline] + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + poller.add(self.raw, Event::none(token)) + } + + /// Re-registers the object into the reactor. + #[inline] + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + poller.modify(self.raw, interest) + } + + /// Deregisters the object from the reactor. + #[inline] + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + poller.delete(self.raw) + } +} diff --git a/src/reactor/windows.rs b/src/reactor/windows.rs new file mode 100644 index 0000000..59f247f --- /dev/null +++ b/src/reactor/windows.rs @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use polling::{Event, Poller}; +use std::fmt; +use std::io::Result; +use std::os::windows::io::RawSocket; + +/// The raw registration into the reactor. +#[doc(hidden)] +pub struct Registration { + /// Raw socket handle on Windows. + raw: RawSocket, +} + +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.raw, f) + } +} + +impl From for Registration { + #[inline] + fn from(raw: RawSocket) -> Self { + Self { raw } + } +} + +impl Registration { + /// Registers the object into the reactor. + #[inline] + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + poller.add(self.raw, Event::none(token)) + } + + /// Re-registers the object into the reactor. + #[inline] + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + poller.modify(self.raw, interest) + } + + /// Deregisters the object from the reactor. + #[inline] + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + poller.delete(self.raw) + } +}