Skip to content

Commit

Permalink
feat(core): Handle cycles in workflows when partially executing them (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
despairblue authored Oct 18, 2024
1 parent b4b543d commit 321d6de
Show file tree
Hide file tree
Showing 13 changed files with 469 additions and 53 deletions.
143 changes: 143 additions & 0 deletions packages/core/src/PartialExecutionUtils/DirectedGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,149 @@ export class DirectedGraph {
);
}

/**
* Returns all strongly connected components.
*
* Strongly connected components are a set of nodes where it's possible to
* reach every node from every node.
*
* Strongly connected components are mutually exclusive in directed graphs,
* e.g. they cannot overlap.
*
* The smallest strongly connected component is a single node, since it can
* reach itself from itself by not following any edges.
*
* The algorithm implement here is Tarjan's algorithm.
*
* Example:
* ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
* │node1├────►node2◄────┤node3├────►node5│
* └─────┘ └──┬──┘ └──▲──┘ └▲───┬┘
* │ │ │ │
* ┌──▼──┐ │ ┌┴───▼┐
* │node4├───────┘ │node6│
* └─────┘ └─────┘
*
* The strongly connected components are
* 1. node1
* 2. node2, node4, node3
* 3. node5, node6
*
* Further reading:
* https://en.wikipedia.org/wiki/Strongly_connected_component
* https://www.youtube.com/watch?v=wUgWX0nc4NY
*/
getStronglyConnectedComponents(): Array<Set<INode>> {
let id = 0;
const visited = new Set<INode>();
const ids = new Map<INode, number>();
const lowLinkValues = new Map<INode, number>();
const stack: INode[] = [];
const stronglyConnectedComponents: Array<Set<INode>> = [];

const followNode = (node: INode) => {
if (visited.has(node)) {
return;
}

visited.add(node);
lowLinkValues.set(node, id);
ids.set(node, id);
id++;
stack.push(node);

const directChildren = this.getDirectChildConnections(node).map((c) => c.to);
for (const child of directChildren) {
followNode(child);

// if node is on stack min the low id
if (stack.includes(child)) {
const childLowLinkValue = lowLinkValues.get(child);
const ownLowLinkValue = lowLinkValues.get(node);
a.ok(childLowLinkValue !== undefined);
a.ok(ownLowLinkValue !== undefined);
const lowestLowLinkValue = Math.min(childLowLinkValue, ownLowLinkValue);

lowLinkValues.set(node, lowestLowLinkValue);
}
}

// after we visited all children, check if the low id is the same as the
// nodes id, which means we found a strongly connected component
const ownId = ids.get(node);
const ownLowLinkValue = lowLinkValues.get(node);
a.ok(ownId !== undefined);
a.ok(ownLowLinkValue !== undefined);

if (ownId === ownLowLinkValue) {
// pop from the stack until the stack is empty or we find a node that
// has a different low id
const scc: Set<INode> = new Set();
let next = stack.at(-1);

while (next && lowLinkValues.get(next) === ownId) {
stack.pop();
scc.add(next);
next = stack.at(-1);
}

if (scc.size > 0) {
stronglyConnectedComponents.push(scc);
}
}
};

for (const node of this.nodes.values()) {
followNode(node);
}

return stronglyConnectedComponents;
}

private depthFirstSearchRecursive(
from: INode,
fn: (node: INode) => boolean,
seen: Set<INode>,
): INode | undefined {
if (seen.has(from)) {
return undefined;
}
seen.add(from);

if (fn(from)) {
return from;
}

for (const childConnection of this.getDirectChildConnections(from)) {
const found = this.depthFirstSearchRecursive(childConnection.to, fn, seen);

if (found) {
return found;
}
}

return undefined;
}

/**
* Like `Array.prototype.find` but for directed graphs.
*
* Starting from, and including, the `from` node this calls the provided
* predicate function with every child node until the predicate function
* returns true.
*
* The search is depth first, meaning every branch is exhausted before the
* next branch is tried.
*
* The first node for which the predicate function returns true is returned.
*
* If the graph is exhausted and the predicate function never returned true,
* undefined is returned instead.
*/
depthFirstSearch({ from, fn }: { from: INode; fn: (node: INode) => boolean }): INode | undefined {
return this.depthFirstSearchRecursive(from, fn, new Set());
}

