Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix detachment #40

Merged
merged 6 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,11 +959,15 @@ mod tests {
let mut promise = context.command(|commands| commands.request(5, workflow).take_response());

context.run_with_conditions(&mut promise, Duration::from_secs(2));
assert!(
context.no_unhandled_errors(),
"{:#?}",
context.get_unhandled_errors(),
);
assert!(promise.peek().is_cancelled());
let channel_output = receiver.try_recv().unwrap();
assert_eq!(channel_output, 5);
assert!(receiver.try_recv().is_err());
assert!(context.no_unhandled_errors());
assert!(context.confirm_buffers_empty().is_ok());

let (cancel_sender, mut cancel_receiver) = unbounded_channel();
Expand Down
6 changes: 6 additions & 0 deletions src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ fn flush_impulses_impl(

let mut loop_count = 0;
while !roster.is_empty() {
for e in roster.deferred_despawn.drain(..) {
if let Some(e_mut) = world.get_entity_mut(e) {
e_mut.despawn_recursive();
}
}

let parameters = world.get_resource_or_insert_with(FlushParameters::default);
let flush_loop_limit = parameters.flush_loop_limit;
let single_threaded_poll_limit = parameters.single_threaded_poll_limit;
Expand Down
49 changes: 48 additions & 1 deletion src/impulse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ where
// this one is finished.
self.commands
.entity(source)
.insert(Cancellable::new(cancel_impulse))
.insert((Cancellable::new(cancel_impulse), ImpulseMarker))
.remove::<UnusedTarget>()
.set_parent(target);
provider.connect(None, source, target, self.commands);
Impulse {
Expand Down Expand Up @@ -484,6 +485,38 @@ mod tests {
assert!(context.no_unhandled_errors());
}

#[test]
fn test_detach() {
// This is a regression test that covers a bug which existed due to
// an incorrect handling of detached impulses when giving input.
let mut context = TestingContext::minimal_plugins();
let service = context.spawn_delayed_map(Duration::from_millis(1), |n| n + 1);

context.command(|commands| {
commands.provide(0).then(service).detach();
});

let (sender, mut promise) = Promise::<()>::new();
context.run_with_conditions(&mut promise, Duration::from_millis(5));
assert!(
context.no_unhandled_errors(),
"Unhandled errors: {:#?}",
context.get_unhandled_errors(),
);

// The promise and sender only exist because run_with_conditions requires
// them. Moreover we need to make sure that sender does not get dropped
// prematurely by the compiler, otherwise the promise will have the run
// exit prematurely. Therefore we call .send(()) here to guarantee the
// compiler knows to keep it alive until the running is finished.
//
// We have observed that using `let (_, mut promise) = ` will cause the
// sender to drop prematurely, so we don't want to risk that there are
// other cases where that may happen. It is important for the run to
// last multiple cycles.
sender.send(()).ok();
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct UnitLabel;

Expand Down Expand Up @@ -547,6 +580,20 @@ mod tests {
);

verify_delivery_instruction_matrix(service, &mut context);

let async_service = service;
let service = context.spawn_io_workflow(|scope, builder| {
scope
.input
.chain(builder)
.then(async_service)
.connect(scope.terminate);
});

verify_delivery_instruction_matrix(service, &mut context);

// We don't test blocking services because blocking services are always
// serial no matter what, so delivery instructions have no effect for them.
}

fn verify_delivery_instruction_matrix(
Expand Down
54 changes: 50 additions & 4 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use smallvec::SmallVec;
use backtrace::Backtrace;

use crate::{
Broken, BufferStorage, Cancel, Cancellation, CancellationCause, DeferredRoster, OperationError,
OperationRoster, OrBroken, SessionStatus, UnusedTarget,
Broken, BufferStorage, Cancel, Cancellation, CancellationCause, DeferredRoster, Detached,
MiscellaneousFailure, OperationError, OperationRoster, OrBroken, SessionStatus,
UnhandledErrors, UnusedTarget,
};

/// This contains data that has been provided as input into an operation, along
Expand Down Expand Up @@ -69,15 +70,31 @@ impl<T> Default for InputStorage<T> {
}
}

/// Used to keep track of the expected input type for an operation
#[derive(Component)]
pub(crate) struct InputTypeIndicator {
pub(crate) name: &'static str,
}

impl InputTypeIndicator {
fn new<T>() -> Self {
Self {
name: std::any::type_name::<T>(),
}
}
}

#[derive(Bundle)]
pub struct InputBundle<T: 'static + Send + Sync> {
storage: InputStorage<T>,
indicator: InputTypeIndicator,
}

impl<T: 'static + Send + Sync> InputBundle<T> {
pub fn new() -> Self {
Self {
storage: Default::default(),
indicator: InputTypeIndicator::new::<T>(),
}
}
}
Expand Down Expand Up @@ -125,6 +142,7 @@ pub trait ManageInput {
session: Entity,
data: T,
only_if_active: bool,
roster: &mut OperationRoster,
) -> Result<bool, OperationError>;

/// Get an input that is ready to be taken, or else produce an error.
Expand All @@ -150,7 +168,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError> {
if unsafe { self.sneak_input(session, data, true)? } {
if unsafe { self.sneak_input(session, data, true, roster)? } {
roster.queue(self.id());
}
Ok(())
Expand All @@ -162,7 +180,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError> {
if unsafe { self.sneak_input(session, data, true)? } {
if unsafe { self.sneak_input(session, data, true, roster)? } {
roster.defer(self.id());
}
Ok(())
Expand All @@ -173,6 +191,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
session: Entity,
data: T,
only_if_active: bool,
roster: &mut OperationRoster,
) -> Result<bool, OperationError> {
if only_if_active {
let active_session =
Expand All @@ -193,13 +212,40 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
if let Some(mut storage) = self.get_mut::<InputStorage<T>>() {
storage.reverse_queue.insert(0, Input { session, data });
} else if !self.contains::<UnusedTarget>() {
let id = self.id();
if let Some(detached) = self.get::<Detached>() {
if detached.is_detached() {
// The input is going to a detached impulse that will not
// react any further. We need to tell that detached impulse
// to despawn since it is no longer needed.
roster.defer_despawn(id);

// No error occurred, but the caller should not queue the
// operation into the roster because it is being despawned.
return Ok(false);
}
}

let expected = self.get::<InputTypeIndicator>().map(|i| i.name);
// If the input is being fed to an unused target then we can
// generally ignore it, although it may indicate a bug in the user's
// workflow because workflow branches that end in an unused target
// will be spuriously dropped when the scope terminates.

// However in this case, the target is not unused but also does not
// have the correct input storage type. This indicates
self.world_mut()
.get_resource_or_insert_with(|| UnhandledErrors::default())
.miscellaneous
.push(MiscellaneousFailure {
error: std::sync::Arc::new(anyhow::anyhow!(
"Incorrect input type for operation [{:?}]: received [{}], expected [{}]",
id,
std::any::type_name::<T>(),
expected.unwrap_or("<null>"),
)),
backtrace: None,
});
None.or_broken()?;
}
Ok(true)
Expand Down
19 changes: 19 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ pub struct OperationRoster {
pub(crate) disposed: Vec<DisposalNotice>,
/// Tell a scope to attempt cleanup
pub(crate) cleanup_finished: Vec<Cleanup>,
/// Despawn these entities while no other operation is running. This is used
/// to cleanup detached impulses that receive no input.
pub(crate) deferred_despawn: Vec<Entity>,
}

impl OperationRoster {
Expand Down Expand Up @@ -262,6 +265,10 @@ impl OperationRoster {
self.cleanup_finished.push(cleanup);
}

pub fn defer_despawn(&mut self, source: Entity) {
self.deferred_despawn.push(source);
}

pub fn is_empty(&self) -> bool {
self.queue.is_empty()
&& self.awake.is_empty()
Expand All @@ -270,6 +277,7 @@ impl OperationRoster {
&& self.unblock.is_empty()
&& self.disposed.is_empty()
&& self.cleanup_finished.is_empty()
&& self.deferred_despawn.is_empty()
}

pub fn append(&mut self, other: &mut Self) {
Expand Down Expand Up @@ -319,6 +327,17 @@ pub(crate) struct Blocker {
pub(crate) serve_next: fn(Blocker, &mut World, &mut OperationRoster),
}

impl std::fmt::Debug for Blocker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Blocker")
.field("provider", &self.provider)
.field("source", &self.source)
.field("session", &self.session)
.field("label", &self.label)
.finish()
}
}

#[derive(Clone, Debug)]
pub enum OperationError {
Broken(Option<Backtrace>),
Expand Down
10 changes: 8 additions & 2 deletions src/operation/injection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,16 @@ where
// roster to register the task as an operation. In fact it does not
// implement Operation at all. It is just a temporary container for the
// input and the stream targets.
unsafe {
let execute = unsafe {
world
.entity_mut(task)
.sneak_input(session, request, false)?;
.sneak_input(session, request, false, roster)?
};

if !execute {
// If giving the input failed then this workflow will not be able to
// proceed. Therefore we should report that this is broken.
None.or_broken()?;
}

let mut storage = world.get_mut::<InjectionStorage>(source).or_broken()?;
Expand Down
Loading
Loading