Skip to content

Commit

Permalink
support dual key+seq split
Browse files Browse the repository at this point in the history
Signed-off-by: Teo Koon Peng <[email protected]>
  • Loading branch information
koonpeng committed Jan 9, 2025
1 parent e3ff585 commit 046f81f
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 76 deletions.
46 changes: 14 additions & 32 deletions diagram.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,42 +150,17 @@
{
"description": "If the request is a list-like or map-like object, split it into multiple responses. Note that the split output is a tuple of `(KeyOrIndex, Value)`, nodes receiving a split output should have request of that type instead of just the value type.\n\n# Examples ``` # bevy_impulse::Diagram::from_json_str(r#\" { \"version\": \"0.1.0\", \"start\": \"split\", \"ops\": { \"split\": { \"type\": \"split\", \"index\": [{ \"builtin\": \"terminate\" }] } } } # \"#)?; # Ok::<_, serde_json::Error>(()) ```",
"type": "object",
"oneOf": [
{
"type": "object",
"required": [
"index"
],
"properties": {
"index": {
"type": "array",
"items": {
"$ref": "#/definitions/NextOperation"
}
}
},
"additionalProperties": false
},
{
"type": "object",
"required": [
"key"
],
"properties": {
"key": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/NextOperation"
}
}
},
"additionalProperties": false
}
],
"required": [
"type"
],
"properties": {
"keyed": {
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/NextOperation"
}
},
"remaining": {
"anyOf": [
{
Expand All @@ -196,6 +171,13 @@
}
]
},
"sequential": {
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/NextOperation"
}
},
"type": {
"type": "string",
"enum": [
Expand Down
8 changes: 4 additions & 4 deletions src/diagram/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ mod tests {
"ops": {
"split": {
"type": "split",
"index": ["get_split_value1", "get_split_value2"]
"sequential": ["get_split_value1", "get_split_value2"]
},
"get_split_value1": {
"type": "node",
Expand Down Expand Up @@ -226,9 +226,9 @@ mod tests {
"ops": {
"split": {
"type": "split",
"index": ["getSplitValue1", "getSplitValue2"]
"sequential": ["get_split_value1", "get_split_value2"]
},
"getSplitValue1": {
"get_split_value1": {
"type": "node",
"builder": "get_split_value",
"next": "op1",
Expand All @@ -238,7 +238,7 @@ mod tests {
"builder": "multiply3_uncloneable",
"next": { "builtin": "terminate" },
},
"getSplitValue2": {
"get_split_value2": {
"type": "node",
"builder": "get_split_value",
"next": "op2",
Expand Down
113 changes: 78 additions & 35 deletions src/diagram/split_serialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,13 @@ use super::{
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct SplitOp {
#[serde(flatten)]
pub(super) params: SplitOpParams,
#[serde(default)]
pub(super) sequential: Vec<NextOperation>,

pub(super) remaining: Option<NextOperation>,
}
#[serde(default)]
pub(super) keyed: HashMap<String, NextOperation>,

#[derive(Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum SplitOpParams {
Index(Vec<NextOperation>),
Key(HashMap<String, NextOperation>),
pub(super) remaining: Option<NextOperation>,
}

impl Splittable for Value {
Expand Down Expand Up @@ -180,28 +176,33 @@ where
TypeId::of::<T>(),
split_op
);

enum SeqOrKey<'inner> {
Seq(usize),
Key(&'inner String),
}

chain.split(|mut sb| -> Result<DynSplitOutputs, DiagramError> {
let outputs = match &split_op.params {
SplitOpParams::Index(v) => {
let outputs: HashMap<_, _> = v
.into_iter()
.enumerate()
.map(|(i, op_id)| -> Result<(_, DynOutput), DiagramError> {
Ok((op_id, sb.sequential_output(i)?.into()))
})
.collect::<Result<_, _>>()?;
outputs
}
SplitOpParams::Key(v) => {
let outputs: HashMap<_, _> = v
.into_iter()
.map(|(k, op_id)| -> Result<(_, DynOutput), DiagramError> {
Ok((op_id, sb.specific_output(k.clone())?.into()))
})
.collect::<Result<_, _>>()?;
outputs
}
};
let outputs: HashMap<&NextOperation, DynOutput> = split_op
.sequential
.iter()
.enumerate()
.map(|(i, op_id)| (SeqOrKey::Seq(i), op_id))
.chain(
split_op
.keyed
.iter()
.map(|(k, op_id)| (SeqOrKey::Key(k), op_id)),
)
.map(
|(ki, op_id)| -> Result<(&NextOperation, DynOutput), DiagramError> {
match ki {
SeqOrKey::Seq(i) => Ok((op_id, sb.sequential_output(i)?.into())),
SeqOrKey::Key(k) => Ok((op_id, sb.specific_output(k.clone())?.into())),
}
},
)
.collect::<Result<HashMap<_, _>, _>>()?;
let split_outputs = DynSplitOutputs {
outputs,
remaining: sb.remaining_output()?.into(),
Expand Down Expand Up @@ -406,7 +407,7 @@ mod tests {
},
"split": {
"type": "split",
"index": [{ "builtin": "terminate" }],
"sequential": [{ "builtin": "terminate" }],
},
},
}))
Expand Down Expand Up @@ -446,7 +447,7 @@ mod tests {
},
"split": {
"type": "split",
"key": { "1": { "builtin": "terminate" } },
"keyed": { "1": { "builtin": "terminate" } },
},
},
}))
Expand Down Expand Up @@ -490,7 +491,48 @@ mod tests {
},
"split": {
"type": "split",
"key": { "b": { "builtin": "terminate" } },
"keyed": { "b": { "builtin": "terminate" } },
},
},
}))
.unwrap();

let result = fixture
.spawn_and_run(&diagram, serde_json::Value::from(4))
.unwrap();
assert_eq!(result[1], 2);
}

#[test]
fn test_split_dual_key_seq() {
let mut fixture = DiagramTestFixture::new();

fn split_map(_: i64) -> HashMap<String, i64> {
HashMap::from([("a".to_string(), 1), ("b".to_string(), 2)])
}

fixture
.registry
.registration_builder()
.with_splittable()
.register_node_builder(
NodeBuilderOptions::new("split_map".to_string()),
|builder: &mut Builder, _config: ()| builder.create_map_block(&split_map),
);

let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "split_map",
"next": "split",
},
"split": {
"type": "split",
"keyed": { "a": { "builtin": "dispose" } },
"sequential": [{ "builtin": "terminate" }],
},
},
}))
Expand All @@ -499,6 +541,7 @@ mod tests {
let result = fixture
.spawn_and_run(&diagram, serde_json::Value::from(4))
.unwrap();
// "a" is "eaten" up by the keyed path, so we should be the result of "b".
assert_eq!(result[1], 2);
}

