Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/clean-doodles-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Set `stepId` property on function in `registerStepFunction` for serialization support
5 changes: 5 additions & 0 deletions .changeset/tasty-rules-stick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/swc-plugin": patch
---

Set `stepId` property on step functions in "client" mode for serialization support
64 changes: 64 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,70 @@ describe('e2e', () => {
}
);

test(
'stepFunctionAsStartArgWorkflow - step function reference passed as start() argument',
{ timeout: 120_000 },
async () => {
// This test verifies that step function references can be:
// 1. Serialized in the client bundle (the SWC plugin sets stepId property on the function)
// 2. Passed as arguments to start()
// 3. Deserialized in the workflow bundle (using WORKFLOW_USE_STEP from globalThis)
// 4. Invoked from within a step function in the workflow
//
// In client mode, the SWC plugin sets the `stepId` property directly on step functions
// (e.g., `myStepFn.stepId = "step//..."`). This allows the serialization layer to detect
// step functions and serialize them by their stepId.
//
// The workflow receives a step function reference (add) and:
// 1. Calls stepFn(3, 5) directly -> 8
// 2. Passes it to invokeStepFn(stepFn, 3, 5) -> stepFn(3, 5) = 8
// 3. Calls stepFn(8, 8) -> 16

// Look up the stepId for the `add` function from 98_duplicate_case.ts
// This simulates what the SWC plugin does in client mode: setting stepId on the function
const manifest = await fetchManifest();
const stepFile = Object.keys(manifest.steps).find((f) =>
f.includes('98_duplicate_case')
);
assert(stepFile, 'Could not find 98_duplicate_case in manifest steps');
const addStepInfo = manifest.steps[stepFile]?.['add'];
assert(addStepInfo, 'Could not find "add" step in manifest');

// Create a function reference with stepId, mimicking what the SWC client transform does
const addStepRef = Object.assign(() => {}, {
stepId: addStepInfo.stepId,
});

const run = await start(await e2e('stepFunctionAsStartArgWorkflow'), [
addStepRef,
3,
5,
]);
const returnValue = await run.returnValue;

// Verify the workflow result
// directResult: stepFn called directly from workflow code = add(3, 5) = 8
// viaStepResult: stepFn called via invokeStepFn = add(3, 5) = 8
// doubled: stepFn(8, 8) = 16
expect(returnValue).toEqual({
directResult: 8,
viaStepResult: 8,
doubled: 16,
});

// Verify the run completed successfully via CLI
const { json: runData } = await cliInspectJson(
`runs ${run.runId} --withData`
);
expect(runData.status).toBe('completed');
expect(runData.output).toEqual({
directResult: 8,
viaStepResult: 8,
doubled: 16,
});
}
);

// ==================== PAGES ROUTER TESTS ====================
// Tests for Next.js Pages Router API endpoint (only runs for nextjs-turbopack and nextjs-webpack)
const isNextJsApp =
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ export type StepFunction<
Result extends Serializable | unknown = unknown,
> = ((...args: Args) => Promise<Result>) & {
maxRetries?: number;
stepId?: string;
};

const registeredSteps = new Map<string, StepFunction>();

/**
* Register a step function to be served in the server bundle
* Register a step function to be served in the server bundle.
* Also sets the stepId property on the function for serialization support.
*/
export function registerStepFunction(stepId: string, stepFn: StepFunction) {
registeredSteps.set(stepId, stepFn);
stepFn.stepId = stepId;
}

