diff --git a/Cargo.lock b/Cargo.lock index 7b55467..977c1ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -529,6 +529,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctrlc" +version = "3.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "672465ae37dc1bc6380a6547a8883d5dd397b0f1faaad4f265726cc7042a5345" +dependencies = [ + "nix 0.28.0", + "windows-sys 0.52.0", +] + [[package]] name = "dashmap" version = "5.5.1" @@ -653,6 +663,7 @@ dependencies = [ "dirs 5.0.1", "dns-lookup", "downcast-rs", + "dune_event_loop", "enable-ansi-support", "futures", "httparse", @@ -693,6 +704,21 @@ dependencies = [ "zstd", ] +[[package]] +name = "dune_event_loop" +version = "0.1.0" +source = "git+https://github.com/aalykiot/dune-event-loop?branch=main#0ab67d13de3f00858a242ceab8646ec5ae296501" +dependencies = [ + "anyhow", + "ctrlc", + "downcast-rs", + "mio", + "notify", + "rayon", + "signal-hook", + "signal-hook-mio", +] + [[package]] name = "either" version = "1.9.0" @@ -2243,6 +2269,27 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 33f9383..49baf22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,9 @@ name = "dune" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[target.'cfg(unix)'.dependencies] -nix = { version = "0.28.0", features = ["signal"] } - -[target.'cfg(windows)'.dependencies] -enable-ansi-support = "0.2.1" +[dependencies.dune_event_loop] +git = "https://github.com/aalykiot/dune-event-loop" +branch = "main" [dependencies] v8 = { version = "0.89.0", default-features = false } @@ -66,5 +64,11 @@ tokio = { version = "1.37.0", features = ["full"] } axum = { version = "0.7.5", features = ["ws"] } uuid = { version = "1.8.0", features = ["v4", "fast-rng"] } +[target.'cfg(unix)'.dependencies] +nix = { version = "0.28.0", features = ["signal"] } + +[target.'cfg(windows)'.dependencies] +enable-ansi-support = "0.2.1" + [dev-dependencies] assert_fs = "1.1.1" diff --git a/examples/signals.js b/examples/signals.js new file mode 100644 index 0000000..4665df2 --- /dev/null +++ b/examples/signals.js @@ -0,0 +1,17 @@ +import http from 'http'; + +// We need somehow to keep the event-loop alive. +http.createServer(() => {}).listen(3000); + +let shouldExit = false; + +// Exit on fast double CTRL+C key press. +const onSignal = () => { + if (shouldExit) process.exit(0); + shouldExit = true; + setTimeout(() => { + shouldExit = false; + }, 500); +}; + +process.on('SIGINT', onSignal); diff --git a/src/bindings.rs b/src/bindings.rs index 4355627..aee2ce3 100644 --- a/src/bindings.rs +++ b/src/bindings.rs @@ -7,6 +7,7 @@ use crate::net; use crate::perf_hooks; use crate::process; use crate::promise; +use crate::signals; use crate::stdio; use crate::timers; use anyhow::Error; @@ -28,6 +29,7 @@ lazy_static! { ("net", net::initialize), ("promise", promise::initialize), ("http_parser", http_parser::initialize), + ("signals", signals::initialize), ]; HashMap::from_iter(bindings.into_iter()) }; diff --git a/src/dns.rs b/src/dns.rs index 3f4bac0..dcdd0d8 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,12 +1,12 @@ use crate::bindings::set_exception_code; use crate::bindings::set_function_to; use crate::bindings::set_property_to; -use crate::event_loop::LoopHandle; -use crate::event_loop::TaskResult; use crate::runtime::JsFuture; use crate::runtime::JsRuntime; use anyhow::Result; use dns_lookup::lookup_host; +use dune_event_loop::LoopHandle; +use dune_event_loop::TaskResult; use std::net::IpAddr; pub fn initialize(scope: &mut v8::HandleScope) -> v8::Global { diff --git a/src/event_loop.rs b/src/event_loop.rs deleted file mode 100644 index 948e3f2..0000000 --- a/src/event_loop.rs +++ /dev/null @@ -1,1177 +0,0 @@ -use anyhow::anyhow; -use anyhow::Result; -use downcast_rs::impl_downcast; -use downcast_rs::Downcast; -use mio::net::TcpListener; -use mio::net::TcpStream; -use mio::Events; -use mio::Interest; -use mio::Poll; -use mio::Registry; -use mio::Token; -use mio::Waker; -pub use notify::Event as FsEvent; -use notify::RecommendedWatcher; -use notify::RecursiveMode; -use notify::Watcher; -use rayon::ThreadPool; -use rayon::ThreadPoolBuilder; -use std::any::type_name; -use std::borrow::Cow; -use std::cell::Cell; -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::LinkedList; -use std::io; -use std::io::Read; -use std::io::Write; -use std::net::Shutdown; -use std::net::SocketAddr; -use std::num::NonZeroUsize; -use std::path::Path; -use std::path::PathBuf; -use std::rc::Rc; -use std::sync::mpsc; -use std::sync::Arc; -use std::sync::Mutex; -use std::thread; -use std::time::Duration; -use std::time::Instant; - -/// Wrapper type for resource identification. -pub type Index = u32; - -/// All objects that are tracked by the event-loop should implement the `Resource` trait. -pub trait Resource: Downcast + 'static { - /// Returns a string representation of the resource. - fn name(&self) -> Cow { - type_name::().into() - } - /// Custom way to close any resources. - fn close(&mut self) {} -} - -impl_downcast!(Resource); - -/// Describes a timer resource. -struct TimerWrap { - cb: Box, - expires_at: Duration, - repeat: bool, -} - -impl Resource for TimerWrap {} - -/// Describes an async task. -struct TaskWrap { - inner: Option, -} - -impl Resource for TaskWrap {} - -// Wrapper types for the task resource. -type Task = Box TaskResult + Send>; -type TaskOnFinish = Box; -pub type TaskResult = Option>>; - -// Wrapper types for different TCP callbacks. -type TcpOnConnection = Box) + 'static>; -type TcpListenerOnConnection = Box) + 'static>; -type TcpOnWrite = Box) + 'static>; -type TcpOnRead = Box>) + 'static>; - -// Wrapper around check callbacks. -type OnCheck = Box; - -// Wrapper around close callbacks. -type OnClose = Box; - -// Wrapper around fs events callbacks. -type FsWatchOnEvent = Box; - -/// Describes a TCP connection. -struct TcpStreamWrap { - id: Index, - socket: TcpStream, - on_connection: Option, - on_read: Option, - write_queue: LinkedList<(Vec, TcpOnWrite)>, -} - -impl Resource for TcpStreamWrap { - #[allow(unused_must_use)] - fn close(&mut self) { - // Shutdown the write side of the stream. - self.socket.shutdown(Shutdown::Write); - } -} - -/// Describes a TCP server. -struct TcpListenerWrap { - id: Index, - socket: TcpListener, - on_connection: TcpListenerOnConnection, -} - -impl Resource for TcpListenerWrap {} - -#[allow(dead_code)] -/// Useful information about a TCP socket. -pub struct TcpSocketInfo { - pub id: Index, - pub host: SocketAddr, - pub remote: SocketAddr, -} - -/// Describes a callback that will run once after the Poll phase. -pub struct CheckWrap { - cb: Option, -} - -impl Resource for CheckWrap {} - -/// Describes a file-system watcher. -pub struct FsWatcherWrap { - pub inner: Option, - pub on_event: Option, - pub path: PathBuf, - pub recursive: bool, -} - -impl Resource for FsWatcherWrap {} - -#[allow(clippy::enum_variant_names)] -enum Action { - TimerReq(Index, TimerWrap), - TimerRemoveReq(Index), - SpawnReq(Index, Task, TaskWrap), - TcpConnectionReq(Index, TcpStreamWrap), - TcpListenReq(Index, TcpListenerWrap), - TcpWriteReq(Index, Vec, TcpOnWrite), - TcpReadStartReq(Index, TcpOnRead), - TcpCloseReq(Index, OnClose), - TcpShutdownReq(Index), - CheckReq(Index, CheckWrap), - CheckRemoveReq(Index), - FsEventStartReq(Index, FsWatcherWrap), - FsEventStopReq(Index), -} - -enum Event { - /// A thread-pool task has been completed. - ThreadPool(Index, TaskResult), - /// A network operation is available. - Network(TcpEvent), - /// A file-system change has been detected. - Watch(Index, FsEvent), -} - -#[derive(Debug)] -enum TcpEvent { - /// Socket is (probably) ready for reading. - Read(Index), - /// Socket is (probably) ready for writing. - Write(Index), -} - -/// An instance that knows how to handle fs events. -struct FsEventHandler { - id: Index, - waker: Arc, - sender: Arc>>, -} - -impl notify::EventHandler for FsEventHandler { - /// Handles an event. - fn handle_event(&mut self, event: notify::Result) { - // Notify the main thread about this fs event. - let event = Event::Watch(self.id, event.unwrap()); - - self.sender.lock().unwrap().send(event).unwrap(); - self.waker.wake().unwrap(); - } -} - -pub struct EventLoop { - index: Rc>, - resources: HashMap>, - timer_queue: BTreeMap, - action_queue: mpsc::Receiver, - action_queue_empty: Rc>, - action_dispatcher: Rc>, - check_queue: Vec, - close_queue: Vec<(Index, Option)>, - thread_pool: ThreadPool, - thread_pool_tasks: usize, - event_dispatcher: Arc>>, - event_queue: mpsc::Receiver, - network_events: Registry, - poll: Poll, - waker: Arc, -} - -//--------------------------------------------------------- -// PUBLICLY EXPOSED METHODS. -//--------------------------------------------------------- - -impl EventLoop { - /// Creates a new event-loop instance. - pub fn new(num_threads: usize) -> Self { - // Number of threads should always be a positive non-zero number. - assert!(num_threads > 0); - - let (action_dispatcher, action_queue) = mpsc::channel(); - let (event_dispatcher, event_queue) = mpsc::channel(); - - let thread_pool = ThreadPoolBuilder::new() - .num_threads(num_threads) - .build() - .unwrap(); - - let event_dispatcher = Arc::new(Mutex::new(event_dispatcher)); - - // Create network handles. - let poll = Poll::new().unwrap(); - let waker = Waker::new(poll.registry(), Token(0)).unwrap(); - let registry = poll.registry().try_clone().unwrap(); - - EventLoop { - index: Rc::new(Cell::new(1)), - resources: HashMap::new(), - timer_queue: BTreeMap::new(), - action_queue, - action_queue_empty: Rc::new(Cell::new(true)), - action_dispatcher: Rc::new(action_dispatcher), - check_queue: Vec::new(), - close_queue: Vec::new(), - thread_pool, - thread_pool_tasks: 0, - event_dispatcher, - event_queue, - poll, - network_events: registry, - waker: Arc::new(waker), - } - } - - /// Returns a new handle to the event-loop. - pub fn handle(&self) -> LoopHandle { - LoopHandle { - index: self.index.clone(), - actions: self.action_dispatcher.clone(), - actions_queue_empty: self.action_queue_empty.clone(), - } - } - - /// Returns a new interrupt-handle to the event-loop (sharable across threads). - pub fn interrupt_handle(&self) -> LoopInterruptHandle { - LoopInterruptHandle { - waker: self.waker.clone(), - } - } - - /// Returns if there are pending events still ongoing. - pub fn has_pending_events(&self) -> bool { - !(self.resources.is_empty() && self.action_queue_empty.get() && self.thread_pool_tasks == 0) - } - - /// Performs a single tick of the event-loop. - pub fn tick(&mut self) { - self.prepare(); - self.run_timers(); - self.run_poll(); - self.run_check(); - self.run_close(); - } -} - -//--------------------------------------------------------- -// EVENT LOOP PHASES. -//--------------------------------------------------------- - -impl EventLoop { - /// Drains the action_queue for requested async actions. - fn prepare(&mut self) { - while let Ok(action) = self.action_queue.try_recv() { - match action { - Action::TimerReq(index, timer) => self.timer_req(index, timer), - Action::TimerRemoveReq(index) => self.timer_remove_req(index), - Action::SpawnReq(index, task, t_wrap) => self.spawn_req(index, task, t_wrap), - Action::TcpConnectionReq(index, tc_wrap) => self.tcp_connection_req(index, tc_wrap), - Action::TcpListenReq(index, tc_wrap) => self.tcp_listen_req(index, tc_wrap), - Action::TcpWriteReq(index, data, cb) => self.tcp_write_req(index, data, cb), - Action::TcpReadStartReq(index, cb) => self.tcp_read_start_req(index, cb), - Action::TcpCloseReq(index, cb) => self.tcp_close_req(index, cb), - Action::TcpShutdownReq(index) => self.tcp_shutdown_req(index), - Action::CheckReq(index, cb) => self.check_req(index, cb), - Action::CheckRemoveReq(index) => self.check_remove_req(index), - Action::FsEventStartReq(index, w_wrap) => self.fs_event_start_req(index, w_wrap), - Action::FsEventStopReq(index) => self.fs_event_stop_req(index), - }; - } - self.action_queue_empty.set(true); - } - - /// Runs all expired timers. - fn run_timers(&mut self) { - // Note: We use this intermediate vector so we don't have Rust complaining - // about holding multiple references. - let timers_to_remove: Vec = self - .timer_queue - .range(..Instant::now()) - .map(|(k, _)| *k) - .collect(); - - let indexes: Vec = timers_to_remove - .iter() - .filter_map(|instant| self.timer_queue.remove(instant)) - .collect(); - - indexes.iter().for_each(|index| { - // Create a new event-loop handle to pass in timer's callback. - let handle = self.handle(); - - if let Some(timer) = self - .resources - .get_mut(index) - .map(|resource| resource.downcast_mut::().unwrap()) - { - // Run timer's callback. - (timer.cb)(handle); - - // If the timer is repeatable reschedule it, otherwise drop it. - if timer.repeat { - let time_key = Instant::now() + timer.expires_at; - self.timer_queue.insert(time_key, *index); - } else { - self.resources.remove(index); - } - } - }); - - self.prepare(); - } - - /// Polls for new I/O events (async-tasks, networking, etc). - fn run_poll(&mut self) { - // Based on what resources the event-loop is currently running will decide - // how long we should wait on the this phase. - let timeout = if self.has_pending_events() { - let refs = self.check_queue.len() + self.close_queue.len(); - match self.timer_queue.iter().next() { - _ if refs > 0 => Some(Duration::ZERO), - Some((t, _)) => Some(*t - Instant::now()), - None => None, - } - } else { - Some(Duration::ZERO) - }; - - let mut events = Events::with_capacity(1024); - - // Poll for new network events (this will block the thread). - if let Err(e) = self.poll.poll(&mut events, timeout) { - match e.kind() { - io::ErrorKind::Interrupted => return, - _ => panic!("{}", e), - }; - } - - for event in &events { - // Note: Token(0) is a special token signaling that someone woke us up. - if event.token() == Token(0) { - continue; - } - - let event_type = match ( - event.is_readable() || event.is_read_closed(), - event.is_writable(), - ) { - (true, false) => TcpEvent::Read(event.token().0 as u32), - (false, true) => TcpEvent::Write(event.token().0 as u32), - _ => continue, - }; - - self.event_dispatcher - .lock() - .unwrap() - .send(Event::Network(event_type)) - .unwrap(); - } - - while let Ok(event) = self.event_queue.try_recv() { - match event { - Event::ThreadPool(index, result) => self.task_complete(index, result), - Event::Watch(index, event) => self.fs_event(index, event), - Event::Network(tcp_event) => match tcp_event { - TcpEvent::Write(index) => self.tcp_socket_write(index), - TcpEvent::Read(index) => self.tcp_socket_read(index), - }, - } - self.prepare(); - } - } - - /// Runs all check callbacks. - fn run_check(&mut self) { - // Create a new event-loop handle. - let handle = self.handle(); - - for rid in self.check_queue.drain(..) { - // Remove resource from the event-loop. - let mut resource = match self.resources.remove(&rid) { - Some(resource) => resource, - None => continue, - }; - - if let Some(cb) = resource - .downcast_mut::() - .map(|wrap| wrap.cb.take().unwrap()) - { - // Run callback. - (cb)(handle.clone()); - } - } - self.prepare(); - } - - /// Cleans up `dying` resources. - fn run_close(&mut self) { - // Create a new event-loop handle. - let handle = self.handle(); - - // Clean up resources. - for (rid, on_close) in self.close_queue.drain(..) { - if let Some(mut resource) = self.resources.remove(&rid) { - resource.close(); - if let Some(cb) = on_close { - (cb)(handle.clone()); - } - } - } - self.prepare(); - } -} - -//--------------------------------------------------------- -// INTERNAL (AFTER) ASYNC OPERATION HANDLES. -//--------------------------------------------------------- - -impl EventLoop { - /// Runs callback of finished async task. - fn task_complete(&mut self, index: Index, result: TaskResult) { - if let Some(mut resource) = self.resources.remove(&index) { - let task_wrap = resource.downcast_mut::().unwrap(); - let callback = task_wrap.inner.take().unwrap(); - (callback)(self.handle(), result); - } - self.thread_pool_tasks -= 1; - } - - /// Tries to write to a (ready) TCP socket. - /// `ready` = the operation won't block the current thread. - fn tcp_socket_write(&mut self, index: Index) { - // Create a new handle. - let handle = self.handle(); - - // Try to get a reference to the resource. - let resource = match self.resources.get_mut(&index) { - Some(resource) => resource, - None => return, - }; - - // Cast resource to TcpStreamWrap. - let tcp_wrap = resource.downcast_mut::().unwrap(); - - // Check if the socket is in error state. - if let Ok(Some(e)) | Err(e) = tcp_wrap.socket.take_error() { - // If `on_connection` is available it means the socket error happened - // while trying to connect. - if let Some(on_connection) = tcp_wrap.on_connection.take() { - (on_connection)(handle, index, Result::Err(e.into())); - return; - } - // Otherwise the error happened while writing. - if let Some((_, on_write)) = tcp_wrap.write_queue.pop_front() { - (on_write)(handle, index, Result::Err(e.into())); - return; - } - } - - // Note: If the on_connection callback is None it means that in some - // previous iteration we made sure the TCP socket is well connected - // with the remote host. - - if let Some(on_connection) = tcp_wrap.on_connection.take() { - // Run socket's on_connection callback. - (on_connection)( - handle.clone(), - index, - Ok(TcpSocketInfo { - id: index, - host: tcp_wrap.socket.local_addr().unwrap(), - remote: tcp_wrap.socket.peer_addr().unwrap(), - }), - ); - - let token = Token(index as usize); - - self.network_events - .reregister(&mut tcp_wrap.socket, token, Interest::READABLE) - .unwrap(); - } - - loop { - // Due to loop ownership issues we need to clone the handle. - let handle = handle.clone(); - - // Connection is OK, let's write some bytes... - let (data, on_write) = match tcp_wrap.write_queue.pop_front() { - Some(value) => value, - None => break, - }; - - match tcp_wrap.socket.write(&data) { - // We want to write the entire `data` buffer in a single go. If we - // write less we'll return a short write error (same as - // `io::Write::write_all` does). - Ok(n) if n < data.len() => { - let err_message = io::ErrorKind::WriteZero.to_string(); - (on_write)(handle, index, Result::Err(anyhow!("{}", err_message))); - } - // All bytes were written to socket. - Ok(n) => (on_write)(handle, index, Result::Ok(n)), - // Would block "errors" are the OS's way of saying that the - // connection is not actually ready to perform this I/O operation. - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // Since we couldn't send this data we need to put it - // back into the write_queue. - tcp_wrap.write_queue.push_front((data, on_write)); - break; - } - Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, - // An important error seems to have accrued. - Err(e) => (on_write)(handle, index, Result::Err(e.into())), - }; - } - - // Unregister write interest if the write_queue is empty. - if tcp_wrap.write_queue.is_empty() { - let token = Token(tcp_wrap.id as usize); - self.network_events - .reregister(&mut tcp_wrap.socket, token, Interest::READABLE) - .unwrap(); - } - } - - /// Tries to read from a (ready) TCP socket. - /// `ready` = the operation won't block the current thread. - fn tcp_socket_read(&mut self, index: Index) { - // Create a new handle. - let handle = self.handle(); - - // Try to get a reference to the resource. - let resource = match self.resources.get_mut(&index) { - Some(resource) => resource, - None => return, - }; - - // Check if the TCP read event is really a TCP accept for some listener. - if resource.downcast_ref::().is_some() { - self.tcp_try_accept(index); - return; - } - - // Cast resource to TcpStreamWrap. - let tcp_wrap = resource.downcast_mut::().unwrap(); - - let mut data = vec![]; - let mut data_buf = [0; 4096]; - - // This will help us catch errors and FIN packets. - let mut read_error: Option = None; - let mut connection_closed = false; - - // We can (maybe) read from the connection. - loop { - match tcp_wrap.socket.read(&mut data_buf) { - // Reading 0 bytes means the other side has closed the - // connection or is done writing. - Ok(0) => { - connection_closed = true; - break; - } - Ok(n) => data.extend_from_slice(&data_buf[..n]), - // Would block "errors" are the OS's way of saying that the - // connection is not actually ready to perform this I/O operation. - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, - // Other errors we'll be considered fatal. - Err(e) => read_error = Some(e), - } - } - - // Note: If a FIN packet received without us listening on the TCP stream, it means - // that the other side closed the connection so we'll schedule the resource - // for removal. - - let on_read = match tcp_wrap.on_read.as_mut() { - Some(on_read) => on_read, - None if !connection_closed => return, - None => { - // Deregister any interest on that socket. - self.network_events - .deregister(&mut tcp_wrap.socket) - .unwrap(); - // Schedule resource clean-up. - self.close_queue.push((index, None)); - return; - } - }; - - // Check if we had any errors while reading. - if let Some(e) = read_error { - // Run on_read callback. - (on_read)(handle, index, Result::Err(e.into())); - return; - } - - match data.len() { - // FIN packet. - 0 => (on_read)(handle, index, Result::Ok(data)), - // We read some bytes. - _ if !connection_closed => (on_read)(handle, index, Result::Ok(data)), - // FIN packet is included to the bytes we read. - _ => { - (on_read)(handle.clone(), index, Result::Ok(data)); - (on_read)(handle, index, Result::Ok(vec![])); - } - }; - } - - /// Tries to accept a new TCP connection. - fn tcp_try_accept(&mut self, index: Index) { - // Create a new handle. - let handle = self.handle(); - - // Try to get a reference to the resource. - let resource = match self.resources.get_mut(&index) { - Some(resource) => resource, - None => return, - }; - - // Note: In case the downcast to TcpListenerWrap fails it means that the event - // fired by the network thread is not for a TCP accept. - - let tcp_wrap = match resource.downcast_mut::() { - Some(tcp_wrap) => tcp_wrap, - None => return, - }; - - let on_connection = tcp_wrap.on_connection.as_mut(); - let mut new_resources = vec![]; - - loop { - // Create a new handle. - let handle = handle.clone(); - - // Received an event for the TCP server socket, which indicates we can accept a connections. - let (socket, _) = match tcp_wrap.socket.accept() { - Ok(sock) => sock, - // If we get a `WouldBlock` error we know our - // listener has no more incoming connections queued, - // so we can return to polling and wait for some - // more. - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) => { - (on_connection)(handle, index, Result::Err(e.into())); - break; - } - }; - - // Create a new ID for the socket. - let id = handle.index(); - - // Create a TCP wrap from the raw socket. - let mut stream = TcpStreamWrap { - id, - socket, - on_connection: None, - on_read: None, - write_queue: LinkedList::new(), - }; - - (on_connection)( - handle, - id, - Ok(TcpSocketInfo { - id, - host: stream.socket.local_addr().unwrap(), - remote: stream.socket.peer_addr().unwrap(), - }), - ); - - // Initialize socket with a READABLE event. - self.network_events - .register(&mut stream.socket, Token(id as usize), Interest::READABLE) - .unwrap(); - - new_resources.push((id, Box::new(stream))); - } - - // Register the new TCP streams to the event-loop. - for (id, stream) in new_resources.drain(..) { - self.resources.insert(id, stream); - } - } - - /// Runs callback referring to specific fs event. - fn fs_event(&mut self, index: Index, event: FsEvent) { - // Try to get a reference to the resource. - let handle = self.handle(); - let resource = match self.resources.get_mut(&index) { - Some(resource) => resource, - None => return, - }; - - // Get a mut reference to the callback. - let on_event = match resource.downcast_mut::() { - Some(w_wrap) => w_wrap.on_event.as_mut().unwrap(), - None => return, - }; - - // Run watcher's cb. - (on_event)(handle, event); - } -} - -//--------------------------------------------------------- -// INTERNAL (SCHEDULING) ASYNC OPERATION HANDLES. -//--------------------------------------------------------- - -impl EventLoop { - /// Schedules a new timer. - fn timer_req(&mut self, index: Index, timer: TimerWrap) { - let time_key = Instant::now() + timer.expires_at; - self.resources.insert(index, Box::new(timer)); - self.timer_queue.insert(time_key, index); - } - - /// Removes an existed timer. - fn timer_remove_req(&mut self, index: Index) { - self.resources.remove(&index); - self.timer_queue.retain(|_, v| *v != index); - } - - /// Spawns a new task to the thread-pool. - fn spawn_req( - &mut self, - index: Index, - task: Box TaskResult + Send>, - task_wrap: TaskWrap, - ) { - let notifier = self.event_dispatcher.clone(); - - if task_wrap.inner.is_some() { - self.resources.insert(index, Box::new(task_wrap)); - } - - self.thread_pool.spawn({ - let waker = self.waker.clone(); - move || { - let result = (task)(); - let notifier = notifier.lock().unwrap(); - - notifier.send(Event::ThreadPool(index, result)).unwrap(); - waker.wake().unwrap(); - } - }); - - self.thread_pool_tasks += 1; - } - - /// Registers interest for connecting to a TCP socket. - fn tcp_connection_req(&mut self, index: Index, mut tcp_wrap: TcpStreamWrap) { - // When we create a new TCP socket connection we have to make sure - // it's well connected with the remote host. - // - // See https://docs.rs/mio/0.8.4/mio/net/struct.TcpStream.html#notes - let socket = &mut tcp_wrap.socket; - let token = Token(tcp_wrap.id as usize); - - self.network_events - .register(socket, token, Interest::WRITABLE) - .unwrap(); - - self.resources.insert(index, Box::new(tcp_wrap)); - } - - /// Registers the TCP listener to the event-loop. - fn tcp_listen_req(&mut self, index: Index, mut tcp_wrap: TcpListenerWrap) { - let listener = &mut tcp_wrap.socket; - let token = Token(tcp_wrap.id as usize); - - self.network_events - .register(listener, token, Interest::READABLE) - .unwrap(); - - self.resources.insert(index, Box::new(tcp_wrap)); - } - - /// Registers interest for writing to an open TCP socket. - fn tcp_write_req(&mut self, index: Index, data: Vec, on_write: TcpOnWrite) { - let resource = match self.resources.get_mut(&index) { - Some(resource) => resource, - None => return, - }; - - // Cast resource to TcpStreamWrap. - let tcp_wrap = resource.downcast_mut::().unwrap(); - let token = Token(index as usize); - - // Push data to socket's write queue. - tcp_wrap.write_queue.push_back((data, on_write)); - - let interest = Interest::READABLE.add(Interest::WRITABLE); - - self.network_events - .reregister(&mut tcp_wrap.socket, token, interest) - .unwrap(); - } - - /// Registers interest for reading of an open TCP socket. - fn tcp_read_start_req(&mut self, index: Index, on_read: TcpOnRead) { - let resource = match self.resources.get_mut(&index) { - Some(resource) => resource, - None => return, - }; - - // Cast resource to TcpStreamWrap. - let tcp_wrap = resource.downcast_mut::().unwrap(); - let token = Token(index as usize); - - // Register the on_read callback. - tcp_wrap.on_read = Some(on_read); - - let interest = match tcp_wrap.write_queue.len() { - 0 => Interest::READABLE, - _ => Interest::READABLE.add(Interest::WRITABLE), - }; - - self.network_events - .reregister(&mut tcp_wrap.socket, token, interest) - .unwrap(); - } - - /// Schedules a TCP socket shutdown. - fn tcp_close_req(&mut self, index: Index, on_close: Box) { - // Schedule resource for graceful shutdown and removal. - self.close_queue.push((index, Some(on_close))); - } - - /// Closes the write side of the stream. - fn tcp_shutdown_req(&mut self, index: Index) { - // Get resource by it's ID. - let resource = match self.resources.get_mut(&index) { - Some(resource) => resource, - None => return, - }; - - // Cast resource to TcpStreamWrap. - resource.downcast_mut::().unwrap().close(); - } - - /// Schedules a new check callback. - fn check_req(&mut self, index: Index, check_wrap: CheckWrap) { - // Add the check_wrap to the event loop. - self.resources.insert(index, Box::new(check_wrap)); - self.check_queue.push(index); - } - - /// Removes a check callback from the event-loop. - fn check_remove_req(&mut self, index: Index) { - self.resources.remove(&index); - self.check_queue.retain(|v| *v != index); - } - - /// Subscribes a new fs watcher to the event-loop. - fn fs_event_start_req(&mut self, index: Index, mut wrap: FsWatcherWrap) { - // Create an appropriate watcher for the current system. - let mut watcher = RecommendedWatcher::new( - FsEventHandler { - waker: self.waker.clone(), - sender: self.event_dispatcher.clone(), - id: index, - }, - notify::Config::default(), - ) - .unwrap(); - - let recursive_mode = match wrap.recursive { - true => RecursiveMode::Recursive, - _ => RecursiveMode::NonRecursive, - }; - - // Start watching requested path(s). - watcher.watch(&wrap.path, recursive_mode).unwrap(); - - wrap.inner = Some(watcher); - self.resources.insert(index, Box::new(wrap)); - } - - /// Stops an fs watcher and removes it from the event-loop. - fn fs_event_stop_req(&mut self, index: Index) { - self.resources.remove(&index); - } -} - -impl Default for EventLoop { - fn default() -> Self { - let default_pool_size = unsafe { NonZeroUsize::new_unchecked(4) }; - let num_cores = thread::available_parallelism().unwrap_or(default_pool_size); - - Self::new(num_cores.into()) - } -} - -#[derive(Clone)] -pub struct LoopHandle { - index: Rc>, - actions: Rc>, - actions_queue_empty: Rc>, -} - -#[allow(dead_code)] -impl LoopHandle { - /// Returns the next available resource index. - pub fn index(&self) -> Index { - let index = self.index.get(); - self.index.set(index + 1); - index - } - - /// Schedules a new timer to the event-loop. - pub fn timer(&self, delay: u64, repeat: bool, cb: F) -> Index - where - F: FnMut(LoopHandle) + 'static, - { - let index = self.index(); - let expires_at = Duration::from_millis(delay); - - let timer = TimerWrap { - cb: Box::new(cb), - expires_at, - repeat, - }; - - self.actions.send(Action::TimerReq(index, timer)).unwrap(); - self.actions_queue_empty.set(false); - - index - } - - /// Removes a scheduled timer from the event-loop. - pub fn remove_timer(&self, index: &Index) { - self.actions.send(Action::TimerRemoveReq(*index)).unwrap(); - self.actions_queue_empty.set(false); - } - - /// Spawns a new task without blocking the main thread. - pub fn spawn(&self, task: F, task_cb: Option) -> Index - where - F: FnOnce() -> TaskResult + Send + 'static, - U: FnOnce(LoopHandle, TaskResult) + 'static, - { - let index = self.index(); - - // Note: I tried to use `.and_then` instead of this ugly match statement but Rust complains - // about mismatch types having no idea why. - let task_cb: Option> = match task_cb { - Some(cb) => Some(Box::new(cb)), - None => None, - }; - - let task_wrap = TaskWrap { inner: task_cb }; - - self.actions - .send(Action::SpawnReq(index, Box::new(task), task_wrap)) - .unwrap(); - - self.actions_queue_empty.set(false); - - index - } - - /// Creates a new TCP stream and issue a non-blocking connect to the specified address. - pub fn tcp_connect(&self, address: &str, on_connection: F) -> Result - where - F: FnOnce(LoopHandle, Index, Result) + 'static, - { - // Create a SocketAddr from the provided string. - let address: SocketAddr = address.parse()?; - let index = self.index(); - - // Connect the stream. - let socket = TcpStream::connect(address)?; - - let stream = TcpStreamWrap { - id: index, - socket, - on_connection: Some(Box::new(on_connection)), - on_read: None, - write_queue: LinkedList::new(), - }; - - self.actions - .send(Action::TcpConnectionReq(index, stream)) - .unwrap(); - - self.actions_queue_empty.set(false); - - Ok(index) - } - - /// Starts listening for incoming connections. - pub fn tcp_listen(&self, host: &str, on_connection: F) -> Result - where - F: FnMut(LoopHandle, Index, Result) + 'static, - { - // Create a SocketAddr from the provided host. - let address: SocketAddr = host.parse()?; - let index = self.index(); - - // Bind address to the socket. - let socket = TcpListener::bind(address)?; - - let listener = TcpListenerWrap { - id: index, - socket, - on_connection: Box::new(on_connection), - }; - - self.actions - .send(Action::TcpListenReq(index, listener)) - .unwrap(); - - self.actions_queue_empty.set(false); - - Ok(index) - } - - /// Writes bytes to an open TCP socket. - pub fn tcp_write(&self, index: Index, data: &[u8], on_write: F) - where - F: FnOnce(LoopHandle, Index, Result) + 'static, - { - self.actions - .send(Action::TcpWriteReq( - index, - data.to_vec(), - Box::new(on_write), - )) - .unwrap(); - - self.actions_queue_empty.set(false); - } - - /// Starts reading from an open socket. - pub fn tcp_read_start(&self, index: Index, on_read: F) - where - F: FnMut(LoopHandle, Index, Result>) + 'static, - { - self.actions - .send(Action::TcpReadStartReq(index, Box::new(on_read))) - .unwrap(); - - self.actions_queue_empty.set(false); - } - - /// Closes an open TCP socket. - pub fn tcp_close(&self, index: Index, on_close: F) - where - F: FnOnce(LoopHandle) + 'static, - { - self.actions - .send(Action::TcpCloseReq(index, Box::new(on_close))) - .unwrap(); - - self.actions_queue_empty.set(false); - } - - /// Closes the write side of the TCP stream. - pub fn tcp_shutdown(&self, index: Index) { - self.actions.send(Action::TcpShutdownReq(index)).unwrap(); - self.actions_queue_empty.set(false); - } - - /// Schedules a new check callback. - pub fn check(&self, on_check: F) -> Index - where - F: FnOnce(LoopHandle) + 'static, - { - let index = self.index(); - let on_check = Box::new(on_check); - - self.actions - .send(Action::CheckReq(index, CheckWrap { cb: Some(on_check) })) - .unwrap(); - - self.actions_queue_empty.set(false); - - index - } - - /// Removes a check callback from the event-loop. - pub fn remove_check(&self, index: &Index) { - self.actions.send(Action::CheckRemoveReq(*index)).unwrap(); - self.actions_queue_empty.set(false); - } - - /// Creates a watcher that will watch the specified path for changes. - pub fn fs_event_start(&self, path: P, recursive: bool, on_event: F) -> Result - where - F: FnMut(LoopHandle, FsEvent) + 'static, - P: AsRef, - { - let index = self.index(); - let on_event = Box::new(on_event); - - // Check if path exists. - std::fs::metadata(path.as_ref())?; - - // Note: We don't have access to internal mpsc channels so will - // create the watcher at a later stage. - let watcher_wrap = FsWatcherWrap { - inner: None, - on_event: Some(on_event), - path: path.as_ref().to_path_buf(), - recursive, - }; - - self.actions - .send(Action::FsEventStartReq(index, watcher_wrap)) - .unwrap(); - - self.actions_queue_empty.set(false); - - Ok(index) - } - - /// Stops watch handle, the callback will no longer be called. - pub fn fs_event_stop(&self, index: &Index) { - self.actions.send(Action::FsEventStopReq(*index)).unwrap(); - self.actions_queue_empty.set(false); - } -} - -#[derive(Clone)] -pub struct LoopInterruptHandle { - waker: Arc, -} - -impl LoopInterruptHandle { - // Interrupts the poll phase of the event-loop. - pub fn interrupt(&self) { - self.waker.wake().unwrap(); - } -} diff --git a/src/file.rs b/src/file.rs index 5bff4eb..2a7f77c 100644 --- a/src/file.rs +++ b/src/file.rs @@ -5,14 +5,14 @@ use crate::bindings::set_function_to; use crate::bindings::set_internal_ref; use crate::bindings::set_property_to; use crate::bindings::throw_exception; -use crate::event_loop::FsEvent; -use crate::event_loop::LoopHandle; -use crate::event_loop::TaskResult; use crate::runtime::JsFuture; use crate::runtime::JsRuntime; use anyhow::anyhow; use anyhow::bail; use anyhow::Result; +use dune_event_loop::FsEvent; +use dune_event_loop::LoopHandle; +use dune_event_loop::TaskResult; use notify::EventKind; use serde::Deserialize; use serde::Serialize; diff --git a/src/hooks.rs b/src/hooks.rs index da4e38f..5733ee7 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -1,14 +1,14 @@ use crate::bindings::set_exception_code; use crate::bindings::throw_type_error; use crate::errors::unwrap_or_exit; -use crate::event_loop::LoopHandle; -use crate::event_loop::TaskResult; use crate::modules::load_import; use crate::modules::resolve_import; use crate::modules::EsModuleFuture; use crate::modules::ModuleGraph; use crate::modules::ModuleStatus; use crate::runtime::JsRuntime; +use dune_event_loop::LoopHandle; +use dune_event_loop::TaskResult; use std::cell::RefCell; use std::rc::Rc; diff --git a/src/inspector.rs b/src/inspector.rs index 382246a..09dd97e 100644 --- a/src/inspector.rs +++ b/src/inspector.rs @@ -5,7 +5,6 @@ use crate::errors::generic_error; use crate::errors::unwrap_or_exit; -use crate::event_loop::LoopInterruptHandle; use axum::extract::ws::Message; use axum::extract::ws::WebSocket; use axum::extract::ws::WebSocketUpgrade; @@ -14,6 +13,7 @@ use axum::response::IntoResponse; use axum::routing::get; use axum::Json; use axum::Router; +use dune_event_loop::LoopInterruptHandle; use futures::sink::SinkExt; use futures::stream::StreamExt; use serde::Serialize; diff --git a/src/js/main.js b/src/js/main.js index 5b8b2ce..68c50f6 100644 --- a/src/js/main.js +++ b/src/js/main.js @@ -1,7 +1,7 @@ +import process from 'process'; import timers from 'timers'; import fetch from '@web/fetch'; import structuredClone from '@web/clone'; -import { cloneFunction } from 'util'; import { Console, prompt, wrapConsole } from 'console'; import { AbortController, AbortSignal } from '@web/abort'; import { TextEncoder, TextDecoder } from '@web/text_encoding'; @@ -15,73 +15,6 @@ function makeGlobal(name, value) { globalThis[name] = value; } -// Note: Adding a caching layer to `process.binding` allows us to not -// cross the JavaScript <-> Rust bridge every time we need native methods. - -let cache = new Map(); -let internalBinding = cloneFunction(process.binding); - -process.binding = (name) => { - // Check bindings cache. - if (cache.has(name)) return cache.get(name); - - // Load binding (from rust), save it to cache. - const binding = internalBinding(name); - cache.set(name, binding); - - return binding; -}; - -const kill = cloneFunction(process.kill); - -process.kill = (pid, signal = 'SIGKILL') => { - // Check arguments. - if (!pid || Number.isNaN(Number.parseInt(pid))) { - throw new TypeError(`The "pid" argument must be of type number.`); - } - kill(pid, signal); -}; - -const nextTick = cloneFunction(process.nextTick); - -process.nextTick = (callback, ...args) => { - // Check if callback is a valid function. - if (typeof callback !== 'function') { - throw new TypeError(`The "callback" argument must be of type function.`); - } - nextTick(() => callback(...args)); -}; - -/* Setting up STDOUT, STDIN and STDERR streams. */ - -Object.defineProperty(process, 'stdout', { - get() { - return { - write: process.binding('stdio').write, - end() {}, - }; - }, - configurable: true, -}); - -Object.defineProperty(process, 'stdin', { - get() { - return { - read: process.binding('stdio').read, - }; - }, - configurable: true, -}); - -Object.defineProperty(process, 'stderr', { - get() { - return { - write: process.binding('stdio').writeError, - }; - }, - configurable: true, -}); - const console = new Console(); const consoleFromV8 = globalThis['console']; @@ -89,6 +22,7 @@ wrapConsole(console, consoleFromV8); /* Initialize global environment for user script */ +makeGlobal('process', process); makeGlobal('console', console); makeGlobal('prompt', prompt); diff --git a/src/js/process.js b/src/js/process.js index cf7ed64..961e7ce 100644 --- a/src/js/process.js +++ b/src/js/process.js @@ -1,5 +1,122 @@ -'use strict'; +import { EventEmitter } from 'events'; +import { cloneFunction as clone } from 'util'; -// Re-export process as a native module. +const cache = new Map(); + +const internalBinding = clone(process.binding); +const kill = clone(process.kill); +const nextTick = clone(process.nextTick); + +// Note: Integrating a caching layer into process.binding enables us +// to avoid traversing the JavaScript - Rust bridge for native method +// access on every occasion. +process.binding = (name) => { + // Check bindings cache. + if (cache.has(name)) return cache.get(name); + + // Load binding (from rust), save it to cache. + const binding = internalBinding(name); + cache.set(name, binding); + + return binding; +}; + +process.kill = (pid, signal = 'SIGKILL') => { + // Check arguments. + if (!pid || Number.isNaN(Number.parseInt(pid))) { + throw new TypeError(`The "pid" argument must be of type number.`); + } + kill(pid, signal); +}; + +process.nextTick = (callback, ...args) => { + // Check if callback is a valid function. + if (typeof callback !== 'function') { + throw new TypeError(`The "callback" argument must be of type function.`); + } + nextTick(() => callback(...args)); +}; + +function defineStream(name, getter) { + Object.defineProperty(process, name, { + get: getter, + configurable: true, + enumerable: true, + }); +} + +const io = process.binding('stdio'); + +defineStream('stdout', () => ({ + write: io.write, + end() {}, +})); + +defineStream('stdin', () => ({ + read: io.read, +})); + +defineStream('stderr', () => ({ + write: io.writeError, +})); + +const os = process.binding('signals'); + +// Note: To transform the process object, initialized in Rust, into +// an event emitter, we must manually instantiate the object fields +// and extend the prototype. +process._events = {}; +process._eventsCount = 0; + +Object.setPrototypeOf(process, EventEmitter.prototype); + +const activeSignals = new Map(); +const isSignal = (type) => os.signals.includes(type); + +const signalEmitFunction = (type) => { + process.emit(type); + if (process.listenerCount(type) === 0) { + stopListeningIfNoListener(type); + } +}; + +function startListeningIfSignal(type) { + // Check if the type is a valid signal. + if (!isSignal(type) || activeSignals.has(type)) return; + + // Define a handler function for the signal. + const callback = signalEmitFunction.bind(this, type); + const signal = os.startSignal(type, callback); + + activeSignals.set(type, signal); +} + +function stopListeningIfNoListener(type) { + // Retrieve the internal ID of the signal. + const signal = activeSignals.get(type); + + // Remove the signal. + if (signal && process.listenerCount(type) === 0) { + os.cancelSignal(signal); + activeSignals.delete(type); + } +} + +// Note: To ensure the full functionality, it's essential to 'override' +// specific methods inherited from the EventEmitter prototype. + +for (const method of ['on', 'once']) { + process[method] = (event, ...args) => { + EventEmitter.prototype[method].call(process, event, ...args); + startListeningIfSignal(event); + }; +} + +for (const method of ['removeListener', 'removeAllListeners']) { + process[method] = (event, ...args) => { + EventEmitter.prototype[method].call(process, event, ...args); + stopListeningIfNoListener(event); + }; +} export default process; diff --git a/src/main.rs b/src/main.rs index 3477c73..8a3f118 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ mod cli; mod dns; mod dotenv; mod errors; -mod event_loop; mod file; mod hooks; mod http_parser; @@ -16,6 +15,7 @@ mod process; mod promise; mod repl; mod runtime; +mod signals; mod stdio; mod timers; mod tools; diff --git a/src/modules.rs b/src/modules.rs index 00110f0..d9af38f 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -1,8 +1,6 @@ use crate::errors::generic_error; use crate::errors::unwrap_or_exit; use crate::errors::JsError; -use crate::event_loop::LoopHandle; -use crate::event_loop::TaskResult; use crate::loaders::CoreModuleLoader; use crate::loaders::FsModuleLoader; use crate::loaders::ModuleLoader; @@ -12,6 +10,8 @@ use crate::runtime::JsRuntime; use anyhow::anyhow; use anyhow::Error; use anyhow::Result; +use dune_event_loop::LoopHandle; +use dune_event_loop::TaskResult; use lazy_static::lazy_static; use regex::Regex; use serde_json::Value; diff --git a/src/net.rs b/src/net.rs index 3aafc3f..89d16e9 100644 --- a/src/net.rs +++ b/src/net.rs @@ -2,12 +2,12 @@ use crate::bindings::set_exception_code; use crate::bindings::set_function_to; use crate::bindings::set_property_to; use crate::errors::JsError; -use crate::event_loop::Index; -use crate::event_loop::LoopHandle; -use crate::event_loop::TcpSocketInfo; use crate::runtime::JsFuture; use crate::runtime::JsRuntime; use anyhow::Result; +use dune_event_loop::Index; +use dune_event_loop::LoopHandle; +use dune_event_loop::TcpSocketInfo; use std::net::IpAddr; use std::rc::Rc; diff --git a/src/process.rs b/src/process.rs index b230b32..979a144 100644 --- a/src/process.rs +++ b/src/process.rs @@ -31,25 +31,12 @@ pub fn initialize<'s>( // This represents the global `process` object. let process = create_object_under(scope, global, "process"); - // `process.cwd()` - current working directory. set_function_to(scope, process, "cwd", cwd); - - // `process.exit([code])` - exits the program with the given code. set_function_to(scope, process, "exit", exit); - - // `process.memoryUsage()` - an object describing the memory usage. set_function_to(scope, process, "memoryUsage", memory_usage); - - // `process.nextTick()` - adds callback to the "next tick queue". set_function_to(scope, process, "nextTick", next_tick); - - // `process.uptime()` - a number describing the amount of time (in seconds) the process is running. set_function_to(scope, process, "uptime", uptime); - - // `process.kill()` - sends the signal to the process identified by pid. set_function_to(scope, process, "kill", kill); - - // `process.binding()` - exposes native modules to JavaScript. set_function_to(scope, process, "binding", bind); process diff --git a/src/runtime.rs b/src/runtime.rs index 3b6bde7..8c1c3c6 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,10 +1,6 @@ use crate::bindings; use crate::errors::unwrap_or_exit; use crate::errors::JsError; -use crate::event_loop::EventLoop; -use crate::event_loop::LoopHandle; -use crate::event_loop::LoopInterruptHandle; -use crate::event_loop::TaskResult; use crate::hooks::host_import_module_dynamically_cb; use crate::hooks::host_initialize_import_meta_object_cb; use crate::hooks::module_resolve_cb; @@ -24,6 +20,10 @@ use crate::process; use anyhow::bail; use anyhow::Error; use anyhow::Ok; +use dune_event_loop::EventLoop; +use dune_event_loop::LoopHandle; +use dune_event_loop::LoopInterruptHandle; +use dune_event_loop::TaskResult; use std::cell::RefCell; use std::cmp; use std::collections::HashMap; diff --git a/src/signals.rs b/src/signals.rs new file mode 100644 index 0000000..9cd5c77 --- /dev/null +++ b/src/signals.rs @@ -0,0 +1,150 @@ +use crate::bindings::set_function_to; +use crate::bindings::set_property_to; +use crate::bindings::throw_exception; +use crate::errors::JsError; +use crate::runtime::JsFuture; +use crate::runtime::JsRuntime; +use anyhow::anyhow; +use dune_event_loop::LoopHandle; +use dune_event_loop::Signal; +use std::rc::Rc; + +#[cfg(windows)] +const SIGNALS: [(&str, i32); 6] = [ + ("SIGABRT", Signal::SIGABRT), + ("SIGFPE", Signal::SIGFPE), + ("SIGILL", Signal::SIGILL), + ("SIGINT", Signal::SIGINT), + ("SIGSEGV", Signal::SIGSEGV), + ("SIGTERM", Signal::SIGTERM), +]; + +#[cfg(not(windows))] +const SIGNALS: [(&str, i32); 29] = [ + ("SIGABRT", Signal::SIGABRT), + ("SIGALRM", Signal::SIGALRM), + ("SIGBUS", Signal::SIGBUS), + ("SIGCHLD", Signal::SIGCHLD), + ("SIGCONT", Signal::SIGCONT), + ("SIGFPE", Signal::SIGFPE), + ("SIGHUP", Signal::SIGHUP), + ("SIGILL", Signal::SIGILL), + ("SIGINT", Signal::SIGINT), + ("SIGIO", Signal::SIGIO), + ("SIGKILL", Signal::SIGKILL), + ("SIGPIPE", Signal::SIGPIPE), + ("SIGPROF", Signal::SIGPROF), + ("SIGQUIT", Signal::SIGQUIT), + ("SIGSEGV", Signal::SIGSEGV), + ("SIGSTOP", Signal::SIGSTOP), + ("SIGSYS", Signal::SIGSYS), + ("SIGTERM", Signal::SIGTERM), + ("SIGTRAP", Signal::SIGTRAP), + ("SIGTSTP", Signal::SIGTSTP), + ("SIGTTIN", Signal::SIGTTIN), + ("SIGTTOU", Signal::SIGTTOU), + ("SIGURG", Signal::SIGURG), + ("SIGUSR1", Signal::SIGUSR1), + ("SIGUSR2", Signal::SIGUSR2), + ("SIGVTALRM", Signal::SIGVTALRM), + ("SIGWINCH", Signal::SIGWINCH), + ("SIGXCPU", Signal::SIGXCPU), + ("SIGXFSZ", Signal::SIGXFSZ), +]; + +pub fn initialize(scope: &mut v8::HandleScope) -> v8::Global { + // Create local JS object. + let target = v8::Object::new(scope); + let signals = v8::Array::new(scope, SIGNALS.len() as i32); + + set_function_to(scope, target, "startSignal", start_signal); + set_function_to(scope, target, "cancelSignal", cancel_signal); + + // Create a JS array containing the available signals. + SIGNALS.iter().enumerate().for_each(|(i, (signal, _))| { + let index = i as u32; + let signal = v8::String::new(scope, signal).unwrap(); + signals.set_index(scope, index, signal.into()).unwrap(); + }); + + set_property_to(scope, target, "signals", signals.into()); + + // Return v8 global handle. + v8::Global::new(scope, target) +} + +struct SignalFuture(Rc>); + +impl JsFuture for SignalFuture { + fn run(&mut self, scope: &mut v8::HandleScope) { + let undefined = v8::undefined(scope).into(); + let callback = v8::Local::new(scope, (*self.0).clone()); + let tc_scope = &mut v8::TryCatch::new(scope); + + callback.call(tc_scope, undefined, &[]); + + // On exception, report it and exit. + if tc_scope.has_caught() { + let exception = tc_scope.exception().unwrap(); + let exception = JsError::from_v8_exception(tc_scope, exception, None); + println!("{exception:?}"); + std::process::exit(1); + } + } +} + +/// Registers a signal listener to the event-loop. +fn start_signal( + scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + mut rv: v8::ReturnValue, +) { + // Get signal type from javascript. + let signal_type = args.get(0).to_rust_string_lossy(scope); + let signal_type = match SIGNALS + .iter() + .find(|(signal, _)| *signal == signal_type.as_str()) + { + Some((_, signum)) => signum.to_owned(), + None => { + let exception = anyhow!("Invalid signal provided."); + throw_exception(scope, &exception); + return; + } + }; + + // Get signal's listener handler. + let callback = v8::Local::::try_from(args.get(1)).unwrap(); + let callback = Rc::new(v8::Global::new(scope, callback)); + + let state_rc = JsRuntime::state(scope); + + let signal_cb = { + let state_rc = state_rc.clone(); + move |_: LoopHandle, _: i32| { + let mut state = state_rc.borrow_mut(); + let future = SignalFuture(Rc::clone(&callback)); + state.pending_futures.push(Box::new(future)); + } + }; + + // Schedule a new signal listener to the event-loop. + let state = state_rc.borrow(); + let id = state.handle.signal_start(signal_type, signal_cb).unwrap(); + + // Return timeout's internal id. + rv.set(v8::Number::new(scope, id as f64).into()); +} + +/// Removes a signal listener to the event-loop. +fn cancel_signal( + scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + _: v8::ReturnValue, +) { + // Get handlers internal token. + let id = args.get(0).int32_value(scope).unwrap() as u32; + let state_rc = JsRuntime::state(scope); + + state_rc.borrow().handle.signal_stop(&id); +} diff --git a/src/timers.rs b/src/timers.rs index 260a5bc..0864946 100644 --- a/src/timers.rs +++ b/src/timers.rs @@ -1,8 +1,8 @@ use crate::bindings::set_function_to; use crate::errors::JsError; -use crate::event_loop::LoopHandle; use crate::runtime::JsFuture; use crate::runtime::JsRuntime; +use dune_event_loop::LoopHandle; use std::rc::Rc; pub fn initialize(scope: &mut v8::HandleScope) -> v8::Global {