Skip to content

Commit

Permalink
Diagram::spawn_workflow now takes Commands
Browse files Browse the repository at this point in the history
Signed-off-by: Teo Koon Peng <[email protected]>
  • Loading branch information
koonpeng committed Jan 3, 2025
1 parent 3cbbd13 commit 6d8340f
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 51 deletions.
14 changes: 9 additions & 5 deletions examples/diagram/calculator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{error::Error, fs::File, str::FromStr};

use bevy_impulse::{Diagram, ImpulsePlugin, NodeRegistry, RequestExt, RunCommandsOnWorldExt};
use bevy_impulse::{
Diagram, DiagramError, ImpulsePlugin, NodeRegistry, Promise, RequestExt, RunCommandsOnWorldExt,
};
use clap::Parser;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -38,10 +40,12 @@ fn main() -> Result<(), Box<dyn Error>> {
let diagram = Diagram::from_reader(file)?;

let request = serde_json::Value::from_str(&args.request)?;
let workflow = diagram.spawn_io_workflow(&mut app, &registry)?;
let mut promise = app
.world
.command(|cmds| cmds.request(request, workflow).take_response());
let mut promise =
app.world
.command(|cmds| -> Result<Promise<serde_json::Value>, DiagramError> {
let workflow = diagram.spawn_io_workflow(cmds, &registry)?;
Ok(cmds.request(request, workflow).take_response())
})?;

while promise.peek().is_pending() {
app.update();
Expand Down
56 changes: 24 additions & 32 deletions src/diagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod transform;
mod unzip;
mod workflow_builder;

use bevy_ecs::system::Commands;
use fork_clone::ForkCloneOp;
use fork_result::ForkResultOp;
use join::JoinOp;
Expand All @@ -25,7 +26,6 @@ use workflow_builder::create_workflow;
use std::{collections::HashMap, io::Read};

use crate::{Builder, Scope, Service, SpawnWorkflowExt, SplitConnectionError, StreamPack};
use bevy_app::App;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -284,9 +284,9 @@ pub enum DiagramOperation {
Dispose,
}

type ScopeStart = serde_json::Value;
type ScopeTerminate = serde_json::Value;
type DynScope<Streams> = Scope<ScopeStart, ScopeTerminate, Streams>;
type DiagramStart = serde_json::Value;
type DiagramTerminate = serde_json::Value;
type DiagramScope<Streams = ()> = Scope<DiagramStart, DiagramTerminate, Streams>;

#[derive(JsonSchema, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -331,11 +331,13 @@ impl Diagram {
/// let workflow = diagram.spawn_io_workflow(&mut app, &registry)?;
/// # Ok::<_, DiagramError>(())
/// ```
pub fn spawn_workflow<Streams>(
// TODO(koonpeng): Support streams other than `()` #43.
/* pub */
fn spawn_workflow<Streams>(
&self,
app: &mut App,
cmds: &mut Commands,
registry: &NodeRegistry,
) -> Result<Service<ScopeStart, ScopeTerminate, Streams>, DiagramError>
) -> Result<Service<DiagramStart, DiagramTerminate, Streams>, DiagramError>
where
Streams: StreamPack,
{
Expand All @@ -353,17 +355,15 @@ impl Diagram {
};
}

let w = app
.world
.spawn_workflow(|scope: DynScope<Streams>, builder: &mut Builder| {
debug!(
"spawn workflow, scope input: {:?}, terminate: {:?}",
scope.input.id(),
scope.terminate.id()
);
let w = cmds.spawn_workflow(|scope: DiagramScope<Streams>, builder: &mut Builder| {
debug!(
"spawn workflow, scope input: {:?}, terminate: {:?}",
scope.input.id(),
scope.terminate.id()
);

unwrap_or_return!(create_workflow(scope, builder, registry, self));
});
unwrap_or_return!(create_workflow(scope, builder, registry, self));
});

if let Some(err) = err {
return Err(err);
Expand All @@ -375,10 +375,10 @@ impl Diagram {
/// Wrapper to [spawn_workflow::<()>](Self::spawn_workflow).
pub fn spawn_io_workflow(
&self,
app: &mut App,
cmds: &mut Commands,
registry: &NodeRegistry,
) -> Result<Service<ScopeStart, ScopeTerminate, ()>, DiagramError> {
self.spawn_workflow::<()>(app, registry)
) -> Result<Service<DiagramStart, DiagramTerminate, ()>, DiagramError> {
self.spawn_workflow::<()>(cmds, registry)
}

pub fn from_json(value: serde_json::Value) -> Result<Self, serde_json::Error> {
Expand Down Expand Up @@ -550,9 +550,7 @@ mod tests {
}))
.unwrap();

let err = diagram
.spawn_io_workflow(&mut fixture.context.app, &fixture.registry)
.unwrap_err();
let err = fixture.spawn_io_workflow(&diagram).unwrap_err();
assert!(matches!(err, DiagramError::CannotConnectStart), "{:?}", err);
}

Expand All @@ -578,9 +576,7 @@ mod tests {
}))
.unwrap();

