Skip to content

Commit

Permalink
Merge pull request #460 from Ironclad/aryamanagrawal/optional-traces
Browse files Browse the repository at this point in the history
Make GraphProcessor trace optional
  • Loading branch information
ZhangYiJiang authored Jan 14, 2025
2 parents c507a75 + abe29cb commit 8216dca
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 38 deletions.
4 changes: 3 additions & 1 deletion packages/core/src/api/createProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export type RunGraphOptions = {
};
abortSignal?: AbortSignal;
registry?: NodeRegistration;
includeTrace?: boolean;
getChatNodeEndpoint?: ProcessContext['getChatNodeEndpoint'];
tokenizer?: Tokenizer;
} & {
Expand Down Expand Up @@ -77,7 +78,8 @@ export function coreCreateProcessor(project: Project, options: RunGraphOptions)
throw new Error(`Graph not found, and no main graph specified.`);
}

const processor = new GraphProcessor(project, graphId as GraphId, options.registry);
// TODO: Consolidate options into one object
const processor = new GraphProcessor(project, graphId as GraphId, options.registry, options.includeTrace);

if (options.onStart) {
processor.on('start', options.onStart);
Expand Down
77 changes: 40 additions & 37 deletions packages/core/src/model/GraphProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ export class GraphProcessor {
readonly #registry: NodeRegistration;
id = nanoid();

readonly #includeTrace?: boolean = true;

executor?: 'nodejs' | 'browser';

/** If set, specifies the node(s) that the graph will run TO, instead of the nodes without any dependents. */
Expand Down Expand Up @@ -246,7 +248,7 @@ export class GraphProcessor {
return this.#running;
}

constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration) {
constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration, includeTrace?: boolean) {
this.#project = project;
const graph = graphId
? project.graphs[graphId]
Expand All @@ -259,6 +261,7 @@ export class GraphProcessor {
}
this.#graph = graph;

this.#includeTrace = includeTrace;
this.#nodeInstances = {};
this.#connections = {};
this.#nodesById = {};
Expand Down Expand Up @@ -370,6 +373,15 @@ export class GraphProcessor {
});
}

#emitTraceEvent(eventData: string) {
if (this.#includeTrace) {
this.#emitter.emit(
'trace',
eventData,
);
}
}

