Skip to content

Commit

Permalink
Merge branch 'koonpeng/service-registry' into serialized_split
Browse files Browse the repository at this point in the history
Signed-off-by: Teo Koon Peng <[email protected]>
  • Loading branch information
koonpeng committed Dec 4, 2024
2 parents ce8f621 + 1065f58 commit 35f5da8
Show file tree
Hide file tree
Showing 9 changed files with 661 additions and 500 deletions.
40 changes: 39 additions & 1 deletion diagram.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
"type"
],
"properties": {
"config": true,
"config": {
"default": null
},
"next": {
"type": "string"
},
Expand Down Expand Up @@ -101,6 +103,42 @@
]
}
}
},
{
"type": "object",
"required": [
"err",
"ok",
"type"
],
"properties": {
"err": {
"type": "string"
},
"ok": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"forkResult"
]
}
}
},
{
"type": "object",
"required": [
"type"
],
"properties": {
"type": {
"type": "string",
"enum": [
"dispose"
]
}
}
}
]
}
Expand Down
163 changes: 77 additions & 86 deletions src/diagram.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
mod fork_clone;
mod fork_result;
mod impls;
mod node_registry;
mod serialization;
mod split_serialized;
mod unzip;
mod workflow_builder;

use log::debug;
Expand Down Expand Up @@ -72,7 +76,7 @@ pub enum DiagramOperation {
}

type ScopeStart = serde_json::Value;
type ScopeTerminate = Result<serde_json::Value, Box<dyn Error + Send + Sync>>;
type ScopeTerminate = serde_json::Value;
type DynScope<Streams> = Scope<ScopeStart, ScopeTerminate, Streams>;

#[derive(JsonSchema, Serialize, Deserialize)]
Expand Down Expand Up @@ -169,23 +173,16 @@ impl Diagram {
}
}

#[derive(Clone, Debug)]
pub struct TypeMismatch {
output_type: String,
input_type: String,
}

