Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GraphProcessor trace optional #460

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/core/src/api/createProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
} from '../index.js';
import { mapValues } from '../utils/typeSafety.js';
import { getProcessorEvents, getProcessorSSEStream, getSingleNodeStream } from './streaming.js';
import { GraphProcessor } from '../model/GraphProcessor.js';

Check warning on line 19 in packages/core/src/api/createProcessor.ts

View workflow job for this annotation

GitHub Actions / build

Dependency cycle via ./Nodes.js:36=>./nodes/CallGraphNode.js:210

Check warning on line 19 in packages/core/src/api/createProcessor.ts

View workflow job for this annotation

GitHub Actions / build

Dependency cycle via ./Nodes.js:36=>./nodes/CallGraphNode.js:210
import { deserializeProject } from '../utils/serialization/serialization.js';
import { DEFAULT_CHAT_NODE_TIMEOUT } from '../utils/defaults.js';
import type { Tokenizer } from '../integrations/Tokenizer.js';
Expand All @@ -38,6 +38,7 @@
};
abortSignal?: AbortSignal;
registry?: NodeRegistration;
includeTrace?: boolean;
getChatNodeEndpoint?: ProcessContext['getChatNodeEndpoint'];
tokenizer?: Tokenizer;
} & {
Expand Down Expand Up @@ -77,7 +78,8 @@
throw new Error(`Graph not found, and no main graph specified.`);
}

const processor = new GraphProcessor(project, graphId as GraphId, options.registry);
// TODO: Consolidate options into one object
const processor = new GraphProcessor(project, graphId as GraphId, options.registry, options.includeTrace);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be preferred for optional properties to be grouped into an options object so they can be specified in any order, but this class is exported out of core which technically makes it part of the public interface. While I think most users would use it through createGraphProcessor we can't update this without a breaking change, but we can at least note this for the future


if (options.onStart) {
processor.on('start', options.onStart);
Expand Down
77 changes: 40 additions & 37 deletions packages/core/src/model/GraphProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import { P, match } from 'ts-pattern';
import type { Opaque } from 'type-fest';
import { coerceTypeOptional } from '../utils/coerceType.js';
import { globalRivetNodeRegistry } from './Nodes.js';

Check warning on line 36 in packages/core/src/model/GraphProcessor.ts

View workflow job for this annotation

GitHub Actions / build

Dependency cycle via ./nodes/CallGraphNode.js:210=>../../api/createProcessor.js:15

Check warning on line 36 in packages/core/src/model/GraphProcessor.ts

View workflow job for this annotation

GitHub Actions / build

Dependency cycle via ./nodes/CallGraphNode.js:210=>../../api/createProcessor.js:15
import type { BuiltInNodeType, BuiltInNodes } from './Nodes.js';
import type { NodeRegistration } from './NodeRegistration.js';
import { getPluginConfig } from '../utils/index.js';
Expand Down Expand Up @@ -187,6 +187,8 @@
readonly #registry: NodeRegistration;
id = nanoid();

readonly #includeTrace?: boolean = true;

executor?: 'nodejs' | 'browser';

/** If set, specifies the node(s) that the graph will run TO, instead of the nodes without any dependents. */
Expand Down Expand Up @@ -246,7 +248,16 @@
return this.#running;
}

constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration) {
emitTraceEvent(eventData: string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be # and also after the constructor (usually the first function defined in most classes)

if (this.#includeTrace) {
this.#emitter.emit(
'trace',
eventData,
);
}
}

constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration, includeTrace?: boolean) {
this.#project = project;
const graph = graphId
? project.graphs[graphId]
Expand All @@ -259,6 +270,7 @@
}
this.#graph = graph;

