From d9e7c57da2da9c4a0f58685327ff7317bb08d31e Mon Sep 17 00:00:00 2001 From: Aryaman Agrawal Date: Tue, 7 Jan 2025 11:37:56 -0800 Subject: [PATCH] review revision --- packages/core/src/api/createProcessor.ts | 1 + packages/core/src/model/GraphProcessor.ts | 115 +++++++--------------- 2 files changed, 37 insertions(+), 79 deletions(-) diff --git a/packages/core/src/api/createProcessor.ts b/packages/core/src/api/createProcessor.ts index f3f90396..cb0998d0 100644 --- a/packages/core/src/api/createProcessor.ts +++ b/packages/core/src/api/createProcessor.ts @@ -78,6 +78,7 @@ export function coreCreateProcessor(project: Project, options: RunGraphOptions) throw new Error(`Graph not found, and no main graph specified.`); } + // TODO: Consolidate options into one object const processor = new GraphProcessor(project, graphId as GraphId, options.registry, options.includeTrace); if (options.onStart) { diff --git a/packages/core/src/model/GraphProcessor.ts b/packages/core/src/model/GraphProcessor.ts index 1a45e5df..17908793 100644 --- a/packages/core/src/model/GraphProcessor.ts +++ b/packages/core/src/model/GraphProcessor.ts @@ -248,6 +248,15 @@ export class GraphProcessor { return this.#running; } + emitTraceEvent(eventData: string) { + if (this.#includeTrace) { + this.#emitter.emit( + 'trace', + eventData, + ); + } + } + constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration, includeTrace?: boolean) { this.#project = project; const graph = graphId @@ -750,9 +759,8 @@ export class GraphProcessor { if (this.#hasPreloadedData) { for (const node of this.#graph.nodes) { if (this.#nodeResults.has(node.id)) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} has preloaded data`); - } + + this.emitTraceEvent(`Node ${node.title} has preloaded data`); await this.#emitter.emit('nodeStart', { node, @@ -874,12 +882,8 @@ export class GraphProcessor { if (!inputsReady) { return; } - if (this.#includeTrace) { - this.#emitter.emit( - 'trace', - `Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, - ); - } + + this.emitTraceEvent(`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`); const attachedData = this.#getAttachedDataTo(node); @@ -905,9 +909,9 @@ export class GraphProcessor { this.#processingQueue.addAll( inputNodes.map((inputNode) => { return async () => { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`); - } + + this.emitTraceEvent(`Fetching required data for node ${inputNode.title} (${inputNode.id})`); + await this.#fetchNodeDataAndProcessNode(inputNode); }; }), @@ -921,9 +925,7 @@ export class GraphProcessor { const builtInNode = node as BuiltInNodes; if (this.#ignoreNodes.has(node.id)) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} is ignored`); - } + this.emitTraceEvent(`Node ${node.title} is ignored`); return; } @@ -931,32 +933,24 @@ export class GraphProcessor { const dependencyNodes = this.getDependencyNodesDeep(node.id); if (this.runToNodeIds.some((runTo) => runTo !== node.id && dependencyNodes.includes(runTo))) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`); - } + this.emitTraceEvent(`Node ${node.title} is excluded due to runToNodeIds`); return; } } if (this.#currentlyProcessing.has(node.id)) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} is already being processed`); - } + this.emitTraceEvent(`Node ${node.title} is already being processed`); return; } // For a loop controller, it can run multiple times, otherwise we already processed this node so bail out if (this.#visitedNodes.has(node.id) && node.type !== 'loopController') { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} has already been processed`); - } + this.emitTraceEvent(`Node ${node.title} has already been processed`); return; } if (this.#erroredNodes.has(node.id)) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} has already errored`); - } + this.emitTraceEvent(`Node ${node.title} has already errored`); return; } @@ -965,9 +959,7 @@ export class GraphProcessor { // Check if all input nodes are free of errors for (const inputNode of inputNodes) { if (this.#erroredNodes.has(inputNode.id)) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`); - } + this.emitTraceEvent(`Node ${node.title} has errored input node ${inputNode.title}`); return; } } @@ -979,13 +971,8 @@ export class GraphProcessor { return connectionToInput || !input.required; }); - if (!inputsReady) { - if (this.#includeTrace) { - await this.#emitter.emit( - 'trace', - `Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, - ); - } + if (!inputsReady) { + this.emitTraceEvent(`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`); return; } @@ -993,9 +980,7 @@ export class GraphProcessor { const inputValues = this.#getInputValuesForNode(node); if (this.#excludedDueToControlFlow(node, inputValues, nanoid() as ProcessId, 'loop-not-broken')) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`); - } + this.emitTraceEvent(`Node ${node.title} is excluded due to control flow`); return; } @@ -1026,9 +1011,7 @@ export class GraphProcessor { } if (waitingForInputNode) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`); - } + this.emitTraceEvent(`Node ${node.title} is waiting for input node ${waitingForInputNode}`); return; } @@ -1045,9 +1028,7 @@ export class GraphProcessor { } if (attachedData.races?.completed) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`); - } + this.emitTraceEvent(`Node ${node.title} is part of a race that was completed`); return; } @@ -1056,9 +1037,7 @@ export class GraphProcessor { if (this.slowMode) { await new Promise((resolve) => setTimeout(resolve, 250)); } - if (this.#includeTrace) { - this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`); - } + this.emitTraceEvent(`Finished processing node ${node.title} (${node.id})`); this.#visitedNodes.add(node.id); this.#currentlyProcessing.delete(node.id); this.#remainingNodes.delete(node.id); @@ -1076,14 +1055,10 @@ export class GraphProcessor { this.#excludedDueToControlFlow(node, this.#getInputValuesForNode(node), nanoid() as ProcessId); if (!didBreak) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`); - } + this.emitTraceEvent(`Loop controller ${node.title} did not break, so we're looping again`); for (const loopNodeId of attachedData.loopInfo?.nodes ?? []) { const cycleNode = this.#nodesById[loopNodeId]!; - if (this.#includeTrace) { - this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`); - } + this.emitTraceEvent(`Clearing cycle node ${cycleNode.title} (${cycleNode.id})`); this.#visitedNodes.delete(cycleNode.id); this.#currentlyProcessing.delete(cycleNode.id); this.#remainingNodes.add(cycleNode.id); @@ -1101,9 +1076,7 @@ export class GraphProcessor { for (const [nodeId] of allNodesForRace) { for (const [key, abortController] of this.#nodeAbortControllers.entries()) { if (key.startsWith(nodeId)) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`); - } + this.emitTraceEvent(`Aborting node ${nodeId} because other race branch won`); abortController.abort(); } } @@ -1169,12 +1142,7 @@ export class GraphProcessor { // Node is finished, check if we can run any more nodes that depend on this one this.#processingQueue.addAll( outputNodes.nodes.map((outputNode) => async () => { - if (this.#includeTrace) { - this.#emitter.emit( - 'trace', - `Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`, - ); - } + this.emitTraceEvent(`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`); await this.#processNodeIfAllInputsAvailable(outputNode); }), @@ -1423,9 +1391,7 @@ export class GraphProcessor { #nodeErrored(node: ChartNode, e: unknown, processId: ProcessId) { const error = getError(e); this.#emitter.emit('nodeError', { node, error, processId }); - if (this.#includeTrace) { - this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`); - } + this.emitTraceEvent(`Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`); this.#erroredNodes.set(node.id, error.toString()); } @@ -1586,9 +1552,7 @@ export class GraphProcessor { return processor; }, trace: (message) => { - if (this.#includeTrace) { - this.#emitter.emit('trace', message); - } + this.emitTraceEvent(message); }, abortGraph: (error) => { this.abort(error === undefined, error); @@ -1615,9 +1579,7 @@ export class GraphProcessor { typeOfExclusion: ControlFlowExcludedDataValue['value'] = undefined, ) { if (node.disabled) { - if (this.#includeTrace) { - this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`); - } + this.emitTraceEvent(`Excluding node ${node.title} because it's disabled`); this.#visitedNodes.add(node.id); this.#markAsExcluded(node, processId, inputValues, 'disabled'); @@ -1651,12 +1613,7 @@ export class GraphProcessor { if (inputIsExcludedValue && !allowedToConsumedExcludedValue) { if (!isWaitingForLoop) { if (inputIsExcludedValue) { - if (this.#includeTrace) { - this.#emitter.emit( - 'trace', - `Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`, - ); - } + this.emitTraceEvent(`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`); } this.#visitedNodes.add(node.id);