Skip to content

Commit

Permalink
feat: Expose other kqueue filters (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull authored Apr 13, 2023
1 parent 0c3f75f commit 913b236
Show file tree
Hide file tree
Showing 9 changed files with 562 additions and 32 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ concurrent-queue = "2.2.0"
futures-lite = "1.11.0"
log = "0.4.11"
parking = "2.0.0"
polling = "2.0.0"
polling = "2.6.0"
rustix = { version = "0.37.1", default-features = false, features = ["std", "fs"] }
slab = "0.4.2"
socket2 = { version = "0.4.2", features = ["all"] }
Expand Down
54 changes: 54 additions & 0 deletions examples/kqueue-process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Uses the `async_io::os::kqueue` module to wait for a process to terminate.
//!
//! Run with:
//!
//! ```
//! cargo run --example kqueue-process
//! ```
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "tvos",
target_os = "watchos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
fn main() -> std::io::Result<()> {
use std::process::Command;

use async_io::os::kqueue::{Exit, Filter};
use futures_lite::future;

future::block_on(async {
// Spawn a process.
let process = Command::new("sleep")
.arg("3")
.spawn()
.expect("failed to spawn process");

// Wrap the process in an `Async` object that waits for it to exit.
let process = Filter::new(Exit::new(process))?;

// Wait for the process to exit.
process.ready().await?;

Ok(())
})
}

#[cfg(not(any(
target_os = "macos",
target_os = "ios",
target_os = "tvos",
target_os = "watchos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
)))]
fn main() {
println!("This example only works for kqueue-enabled platforms.");
}
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ use crate::reactor::{Reactor, Source};
mod driver;
mod reactor;

pub mod os;

pub use driver::block_on;
pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};