Expand Down Expand Up @@ -530,7 +573,7 @@ mod tests {
},
"split": {
"type": "split",
"index": [{ "builtin": "dispose" }],
"sequential": [{ "builtin": "dispose" }],
"remaining": { "builtin": "terminate" },
},
},
Expand Down Expand Up @@ -562,7 +605,7 @@ mod tests {
"ops": {
"split": {
"type": "split",
"index": ["getSplitValue"],
"sequential": ["getSplitValue"],
},
"getSplitValue": {
"type": "node",
Expand Down
11 changes: 6 additions & 5 deletions src/diagram/workflow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
use super::{
fork_clone::DynForkClone, impls::DefaultImpl, split_chain, transform::transform_output,
BuiltinTarget, Diagram, DiagramError, DiagramOperation, DiagramScope, DynInputSlot, DynOutput,
NextOperation, NodeOp, NodeRegistry, OperationId, SourceOperation, SplitOpParams,
NextOperation, NodeOp, NodeRegistry, OperationId, SourceOperation,
};

struct Vertex<'a> {
Expand Down Expand Up @@ -187,10 +187,11 @@ pub(super) fn create_workflow<'a, Streams: StreamPack>(
)?;
}
DiagramOperation::Split(split_op) => {
let next_op_ids: Vec<&NextOperation> = match &split_op.params {
SplitOpParams::Index(v) => v.iter().collect(),
SplitOpParams::Key(v) => v.values().collect(),
};
let next_op_ids: Vec<&NextOperation> = split_op
.sequential
.iter()
.chain(split_op.keyed.values())
.collect();
for next_op_id in next_op_ids {
add_edge(op_id.clone().into(), next_op_id, EdgeState::Pending)?;
}
Expand Down

0 comments on commit 046f81f

Please sign in to comment.