diff --git a/crates/test-util/src/wast.rs b/crates/test-util/src/wast.rs index 5408ee33feb4..a40f95cfbf64 100644 --- a/crates/test-util/src/wast.rs +++ b/crates/test-util/src/wast.rs @@ -185,9 +185,13 @@ fn component_test_config(test: &Path) -> TestConfig { if let Some(parent) = test.parent() { if parent.ends_with("async") - || ["trap-in-post-return.wast"] - .into_iter() - .any(|name| Some(name) == test.file_name().and_then(|s| s.to_str())) + || [ + "trap-in-post-return.wast", + "resources.wast", + "multiple-resources.wast", + ] + .into_iter() + .any(|name| Some(name) == test.file_name().and_then(|s| s.to_str())) { ret.component_model_async = Some(true); ret.component_model_async_stackful = Some(true); diff --git a/crates/wasmtime/src/config.rs b/crates/wasmtime/src/config.rs index 0b868f9a9037..435c4333ba16 100644 --- a/crates/wasmtime/src/config.rs +++ b/crates/wasmtime/src/config.rs @@ -3009,6 +3009,23 @@ impl Config { self.shared_memory = enable; self } + + #[cfg(feature = "component-model")] + #[inline] + pub(crate) fn cm_concurrency_enabled(&self) -> bool { + cfg!(feature = "component-model-async") + && (self.enabled_features.contains(WasmFeatures::CM_ASYNC) + || self + .enabled_features + .contains(WasmFeatures::CM_ASYNC_BUILTINS) + || self + .enabled_features + .contains(WasmFeatures::CM_ASYNC_STACKFUL) + || self.enabled_features.contains(WasmFeatures::CM_THREADING) + || self + .enabled_features + .contains(WasmFeatures::CM_ERROR_CONTEXT)) + } } impl Default for Config { diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 1af58bb64f30..52f5227d0503 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -55,6 +55,7 @@ use crate::component::{ HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance, }; use crate::fiber::{self, StoreFiber, StoreFiberYield}; +use crate::prelude::*; use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken}; use crate::vm::component::{CallContext, ComponentInstance, HandleTable, ResourceTables}; use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; @@ -891,6 +892,10 @@ impl Store { where T: Send + 'static, { + ensure!( + self.as_context().0.cm_concurrency_enabled(), + "cannot use `run_concurrent` without enabling component-model async" + ); self.as_context_mut().run_concurrent(fun).await } @@ -987,8 +992,6 @@ impl StoreContextMut<'_, T> { /// /// # Store-blocking behavior /// - /// - /// /// At this time there are certain situations in which the `Future` returned /// by the `AsyncFnOnce` passed to this function will not be polled for an /// extended period of time, despite one or more `Waker::wake` events having @@ -1059,6 +1062,10 @@ impl StoreContextMut<'_, T> { where T: Send + 'static, { + ensure!( + self.0.cm_concurrency_enabled(), + "cannot use `run_concurrent` without enabling component-model async" + ); self.do_run_concurrent(fun, false).await } @@ -1080,6 +1087,7 @@ impl StoreContextMut<'_, T> { where T: Send + 'static, { + debug_assert!(self.0.cm_concurrency_enabled()); check_recursive_run(); let token = StoreToken::new(self.as_context_mut()); @@ -1390,7 +1398,8 @@ impl StoreOpaque { /// - The top-level instance is not already on the current task's call stack. /// - The instance is not in need of a post-return function call. /// - `self` has not been poisoned due to a trap. - pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool { + pub(crate) fn may_enter_concurrent(&mut self, instance: RuntimeInstance) -> bool { + debug_assert!(self.cm_concurrency_enabled()); let state = self.concurrent_state_mut(); if let Some(caller) = state.guest_thread { instance != state.get_mut(caller.task).unwrap().instance @@ -1512,7 +1521,8 @@ impl StoreOpaque { .set_task_may_block(may_block) } - pub(crate) fn check_blocking(&mut self) -> Result<()> { + pub(crate) fn check_blocking_concurrent(&mut self) -> Result<()> { + debug_assert!(self.cm_concurrency_enabled()); let state = self.concurrent_state_mut(); let task = state.guest_thread.unwrap().task; let instance = state.get_mut(task).unwrap().instance.instance; diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index f1fa44d110dd..a352e89f805a 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -12,7 +12,7 @@ use crate::component::{ use crate::store::{StoreOpaque, StoreToken}; use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState}; use crate::vm::{AlwaysMut, VMStore}; -use crate::{AsContextMut, StoreContextMut, ValRaw}; +use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw}; use crate::{ Error, Result, bail, error::{Context as _, format_err}, @@ -1123,6 +1123,8 @@ impl FutureReader { where T: func::Lower + func::Lift + Send + Sync + 'static, { + assert!(store.as_context().0.cm_concurrency_enabled()); + struct Producer

(P); impl> StreamProducer @@ -1450,6 +1452,11 @@ where { /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`. pub fn new(accessor: A, reader: FutureReader) -> Self { + assert!( + accessor + .as_accessor() + .with(|a| a.as_context().0.cm_concurrency_enabled()) + ); Self { reader: Some(reader), accessor, @@ -1503,6 +1510,7 @@ impl StreamReader { where T: func::Lower + func::Lift + Send + Sync + 'static, { + assert!(store.as_context().0.cm_concurrency_enabled()); Self::new_( store .as_context_mut() @@ -1778,6 +1786,11 @@ where /// Create a new `GuardedStreamReader` with the specified `accessor` and /// `reader`. pub fn new(accessor: A, reader: StreamReader) -> Self { + assert!( + accessor + .as_accessor() + .with(|a| a.as_context().0.cm_concurrency_enabled()) + ); Self { reader: Some(reader), accessor, diff --git a/crates/wasmtime/src/runtime/component/concurrent_disabled.rs b/crates/wasmtime/src/runtime/component/concurrent_disabled.rs index 0437c1f540d7..bf710393212e 100644 --- a/crates/wasmtime/src/runtime/component/concurrent_disabled.rs +++ b/crates/wasmtime/src/runtime/component/concurrent_disabled.rs @@ -1,7 +1,6 @@ use crate::component::func::{LiftContext, LowerContext}; use crate::component::matching::InstanceType; -use crate::component::{ComponentType, Lift, Lower, RuntimeInstance, Val}; -use crate::store::StoreOpaque; +use crate::component::{ComponentType, Lift, Lower, Val}; use crate::{Result, bail, error::format_err}; use core::convert::Infallible; use core::mem::MaybeUninit; @@ -150,21 +149,3 @@ unsafe impl Lower for StreamAny { match self.0 {} } } - -impl StoreOpaque { - pub(crate) fn check_blocking(&mut self) -> Result<()> { - Ok(()) - } - - pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool { - if self.trapped() { - return false; - } - - let flags = self - .component_instance(instance.instance) - .instance_flags(instance.index); - - unsafe { !flags.needs_post_return() } - } -} diff --git a/crates/wasmtime/src/runtime/component/func.rs b/crates/wasmtime/src/runtime/component/func.rs index 0cbbff714f2f..817c18686cc6 100644 --- a/crates/wasmtime/src/runtime/component/func.rs +++ b/crates/wasmtime/src/runtime/component/func.rs @@ -263,26 +263,24 @@ impl Func { let store = store.as_context_mut(); #[cfg(feature = "component-model-async")] - { - store + if store.0.cm_concurrency_enabled() { + return store .run_concurrent_trap_on_idle(async |store| { self.call_concurrent_dynamic(store, params, results, false) .await .map(drop) }) - .await? - } - #[cfg(not(feature = "component-model-async"))] - { - assert!( - store.0.async_support(), - "cannot use `call_async` without enabling async support in the config" - ); - let mut store = store; - store - .on_fiber(|store| self.call_impl(store, params, results)) - .await? + .await?; } + + assert!( + store.0.async_support(), + "cannot use `call_async` without enabling async support in the config" + ); + let mut store = store; + store + .on_fiber(|store| self.call_impl(store, params, results)) + .await? } fn check_params_results( diff --git a/crates/wasmtime/src/runtime/component/func/options.rs b/crates/wasmtime/src/runtime/component/func/options.rs index 55df5005cce7..7a1948346bd4 100644 --- a/crates/wasmtime/src/runtime/component/func/options.rs +++ b/crates/wasmtime/src/runtime/component/func/options.rs @@ -331,7 +331,7 @@ pub struct LiftContext<'a> { not(feature = "component-model-async"), allow(unused, reason = "easier to not #[cfg] away") )] - concurrent_state: &'a mut ConcurrentState, + concurrent_state: Option<&'a mut ConcurrentState>, } #[doc(hidden)] @@ -408,7 +408,7 @@ impl<'a> LiftContext<'a> { #[cfg(feature = "component-model-async")] pub(crate) fn concurrent_state_mut(&mut self) -> &mut ConcurrentState { - self.concurrent_state + self.concurrent_state.as_deref_mut().unwrap() } /// Lifts an `own` resource from the guest at the `idx` specified into its diff --git a/crates/wasmtime/src/runtime/component/func/typed.rs b/crates/wasmtime/src/runtime/component/func/typed.rs index 62ade6e71ba4..ee25c3737d3b 100644 --- a/crates/wasmtime/src/runtime/component/func/typed.rs +++ b/crates/wasmtime/src/runtime/component/func/typed.rs @@ -186,8 +186,9 @@ where store.0.async_support(), "cannot use `call_async` when async support is not enabled on the config" ); + #[cfg(feature = "component-model-async")] - { + if store.0.cm_concurrency_enabled() { use crate::component::concurrent::TaskId; use crate::runtime::vm::SendSyncPtr; use core::ptr::NonNull; @@ -236,18 +237,16 @@ where }; let result = concurrent::queue_call(wrapper.store.as_context_mut(), prepared)?; - wrapper + return wrapper .store .as_context_mut() .run_concurrent_trap_on_idle(async |_| Ok(result.await?.0)) - .await? - } - #[cfg(not(feature = "component-model-async"))] - { - store - .on_fiber(|store| self.call_impl(store, params)) - .await? + .await?; } + + store + .on_fiber(|store| self.call_impl(store, params)) + .await? } /// Start a concurrent call to this function. @@ -321,6 +320,7 @@ where { let result = accessor.as_accessor().with(|mut store| { let mut store = store.as_context_mut(); + assert!(store.0.cm_concurrency_enabled()); assert!( store.0.async_support(), "cannot use `call_concurrent` when async support is not enabled on the config" @@ -379,6 +379,7 @@ where Return: 'static, { use crate::component::storage::slice_to_storage; + debug_assert!(store.0.cm_concurrency_enabled()); let param_count = if Params::flatten_count() <= MAX_FLAT_PARAMS { Params::flatten_count() diff --git a/crates/wasmtime/src/runtime/component/mod.rs b/crates/wasmtime/src/runtime/component/mod.rs index a641c02d8dab..7273cf6a3c34 100644 --- a/crates/wasmtime/src/runtime/component/mod.rs +++ b/crates/wasmtime/src/runtime/component/mod.rs @@ -765,3 +765,38 @@ pub(crate) mod concurrent_disabled; #[cfg(not(feature = "component-model-async"))] pub(crate) use concurrent_disabled as concurrent; + +impl crate::runtime::store::StoreOpaque { + #[cfg(feature = "component-model-async")] + pub(crate) fn cm_concurrency_enabled(&self) -> bool { + let enabled = self.concurrent_state().is_some(); + debug_assert_eq!(enabled, self.engine().config().cm_concurrency_enabled()); + enabled + } + + pub(crate) fn check_blocking(&mut self) -> crate::Result<()> { + #[cfg(feature = "component-model-async")] + if self.cm_concurrency_enabled() { + return self.check_blocking_concurrent(); + } + + Ok(()) + } + + pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool { + #[cfg(feature = "component-model-async")] + if self.cm_concurrency_enabled() { + return self.may_enter_concurrent(instance); + } + + if self.trapped() { + return false; + } + + let flags = self + .component_instance(instance.instance) + .instance_flags(instance.index); + + unsafe { !flags.needs_post_return() } + } +} diff --git a/crates/wasmtime/src/runtime/store.rs b/crates/wasmtime/src/runtime/store.rs index f41c401daf10..1be1b73ea267 100644 --- a/crates/wasmtime/src/runtime/store.rs +++ b/crates/wasmtime/src/runtime/store.rs @@ -543,7 +543,7 @@ pub struct StoreOpaque { #[cfg(feature = "component-model")] host_resource_data: crate::component::HostResourceData, #[cfg(feature = "component-model")] - concurrent_state: concurrent::ConcurrentState, + concurrent_state: Option, /// State related to the executor of wasm code. /// @@ -771,7 +771,11 @@ impl Store { host_resource_data: Default::default(), executor: Executor::new(engine), #[cfg(feature = "component-model")] - concurrent_state: Default::default(), + concurrent_state: if engine.config().cm_concurrency_enabled() { + Some(Default::default()) + } else { + None + }, #[cfg(feature = "debug")] breakpoints: Default::default(), }; @@ -854,7 +858,9 @@ impl Store { // in their `Drop::drop` implementations, in which case they'll need to // be called from with in the context of a `tls::set` closure. #[cfg(feature = "component-model-async")] - ComponentStoreData::drop_fibers_and_futures(&mut **self.inner); + if self.inner.concurrent_state.is_some() { + ComponentStoreData::drop_fibers_and_futures(&mut **self.inner); + } // Ensure all fiber stacks, even cached ones, are all flushed out to the // instance allocator. @@ -2562,14 +2568,14 @@ at https://bytecodealliance.org/security. &mut vm::component::HandleTable, &mut crate::component::HostResourceData, Pin<&mut vm::component::ComponentInstance>, - &mut concurrent::ConcurrentState, + Option<&mut concurrent::ConcurrentState>, ) { ( &mut self.component_calls, &mut self.component_host_table, &mut self.host_resource_data, instance.id().from_data_get_mut(&mut self.store_data), - &mut self.concurrent_state, + self.concurrent_state.as_mut(), ) } @@ -2578,9 +2584,15 @@ at https://bytecodealliance.org/security. &mut self.async_state } + #[cfg(feature = "component-model-async")] + pub(crate) fn concurrent_state(&self) -> Option<&concurrent::ConcurrentState> { + self.concurrent_state.as_ref() + } + #[cfg(feature = "component-model-async")] pub(crate) fn concurrent_state_mut(&mut self) -> &mut concurrent::ConcurrentState { - &mut self.concurrent_state + debug_assert!(self.engine().config().cm_concurrency_enabled()); + self.concurrent_state.as_mut().unwrap() } #[cfg(feature = "async")] diff --git a/src/commands/run.rs b/src/commands/run.rs index 377a34e47c89..8db019d54e21 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -618,26 +618,36 @@ impl RunCommand { .expect("found export index"); let mut results = vec![Val::Bool(false); func_type.results().len()]; + self.call_component_func(store, ¶ms, func, &mut results) + .await?; + println!("{}", DisplayFuncResults(&results)); + Ok(instance) + } + + #[cfg(feature = "component-model")] + async fn call_component_func( + &self, + store: &mut Store, + params: &[wasmtime::component::Val], + func: wasmtime::component::Func, + results: &mut Vec, + ) -> Result<(), Error> { #[cfg(feature = "component-model-async")] - { + if self.run.common.wasm.component_model_async.unwrap_or(false) { store .run_concurrent(async |store| { - let task = func.call_concurrent(store, ¶ms, &mut results).await?; + let task = func.call_concurrent(store, params, results).await?; task.block(store).await; wasmtime::error::Ok(()) }) .await??; + return Ok(()); } - #[cfg(not(feature = "component-model-async"))] - { - func.call_async(&mut *store, ¶ms, &mut results).await?; - func.post_return_async(&mut *store).await?; - } - - println!("{}", DisplayFuncResults(&results)); - Ok(instance) + func.call_async(&mut *store, ¶ms, results).await?; + func.post_return_async(&mut *store).await?; + Ok(()) } /// Execute the default behavior for components on the CLI, looking for diff --git a/src/commands/serve.rs b/src/commands/serve.rs index 98d8b36368dd..56b7856a4e21 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -403,6 +403,7 @@ impl ServeCommand { .common .config(use_pooling_allocator_by_default().unwrap_or(None))?; config.wasm_component_model(true); + config.wasm_component_model_async(true); config.async_support(true); if self.run.common.wasm.timeout.is_some() { diff --git a/tests/all/component_model/async_dynamic.rs b/tests/all/component_model/async_dynamic.rs index d761342abc4c..860777d1e755 100644 --- a/tests/all/component_model/async_dynamic.rs +++ b/tests/all/component_model/async_dynamic.rs @@ -2,8 +2,10 @@ use wasmtime::component::{Component, FutureAny, FutureReader, Linker, StreamAny, use wasmtime::{Config, Engine, Result, Store}; #[test] -fn simple_type_conversions() { - let engine = Engine::default(); +fn simple_type_conversions() -> Result<()> { + let mut config = Config::new(); + config.wasm_component_model_async(true); + let engine = Engine::new(&config)?; let mut store = Store::new(&engine, ()); let f = FutureReader::new(&mut store, async { wasmtime::error::Ok(10_u32) }); @@ -19,6 +21,8 @@ fn simple_type_conversions() { assert!(s.clone().try_into_stream_reader::().is_err()); let s = s.try_into_stream_reader::().unwrap(); s.try_into_stream_any(&mut store).unwrap().close(&mut store); + + Ok(()) } #[test]