toWorkflow(parameters: Omit<WorkflowParameters, 'nodes' | 'connections'>): Workflow {
return new Workflow({
...parameters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// XX denotes that the node is disabled
// PD denotes that the node has pinned data

import type { INode } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';

import { createNodeData, defaultWorkflowParameter } from './helpers';
Expand Down Expand Up @@ -89,6 +90,115 @@ describe('DirectedGraph', () => {
});
});

describe('getStronglyConnectedComponents', () => {
// ┌─────┐ ┌─────┐ ┌─────┐
// │node1├───►│node2├───►│node4│
// └─────┘ └──┬──┘ └─────┘
// ▲ │
// │ │
// ┌──┴──┐ │
// │node3│◄──────┘
// └─────┘
test('find strongly connected components', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const node4 = createNodeData({ name: 'Node4' });
const graph = new DirectedGraph()
.addNodes(node1, node2, node3, node4)
.addConnections(
{ from: node1, to: node2 },
{ from: node2, to: node3 },
{ from: node3, to: node1 },
{ from: node2, to: node4 },
);

// ACT
const stronglyConnectedComponents = graph.getStronglyConnectedComponents();

// ASSERT
expect(stronglyConnectedComponents).toHaveLength(2);
expect(stronglyConnectedComponents).toContainEqual(new Set([node4]));
expect(stronglyConnectedComponents).toContainEqual(new Set([node3, node2, node1]));
});

// ┌────┐
// ┌───────┐ │ ├─
// │trigger├──┬──►loop│
// └───────┘ │ │ ├────┐
// │ └────┘ │
// └─────────┐ │
// ┌────┐ │ │
// ┌───►node├─┘ │
// │ └────┘ │
// │ │
// └─────────────┘
test('find strongly connected components even if they use different output indexes', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, node)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 1, to: node },
{ from: node, to: loop },
);

// ACT
const stronglyConnectedComponents = graph.getStronglyConnectedComponents();

// ASSERT
expect(stronglyConnectedComponents).toHaveLength(2);
expect(stronglyConnectedComponents).toContainEqual(new Set([trigger]));
expect(stronglyConnectedComponents).toContainEqual(new Set([node, loop]));
});
});

describe('depthFirstSearch', () => {
// ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
// │node0├───►│node1├───►│node2├───►│node4│───►│node5│
// └─────┘ └─────┘ └──┬──┘ └─────┘ └─────┘
// ▲ │
// │ │
// ┌──┴──┐ │
// │node3│◄──────┘
// └─────┘
test('calls nodes in the correct order and stops when it found the node', () => {
// ARRANGE
const node0 = createNodeData({ name: 'Node0' });
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const node4 = createNodeData({ name: 'Node4' });
const node5 = createNodeData({ name: 'Node5' });
const graph = new DirectedGraph()
.addNodes(node0, node1, node2, node3, node4, node5)
.addConnections(
{ from: node0, to: node1 },
{ from: node1, to: node2 },
{ from: node2, to: node3 },
{ from: node3, to: node1 },
{ from: node2, to: node4 },
{ from: node4, to: node5 },
);
const fn = jest.fn().mockImplementation((node: INode) => node === node4);

// ACT
const foundNode = graph.depthFirstSearch({
from: node0,
fn,
});

// ASSERT
expect(foundNode).toBe(node4);
expect(fn).toHaveBeenCalledTimes(5);
expect(fn.mock.calls).toEqual([[node0], [node1], [node2], [node3], [node4]]);
});
});

describe('getParentConnections', () => {
// ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
// │node1├──►│node2├──►│node3│──►│node4│
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('cleanRunData', () => {
};

// ACT
const newRunData = cleanRunData(runData, graph, [node1]);
const newRunData = cleanRunData(runData, graph, new Set([node1]));

// ASSERT
expect(newRunData).toEqual({});
Expand All @@ -47,7 +47,7 @@ describe('cleanRunData', () => {
};

// ACT
const newRunData = cleanRunData(runData, graph, [node2]);
const newRunData = cleanRunData(runData, graph, new Set([node2]));

// ASSERT
expect(newRunData).toEqual({ [node1.name]: runData[node1.name] });
Expand Down Expand Up @@ -78,7 +78,7 @@ describe('cleanRunData', () => {
};

// ACT
const newRunData = cleanRunData(runData, graph, [node2]);
const newRunData = cleanRunData(runData, graph, new Set([node2]));

// ASSERT
// TODO: Find out if this is a desirable result in milestone 2
Expand Down
Loading

0 comments on commit 321d6de

Please sign in to comment.