let err = diagram
.spawn_io_workflow(&mut fixture.context.app, &fixture.registry)
.unwrap_err();
let err = fixture.spawn_io_workflow(&diagram).unwrap_err();
assert!(matches!(err, DiagramError::NotSerializable), "{:?}", err);
}

Expand All @@ -606,9 +602,7 @@ mod tests {
}))
.unwrap();

let err = diagram
.spawn_io_workflow(&mut fixture.context.app, &fixture.registry)
.unwrap_err();
let err = fixture.spawn_io_workflow(&diagram).unwrap_err();
assert!(matches!(err, DiagramError::NotSerializable), "{:?}", err);
}

Expand Down Expand Up @@ -639,9 +633,7 @@ mod tests {
}))
.unwrap();

let err = diagram
.spawn_io_workflow(&mut fixture.context.app, &fixture.registry)
.unwrap_err();
let err = fixture.spawn_io_workflow(&diagram).unwrap_err();
assert!(matches!(err, DiagramError::TypeMismatch), "{:?}", err);
}

Expand Down
4 changes: 1 addition & 3 deletions src/diagram/fork_clone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ mod tests {
},
}))
.unwrap();
let err = diagram
.spawn_io_workflow(&mut fixture.context.app, &fixture.registry)
.unwrap_err();
let err = fixture.spawn_io_workflow(&diagram).unwrap_err();
assert!(matches!(err, DiagramError::NotCloneable), "{:?}", err);
}

Expand Down
26 changes: 23 additions & 3 deletions src/diagram/testing.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::error::Error;

use crate::{testing::TestingContext, Builder, RequestExt};
use crate::{
testing::TestingContext, Builder, RequestExt, RunCommandsOnWorldExt, Service, StreamPack,
};

use super::{Diagram, NodeRegistry};
use super::{Diagram, DiagramError, DiagramStart, DiagramTerminate, NodeRegistry};

pub(super) struct DiagramTestFixture {
pub(super) context: TestingContext,
Expand All @@ -17,14 +19,32 @@ impl DiagramTestFixture {
}
}

pub(super) fn spawn_workflow<Streams: StreamPack>(
&mut self,
diagram: &Diagram,
) -> Result<Service<DiagramStart, DiagramTerminate, Streams>, DiagramError> {
self.context
.app
.world
.command(|cmds| diagram.spawn_workflow(cmds, &self.registry))
}

/// Equivalent to `self.spawn_workflow::<()>(diagram)`
pub(super) fn spawn_io_workflow(
&mut self,
diagram: &Diagram,
) -> Result<Service<DiagramStart, DiagramTerminate, ()>, DiagramError> {
self.spawn_workflow::<()>(diagram)
}

/// Spawns a workflow from a diagram then run the workflow until completion.
/// Returns the result of the workflow.
pub(super) fn spawn_and_run(
&mut self,
diagram: &Diagram,
request: serde_json::Value,
) -> Result<serde_json::Value, Box<dyn Error>> {
let workflow = diagram.spawn_io_workflow(&mut self.context.app, &self.registry)?;
let workflow = self.spawn_workflow::<()>(diagram)?;
let mut promise = self
.context
.command(|cmds| cmds.request(request, workflow).take_response());
Expand Down
8 changes: 2 additions & 6 deletions src/diagram/unzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ mod tests {
]),
};

let err = diagram
.spawn_io_workflow(&mut fixture.context.app, &fixture.registry)
.unwrap_err();
let err = fixture.spawn_io_workflow(&diagram).unwrap_err();
assert!(matches!(err, DiagramError::NotUnzippable), "{}", err);
}

Expand Down Expand Up @@ -193,9 +191,7 @@ mod tests {
]),
};

let err = diagram
.spawn_io_workflow(&mut fixture.context.app, &fixture.registry)
.unwrap_err();
let err = fixture.spawn_io_workflow(&diagram).unwrap_err();
assert!(matches!(err, DiagramError::NotUnzippable));
}

Expand Down
4 changes: 2 additions & 2 deletions src/diagram/workflow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{Builder, Output, StreamPack};

use super::{
fork_clone::DynForkClone, impls::DefaultImpl, split_chain, transform::transform_output,
Diagram, DiagramError, DiagramOperation, DynInputSlot, DynOutput, DynScope, NodeRegistry,
Diagram, DiagramError, DiagramOperation, DiagramScope, DynInputSlot, DynOutput, NodeRegistry,
OperationId, SplitOpParams,
};

Expand All @@ -32,7 +32,7 @@ enum EdgeState<'a> {
}

pub(super) fn create_workflow<'a, Streams: StreamPack>(
scope: DynScope<Streams>,
scope: DiagramScope<Streams>,
builder: &mut Builder,
registry: &NodeRegistry,
diagram: &'a Diagram,
Expand Down

0 comments on commit 6d8340f

Please sign in to comment.