#[derive(Debug)]
pub enum DiagramError {
NodeNotFound(NodeId),
OperationNotFound(OperationId),
TypeMismatch(TypeMismatch),
TypeMismatch,
MissingStartOrTerminate,
CannotConnectStart,
NotSerializable,
NotCloneable,
NotUnzippable,
UnzipToTerminate,
CannotForkResult,
BadInterconnectChain,
JsonError(serde_json::Error),
Expand All @@ -196,11 +193,7 @@ impl Display for DiagramError {
match self {
Self::NodeNotFound(node_id) => write!(f, "node [{}] is not registered", node_id),
Self::OperationNotFound(op_id) => write!(f, "operation [{}] not found", op_id),
Self::TypeMismatch(data) => write!(
f,
"output type [{}] does not match input type [{}]",
data.output_type, data.input_type
),
Self::TypeMismatch => f.write_str("output type does not match input type"),
Self::MissingStartOrTerminate => f.write_str("missing start or terminate"),
Self::CannotConnectStart => f.write_str("cannot connect to start"),
Self::NotSerializable => {
Expand All @@ -210,7 +203,6 @@ impl Display for DiagramError {
Self::NotUnzippable => f.write_str(
"the number of unzip slots in response does not match the number of inputs",
),
Self::UnzipToTerminate => f.write_str("cannot unzip to terminate"),
Self::CannotForkResult => f.write_str(
"node must be registered with \"with_fork_result()\" to be able to perform fork result",
),
Expand Down Expand Up @@ -279,55 +271,54 @@ mod tests {
registry.register_node(
"multiply3",
"multiply3",
(|builder: &mut Builder, _config: ()| builder.create_map_block(multiply3))
.into_registration_builder(),
);
registry.register_node(
"multiply3_cloneable",
"multiply3_cloneable",
(|builder: &mut Builder, _config: ()| builder.create_map_block(multiply3))
.into_registration_builder()
.with_response_cloneable(),
);
registry.register_node(
"multiply3_5",
"multiply3_5",
(|builder: &mut Builder, _config: ()| builder.create_map_block(multiply3_5))
.into_registration_builder()
.with_unzippable(),
|builder: &mut Builder, _config: ()| builder.create_map_block(multiply3),
);
registry
.registration_builder()
.with_response_cloneable()
.register_node(
"multiply3_cloneable",
"multiply3_cloneable",
|builder: &mut Builder, _config: ()| builder.create_map_block(multiply3),
);
registry
.registration_builder()
.with_unzippable()
.register_node(
"multiply3_5",
"multiply3_5",
|builder: &mut Builder, _config: ()| builder.create_map_block(multiply3_5),
);

registry.register_node(
"multiplyBy",
"multiplyBy",
(|builder: &mut Builder, config: i64| {
builder.create_map_block(move |a: i64| a * config)
})
.into_registration_builder(),
|builder: &mut Builder, config: i64| builder.create_map_block(move |a: i64| a * config),
);

registry.register_node(
"opaque",
"opaque",
(|builder: &mut Builder, _config: ()| builder.create_map_block(opaque))
.into_registration_builder()
.with_opaque_request()
.with_opaque_response(),
);
registry.register_node(
"opaque_request",
"opaque_request",
(|builder: &mut Builder, _config: ()| builder.create_map_block(opaque_request))
.into_registration_builder()
.with_opaque_request(),
);
registry.register_node(
"opaque_response",
"opaque_response",
(|builder: &mut Builder, _config: ()| builder.create_map_block(opaque_response))
.into_registration_builder()
.with_opaque_response(),
);
registry
.registration_builder()
.with_opaque_request()
.with_opaque_response()
.register_node("opaque", "opaque", |builder: &mut Builder, _config: ()| {
builder.create_map_block(opaque)
});
registry
.registration_builder()
.with_opaque_request()
.register_node(
"opaque_request",
"opaque_request",
|builder: &mut Builder, _config: ()| builder.create_map_block(opaque_request),
);
registry
.registration_builder()
.with_opaque_response()
.register_node(
"opaque_response",
"opaque_response",
|builder: &mut Builder, _config: ()| builder.create_map_block(opaque_response),
);
registry
}

Expand Down Expand Up @@ -548,7 +539,7 @@ mod tests {
let err = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.unwrap_err();
assert!(matches!(err, DiagramError::TypeMismatch(_)), "{:?}", err);
assert!(matches!(err, DiagramError::TypeMismatch), "{:?}", err);
}

#[test]
Expand Down Expand Up @@ -696,7 +687,7 @@ mod tests {
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
let result = unwrap_promise(promise);
assert_eq!(result.unwrap(), 12);
assert_eq!(result, 12);
}

#[test]
Expand Down Expand Up @@ -804,9 +795,7 @@ mod tests {
}

#[test]
fn test_no_unzip_to_terminate() {
// we cannot unzip to terminate because the serialization registered is for (T1, T2, ...) but when we
// unzip to terminate we need to serialize T1.
fn test_unzip_to_terminate() {
let mut registry = new_registry_with_basic_nodes();
let mut context = TestingContext::minimal_plugins();

Expand All @@ -829,20 +818,24 @@ mod tests {
(
"unzip".to_string(),
DiagramOperation::Unzip(UnzipOp {
next: vec!["terminate".to_string()],
next: vec!["dispose".to_string(), "terminate".to_string()],
}),
),
("dispose".to_string(), DiagramOperation::Dispose),
(
"terminate".to_string(),
DiagramOperation::Terminate(TerminateOp {}),
),
]),
};

let err = diagram
let w = diagram
.spawn_io_workflow(&mut context.app, &mut registry)
.unwrap_err();
assert!(matches!(err, DiagramError::UnzipToTerminate));
.unwrap();
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap(), 20);
}

#[test]
Expand Down Expand Up @@ -893,7 +886,7 @@ mod tests {
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap().unwrap(), 36);
assert_eq!(promise.take().available().unwrap(), 36);
}

#[test]
Expand Down Expand Up @@ -945,7 +938,7 @@ mod tests {
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap().unwrap(), 60);
assert_eq!(promise.take().available().unwrap(), 60);
}

#[test]
Expand All @@ -961,24 +954,22 @@ mod tests {
}
}

registry.register_node(
"check_even",
"check_even",
(|builder: &mut Builder, _config: ()| builder.create_map_block(&check_even))
.into_registration_builder()
.with_fork_result(),
);
registry
.registration_builder()
.with_fork_result()
.register_node(
"check_even",
"check_even",
|builder: &mut Builder, _config: ()| builder.create_map_block(&check_even),
);

fn echo(s: String) -> String {
s
}

registry.register_node(
"echo",
"echo",
(|builder: &mut Builder, _config: ()| builder.create_map_block(&echo))
.into_registration_builder(),
);
registry.register_node("echo", "echo", |builder: &mut Builder, _config: ()| {
builder.create_map_block(&echo)
});

let diagram = Diagram {
ops: HashMap::from([
Expand Down Expand Up @@ -1032,12 +1023,12 @@ mod tests {
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap().unwrap(), "even");
assert_eq!(promise.take().available().unwrap(), "even");

let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(3), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap().unwrap(), "odd");
assert_eq!(promise.take().available().unwrap(), "odd");
}

#[test]
Expand Down Expand Up @@ -1088,7 +1079,7 @@ mod tests {
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap().unwrap(), 36);
assert_eq!(promise.take().available().unwrap(), 36);
}

#[test]
Expand Down Expand Up @@ -1117,7 +1108,7 @@ mod tests {
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap().unwrap(), 4);
assert_eq!(promise.take().available().unwrap(), 4);
}

#[test]
Expand Down Expand Up @@ -1150,6 +1141,6 @@ mod tests {
let mut promise =
context.command(|cmds| cmds.request(serde_json::Value::from(4), w).take_response());
context.run_while_pending(&mut promise);
assert_eq!(promise.take().available().unwrap().unwrap(), 28);
assert_eq!(promise.take().available().unwrap(), 28);
}
}
Loading

0 comments on commit 35f5da8

Please sign in to comment.