Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs
Browse files Browse the repository at this point in the history
Signed-off-by: Teo Koon Peng <teokoonpeng@gmail.com>
koonpeng committed Dec 5, 2024
1 parent 8547965 commit 891205a
Showing 3 changed files with 145 additions and 42 deletions.
117 changes: 76 additions & 41 deletions src/diagram.rs
Original file line number Diff line number Diff line change
@@ -104,10 +104,44 @@ pub struct Diagram {
}

impl Diagram {
/// Spawns a workflow from this diagram.
///
/// # Examples
///
/// ```
/// use bevy_impulse::{Diagram, DiagramError, NodeRegistry};
///
/// let mut app = bevy_app::App::new();
/// let mut registry = NodeRegistry::default();
/// registry.register_node("echo", "echo", |builder, _config: ()| {
/// builder.create_map_block(|msg: String| msg)
/// });
///
/// let json_str = r#"
/// {
/// "start": {
/// "type": "start",
/// "next": "echo"
/// },
/// "echo": {
/// "type": "node",
/// "nodeId": "echo",
/// "next": "terminate"
/// },
/// "terminate": {
/// "type": "terminate"
/// }
/// }
/// "#;
///
/// let diagram = Diagram::from_json_str(json_str)?;
/// let workflow = diagram.spawn_io_workflow(&mut app, &registry)?;
/// # Ok::<_, DiagramError>(())
/// ```
pub fn spawn_workflow<Streams>(
&self,
app: &mut App,
registry: &mut NodeRegistry,
registry: &NodeRegistry,
) -> Result<Service<ScopeStart, ScopeTerminate, Streams>, DiagramError>
where
Streams: StreamPack,
@@ -164,10 +198,11 @@ impl Diagram {
Ok(w)
}

/// Wrapper to [`spawn_workflow::<()>`].
pub fn spawn_io_workflow(
&self,
app: &mut App,
registry: &mut NodeRegistry,
registry: &NodeRegistry,
) -> Result<Service<ScopeStart, ScopeTerminate, ()>, DiagramError> {
self.spawn_workflow::<()>(app, registry)
}
@@ -353,7 +388,7 @@ mod tests {
#[test]
fn test_no_terminate() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -375,7 +410,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -389,7 +424,7 @@ mod tests {
#[test]
fn test_no_start() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -409,7 +444,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -423,7 +458,7 @@ mod tests {
#[test]
fn test_connect_to_start() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -455,15 +490,15 @@ mod tests {
};

let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap_err();
assert!(matches!(err, DiagramError::CannotConnectStart), "{:?}", err);
}

#[test]
fn test_unserializable_start() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -489,15 +524,15 @@ mod tests {
};

let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap_err();
assert!(matches!(err, DiagramError::NotSerializable), "{:?}", err);
}

#[test]
fn test_unserializable_terminate() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -523,15 +558,15 @@ mod tests {
};

let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap_err();
assert!(matches!(err, DiagramError::NotSerializable), "{:?}", err);
}

#[test]
fn test_mismatch_types() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -565,15 +600,15 @@ mod tests {
};

let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap_err();
assert!(matches!(err, DiagramError::TypeMismatch), "{:?}", err);
}