/**
Expand Down
138 changes: 131 additions & 7 deletions packages/core/src/serialization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1688,25 +1688,58 @@ describe('step function serialization', () => {
});

it('should deserialize step function name through reviver', () => {
const stepName = 'testStep';
const stepName = 'step//test//testStep';
const stepFn = async () => 42;

// Register the step function
registerStepFunction(stepName, stepFn);

// Get the reviver and test it directly
const revivers = getCommonRevivers(vmGlobalThis);
const result = revivers.StepFunction({ stepId: stepName });
// Create a function with stepId property (like registerStepFunction does)
const fnWithStepId = async () => 42;
Object.defineProperty(fnWithStepId, 'stepId', {
value: stepName,
writable: false,
enumerable: false,
configurable: false,
});

// Serialize using workflow reducers (which handle StepFunction)
const dehydrated = dehydrateStepArguments([fnWithStepId], globalThis);

expect(result).toBe(stepFn);
// Hydrate it back using step revivers
const ops: Promise<void>[] = [];
const hydrated = hydrateStepArguments(
dehydrated,
ops,
mockRunId,
globalThis
);

// The hydrated result should be the registered step function
expect(hydrated[0]).toBe(stepFn);
});

it('should throw error when reviver cannot find registered step function', () => {
const revivers = getCommonRevivers(vmGlobalThis);
// Create a function with a non-existent stepId
const fnWithNonExistentStepId = async () => 42;
Object.defineProperty(fnWithNonExistentStepId, 'stepId', {
value: 'nonExistentStep',
writable: false,
enumerable: false,
configurable: false,
});

// Serialize the step function reference
const dehydrated = dehydrateStepArguments(
[fnWithNonExistentStepId],
globalThis
);

// Hydrating should throw an error
const ops: Promise<void>[] = [];
let err: Error | undefined;
try {
revivers.StepFunction({ stepId: 'nonExistentStep' });
hydrateStepArguments(dehydrated, ops, mockRunId, globalThis);
} catch (err_) {
err = err_ as Error;
}
Expand Down Expand Up @@ -1849,6 +1882,97 @@ describe('step function serialization', () => {
// Should return object with stepId
expect(result).toEqual({ stepId: stepName });
});

it('should hydrate step function from workflow arguments using WORKFLOW_USE_STEP', () => {
// This tests the flow: client mode serializes step function with stepId,
// workflow mode deserializes it using WORKFLOW_USE_STEP from vmGlobalThis
const stepId = 'step//workflows/test.ts//addNumbers';

// Create a VM context like the workflow runner does
const { context, globalThis: vmGlobalThis } = createContext({
seed: 'test',
fixedTimestamp: 1714857600000,
});

// Set up WORKFLOW_USE_STEP on the VM's globalThis (like workflow.ts does)
const mockUseStep = (id: string) => {
const fn = (...args: any[]) => {
// Return a promise that resolves with args (like useStep wrapper does)
return Promise.resolve({ calledWithStepId: id, args });
};
fn.stepId = id;
return fn;
};
(vmGlobalThis as any)[Symbol.for('WORKFLOW_USE_STEP')] = mockUseStep;

// Create a function with stepId (like SWC plugin does in client mode)
const clientStepFn = async (a: number, b: number) => a + b;
Object.defineProperty(clientStepFn, 'stepId', {
value: stepId,
writable: false,
enumerable: false,
configurable: false,
});

// Serialize from client side using external reducers
const ops: Promise<void>[] = [];
const dehydrated = dehydrateWorkflowArguments(
[clientStepFn, 3, 5],
ops,
mockRunId,
globalThis
);

// Hydrate in workflow context using VM's globalThis
const hydrated = hydrateWorkflowArguments(dehydrated, vmGlobalThis);

// Verify the hydrated result
expect(Array.isArray(hydrated)).toBe(true);
expect(hydrated).toHaveLength(3);

const [hydratedStepFn, arg1, arg2] = hydrated;

// The step function should be a function (from useStep wrapper)
expect(typeof hydratedStepFn).toBe('function');
expect(arg1).toBe(3);
expect(arg2).toBe(5);

// The hydrated function should have stepId
expect(hydratedStepFn.stepId).toBe(stepId);
});

it('should throw error when WORKFLOW_USE_STEP is not set on globalThis', () => {
const stepId = 'step//workflows/test.ts//missingUseStep';

// Create a VM context WITHOUT setting up WORKFLOW_USE_STEP
const { context, globalThis: vmGlobalThis } = createContext({
seed: 'test',
fixedTimestamp: 1714857600000,
});

// Create a function with stepId
const clientStepFn = async (a: number, b: number) => a + b;
Object.defineProperty(clientStepFn, 'stepId', {
value: stepId,
writable: false,
enumerable: false,
configurable: false,
});

// Serialize from client side
const ops: Promise<void>[] = [];
const dehydrated = dehydrateWorkflowArguments(
[clientStepFn],
ops,
mockRunId,
globalThis
);

// Hydrating should throw because WORKFLOW_USE_STEP is not set
expect(() => hydrateWorkflowArguments(dehydrated, vmGlobalThis)).toThrow(
'WORKFLOW_USE_STEP not found on global object'
);
});
});

describe('custom class serialization', () => {
Expand Down
Loading
Loading