Expand Down Expand Up @@ -685,7 +687,7 @@ impl<T: AsRawFd> Async<T> {
#[cfg(unix)]
impl<T: AsRawFd> AsRawFd for Async<T> {
fn as_raw_fd(&self) -> RawFd {
self.source.raw
self.get_ref().as_raw_fd()
}
}

Expand Down Expand Up @@ -761,7 +763,7 @@ impl<T: AsRawSocket> Async<T> {
#[cfg(windows)]
impl<T: AsRawSocket> AsRawSocket for Async<T> {
fn as_raw_socket(&self) -> RawSocket {
self.source.raw
self.get_ref().as_raw_socket()
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/os.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Platform-specific functionality.
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "tvos",
target_os = "watchos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
pub mod kqueue;
269 changes: 269 additions & 0 deletions src/os/kqueue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
//! Functionality that is only available for `kqueue`-based platforms.
use __private::QueueableSealed;

use crate::reactor::{Reactor, Readable, Registration};
use crate::Async;

use std::convert::{TryFrom, TryInto};
use std::future::Future;
use std::io::{Error, Result};
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::process::Child;
use std::task::{Context, Poll};

#[cfg(not(async_io_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};

/// A wrapper around a queueable object that waits until it is ready.
///
/// The underlying `kqueue` implementation can be used to poll for events besides file descriptor
/// read/write readiness. This API makes these faculties available to the user.
///
/// See the [`Queueable`] trait and its implementors for objects that currently support being registered
/// into the reactor.
#[derive(Debug)]
pub struct Filter<T>(Async<T>);

impl<T> AsRef<T> for Filter<T> {
fn as_ref(&self) -> &T {
self.0.as_ref()
}
}

impl<T> AsMut<T> for Filter<T> {
fn as_mut(&mut self) -> &mut T {
self.0.as_mut()
}
}

impl<T: Queueable> Filter<T> {
/// Create a new [`Filter`] around a [`Queueable`].
///
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::kqueue::{Exit, Filter};
///
/// // Create a new process to wait for.
/// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
///
/// // Wrap the process in an `Async` object that waits for it to exit.
/// let process = Filter::new(Exit::new(child)).unwrap();
///
/// // Wait for the process to exit.
/// # async_io::block_on(async {
/// process.ready().await.unwrap();
/// # });
/// ```
pub fn new(mut filter: T) -> Result<Self> {
Ok(Self(Async {
source: Reactor::get().insert_io(filter.registration())?,
io: Some(filter),
}))
}
}

impl<T: AsRawFd> AsRawFd for Filter<T> {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}

#[cfg(not(async_io_no_io_safety))]
impl<T: AsFd> AsFd for Filter<T> {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}

#[cfg(not(async_io_no_io_safety))]
impl<T: AsRawFd + From<OwnedFd>> TryFrom<OwnedFd> for Filter<T> {
type Error = Error;

fn try_from(fd: OwnedFd) -> Result<Self> {
Ok(Self(Async::try_from(fd)?))
}
}

#[cfg(not(async_io_no_io_safety))]
impl<T: Into<OwnedFd>> TryFrom<Filter<T>> for OwnedFd {
type Error = Error;

fn try_from(filter: Filter<T>) -> Result<Self> {
filter.0.try_into()
}
}

impl<T> Filter<T> {
/// Gets a reference to the underlying [`Queueable`] object.
///
/// # Examples
///
/// ```
/// use async_io::os::kqueue::{Exit, Filter};
///
/// # futures_lite::future::block_on(async {
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
/// let process = Filter::new(Exit::new(child)).unwrap();
/// let inner = process.get_ref();
/// # });
/// ```
pub fn get_ref(&self) -> &T {
self.0.get_ref()
}

/// Gets a mutable reference to the underlying [`Queueable`] object.
///
/// # Examples
///
/// ```
/// use async_io::os::kqueue::{Exit, Filter};
///
/// # futures_lite::future::block_on(async {
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
/// let mut process = Filter::new(Exit::new(child)).unwrap();
/// let inner = process.get_mut();
/// # });
/// ```
pub fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}

/// Unwraps the inner [`Queueable`] object.
///
/// # Examples
///
/// ```
/// use async_io::os::kqueue::{Exit, Filter};
///
/// # futures_lite::future::block_on(async {
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
/// let process = Filter::new(Exit::new(child)).unwrap();
/// let inner = process.into_inner().unwrap();
/// # });
/// ```
pub fn into_inner(self) -> Result<T> {
self.0.into_inner()
}

/// Waits until the [`Queueable`] object is ready.
///
/// This method completes when the underlying [`Queueable`] object has completed. See the documentation
/// for the [`Queueable`] object for more information.
///
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::kqueue::{Exit, Filter};
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Filter::new(Exit::new(child))?;
///
/// // Wait for the process to exit.
/// process.ready().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn ready(&self) -> Ready<'_, T> {
Ready(self.0.readable())
}

/// Polls the I/O handle for readiness.
///
/// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification
/// that the underlying [`Queueable`] object is ready. See the documentation for the [`Queueable`]
/// object for more information.
///
/// # Caveats
///
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
/// will just keep waking each other in turn, thus wasting CPU time.
///
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::kqueue::{Exit, Filter};
/// use futures_lite::future;
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Filter::new(Exit::new(child))?;
///
/// // Wait for the process to exit.
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.0.poll_readable(cx)
}
}

/// Future for [`Filter::ready`].
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Ready<'a, T>(Readable<'a, T>);

impl<T> Future for Ready<'_, T> {
type Output = Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}

/// Objects that can be registered into the reactor via a [`Async`](crate::Async).
///
/// These objects represent other filters associated with the `kqueue` runtime aside from readability
/// and writability. Rather than waiting on readable/writable, they wait on "readiness". This is
/// typically used for signals and child process exits.
pub trait Queueable: QueueableSealed {}

/// An object representing a signal.
///
/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
/// it will return a [`readable`](crate::Async::readable) event when the signal is received.
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub struct Signal(pub i32);

impl QueueableSealed for Signal {
fn registration(&mut self) -> Registration {
Registration::Signal(*self)
}
}
impl Queueable for Signal {}

/// Wait for a child process to exit.
///
/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
/// it will return a [`readable`](crate::Async::readable) event when the child process exits.
#[derive(Debug)]
pub struct Exit(Option<Child>);

impl Exit {
/// Create a new `Exit` object.
pub fn new(child: Child) -> Self {
Self(Some(child))
}
}

impl QueueableSealed for Exit {
fn registration(&mut self) -> Registration {
Registration::Process(self.0.take().expect("Cannot reregister child"))
}
}
impl Queueable for Exit {}

mod __private {
use crate::reactor::Registration;

#[doc(hidden)]
pub trait QueueableSealed {
/// Get a registration object for this filter.
fn registration(&mut self) -> Registration;
}
}
Loading

0 comments on commit 913b236

Please sign in to comment.