diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5cefdea..055170c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -169,6 +169,13 @@ jobs: # --no-self-update is necessary because the windows environment cannot self-update rustup.exe. run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }} - run: cargo build + - name: Install Other Targets + if: startsWith(matrix.os, 'ubuntu') + run: rustup target add x86_64-unknown-freebsd x86_64-unknown-netbsd + - run: cargo build --target x86_64-unknown-freebsd + if: startsWith(matrix.os, 'ubuntu') + - run: cargo build --target x86_64-unknown-netbsd + if: startsWith(matrix.os, 'ubuntu') clippy: runs-on: ubuntu-latest diff --git a/examples/wait-signal.rs b/examples/wait-signal.rs new file mode 100644 index 0000000..02a4c5d --- /dev/null +++ b/examples/wait-signal.rs @@ -0,0 +1,80 @@ +#[cfg(all( + any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + ), + not(polling_test_poll_backend), +))] +mod example { + use polling::os::kqueue::{PollerKqueueExt, Signal}; + use polling::{PollMode, Poller}; + + pub(super) fn main2() { + // Create a poller. + let poller = Poller::new().unwrap(); + + // Register SIGINT in the poller. + let sigint = Signal(libc::SIGINT); + poller.add_filter(sigint, 1, PollMode::Oneshot).unwrap(); + + let mut events = vec![]; + + println!("Press Ctrl+C to exit..."); + + loop { + // Wait for events. + poller.wait(&mut events, None).unwrap(); + + // Process events. + for ev in events.drain(..) { + match ev.key { + 1 => { + println!("SIGINT received"); + return; + } + _ => unreachable!(), + } + } + } + } +} + +#[cfg(all( + any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + ), + not(polling_test_poll_backend), +))] +fn main() { + example::main2(); +} + +#[cfg(not(all( + any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + ), + not(polling_test_poll_backend), +)))] +fn main() { + eprintln!("This example is only supported on kqueue-based platforms."); +} diff --git a/src/kqueue.rs b/src/kqueue.rs index 7ace814..bc127d0 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -65,11 +65,7 @@ impl Poller { log::trace!("add: kqueue_fd={}, fd={}, ev={:?}", self.kqueue_fd, fd, ev); } - let mode_flags = match mode { - PollMode::Oneshot => libc::EV_ONESHOT, - PollMode::Level => 0, - PollMode::Edge => libc::EV_CLEAR, - }; + let mode_flags = mode_to_flags(mode); let read_flags = if ev.readable { libc::EV_ADD | mode_flags @@ -105,7 +101,7 @@ impl Poller { } /// Submit one or more changes to the kernel queue and check to see if they succeeded. - fn submit_changes(&self, changelist: A) -> io::Result<()> + pub(crate) fn submit_changes(&self, changelist: A) -> io::Result<()> where A: Copy + AsRef<[libc::kevent]> + AsMut<[libc::kevent]>, { @@ -229,19 +225,47 @@ impl Events { /// Iterates over I/O events. pub fn iter(&self) -> impl Iterator + '_ { + const READABLES: &[FilterName] = &[ + libc::EVFILT_READ, + libc::EVFILT_VNODE, + libc::EVFILT_PROC, + libc::EVFILT_SIGNAL, + libc::EVFILT_TIMER, + ]; + // On some platforms, closing the read end of a pipe wakes up writers, but the // event is reported as EVFILT_READ with the EV_EOF flag. // // https://github.com/golang/go/commit/23aad448b1e3f7c3b4ba2af90120bde91ac865b4 self.list[..self.len].iter().map(|ev| Event { key: ev.udata as usize, - readable: ev.filter == libc::EVFILT_READ, + readable: READABLES.contains(&ev.filter), writable: ev.filter == libc::EVFILT_WRITE || (ev.filter == libc::EVFILT_READ && (ev.flags & libc::EV_EOF) != 0), }) } } +pub(crate) fn mode_to_flags(mode: PollMode) -> FilterFlags { + match mode { + PollMode::Oneshot => libc::EV_ONESHOT, + PollMode::Level => 0, + PollMode::Edge => libc::EV_CLEAR, + } +} + +#[cfg(target_os = "netbsd")] +pub(crate) type FilterFlags = u32; + +#[cfg(not(target_os = "netbsd"))] +pub(crate) type FilterFlags = libc::c_ushort; + +#[cfg(target_os = "netbsd")] +pub(crate) type FilterName = u32; + +#[cfg(not(target_os = "netbsd"))] +pub(crate) type FilterName = libc::c_short; + #[cfg(any( target_os = "freebsd", target_os = "dragonfly", diff --git a/src/lib.rs b/src/lib.rs index d388283..a6795c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,6 +120,8 @@ cfg_if! { } } +pub mod os; + /// Key associated with notifications. const NOTIFY_KEY: usize = std::usize::MAX; diff --git a/src/os.rs b/src/os.rs new file mode 100644 index 0000000..280adf3 --- /dev/null +++ b/src/os.rs @@ -0,0 +1,21 @@ +//! Platform-specific functionality. + +#[cfg(all( + any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + ), + not(polling_test_poll_backend), +))] +pub mod kqueue; + +mod __private { + #[doc(hidden)] + pub trait PollerSealed {} +} diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs new file mode 100644 index 0000000..ad6aa1d --- /dev/null +++ b/src/os/kqueue.rs @@ -0,0 +1,304 @@ +//! Functionality that is only available for `kqueue`-based platforms. + +use crate::sys::{mode_to_flags, FilterFlags}; +use crate::{PollMode, Poller}; + +use std::convert::TryInto; +use std::process::Child; +use std::time::Duration; +use std::{io, mem}; + +use super::__private::PollerSealed; +use __private::FilterSealed; + +// TODO(notgull): We should also have EVFILT_AIO, EVFILT_VNODE and EVFILT_USER. However, the current +// API makes it difficult to effectively express events from these filters. At the next breaking +// change, we should change `Event` to be a struct with private fields, and encode additional +// information in there. + +/// Functionality that is only available for `kqueue`-based platforms. +/// +/// `kqueue` is able to monitor much more than just read/write readiness on file descriptors. Using +/// this extension trait, you can monitor for signals, process exits, and more. See the implementors +/// of the [`Filter`] trait for more information. +pub trait PollerKqueueExt: PollerSealed { + /// Add a filter to the poller. + /// + /// This is similar to [`add`][Poller::add], but it allows you to specify a filter instead of + /// a socket. See the implementors of the [`Filter`] trait for more information. + /// + /// # Examples + /// + /// ```no_run + /// use polling::{Poller, PollMode}; + /// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal}; + /// + /// let poller = Poller::new().unwrap(); + /// + /// // Register the SIGINT signal. + /// poller.add_filter(Signal(libc::SIGINT), 0, PollMode::Oneshot).unwrap(); + /// + /// // Wait for the signal. + /// let mut events = vec![]; + /// poller.wait(&mut events, None).unwrap(); + /// # let _ = events; + /// ``` + fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()>; + + /// Modify a filter in the poller. + /// + /// This is similar to [`modify`][Poller::modify], but it allows you to specify a filter + /// instead of a socket. See the implementors of the [`Filter`] trait for more information. + /// + /// # Examples + /// + /// ```no_run + /// use polling::{Poller, PollMode}; + /// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal}; + /// + /// let poller = Poller::new().unwrap(); + /// + /// // Register the SIGINT signal. + /// poller.add_filter(Signal(libc::SIGINT), 0, PollMode::Oneshot).unwrap(); + /// + /// // Re-register with a different key. + /// poller.modify_filter(Signal(libc::SIGINT), 1, PollMode::Oneshot).unwrap(); + /// + /// // Wait for the signal. + /// let mut events = vec![]; + /// poller.wait(&mut events, None).unwrap(); + /// # let _ = events; + /// ``` + fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()>; + + /// Remove a filter from the poller. + /// + /// This is used to remove filters that were previously added with + /// [`add_filter`](PollerKqueueExt::add_filter). + /// + /// # Examples + /// + /// ```no_run + /// use polling::{Poller, PollMode}; + /// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal}; + /// + /// let poller = Poller::new().unwrap(); + /// + /// // Register the SIGINT signal. + /// poller.add_filter(Signal(libc::SIGINT), 0, PollMode::Oneshot).unwrap(); + /// + /// // Remove the filter. + /// poller.delete_filter(Signal(libc::SIGINT)).unwrap(); + /// ``` + fn delete_filter(&self, filter: F) -> io::Result<()>; +} + +impl PollerSealed for Poller {} + +impl PollerKqueueExt for Poller { + #[inline(always)] + fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> { + // No difference between adding and modifying in kqueue. + self.modify_filter(filter, key, mode) + } + + fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> { + // Convert the filter into a kevent. + let event = filter.filter(libc::EV_ADD | mode_to_flags(mode), key); + + // Modify the filter. + self.poller.submit_changes([event]) + } + + fn delete_filter(&self, filter: F) -> io::Result<()> { + // Convert the filter into a kevent. + let event = filter.filter(libc::EV_DELETE, 0); + + // Delete the filter. + self.poller.submit_changes([event]) + } +} + +/// A filter that can be registered into a `kqueue`. +pub trait Filter: FilterSealed {} + +unsafe impl FilterSealed for &T { + #[inline(always)] + fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent { + (**self).filter(flags, key) + } +} + +impl Filter for &T {} + +/// Monitor this signal number. +/// +/// No matter what `PollMode` is specified, this filter will always be +/// oneshot-only. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Signal(pub c_int); + +/// Alias for `libc::c_int`. +#[allow(non_camel_case_types)] +pub type c_int = i32; + +unsafe impl FilterSealed for Signal { + #[inline(always)] + fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent { + libc::kevent { + ident: self.0 as _, + filter: libc::EVFILT_SIGNAL, + flags: flags | libc::EV_RECEIPT, + udata: key as _, + ..unsafe { mem::zeroed() } + } + } +} + +impl Filter for Signal {} + +/// Monitor a child process. +#[derive(Debug)] +pub struct Process<'a> { + /// The child process to monitor. + child: &'a Child, + + /// The operation to monitor. + ops: ProcessOps, +} + +/// The operations that a monitored process can perform. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[non_exhaustive] +pub enum ProcessOps { + /// The process exited. + Exit, + + /// The process was forked. + Fork, + + /// The process executed a new process. + Exec, +} + +impl<'a> Process<'a> { + /// Monitor a child process. + pub fn new(child: &'a Child, ops: ProcessOps) -> Self { + Self { child, ops } + } +} + +unsafe impl FilterSealed for Process<'_> { + #[inline(always)] + fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent { + let fflags = match self.ops { + ProcessOps::Exit => libc::NOTE_EXIT, + ProcessOps::Fork => libc::NOTE_FORK, + ProcessOps::Exec => libc::NOTE_EXEC, + }; + + libc::kevent { + ident: self.child.id() as _, + filter: libc::EVFILT_PROC, + flags: flags | libc::EV_RECEIPT, + fflags, + udata: key as _, + ..unsafe { mem::zeroed() } + } + } +} + +impl Filter for Process<'_> {} + +/// Wait for a timeout to expire. +/// +/// Modifying the timeout after it has been added to the poller will reset it. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Timer { + /// Identifier for the timer. + pub id: usize, + + /// The timeout to wait for. + pub timeout: Duration, +} + +unsafe impl FilterSealed for Timer { + fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent { + // Figure out the granularity of the timer. + let (fflags, data) = { + #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd", target_os = "openbsd")))] + { + let subsec_nanos = self.timeout.subsec_nanos(); + + match (subsec_nanos % 1_000, subsec_nanos % 1_000_000, subsec_nanos) { + (_, _, 0) => ( + libc::NOTE_SECONDS, + self.timeout.as_secs().try_into().expect("too many seconds"), + ), + (_, 0, _) => ( + // Note: 0 by default means milliseconds. + 0, + self.timeout + .as_millis() + .try_into() + .expect("too many milliseconds"), + ), + (0, _, _) => ( + libc::NOTE_USECONDS, + self.timeout + .as_micros() + .try_into() + .expect("too many microseconds"), + ), + (_, _, _) => ( + libc::NOTE_NSECONDS, + self.timeout + .as_nanos() + .try_into() + .expect("too many nanoseconds"), + ), + } + } + + #[cfg(any(target_os = "dragonfly", target_os = "netbsd", target_os = "openbsd"))] + { + // OpenBSD/Dragonfly/NetBSD only supports milliseconds. + // NetBSD 10 supports NOTE_SECONDS et al, once Rust drops support for + // NetBSD 9 we can use the same code as above. + // See also: https://github.com/rust-lang/libc/pull/3080 + ( + 0, + self.timeout + .as_millis() + .try_into() + .expect("too many milliseconds"), + ) + } + }; + + #[allow(clippy::needless_update)] + libc::kevent { + ident: self.id as _, + filter: libc::EVFILT_TIMER, + flags: flags | libc::EV_RECEIPT, + fflags, + data, + udata: key as _, + ..unsafe { mem::zeroed() } + } + } +} + +impl Filter for Timer {} + +mod __private { + use crate::sys::FilterFlags; + + #[doc(hidden)] + pub unsafe trait FilterSealed { + /// Get the filter for the given event. + /// + /// This filter's flags must have `EV_RECEIPT`. + fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent; + } +}