this.#includeTrace = includeTrace;
this.#nodeInstances = {};
this.#connections = {};
this.#nodesById = {};
Expand Down Expand Up @@ -747,7 +759,8 @@
if (this.#hasPreloadedData) {
for (const node of this.#graph.nodes) {
if (this.#nodeResults.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} has preloaded data`);

this.emitTraceEvent(`Node ${node.title} has preloaded data`);

await this.#emitter.emit('nodeStart', {
node,
Expand Down Expand Up @@ -869,10 +882,8 @@
if (!inputsReady) {
return;
}
this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);

this.emitTraceEvent(`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`);

const attachedData = this.#getAttachedDataTo(node);

Expand All @@ -898,7 +909,9 @@
this.#processingQueue.addAll(
inputNodes.map((inputNode) => {
return async () => {
this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`);

this.emitTraceEvent(`Fetching required data for node ${inputNode.title} (${inputNode.id})`);

await this.#fetchNodeDataAndProcessNode(inputNode);
};
}),
Expand All @@ -912,32 +925,32 @@
const builtInNode = node as BuiltInNodes;

if (this.#ignoreNodes.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} is ignored`);
this.emitTraceEvent(`Node ${node.title} is ignored`);
return;
}

if (this.runToNodeIds) {
const dependencyNodes = this.getDependencyNodesDeep(node.id);

if (this.runToNodeIds.some((runTo) => runTo !== node.id && dependencyNodes.includes(runTo))) {
this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`);
this.emitTraceEvent(`Node ${node.title} is excluded due to runToNodeIds`);
return;
}
}

if (this.#currentlyProcessing.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} is already being processed`);
this.emitTraceEvent(`Node ${node.title} is already being processed`);
return;
}

// For a loop controller, it can run multiple times, otherwise we already processed this node so bail out
if (this.#visitedNodes.has(node.id) && node.type !== 'loopController') {
this.#emitter.emit('trace', `Node ${node.title} has already been processed`);
this.emitTraceEvent(`Node ${node.title} has already been processed`);
return;
}

if (this.#erroredNodes.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} has already errored`);
this.emitTraceEvent(`Node ${node.title} has already errored`);
return;
}

Expand All @@ -946,7 +959,7 @@
// Check if all input nodes are free of errors
for (const inputNode of inputNodes) {
if (this.#erroredNodes.has(inputNode.id)) {
this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`);
this.emitTraceEvent(`Node ${node.title} has errored input node ${inputNode.title}`);
return;
}
}
Expand All @@ -958,19 +971,16 @@
return connectionToInput || !input.required;
});

if (!inputsReady) {
await this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);
if (!inputsReady) {
this.emitTraceEvent(`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`);
return;
}

// Excluded because control flow is still in a loop - difference between "will not execute" and "has not executed yet"
const inputValues = this.#getInputValuesForNode(node);

if (this.#excludedDueToControlFlow(node, inputValues, nanoid() as ProcessId, 'loop-not-broken')) {
this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`);
this.emitTraceEvent(`Node ${node.title} is excluded due to control flow`);
return;
}

Expand Down Expand Up @@ -1001,7 +1011,7 @@
}

if (waitingForInputNode) {
this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`);
this.emitTraceEvent(`Node ${node.title} is waiting for input node ${waitingForInputNode}`);
return;
}

Expand All @@ -1018,7 +1028,7 @@
}

if (attachedData.races?.completed) {
this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`);
this.emitTraceEvent(`Node ${node.title} is part of a race that was completed`);
return;
}

Expand All @@ -1027,8 +1037,7 @@
if (this.slowMode) {
await new Promise((resolve) => setTimeout(resolve, 250));
}

this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`);
this.emitTraceEvent(`Finished processing node ${node.title} (${node.id})`);
this.#visitedNodes.add(node.id);
this.#currentlyProcessing.delete(node.id);
this.#remainingNodes.delete(node.id);
Expand All @@ -1046,10 +1055,10 @@
this.#excludedDueToControlFlow(node, this.#getInputValuesForNode(node), nanoid() as ProcessId);

if (!didBreak) {
this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`);
this.emitTraceEvent(`Loop controller ${node.title} did not break, so we're looping again`);
for (const loopNodeId of attachedData.loopInfo?.nodes ?? []) {
const cycleNode = this.#nodesById[loopNodeId]!;
this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`);
this.emitTraceEvent(`Clearing cycle node ${cycleNode.title} (${cycleNode.id})`);
this.#visitedNodes.delete(cycleNode.id);
this.#currentlyProcessing.delete(cycleNode.id);
this.#remainingNodes.add(cycleNode.id);
Expand All @@ -1067,7 +1076,7 @@
for (const [nodeId] of allNodesForRace) {
for (const [key, abortController] of this.#nodeAbortControllers.entries()) {
if (key.startsWith(nodeId)) {
this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`);
this.emitTraceEvent(`Aborting node ${nodeId} because other race branch won`);
abortController.abort();
}
}
Expand Down Expand Up @@ -1133,10 +1142,7 @@
// Node is finished, check if we can run any more nodes that depend on this one
this.#processingQueue.addAll(
outputNodes.nodes.map((outputNode) => async () => {
this.#emitter.emit(
'trace',
`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`,
);
this.emitTraceEvent(`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`);

await this.#processNodeIfAllInputsAvailable(outputNode);
}),
Expand Down Expand Up @@ -1385,7 +1391,7 @@
#nodeErrored(node: ChartNode, e: unknown, processId: ProcessId) {
const error = getError(e);
this.#emitter.emit('nodeError', { node, error, processId });
this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`);
this.emitTraceEvent(`Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`);
this.#erroredNodes.set(node.id, error.toString());
}

Expand Down Expand Up @@ -1546,7 +1552,7 @@
return processor;
},
trace: (message) => {
this.#emitter.emit('trace', message);
this.emitTraceEvent(message);
},
abortGraph: (error) => {
this.abort(error === undefined, error);
Expand All @@ -1573,7 +1579,7 @@
typeOfExclusion: ControlFlowExcludedDataValue['value'] = undefined,
) {
if (node.disabled) {
this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`);
this.emitTraceEvent(`Excluding node ${node.title} because it's disabled`);

this.#visitedNodes.add(node.id);
this.#markAsExcluded(node, processId, inputValues, 'disabled');
Expand Down Expand Up @@ -1607,10 +1613,7 @@
if (inputIsExcludedValue && !allowedToConsumedExcludedValue) {
if (!isWaitingForLoop) {
if (inputIsExcludedValue) {
this.#emitter.emit(
'trace',
`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`,
);
this.emitTraceEvent(`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`);
}

this.#visitedNodes.add(node.id);
Expand Down
Loading