Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions crates/test-util/src/wast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,13 @@ pub fn find_tests(root: &Path) -> Result<Vec<WastTest>> {
)
.with_context(|| format!("failed to add tests from `{}`", cm_tests.display()))?;

// Temporarily work around upstream tests that loop forever.
//
// Now that `thread.yield` and `CALLBACK_CODE_YIELD` are both no-ops in
// non-blocking contexts, these tests need to be updated; meanwhile, we skip
// them.
//
// TODO: remove this once
// https://github.com/WebAssembly/component-model/pull/578 has been merged:
// Temporarily work around upstream tests that fail in unexpected ways (e.g.
// panics, loops, etc).
{
let skip_list = &["drop-subtask.wast", "async-calls-sync.wast"];
let skip_list = &[
// FIXME(#12510)
"drop-cross-task-borrow.wast",
];
tests.retain(|test| {
test.path
.file_name()
Expand Down
70 changes: 40 additions & 30 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::component::{
use crate::fiber::{self, StoreFiber, StoreFiberYield};
use crate::prelude::*;
use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
use crate::vm::component::{CallContext, ComponentInstance, HandleTable, ResourceTables};
use crate::vm::component::{CallContext, ComponentInstance, InstanceState, ResourceTables};
use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
use crate::{
AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
Expand Down Expand Up @@ -655,7 +655,7 @@ impl GuestCall {
.concurrent_state_mut()
.get_mut(self.thread.task)?
.instance;
let state = store.instance_state(instance);
let state = store.instance_state(instance).concurrent_state();

let ready = match &self.kind {
GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
Expand Down Expand Up @@ -1325,6 +1325,7 @@ impl<T> StoreContextMut<'_, T> {
let instance = state.get_mut(call.thread.task)?.instance;
self.0
.instance_state(instance)
.concurrent_state()
.pending
.insert(call.thread, call.kind);
}
Expand Down Expand Up @@ -1561,20 +1562,11 @@ impl StoreOpaque {
}
}

/// Helper function to retrieve the `ConcurrentInstanceState` for the
/// Helper function to retrieve the `InstanceState` for the
/// specified instance.
fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
self.component_instance_mut(instance.instance)
.instance_state(instance.index)
.concurrent_state()
}

/// Helper function to retrieve the `HandleTable` for the specified
/// instance.
fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
self.component_instance_mut(instance.instance)
.instance_state(instance.index)
.handle_table()
}

fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
Expand Down Expand Up @@ -1633,15 +1625,19 @@ impl StoreOpaque {
/// cannot be entered again until the next call returns.
fn enter_instance(&mut self, instance: RuntimeInstance) {
log::trace!("enter {instance:?}");
self.instance_state(instance).do_not_enter = true;
self.instance_state(instance)
.concurrent_state()
.do_not_enter = true;
}

/// Record that we've exited a (sub-)component instance previously entered
/// with `Self::enter_instance` and then calls `Self::partition_pending`.
/// See the documentation for the latter for details.
fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
log::trace!("exit {instance:?}");
self.instance_state(instance).do_not_enter = false;
self.instance_state(instance)
.concurrent_state()
.do_not_enter = false;
self.partition_pending(instance)
}

Expand All @@ -1650,13 +1646,16 @@ impl StoreOpaque {
///
/// See `GuestCall::is_ready` for details.
fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
for (thread, kind) in
mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
{
let call = GuestCall { thread, kind };
if call.is_ready(self)? {
self.concurrent_state_mut()
.push_high_priority(WorkItem::GuestCall(call));
} else {
self.instance_state(instance)
.concurrent_state()
.pending
.insert(call.thread, call.kind);
}
Expand All @@ -1671,7 +1670,7 @@ impl StoreOpaque {
caller_instance: RuntimeInstance,
modify: impl FnOnce(u16) -> Option<u16>,
) -> Result<()> {
let state = self.instance_state(caller_instance);
let state = self.instance_state(caller_instance).concurrent_state();
let old = state.backpressure;
let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
state.backpressure = new;
Expand Down Expand Up @@ -1918,10 +1917,11 @@ impl Instance {
}

let set = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.handle_table()
.waitable_set_rep(handle)?;

Ok(TableId::<WaitableSet>::new(set))
Expand Down Expand Up @@ -2034,10 +2034,11 @@ impl Instance {
.get_mut(guest_thread.thread)?
.instance_rep;
store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.thread_handle_table()
.guest_thread_remove(guest_id.unwrap())?;

store.concurrent_state_mut().delete(guest_thread.thread)?;
Expand Down Expand Up @@ -2665,7 +2666,8 @@ impl Instance {
// waitable and return the status.
let handle = store
.0
.handle_table(caller_instance)
.instance_state(caller_instance)
.handle_table()
.subtask_insert_guest(guest_thread.task.rep())?;
store
.0
Expand Down Expand Up @@ -2847,10 +2849,11 @@ impl Instance {
store.0.concurrent_state_mut().push_future(future);
let handle = store
.0
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.subtask_insert_host(task.rep())?;
store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
log::trace!(
Expand Down Expand Up @@ -2981,10 +2984,11 @@ impl Instance {
) -> Result<u32> {
let set = store.concurrent_state_mut().push(WaitableSet::default())?;
let handle = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_insert(set.rep())?;
log::trace!("new waitable set {set:?} (handle {handle})");
Ok(handle)
Expand All @@ -2998,10 +3002,11 @@ impl Instance {
set: u32,
) -> Result<()> {
let rep = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_remove(set)?;

log::trace!("drop waitable set {rep} (handle {set})");
Expand Down Expand Up @@ -3056,10 +3061,11 @@ impl Instance {
self.waitable_join(store, caller_instance, task_id, 0)?;

let (rep, is_host) = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.subtask_remove(task_id)?;

let concurrent_state = store.concurrent_state_mut();
Expand Down Expand Up @@ -3132,10 +3138,11 @@ impl Instance {
..
} = &self.id().get(store).component().env_component().options[options];
let rep = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_rep(set)?;

self.waitable_check(
Expand Down Expand Up @@ -3164,10 +3171,11 @@ impl Instance {
..
} = &self.id().get(store).component().env_component().options[options];
let rep = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_rep(set)?;

self.waitable_check(
Expand Down Expand Up @@ -3315,10 +3323,11 @@ impl Instance {
runtime_instance: RuntimeComponentInstanceIndex,
) -> Result<u32> {
let guest_id = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.thread_handle_table()
.guest_thread_insert(thread_id.rep())?;
store
.concurrent_state_mut()
Expand Down Expand Up @@ -3494,10 +3503,11 @@ impl Instance {
}

let (rep, is_host) = store
.handle_table(RuntimeInstance {
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.subtask_rep(task_id)?;
let (waitable, expected_caller_instance) = if is_host {
let id = TableId::<HostTask>::new(rep);
Expand Down Expand Up @@ -3556,7 +3566,7 @@ impl Instance {
assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());

// Not yet started; cancel and remove from pending
let pending = &mut store.instance_state(instance).pending;
let pending = &mut store.instance_state(instance).concurrent_state().pending;
let pending_count = pending.len();
pending.retain(|thread, _| thread.task != guest_task);
// If there were no pending threads for this task, we're in an error state
Expand Down Expand Up @@ -4252,7 +4262,7 @@ impl GuestThread {
guest_thread: u32,
) -> Result<TableId<Self>> {
let rep = state.instance_states().0[caller_instance]
.handle_table()
.thread_handle_table()
.guest_thread_rep(guest_thread)?;
Ok(TableId::new(rep))
}
Expand Down
13 changes: 12 additions & 1 deletion crates/wasmtime/src/runtime/vm/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod resources;

pub use self::handle_table::{HandleTable, RemovedResource};
#[cfg(feature = "component-model-async")]
pub use self::handle_table::{TransmitLocalState, Waitable};
pub use self::handle_table::{ThreadHandleTable, TransmitLocalState, Waitable};
#[cfg(feature = "component-model-async")]
pub use self::resources::CallContext;
pub use self::resources::{CallContexts, ResourceTables, TypedResource, TypedResourceIndex};
Expand All @@ -62,6 +62,11 @@ pub struct InstanceState {
/// is used directly to translate guest handles to host representations and
/// vice-versa.
handle_table: HandleTable,

/// Dedicated table for threads that is separate from `handle_table`. Part
/// of the component-model-threading proposal.
#[cfg(feature = "component-model-async")]
thread_handle_table: ThreadHandleTable,
}

impl InstanceState {
Expand All @@ -75,6 +80,12 @@ impl InstanceState {
pub fn handle_table(&mut self) -> &mut HandleTable {
&mut self.handle_table
}

/// State of thread handles.
#[cfg(feature = "component-model-async")]
pub fn thread_handle_table(&mut self) -> &mut ThreadHandleTable {
&mut self.thread_handle_table
}
}

/// Runtime representation of a component instance and all state necessary for
Expand Down
16 changes: 12 additions & 4 deletions crates/wasmtime/src/runtime/vm/component/handle_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ enum Slot {
},

/// Represents a guest thread handle.
#[cfg(feature = "component-model-async")]
GuestThread {
rep: u32,
},
Expand Down Expand Up @@ -571,16 +572,23 @@ impl HandleTable {
_ => bail!("handle is not a waitable"),
}
}
}

#[derive(Default)]
#[cfg(feature = "component-model-async")]
pub struct ThreadHandleTable(HandleTable);

#[cfg(feature = "component-model-async")]
impl ThreadHandleTable {
/// Inserts the guest thread `rep` into this table, returning the index it
/// now resides at.
pub fn guest_thread_insert(&mut self, rep: u32) -> Result<u32> {
self.insert(Slot::GuestThread { rep })
self.0.insert(Slot::GuestThread { rep })
}

/// Returns the `rep` of a guest thread pointed to by `idx`.
pub fn guest_thread_rep(&mut self, idx: u32) -> Result<u32> {
match self.get_mut(idx)? {
match self.0.get_mut(idx)? {
Slot::GuestThread { rep } => Ok(*rep),
_ => bail!("handle is not a guest thread"),
}
Expand All @@ -590,11 +598,11 @@ impl HandleTable {
///
/// Returns the internal `rep`.
pub fn guest_thread_remove(&mut self, idx: u32) -> Result<u32> {
let rep = match self.get_mut(idx)? {
let rep = match self.0.get_mut(idx)? {
Slot::GuestThread { rep } => *rep,
_ => bail!("handle is not a guest thread"),
};
self.remove(idx)?;
self.0.remove(idx)?;
Ok(rep)
}
}
6 changes: 3 additions & 3 deletions tests/all/component_model/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn mismatch_intrinsics() -> Result<()> {
let ctor = i.get_typed_func::<(u32,), (ResourceAny,)>(&mut store, "ctor")?;
assert_eq!(
ctor.call(&mut store, (100,)).unwrap_err().to_string(),
"handle index 2 used with the wrong type, expected guest-defined \
"handle index 1 used with the wrong type, expected guest-defined \
resource but found a different guest-defined resource",
);

Expand Down Expand Up @@ -1372,8 +1372,8 @@ fn guest_different_host_same() -> Result<()> {
(func (export "f") (param i32 i32)
;; different types, but everything goes into the same
;; handle index namespace
(if (i32.ne (local.get 0) (i32.const 2)) (then (unreachable)))
(if (i32.ne (local.get 1) (i32.const 3)) (then (unreachable)))
(if (i32.ne (local.get 0) (i32.const 1)) (then (unreachable)))
(if (i32.ne (local.get 1) (i32.const 2)) (then (unreachable)))

;; host should end up getting the same resource
(call $f (local.get 0) (local.get 1))
Expand Down
Loading