From 06a7a8279d1ac2219dd6783afc43b17cc8186ab6 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Wed, 8 Jan 2025 16:52:34 +0000 Subject: [PATCH] Introduce CommonOperations vs RegistrationBuilder Signed-off-by: Michael X. Grey --- src/diagram/fork_result.rs | 5 +- src/diagram/join.rs | 10 +- src/diagram/node_registry.rs | 516 ++++++++++++++++---------------- src/diagram/serialization.rs | 23 +- src/diagram/split_serialized.rs | 28 +- src/diagram/testing.rs | 36 +-- src/diagram/transform.rs | 1 + src/diagram/unzip.rs | 8 +- src/diagram/workflow_builder.rs | 8 +- 9 files changed, 318 insertions(+), 317 deletions(-) diff --git a/src/diagram/fork_result.rs b/src/diagram/fork_result.rs index ff4715d..2a37ea9 100644 --- a/src/diagram/fork_result.rs +++ b/src/diagram/fork_result.rs @@ -77,13 +77,12 @@ mod tests { fixture .registry - .registration_builder() - .with_fork_result() .register_node_builder( "check_even".to_string(), "check_even".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(&check_even), - ); + ) + .with_fork_result(); fn echo(s: String) -> String { s diff --git a/src/diagram/join.rs b/src/diagram/join.rs index 9293f5d..9efd6c6 100644 --- a/src/diagram/join.rs +++ b/src/diagram/join.rs @@ -8,7 +8,7 @@ use tracing::debug; use crate::{Builder, IterBufferable, Output}; use super::{ - DiagramError, DynOutput, NextOperation, NodeRegistry, SerializeMessage, SourceOperation, + DiagramError, DynOutput, NextOperation, DataRegistry, SerializeMessage, SourceOperation, }; #[derive(Debug, Serialize, Deserialize, JsonSchema)] @@ -25,7 +25,7 @@ pub struct JoinOp { pub(super) no_serialize: Option, } -pub(super) fn register_join_impl(registry: &mut NodeRegistry) +pub(super) fn register_join_impl(registry: &mut DataRegistry) where T: Send + Sync + 'static, Serializer: SerializeMessage>, @@ -43,7 +43,7 @@ where /// [`serde_json::Value`]. pub(super) fn serialize_and_join( builder: &mut Builder, - registry: &NodeRegistry, + registry: &DataRegistry, outputs: Vec, ) -> Result, DiagramError> { debug!("serialize and join outputs {:?}", outputs); @@ -138,8 +138,8 @@ mod tests { fixture .registry - .registration_builder() - .with_opaque_request() + .opt_out() + .no_request_deserializing() .register_node_builder( "serialize_join_output".to_string(), "serialize_join_output".to_string(), diff --git a/src/diagram/node_registry.rs b/src/diagram/node_registry.rs index 24a7589..6db71ba 100644 --- a/src/diagram/node_registry.rs +++ b/src/diagram/node_registry.rs @@ -146,11 +146,9 @@ pub struct NodeRegistration { pub(super) metadata: NodeMetadata, /// Creates an instance of the registered node. - create_node_impl: - RefCell Result>>, + create_node_impl: CreateNodeFn, - fork_clone_impl: - Option Result, DiagramError>>>, + fork_clone_impl: Option, unzip_impl: Option Result, DiagramError>>>, @@ -171,6 +169,21 @@ pub struct NodeRegistration { } impl NodeRegistration { + fn new( + metadata: NodeMetadata, + create_node_impl: CreateNodeFn, + fork_clone_impl: Option, + ) -> NodeRegistration { + NodeRegistration { + metadata, + create_node_impl, + fork_clone_impl, + unzip_impl: None, + fork_result_impl: None, + split_impl: None, + } + } + pub(super) fn create_node( &self, builder: &mut Builder, @@ -235,232 +248,196 @@ impl NodeRegistration { } } -pub struct RegistrationBuilder< - 'a, - DeserializeImpl, - SerializeImpl, - ForkCloneImpl, - UnzipImpl, - ForkResultImpl, - SplitImpl, -> { +type CreateNodeFn = RefCell Result>>; +type ForkCloneFn = Box Result, DiagramError>>; + +pub struct CommonOperations<'a, Deserialize, Serialize, Cloneable> { registry: &'a mut NodeRegistry, - _unused: PhantomData<( - DeserializeImpl, - SerializeImpl, - ForkCloneImpl, - UnzipImpl, - ForkResultImpl, - SplitImpl, - )>, + _ignore: PhantomData<(Deserialize, Serialize, Cloneable)>, } -impl<'a, DeserializeImpl, SerializeImpl, ForkCloneImpl, UnzipImpl, ForkResultImpl, SplitImpl> - RegistrationBuilder< - 'a, - DeserializeImpl, - SerializeImpl, - ForkCloneImpl, - UnzipImpl, - ForkResultImpl, - SplitImpl, - > -{ - pub fn new(registry: &'a mut NodeRegistry) -> Self { - Self { - registry, - _unused: Default::default(), - } - } - - /// Register a node builder using the default registration config. +impl<'a, DeserializeImpl, SerializeImpl, Cloneable> CommonOperations<'a, DeserializeImpl, SerializeImpl, Cloneable> { + /// Register a node builder with the specified common operations. /// /// # Arguments /// /// * `id` - Id of the builder, this must be unique. /// * `name` - Friendly name for the builder, this is only used for display purposes. /// * `f` - The node builder to register. - pub fn register_node_builder( - &mut self, + pub fn register_node_builder( + self, id: BuilderId, name: String, mut f: impl FnMut(&mut Builder, Config) -> Node + 'static, - ) where + ) -> RegistrationBuilder<'a, Request, Response, Streams> + where Config: JsonSchema + DeserializeOwned, Request: Send + Sync + 'static, Response: Send + Sync + 'static, + Streams: StreamPack, DeserializeImpl: DeserializeMessage, SerializeImpl: SerializeMessage, - ForkCloneImpl: DynForkClone, - UnzipImpl: DynUnzip, - ForkResultImpl: DynForkResult, - SplitImpl: DynSplit, + Cloneable: DynForkClone, { - Config::json_schema(&mut self.registry.gen); - - let request = RequestMetadata { - schema: DeserializeImpl::json_schema(&mut self.registry.gen) - .unwrap_or_else(|| self.registry.gen.subschema_for::<()>()), - deserializable: DeserializeImpl::deserializable(), - }; - let response = ResponseMetadata { - schema: SerializeImpl::json_schema(&mut self.registry.gen) - .unwrap_or_else(|| self.registry.gen.subschema_for::<()>()), - serializable: SerializeImpl::serializable(), - cloneable: ForkCloneImpl::CLONEABLE, - unzip_slots: UnzipImpl::UNZIP_SLOTS, - fork_result: ForkResultImpl::SUPPORTED, - splittable: SplitImpl::SUPPORTED, - }; + register_deserialize::(&mut self.registry.data); + register_serialize::(&mut self.registry.data); - let reg = NodeRegistration { - metadata: NodeMetadata { + let registration = NodeRegistration::new( + NodeMetadata { id: id.clone(), name, - request, - response, - config_schema: self.registry.gen.subschema_for::(), + request: RequestMetadata { + schema: DeserializeImpl::json_schema(&mut self.registry.data.schema_generator) + .unwrap_or_else(|| self.registry.data.schema_generator.subschema_for::<()>()), + deserializable: DeserializeImpl::deserializable() + }, + response: ResponseMetadata::new( + SerializeImpl::json_schema(&mut self.registry.data.schema_generator) + .unwrap_or_else(|| self.registry.data.schema_generator.subschema_for::<()>()), + SerializeImpl::serializable(), + Cloneable::CLONEABLE, + ), + config_schema: self.registry.data.schema_generator.subschema_for::(), }, - create_node_impl: RefCell::new(Box::new(move |builder, config| { + RefCell::new(Box::new(move |builder, config| { let config = serde_json::from_value(config)?; let n = f(builder, config); Ok(DynNode::new(n.output, n.input)) })), - fork_clone_impl: if ForkCloneImpl::CLONEABLE { + if Cloneable::CLONEABLE { Some(Box::new(|builder, output, amount| { - ForkCloneImpl::dyn_fork_clone(builder, output, amount) + Cloneable::dyn_fork_clone(builder, output, amount) })) } else { None }, - unzip_impl: if UnzipImpl::UNZIP_SLOTS > 0 { - Some(Box::new(|builder, output| { - UnzipImpl::dyn_unzip(builder, output) - })) - } else { - None - }, - fork_result_impl: if ForkResultImpl::SUPPORTED { - Some(Box::new(|builder, output| { - ForkResultImpl::dyn_fork_result(builder, output) - })) - } else { - None - }, - split_impl: if SplitImpl::SUPPORTED { - Some(Box::new(|builder, output, split_op| { - SplitImpl::dyn_split(builder, output, split_op) - })) - } else { - None - }, - }; + ); + self.registry.nodes.insert(id.clone(), registration); + + // SAFETY: We inserted an entry at this ID a moment ago + let node = self.registry.nodes.get_mut(&id).unwrap(); - self.registry.nodes.insert(id, reg); + RegistrationBuilder:: { + node, + data: &mut self.registry.data, + _ignore: Default::default(), + } + } - register_deserialize::(self.registry); - register_serialize::(self.registry); + /// Opt out of deserializing the request of the node. Use this to build a + /// node whose request type is not deserializable. + pub fn no_request_deserializing(self) -> CommonOperations<'a, OpaqueMessageDeserializer, SerializeImpl, Cloneable> { + CommonOperations { + registry: self.registry, + _ignore: Default::default(), + } + } - UnzipImpl::on_register(&mut self.registry); - SplitImpl::on_register(&mut self.registry); + /// Opt out of serializing the response of the node. Use this to build a + /// node whose response type is not serializable. + pub fn no_response_serializing(self) -> CommonOperations<'a, DeserializeImpl, OpaqueMessageSerializer, Cloneable> { + CommonOperations { + registry: self.registry, + _ignore: Default::default(), + } } - /// Mark the node as having a non deserializable request. This allows nodes with - /// non deserializable requests to be registered but any nodes registered this way will not - /// be able to be connected to "Start" or any operation that requires deserialization. - pub fn with_opaque_request( - self, - ) -> RegistrationBuilder< - 'a, - OpaqueMessageDeserializer, - SerializeImpl, - ForkCloneImpl, - UnzipImpl, - ForkResultImpl, - SplitImpl, - > { - RegistrationBuilder::new(self.registry) - } - - /// Mark the node as having a non serializable response. This allows nodes with - /// non serializable responses to be registered but any nodes registered this way will not - /// be able to be connected to "Terminate" or any operation that requires serialization. - pub fn with_opaque_response( - self, - ) -> RegistrationBuilder< - 'a, - DeserializeImpl, - OpaqueMessageSerializer, - ForkCloneImpl, - UnzipImpl, - ForkResultImpl, - SplitImpl, - > { - RegistrationBuilder::new(self.registry) - } - - /// Mark the node as having a cloneable response. This is required in order for the node - /// to be able to be connected to a "Fork Clone" operation. - pub fn with_response_cloneable( - self, - ) -> RegistrationBuilder< - 'a, - DeserializeImpl, - SerializeImpl, - DefaultImpl, - UnzipImpl, - ForkResultImpl, - SplitImpl, - > { - RegistrationBuilder::new(self.registry) + /// Opt out of cloning the response of the node. Use this to build a node + /// whose response type is not cloneable. + pub fn no_response_cloning(self) -> CommonOperations<'a, DeserializeImpl, SerializeImpl, NotSupported> { + CommonOperations { + registry: self.registry, + _ignore: Default::default(), + } } +} + +pub struct RegistrationBuilder<'a, Request, Response, Streams> { + node: &'a mut NodeRegistration, + data: &'a mut DataRegistry, + _ignore: PhantomData<(Request, Response, Streams)>, +} +impl<'a, Request, Response, Streams> RegistrationBuilder<'a, Request, Response, Streams> { /// Mark the node as having a unzippable response. This is required in order for the node /// to be able to be connected to a "Unzip" operation. - pub fn with_unzippable( - self, - ) -> RegistrationBuilder< - 'a, - DeserializeImpl, - SerializeImpl, - ForkCloneImpl, - DefaultImpl, - ForkResultImpl, - SplitImpl, - > { - RegistrationBuilder::new(self.registry) + pub fn with_unzip(self) -> Self + where + DefaultImpl: DynUnzip, + { + self.with_unzip_impl::() + } + + /// Mark the node as having an unzippable response whose elements are not serializable. + pub fn with_unzip_unserializable(self) -> Self + where + DefaultImpl: DynUnzip, + { + self.with_unzip_impl::() + } + + fn with_unzip_impl(self) -> Self + where + UnzipImpl: DynUnzip, + { + self.node.metadata.response.unzip_slots = UnzipImpl::UNZIP_SLOTS; + self.node.unzip_impl = if UnzipImpl::UNZIP_SLOTS > 0 { + Some(Box::new(|builder, output| { + UnzipImpl::dyn_unzip(builder, output) + })) + } else { + None + }; + + UnzipImpl::on_register(self.data); + + self } /// Mark the node as having a [`Result<_, _>`] response. This is required in order for the node /// to be able to be connected to a "Fork Result" operation. - pub fn with_fork_result( - self, - ) -> RegistrationBuilder< - 'a, - DeserializeImpl, - SerializeImpl, - ForkCloneImpl, - UnzipImpl, - DefaultImpl, - SplitImpl, - > { - RegistrationBuilder::new(self.registry) - } - - /// Mark the node as having a splittable response. This is required in order for the node - /// to be able to be connected to a "Split" operation. - pub fn with_splittable( - self, - ) -> RegistrationBuilder< - 'a, - DeserializeImpl, - SerializeImpl, - ForkCloneImpl, - UnzipImpl, - ForkResultImpl, - DefaultImpl, - > { - RegistrationBuilder::new(self.registry) + pub fn with_fork_result(self) -> Self + where + DefaultImpl: DynForkResult, + { + self.node.metadata.response.fork_result = true; + self.node.fork_result_impl = Some(Box::new(|builder, output| { + >::dyn_fork_result(builder, output) + })); + + self + } + + /// Mark the node as having a splittable response. This is required in order + /// for the node to be able to be connected to a "Split" operation. + pub fn with_split(self) -> Self + where + DefaultImpl: DynSplit, + { + self.with_split_impl::() + } + + /// Mark the node as having a splittable response but the items from the split + /// are unserializable. + pub fn with_split_unserializable(self) -> Self + where + DefaultImpl: DynSplit, + { + self.with_split_impl::() + } + + pub fn with_split_impl(self) -> Self + where + SplitImpl: DynSplit, + { + self.node.metadata.response.splittable = true; + self.node.split_impl = Some(Box::new(|builder, output, split_op| { + SplitImpl::dyn_split(builder, output, split_op) + })); + + SplitImpl::on_register(self.data); + + self } } @@ -469,17 +446,20 @@ pub trait IntoNodeRegistration { self, id: BuilderId, name: String, - gen: &mut SchemaGenerator, + schema_generator: &mut SchemaGenerator, ) -> NodeRegistration; } pub struct NodeRegistry { nodes: HashMap, + pub(super) data: DataRegistry, +} +pub struct DataRegistry { /// List of all request and response types used in all registered nodes, this only /// contains serializable types, non serializable types are opaque and is only compatible /// with itself. - gen: SchemaGenerator, + schema_generator: SchemaGenerator, pub(super) deserialize_impls: HashMap< TypeId, @@ -503,10 +483,12 @@ impl Default for NodeRegistry { settings.definitions_path = "#/types/".to_string(); NodeRegistry { nodes: Default::default(), - gen: SchemaGenerator::new(settings), - deserialize_impls: HashMap::new(), - serialize_impls: HashMap::new(), - join_impls: HashMap::new(), + data: DataRegistry{ + schema_generator: SchemaGenerator::new(settings), + deserialize_impls: HashMap::new(), + serialize_impls: HashMap::new(), + join_impls: HashMap::new(), + } } } } @@ -516,19 +498,48 @@ impl NodeRegistry { Self::default() } - /// Create a new [`RegistrationBuilder`]. By default, it is configured for nodes with - /// deserializable request and serializable responses and without support for any interconnect - /// operations like "fork_clone" and "unzip". See [`RegistrationBuilder`] for more information - /// about these operations. + /// Register a node builder with all the common operations (deserialize the + /// request, serialize the response, and clone the response) enabled. + /// + /// You will receive a [`RegistrationBuilder`] which you can then use to + /// enable more operations around your node, such as fork result, split, + /// or unzip. The data types of your node need to be suitable for those + /// operations or else the compiler will not allow you to enable them. /// /// ``` /// use bevy_impulse::NodeRegistry; /// /// let mut registry = NodeRegistry::default(); - /// registry.registration_builder().register_node_builder("echo".to_string(), "echo".to_string(), + /// registry.register_node_builder("echo".to_string(), "echo".to_string(), /// |builder, _config: ()| builder.create_map_block(|msg: String| msg)); /// ``` /// + /// # Arguments + /// + /// * `id` - Id of the builder, this must be unique. + /// * `name` - Friendly name for the builder, this is only used for display purposes. + /// * `f` - The node builder to register. + pub fn register_node_builder( + &mut self, + id: BuilderId, + name: String, + builder: impl FnMut(&mut Builder, Config) -> Node + 'static, + ) -> RegistrationBuilder + where + Config: JsonSchema + DeserializeOwned, + Request: Send + Sync + 'static + DynType + DeserializeOwned, + Response: Send + Sync + 'static + DynType + Serialize + Clone, + { + self.opt_out().register_node_builder(id, name, builder) + } + + /// In some cases the common operations of deserialization, serialization, + /// and cloning cannot be performed for the request or response of a node. + /// When that happens you can still register your node builder by calling + /// this function and explicitly disabling the common operations that your + /// node cannot support. + /// + /// /// In order for the request to be deserializable, it must implement [`schemars::JsonSchema`] and [`serde::de::DeserializeOwned`]. /// In order for the response to be serializable, it must implement [`schemars::JsonSchema`] and [`serde::Serialize`]. /// @@ -554,53 +565,24 @@ impl NodeRegistry { /// } /// /// let mut registry = NodeRegistry::default(); - /// registry.registration_builder() - /// .with_opaque_request() - /// .with_opaque_response() + /// registry + /// .opt_out() + /// .no_request_deserializing() + /// .no_response_serializing() + /// .no_response_cloning() /// .register_node_builder("echo".to_string(), "echo".to_string(), |builder, _config: ()| { /// builder.create_map_block(|msg: NonSerializable| msg) /// }); /// ``` /// - /// Note that nodes registered this way cannot be connected to "Start" or "Terminate". - pub fn registration_builder( - &mut self, - ) -> RegistrationBuilder< - DefaultDeserializer, - DefaultSerializer, - NotSupported, - NotSupported, - NotSupported, - NotSupported, - > { - RegistrationBuilder::new(self) - } - - /// Register a node builder using the default registration config. - /// - /// This is a equivalent to - /// - /// ```text - /// registry.registration_builder().register_node_builder(f) - /// ``` - /// - /// # Arguments - /// - /// * `id` - Id of the builder, this must be unique. - /// * `name` - Friendly name for the builder, this is only used for display purposes. - /// * `f` - The node builder to register. - pub fn register_node_builder( - &mut self, - id: BuilderId, - name: String, - builder: impl FnMut(&mut Builder, Config) -> Node + 'static, - ) where - Config: JsonSchema + DeserializeOwned, - Request: Send + Sync + 'static + DynType + DeserializeOwned, - Response: Send + Sync + 'static + DynType + Serialize, - { - self.registration_builder() - .register_node_builder(id, name, builder) + /// Note that nodes registered without deserialization cannot be connected + /// to the workflow start, and nodes registered without serialization cannot + /// be connected to the workflow termination. + pub fn opt_out(&mut self) -> CommonOperations { + CommonOperations { + registry: self, + _ignore: Default::default(), + } } pub(super) fn get_registration(&self, id: &Q) -> Result<&NodeRegistration, DiagramError> @@ -646,7 +628,7 @@ impl Serialize for NodeRegistry { &SerializeWith { value: self } }, )?; - s.serialize_field("types", self.gen.definitions())?; + s.serialize_field("types", self.data.schema_generator.definitions())?; s.end() } } @@ -665,11 +647,14 @@ mod tests { #[test] fn test_register_node_builder() { let mut registry = NodeRegistry::default(); - registry.register_node_builder( - "multiply3_uncloneable".to_string(), - "Test Name".to_string(), - |builder, _config: ()| builder.create_map_block(multiply3), - ); + registry + .opt_out() + .no_response_cloning() + .register_node_builder( + "multiply3_uncloneable".to_string(), + "Test Name".to_string(), + |builder, _config: ()| builder.create_map_block(multiply3), + ); let registration = registry.get_registration("multiply3_uncloneable").unwrap(); assert!(registration.metadata.request.deserializable); assert!(registration.metadata.response.serializable); @@ -681,14 +666,12 @@ mod tests { fn test_register_cloneable_node() { let mut registry = NodeRegistry::default(); registry - .registration_builder() - .with_response_cloneable() .register_node_builder( - "multiply3_uncloneable".to_string(), + "multiply3".to_string(), "Test Name".to_string(), |builder, _config: ()| builder.create_map_block(multiply3), ); - let registration = registry.get_registration("multiply3_uncloneable").unwrap(); + let registration = registry.get_registration("multiply3").unwrap(); assert!(registration.metadata.request.deserializable); assert!(registration.metadata.response.serializable); assert!(registration.metadata.response.cloneable); @@ -700,13 +683,14 @@ mod tests { let mut registry = NodeRegistry::default(); let tuple_resp = |_: ()| -> (i64,) { (1,) }; registry - .registration_builder() - .with_unzippable() + .opt_out() + .no_response_cloning() .register_node_builder( "multiply3_uncloneable".to_string(), "Test Name".to_string(), move |builder: &mut Builder, _config: ()| builder.create_map_block(tuple_resp), - ); + ) + .with_unzip(); let registration = registry.get_registration("multiply3_uncloneable").unwrap(); assert!(registration.metadata.request.deserializable); assert!(registration.metadata.response.serializable); @@ -719,25 +703,24 @@ mod tests { let mut registry = NodeRegistry::default(); let vec_resp = |_: ()| -> Vec { vec![1, 2] }; registry - .registration_builder() - .with_splittable() .register_node_builder( "vec_resp".to_string(), "Test Name".to_string(), move |builder: &mut Builder, _config: ()| builder.create_map_block(vec_resp), - ); + ) + .with_split(); let registration = registry.get_registration("vec_resp").unwrap(); assert!(registration.metadata.response.splittable); let map_resp = |_: ()| -> HashMap { HashMap::new() }; registry - .registration_builder() - .with_splittable() .register_node_builder( "map_resp".to_string(), "Test Name".to_string(), move |builder: &mut Builder, _config: ()| builder.create_map_block(map_resp), - ); + ) + .with_split(); + let registration = registry.get_registration("map_resp").unwrap(); assert!(registration.metadata.response.splittable); @@ -777,8 +760,9 @@ mod tests { let mut registry = NodeRegistry::default(); registry - .registration_builder() - .with_opaque_request() + .opt_out() + .no_request_deserializing() + .no_response_cloning() .register_node_builder( "opaque_request_map".to_string(), "Test Name".to_string(), @@ -793,8 +777,9 @@ mod tests { let opaque_response_map = |_: ()| NonSerializableRequest {}; registry - .registration_builder() - .with_opaque_response() + .opt_out() + .no_response_serializing() + .no_response_cloning() .register_node_builder( "opaque_response_map".to_string(), "Test Name".to_string(), @@ -811,9 +796,10 @@ mod tests { let opaque_req_resp_map = |_: NonSerializableRequest| NonSerializableRequest {}; registry - .registration_builder() - .with_opaque_request() - .with_opaque_response() + .opt_out() + .no_request_deserializing() + .no_response_serializing() + .no_response_cloning() .register_node_builder( "opaque_req_resp_map".to_string(), "Test Name".to_string(), diff --git a/src/diagram/serialization.rs b/src/diagram/serialization.rs index 4bd0567..995ebea 100644 --- a/src/diagram/serialization.rs +++ b/src/diagram/serialization.rs @@ -4,7 +4,7 @@ use schemars::{gen::SchemaGenerator, schema::Schema, JsonSchema}; use serde::{de::DeserializeOwned, Serialize}; use tracing::debug; -use super::NodeRegistry; +use super::DataRegistry; #[derive(thiserror::Error, Debug)] pub enum SerializationError { @@ -67,6 +67,23 @@ pub struct ResponseMetadata { pub(super) splittable: bool, } +impl ResponseMetadata { + pub(super) fn new( + schema: Schema, + serializable: bool, + cloneable: bool, + ) -> ResponseMetadata { + ResponseMetadata { + schema, + serializable, + cloneable, + unzip_slots: 0, + fork_result: false, + splittable: false + } + } +} + pub trait SerializeMessage { fn type_name() -> String; @@ -177,7 +194,7 @@ impl DeserializeMessage for OpaqueMessageDeserializer { } } -pub(super) fn register_deserialize(registry: &mut NodeRegistry) +pub(super) fn register_deserialize(registry: &mut DataRegistry) where Deserializer: DeserializeMessage, T: Send + Sync + 'static, @@ -212,7 +229,7 @@ where ); } -pub(super) fn register_serialize(registry: &mut NodeRegistry) +pub(super) fn register_serialize(registry: &mut DataRegistry) where Serializer: SerializeMessage, T: Send + Sync + 'static, diff --git a/src/diagram/split_serialized.rs b/src/diagram/split_serialized.rs index c535465..cd548f9 100644 --- a/src/diagram/split_serialized.rs +++ b/src/diagram/split_serialized.rs @@ -30,7 +30,7 @@ use crate::{ use super::{ impls::{DefaultImpl, NotSupported}, join::register_join_impl, - register_serialize, DiagramError, DynOutput, NextOperation, NodeRegistry, SerializeMessage, + register_serialize, DiagramError, DynOutput, NextOperation, DataRegistry, SerializeMessage, }; #[derive(Debug, Serialize, Deserialize, JsonSchema)] @@ -214,7 +214,7 @@ pub trait DynSplit { split_op: &'a SplitOp, ) -> Result, DiagramError>; - fn on_register(registry: &mut NodeRegistry); + fn on_register(registry: &mut DataRegistry); } impl DynSplit for NotSupported { @@ -228,7 +228,7 @@ impl DynSplit for NotSupported { Err(DiagramError::NotSplittable) } - fn on_register(_registry: &mut NodeRegistry) {} + fn on_register(_registry: &mut DataRegistry) {} } impl DynSplit for DefaultImpl @@ -248,7 +248,7 @@ where split_chain(chain, split_op) } - fn on_register(registry: &mut NodeRegistry) { + fn on_register(registry: &mut DataRegistry) { register_serialize::(registry); register_join_impl::(registry); } @@ -382,13 +382,12 @@ mod tests { fixture .registry - .registration_builder() - .with_splittable() .register_node_builder( "split_list".to_string(), "split_list".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(&split_list), - ); + ) + .with_split(); let diagram = Diagram::from_json(json!({ "version": "0.1.0", @@ -423,13 +422,12 @@ mod tests { fixture .registry - .registration_builder() - .with_splittable() .register_node_builder( "split_list".to_string(), "split_list".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(&split_list), - ); + ) + .with_split(); let diagram = Diagram::from_json(json!({ "version": "0.1.0", @@ -468,13 +466,12 @@ mod tests { fixture .registry - .registration_builder() - .with_splittable() .register_node_builder( "split_map".to_string(), "split_map".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(&split_map), - ); + ) + .with_split(); let diagram = Diagram::from_json(json!({ "version": "0.1.0", @@ -509,13 +506,12 @@ mod tests { fixture .registry - .registration_builder() - .with_splittable() .register_node_builder( "split_list".to_string(), "split_list".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(&split_list), - ); + ) + .with_split(); let diagram = Diagram::from_json(json!({ "version": "0.1.0", diff --git a/src/diagram/testing.rs b/src/diagram/testing.rs index 879af19..5acd3be 100644 --- a/src/diagram/testing.rs +++ b/src/diagram/testing.rs @@ -84,27 +84,27 @@ fn opaque_response(_: i64) -> Unserializable { /// create a new node registry with some basic nodes registered fn new_registry_with_basic_nodes() -> NodeRegistry { let mut registry = NodeRegistry::default(); - registry.register_node_builder( - "multiply3_uncloneable".to_string(), - "multiply3_uncloneable".to_string(), - |builder: &mut Builder, _config: ()| builder.create_map_block(multiply3), - ); registry - .registration_builder() - .with_response_cloneable() + .opt_out() + .no_response_cloning() + .register_node_builder( + "multiply3_uncloneable".to_string(), + "multiply3_uncloneable".to_string(), + |builder: &mut Builder, _config: ()| builder.create_map_block(multiply3), + ); + registry .register_node_builder( "multiply3".to_string(), "multiply3".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(multiply3), ); registry - .registration_builder() - .with_unzippable() .register_node_builder( "multiply3_5".to_string(), "multiply3_5".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(multiply3_5), - ); + ) + .with_unzip(); registry.register_node_builder( "multiplyBy".to_string(), @@ -113,25 +113,27 @@ fn new_registry_with_basic_nodes() -> NodeRegistry { ); registry - .registration_builder() - .with_opaque_request() - .with_opaque_response() + .opt_out() + .no_request_deserializing() + .no_response_serializing() + .no_response_cloning() .register_node_builder( "opaque".to_string(), "opaque".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(opaque), ); registry - .registration_builder() - .with_opaque_request() + .opt_out() + .no_request_deserializing() .register_node_builder( "opaque_request".to_string(), "opaque_request".to_string(), |builder: &mut Builder, _config: ()| builder.create_map_block(opaque_request), ); registry - .registration_builder() - .with_opaque_response() + .opt_out() + .no_response_serializing() + .no_response_cloning() .register_node_builder( "opaque_response".to_string(), "opaque_response".to_string(), diff --git a/src/diagram/transform.rs b/src/diagram/transform.rs index 36547f0..9ad17a6 100644 --- a/src/diagram/transform.rs +++ b/src/diagram/transform.rs @@ -41,6 +41,7 @@ pub(super) fn transform_output( output.into_output() } else { let serialize = registry + .data .serialize_impls .get(&output.type_id) .ok_or(DiagramError::NotSerializable)?; diff --git a/src/diagram/unzip.rs b/src/diagram/unzip.rs index 5a0eb8e..8a51f18 100644 --- a/src/diagram/unzip.rs +++ b/src/diagram/unzip.rs @@ -9,7 +9,7 @@ use super::{ impls::{DefaultImpl, NotSupported}, join::register_join_impl, register_serialize as register_serialize_impl, DiagramError, DynOutput, NextOperation, - NodeRegistry, SerializeMessage, + DataRegistry, SerializeMessage, }; #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] @@ -24,7 +24,7 @@ pub trait DynUnzip { fn dyn_unzip(builder: &mut Builder, output: DynOutput) -> Result, DiagramError>; /// Called when a node is registered. - fn on_register(registry: &mut NodeRegistry); + fn on_register(registry: &mut DataRegistry); } impl DynUnzip for NotSupported { @@ -37,7 +37,7 @@ impl DynUnzip for NotSupported { Err(DiagramError::NotUnzippable) } - fn on_register(_registry: &mut NodeRegistry) {} + fn on_register(_registry: &mut DataRegistry) {} } macro_rules! dyn_unzip_impl { @@ -66,7 +66,7 @@ macro_rules! dyn_unzip_impl { Ok(outputs) } - fn on_register(registry: &mut NodeRegistry) + fn on_register(registry: &mut DataRegistry) { // Register serialize functions for all items in the tuple. // For a tuple of (T1, T2, T3), registers serialize for T1, T2 and T3. diff --git a/src/diagram/workflow_builder.rs b/src/diagram/workflow_builder.rs index dd89960..e94a2d8 100644 --- a/src/diagram/workflow_builder.rs +++ b/src/diagram/workflow_builder.rs @@ -284,10 +284,10 @@ fn connect_vertex<'a>( } let joined_output = if join_op.no_serialize.unwrap_or(false) { - let join_impl = ®istry.join_impls[&ordered_outputs[0].type_id]; + let join_impl = ®istry.data.join_impls[&ordered_outputs[0].type_id]; join_impl(builder, ordered_outputs)? } else { - serialize_and_join(builder, registry, ordered_outputs)?.into() + serialize_and_join(builder, ®istry.data, ordered_outputs)?.into() }; let out_edge = edges.get_mut(&target.out_edges[0]).unwrap(); @@ -509,7 +509,7 @@ fn deserialize( DiagramOperation::Node(node_op) => { let reg = registry.get_registration(&node_op.builder)?; if reg.metadata.request.deserializable { - let deserialize_impl = ®istry.deserialize_impls[&input_type]; + let deserialize_impl = ®istry.data.deserialize_impls[&input_type]; deserialize_impl(builder, serialized) } else { Err(DiagramError::NotSerializable) @@ -538,7 +538,7 @@ fn serialize( let reg = registry.get_registration(&origin.builder)?; if reg.metadata.response.serializable { - let serialize_impl = ®istry.serialize_impls[&output.type_id]; + let serialize_impl = ®istry.data.serialize_impls[&output.type_id]; serialize_impl(builder, output) } else { Err(DiagramError::NotSerializable)