#[test]
fn test_disconnected() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -607,7 +642,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -621,7 +656,7 @@ mod tests {
#[test]
fn test_fork_clone_uncloneable() {
let mut context = TestingContext::minimal_plugins();
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();

let diagram = Diagram {
ops: HashMap::from([
@@ -661,14 +696,14 @@ mod tests {
};

let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap_err();
assert!(matches!(err, DiagramError::NotCloneable), "{:?}", err);
}

#[test]
fn test_fork_clone() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -709,7 +744,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -720,7 +755,7 @@ mod tests {

#[test]
fn test_unzip_not_unzippable() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -753,14 +788,14 @@ mod tests {
};

let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap_err();
assert!(matches!(err, DiagramError::NotUnzippable));
}

#[test]
fn test_unzip_to_too_many_slots() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -817,14 +852,14 @@ mod tests {
};

let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap_err();
assert!(matches!(err, DiagramError::NotUnzippable));
}

#[test]
fn test_unzip_to_terminate() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -858,7 +893,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -868,7 +903,7 @@ mod tests {

#[test]
fn test_unzip() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -909,7 +944,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -919,7 +954,7 @@ mod tests {

#[test]
fn test_unzip_with_dispose() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -961,7 +996,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1046,7 +1081,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1108,7 +1143,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1168,7 +1203,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1232,7 +1267,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1290,7 +1325,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1300,7 +1335,7 @@ mod tests {

#[test]
fn test_looping_diagram() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -1341,7 +1376,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1351,7 +1386,7 @@ mod tests {

#[test]
fn test_noop_diagram() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let diagram = Diagram {
@@ -1370,7 +1405,7 @@ mod tests {
};

let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
@@ -1380,7 +1415,7 @@ mod tests {

#[test]
fn test_serialized_diagram() {
let mut registry = new_registry_with_basic_nodes();
let registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

let json_str = r#"
@@ -1403,7 +1438,7 @@ mod tests {

let diagram = Diagram::from_json_str(json_str).unwrap();
let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.spawn_io_workflow(&mut context.app, &registry)
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
68 changes: 68 additions & 0 deletions src/diagram/node_registry.rs
Original file line number Diff line number Diff line change
@@ -345,6 +345,9 @@ impl<'a, DeserializeImpl, SerializeImpl, ForkCloneImpl, UnzipImpl, ForkResultImp
SplitImpl::register_serialize(&mut self.registry);
}

/// Mark the node as having a non deserializable request. This allows nodes with
/// non deserializable requests to be registered but any nodes registered this way will not
/// be able to be connected to "Start" or any operation that requires deserialization.
pub fn with_opaque_request(
self,
) -> RegistrationBuilder<
@@ -359,6 +362,9 @@ impl<'a, DeserializeImpl, SerializeImpl, ForkCloneImpl, UnzipImpl, ForkResultImp
RegistrationBuilder::new(self.registry)
}

/// Mark the node as having a non serializable response. This allows nodes with
/// non serializable responses to be registered but any nodes registered this way will not
/// be able to be connected to "Terminate" or any operation that requires serialization.
pub fn with_opaque_response(
self,
) -> RegistrationBuilder<
@@ -373,6 +379,8 @@ impl<'a, DeserializeImpl, SerializeImpl, ForkCloneImpl, UnzipImpl, ForkResultImp
RegistrationBuilder::new(self.registry)
}

/// Mark the node as having a cloneable response. This is required in order for the node
/// to be able to be connected to a "Fork Clone" operation.
pub fn with_response_cloneable(
self,
) -> RegistrationBuilder<
@@ -387,6 +395,8 @@ impl<'a, DeserializeImpl, SerializeImpl, ForkCloneImpl, UnzipImpl, ForkResultImp
RegistrationBuilder::new(self.registry)
}

/// Mark the node as having a unzippable response. This is required in order for the node
/// to be able to be connected to a "Unzip" operation.
pub fn with_unzippable(
self,
) -> RegistrationBuilder<
@@ -401,6 +411,8 @@ impl<'a, DeserializeImpl, SerializeImpl, ForkCloneImpl, UnzipImpl, ForkResultImp
RegistrationBuilder::new(self.registry)
}

/// Mark the node as having a [`Result<_, _>`] response. This is required in order for the node
/// to be able to be connected to a "Fork Result" operation.
pub fn with_fork_result(
self,
) -> RegistrationBuilder<
@@ -415,6 +427,8 @@ impl<'a, DeserializeImpl, SerializeImpl, ForkCloneImpl, UnzipImpl, ForkResultImp
RegistrationBuilder::new(self.registry)
}

/// Mark the node as having a splittable response. This is required in order for the node
/// to be able to be connected to a "Split" operation.
pub fn with_splittable(
self,
) -> RegistrationBuilder<
@@ -513,6 +527,53 @@ impl Default for NodeRegistry {
}

impl NodeRegistry {
/// Create a new [`RegistrationBuilder`]. By default, it is configured for nodes with
/// deserializable request and serializable responses and without support for any interconnect
/// operations like "fork_clone" and "unzip". See [`RegistrationBuilder`] for more information
/// about these operations.
///
/// ```
/// use bevy_impulse::NodeRegistry;
///
/// let mut registry = NodeRegistry::default();
/// registry.registration_builder().register_node("echo", "echo",
/// |builder, _config: ()| builder.create_map_block(|msg: String| msg));
/// ```
///
/// In order for the request to be deserializable, it must implement [`schemars::JsonSchema`] and [`serde::DeserializeOwned`].
/// In order for the response to be serializable, it must implement [`schemars::JsonSchema`] and [`serde::Serialize`].
///
/// ```
/// use schemars::JsonSchema;
/// use serde::{Deserialize, Serialize};
///
/// #[derive(JsonSchema, Deserialize)]
/// struct DeserializableRequest {}
///
/// #[derive(JsonSchema, Serialize)]
/// struct SerializableResponse {}
/// ```
///
/// If your node have a request or response that is not serializable, there is still
/// a way to register it.
///
/// ```
/// use bevy_impulse::NodeRegistry;
///
/// struct NonSerializable {
/// data: String
/// }
///
/// let mut registry = NodeRegistry::default();
/// registry.registration_builder()
/// .with_opaque_request()
/// .with_opaque_response()
/// .register_node("echo", "echo", |builder, _config: ()| {
/// builder.create_map_block(|msg: NonSerializable| msg)
/// });
/// ```
///
/// Note that nodes registered this way cannot be connected to "Start" or "Terminate".
pub fn registration_builder(
&mut self,
) -> RegistrationBuilder<
@@ -526,6 +587,13 @@ impl NodeRegistry {
RegistrationBuilder::new(self)
}

/// Register a node using the default registration config.
///
/// This is a equivalent to
///
/// ```ignore
/// registry.registration_builder().register_node(f)
/// ```
pub fn register_node<Config, Request, Response, Streams: StreamPack>(
&mut self,
id: &'static str,
2 changes: 1 addition & 1 deletion src/diagram/workflow_builder.rs
Original file line number Diff line number Diff line change
@@ -199,7 +199,7 @@ impl<'b> WorkflowBuilder<'b> {
pub(super) fn new<Streams: StreamPack>(
scope: &DynScope<Streams>,
builder: &mut Builder,
registry: &'b mut NodeRegistry,
registry: &'b NodeRegistry,
diagram: &'b Diagram,
) -> Result<Self, DiagramError> {
// nodes and outputs cannot be cloned, but input can be cloned, so we

0 comments on commit 891205a

Please sign in to comment.