on = undefined! as Emittery<ProcessEvents>['on'];
off = undefined! as Emittery<ProcessEvents>['off'];
once = undefined! as Emittery<ProcessEvents>['once'];
Expand Down Expand Up @@ -747,7 +759,8 @@ export class GraphProcessor {
if (this.#hasPreloadedData) {
for (const node of this.#graph.nodes) {
if (this.#nodeResults.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} has preloaded data`);

this.#emitTraceEvent(`Node ${node.title} has preloaded data`);

await this.#emitter.emit('nodeStart', {
node,
Expand Down Expand Up @@ -869,10 +882,8 @@ export class GraphProcessor {
if (!inputsReady) {
return;
}
this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);

this.#emitTraceEvent(`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`);

const attachedData = this.#getAttachedDataTo(node);

Expand All @@ -898,7 +909,9 @@ export class GraphProcessor {
this.#processingQueue.addAll(
inputNodes.map((inputNode) => {
return async () => {
this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`);

this.#emitTraceEvent(`Fetching required data for node ${inputNode.title} (${inputNode.id})`);

await this.#fetchNodeDataAndProcessNode(inputNode);
};
}),
Expand All @@ -912,32 +925,32 @@ export class GraphProcessor {
const builtInNode = node as BuiltInNodes;

if (this.#ignoreNodes.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} is ignored`);
this.#emitTraceEvent(`Node ${node.title} is ignored`);
return;
}

if (this.runToNodeIds) {
const dependencyNodes = this.getDependencyNodesDeep(node.id);

if (this.runToNodeIds.some((runTo) => runTo !== node.id && dependencyNodes.includes(runTo))) {
this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`);
this.#emitTraceEvent(`Node ${node.title} is excluded due to runToNodeIds`);
return;
}
}

if (this.#currentlyProcessing.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} is already being processed`);
this.#emitTraceEvent(`Node ${node.title} is already being processed`);
return;
}

// For a loop controller, it can run multiple times, otherwise we already processed this node so bail out
if (this.#visitedNodes.has(node.id) && node.type !== 'loopController') {
this.#emitter.emit('trace', `Node ${node.title} has already been processed`);
this.#emitTraceEvent(`Node ${node.title} has already been processed`);
return;
}

if (this.#erroredNodes.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} has already errored`);
this.#emitTraceEvent(`Node ${node.title} has already errored`);
return;
}

Expand All @@ -946,7 +959,7 @@ export class GraphProcessor {
// Check if all input nodes are free of errors
for (const inputNode of inputNodes) {
if (this.#erroredNodes.has(inputNode.id)) {
this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`);
this.#emitTraceEvent(`Node ${node.title} has errored input node ${inputNode.title}`);
return;
}
}
Expand All @@ -958,19 +971,16 @@ export class GraphProcessor {
return connectionToInput || !input.required;
});

if (!inputsReady) {
await this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);
if (!inputsReady) {
this.#emitTraceEvent(`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`);
return;
}

// Excluded because control flow is still in a loop - difference between "will not execute" and "has not executed yet"
const inputValues = this.#getInputValuesForNode(node);

if (this.#excludedDueToControlFlow(node, inputValues, nanoid() as ProcessId, 'loop-not-broken')) {
this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`);
this.#emitTraceEvent(`Node ${node.title} is excluded due to control flow`);
return;
}

Expand Down Expand Up @@ -1001,7 +1011,7 @@ export class GraphProcessor {
}

if (waitingForInputNode) {
this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`);
this.#emitTraceEvent(`Node ${node.title} is waiting for input node ${waitingForInputNode}`);
return;
}

Expand All @@ -1018,7 +1028,7 @@ export class GraphProcessor {
}

if (attachedData.races?.completed) {
this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`);
this.#emitTraceEvent(`Node ${node.title} is part of a race that was completed`);
return;
}

Expand All @@ -1027,8 +1037,7 @@ export class GraphProcessor {
if (this.slowMode) {
await new Promise((resolve) => setTimeout(resolve, 250));
}

this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`);
this.#emitTraceEvent(`Finished processing node ${node.title} (${node.id})`);
this.#visitedNodes.add(node.id);
this.#currentlyProcessing.delete(node.id);
this.#remainingNodes.delete(node.id);
Expand All @@ -1046,10 +1055,10 @@ export class GraphProcessor {
this.#excludedDueToControlFlow(node, this.#getInputValuesForNode(node), nanoid() as ProcessId);

if (!didBreak) {
this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`);
this.#emitTraceEvent(`Loop controller ${node.title} did not break, so we're looping again`);
for (const loopNodeId of attachedData.loopInfo?.nodes ?? []) {
const cycleNode = this.#nodesById[loopNodeId]!;
this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`);
this.#emitTraceEvent(`Clearing cycle node ${cycleNode.title} (${cycleNode.id})`);
this.#visitedNodes.delete(cycleNode.id);
this.#currentlyProcessing.delete(cycleNode.id);
this.#remainingNodes.add(cycleNode.id);
Expand All @@ -1067,7 +1076,7 @@ export class GraphProcessor {
for (const [nodeId] of allNodesForRace) {
for (const [key, abortController] of this.#nodeAbortControllers.entries()) {
if (key.startsWith(nodeId)) {
this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`);
this.#emitTraceEvent(`Aborting node ${nodeId} because other race branch won`);
abortController.abort();
}
}
Expand Down Expand Up @@ -1133,10 +1142,7 @@ export class GraphProcessor {
// Node is finished, check if we can run any more nodes that depend on this one
this.#processingQueue.addAll(
outputNodes.nodes.map((outputNode) => async () => {
this.#emitter.emit(
'trace',
`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`,
);
this.#emitTraceEvent(`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`);

await this.#processNodeIfAllInputsAvailable(outputNode);
}),
Expand Down Expand Up @@ -1385,7 +1391,7 @@ export class GraphProcessor {
#nodeErrored(node: ChartNode, e: unknown, processId: ProcessId) {
const error = getError(e);
this.#emitter.emit('nodeError', { node, error, processId });
this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`);
this.#emitTraceEvent(`Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`);
this.#erroredNodes.set(node.id, error.toString());
}

Expand Down Expand Up @@ -1546,7 +1552,7 @@ export class GraphProcessor {
return processor;
},
trace: (message) => {
this.#emitter.emit('trace', message);
this.#emitTraceEvent(message);
},
abortGraph: (error) => {
this.abort(error === undefined, error);
Expand All @@ -1573,7 +1579,7 @@ export class GraphProcessor {
typeOfExclusion: ControlFlowExcludedDataValue['value'] = undefined,
) {
if (node.disabled) {
this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`);
this.#emitTraceEvent(`Excluding node ${node.title} because it's disabled`);

this.#visitedNodes.add(node.id);
this.#markAsExcluded(node, processId, inputValues, 'disabled');
Expand Down Expand Up @@ -1607,10 +1613,7 @@ export class GraphProcessor {
if (inputIsExcludedValue && !allowedToConsumedExcludedValue) {
if (!isWaitingForLoop) {
if (inputIsExcludedValue) {
this.#emitter.emit(
'trace',
`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`,
);
this.#emitTraceEvent(`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`);
}

this.#visitedNodes.add(node.id);
Expand Down

0 comments on commit 8216dca

Please sign in to comment.