Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix detachment #40

Merged
merged 6 commits into from
Dec 6, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix detachment bug
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
mxgrey committed Dec 6, 2024
commit fad22f006f3087fb9730c27ca2fb3a3843cf686d
6 changes: 6 additions & 0 deletions src/flush.rs
Original file line number Diff line number Diff line change
@@ -87,6 +87,12 @@ fn flush_impulses_impl(

let mut loop_count = 0;
while !roster.is_empty() {
for e in roster.deferred_despawn.drain(..) {
if let Some(e_mut) = world.get_entity_mut(e) {
e_mut.despawn_recursive();
}
}

let parameters = world.get_resource_or_insert_with(FlushParameters::default);
let flush_loop_limit = parameters.flush_loop_limit;
let single_threaded_poll_limit = parameters.single_threaded_poll_limit;
19 changes: 14 additions & 5 deletions src/input.rs
Original file line number Diff line number Diff line change
@@ -141,6 +141,7 @@ pub trait ManageInput {
session: Entity,
data: T,
only_if_active: bool,
roster: &mut OperationRoster,
) -> Result<bool, OperationError>;

/// Get an input that is ready to be taken, or else produce an error.
@@ -166,7 +167,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError> {
if unsafe { self.sneak_input(session, data, true)? } {
if unsafe { self.sneak_input(session, data, true, roster)? } {
roster.queue(self.id());
}
Ok(())
@@ -178,7 +179,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError> {
if unsafe { self.sneak_input(session, data, true)? } {
if unsafe { self.sneak_input(session, data, true, roster)? } {
roster.defer(self.id());
}
Ok(())
@@ -189,6 +190,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
session: Entity,
data: T,
only_if_active: bool,
roster: &mut OperationRoster,
) -> Result<bool, OperationError> {
dbg!(session);
if only_if_active {
@@ -212,15 +214,22 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
dbg!(session);
storage.reverse_queue.insert(0, Input { session, data });
} else if !self.contains::<UnusedTarget>() {
let id = self.id();
if let Some(detached) = self.get::<Detached>() {
if detached.is_detached() {
// The input is going to a detached node that will not
// respond any further. We can
// The input is going to a detached impulse that will not
// react any further. We need to tell that detached impulse
// to despawn since it is no longer needed.
roster.defer_despawn(id);

// No error occurred, but the caller should not queue the
// operation into the roster because it is being despawned.
return Ok(false);
}
}

dbg!(session);
let expected = self.get::<InputTypeIndicator>().map(|i| i.name);
let id = self.id();
// If the input is being fed to an unused target then we can
// generally ignore it, although it may indicate a bug in the user's
// workflow because workflow branches that end in an unused target
8 changes: 8 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
@@ -223,6 +223,9 @@ pub struct OperationRoster {
pub(crate) disposed: Vec<DisposalNotice>,
/// Tell a scope to attempt cleanup
pub(crate) cleanup_finished: Vec<Cleanup>,
/// Despawn these entities while no other operation is running. This is used
/// to cleanup detached impulses that receive no input.
pub(crate) deferred_despawn: Vec<Entity>,
}

impl OperationRoster {
@@ -262,6 +265,10 @@ impl OperationRoster {
self.cleanup_finished.push(cleanup);
}

pub fn defer_despawn(&mut self, source: Entity) {
self.deferred_despawn.push(source);
}

pub fn is_empty(&self) -> bool {
self.queue.is_empty()
&& self.awake.is_empty()
@@ -270,6 +277,7 @@ impl OperationRoster {
&& self.unblock.is_empty()
&& self.disposed.is_empty()
&& self.cleanup_finished.is_empty()
&& self.deferred_despawn.is_empty()
}

pub fn append(&mut self, other: &mut Self) {
10 changes: 8 additions & 2 deletions src/operation/injection.rs
Original file line number Diff line number Diff line change
@@ -120,10 +120,16 @@ where
// roster to register the task as an operation. In fact it does not
// implement Operation at all. It is just a temporary container for the
// input and the stream targets.
unsafe {
let execute = unsafe {
world
.entity_mut(task)
.sneak_input(session, request, false)?;
.sneak_input(session, request, false, roster)?
};

if !execute {
// If giving the input failed then this workflow will not be able to
// proceed. Therefore we should report that this is broken.
None.or_broken()?;
}

let mut storage = world.get_mut::<InjectionStorage>(source).or_broken()?;
32 changes: 23 additions & 9 deletions src/operation/scope.rs
Original file line number Diff line number Diff line change
@@ -772,8 +772,15 @@ where
// use this session as input despite not being active because we are
// passing it to an operation that will only use it to begin a
// cleanup workflow.
finish_cleanup_workflow_mut.sneak_input(scoped_session, FinishCleanupSignal::CheckAwaitingSession, false)?;
roster.queue(finish_cleanup);
let add_to_queue = finish_cleanup_workflow_mut.sneak_input(
scoped_session,
FinishCleanupSignal::CheckAwaitingSession,
false,
roster,
)?;
if add_to_queue {
roster.queue(finish_cleanup);
}
}

for begin in begin_cleanup_workflows {
@@ -787,19 +794,26 @@ where
// We execute the begin nodes immediately so that they can load up the
// finish_cancel node with all their cancellation behavior IDs before
// the finish_cancel node gets executed.
unsafe {
let execute = unsafe {
// INVARIANT: We can use sneak_input here because we execute the
// recipient node immediately after giving the input.
world
.get_entity_mut(begin.source)
.or_broken()?
.sneak_input(scoped_session, FinishCleanupSignal::DeductFinishedCleanup, false)?;
.sneak_input(
scoped_session,
FinishCleanupSignal::DeductFinishedCleanup,
false,
roster,
)?
};
if execute {
execute_operation(OperationRequest {
source: begin.source,
world,
roster,
});
}
execute_operation(OperationRequest {
source: begin.source,
world,
roster,
});
}

Ok(())