Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/client-side-runid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/core": patch
"@workflow/world": patch
---

Generate runId client-side in start() and simplify runId types

The `runId` is now generated client-side using ULID before serialization, rather than waiting for the server response. This simplifies the `Streamer` interface and `WorkflowServerWritableStream` to accept `string` instead of `string | Promise<string>` for `runId`.
12 changes: 7 additions & 5 deletions packages/core/src/runtime/start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ describe('start', () => {
let mockQueue: ReturnType<typeof vi.fn>;

beforeEach(() => {
mockEventsCreate = vi.fn().mockResolvedValue({
run: { runId: 'wrun_test123', status: 'pending' },
mockEventsCreate = vi.fn().mockImplementation((runId) => {
return Promise.resolve({
run: { runId: runId ?? 'wrun_test123', status: 'pending' },
});
});
mockQueue = vi.fn().mockResolvedValue(undefined);

Expand All @@ -105,7 +107,7 @@ describe('start', () => {
await start(validWorkflow, []);

expect(mockEventsCreate).toHaveBeenCalledWith(
null,
expect.stringMatching(/^wrun_/),
expect.objectContaining({
eventType: 'run_created',
specVersion: SPEC_VERSION_CURRENT,
Expand All @@ -124,7 +126,7 @@ describe('start', () => {
await start(validWorkflow, [], { specVersion: SPEC_VERSION_LEGACY });

expect(mockEventsCreate).toHaveBeenCalledWith(
null,
expect.stringMatching(/^wrun_/),
expect.objectContaining({
eventType: 'run_created',
specVersion: SPEC_VERSION_LEGACY,
Expand All @@ -143,7 +145,7 @@ describe('start', () => {
await start(validWorkflow, [], { specVersion: 1 });

expect(mockEventsCreate).toHaveBeenCalledWith(
null,
expect.stringMatching(/^wrun_/),
expect.objectContaining({
eventType: 'run_created',
specVersion: 1,
Expand Down
27 changes: 18 additions & 9 deletions packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { waitUntil } from '@vercel/functions';
import { WorkflowRuntimeError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import type { WorkflowInvokePayload, World } from '@workflow/world';
import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world';
import { Run } from './run.js';
import { monotonicFactory } from 'ulid';
import type { Serializable } from '../schemas.js';
import { dehydrateWorkflowArguments } from '../serialization.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { serializeTraceCarrier, trace } from '../telemetry.js';
import { waitedUntil } from '../util.js';
import { version as workflowCoreVersion } from '../version.js';
import { getWorkflowQueueName } from './helpers.js';
import { Run } from './run.js';
import { getWorld } from './world.js';

/** ULID generator for client-side runId generation */
const ulid = monotonicFactory();

export interface StartOptions {
/**
* The deployment ID to use for the workflow run.
Expand Down Expand Up @@ -103,8 +106,10 @@ export async function start<TArgs extends unknown[], TResult>(
const world = opts?.world ?? getWorld();
const deploymentId = opts.deploymentId ?? (await world.getDeploymentId());
const ops: Promise<void>[] = [];
const { promise: runIdPromise, resolve: resolveRunId } =
withResolvers<string>();

// Generate runId client-side so we have it before serialization
// (required for future E2E encryption where runId is part of the encryption context)
const runId = `wrun_${ulid()}`;

// Serialize current trace context to propagate across queue boundary
const traceCarrier = await serializeTraceCarrier();
Expand All @@ -113,16 +118,16 @@ export async function start<TArgs extends unknown[], TResult>(
const v1Compat = isLegacySpecVersion(specVersion);

// Create run via run_created event (event-sourced architecture)
// Pass null for runId - the server generates it and returns it in the response
// Pass client-generated runId - server will accept and use it
const workflowArguments = dehydrateWorkflowArguments(
args,
ops,
runIdPromise,
runId,
globalThis,
v1Compat
);
const result = await world.events.create(
null,
runId,
{
eventType: 'run_created',
specVersion,
Expand All @@ -143,8 +148,12 @@ export async function start<TArgs extends unknown[], TResult>(
);
}

const runId = result.run.runId;
resolveRunId(runId);
// Verify server accepted our runId
if (result.run.runId !== runId) {
throw new WorkflowRuntimeError(
`Server returned different runId than requested: expected ${runId}, got ${result.run.runId}`
);
}

waitUntil(
Promise.all(ops).catch((err) => {
Expand Down
37 changes: 14 additions & 23 deletions packages/core/src/serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,9 @@ export class WorkflowServerReadableStream extends ReadableStream<Uint8Array> {
const STREAM_FLUSH_INTERVAL_MS = 10;

export class WorkflowServerWritableStream extends WritableStream<Uint8Array> {
constructor(name: string, runId: string | Promise<string>) {
// runId can be a promise, because we need a runID to write to a stream,
// but at class instantiation time, we might not have a run ID yet. This
// mainly happens when calling start() for a workflow with already-serialized
// arguments.
if (typeof runId !== 'string' && !(runId instanceof Promise)) {
throw new Error(
`"runId" must be a string or a promise that resolves to a string, got "${typeof runId}"`
);
constructor(name: string, runId: string) {
if (typeof runId !== 'string') {
throw new Error(`"runId" must be a string, got "${typeof runId}"`);
}
if (typeof name !== 'string' || name.length === 0) {
throw new Error(`"name" is required, got "${name}"`);
Expand All @@ -312,18 +306,16 @@ export class WorkflowServerWritableStream extends WritableStream<Uint8Array> {
// This prevents data loss if the write operation fails
const chunksToFlush = buffer.slice();

const _runId = await runId;

// Use writeToStreamMulti if available for batch writes
if (
typeof world.writeToStreamMulti === 'function' &&
chunksToFlush.length > 1
) {
await world.writeToStreamMulti(name, _runId, chunksToFlush);
await world.writeToStreamMulti(name, runId, chunksToFlush);
} else {
// Fall back to sequential writes
for (const chunk of chunksToFlush) {
await world.writeToStream(name, _runId, chunk);
await world.writeToStream(name, runId, chunk);
}
}

Expand Down Expand Up @@ -361,8 +353,7 @@ export class WorkflowServerWritableStream extends WritableStream<Uint8Array> {
// Flush any remaining buffered chunks
await flush();

const _runId = await runId;
await world.closeStream(name, _runId);
await world.closeStream(name, runId);
},
abort() {
// Clean up timer to prevent leaks
Expand Down Expand Up @@ -620,7 +611,7 @@ function getCommonReducers(global: Record<string, any> = globalThis) {
export function getExternalReducers(
global: Record<string, any> = globalThis,
ops: Promise<void>[],
runId: string | Promise<string>
runId: string
): Reducers {
return {
...getCommonReducers(global),
Expand Down Expand Up @@ -727,7 +718,7 @@ export function getWorkflowReducers(
function getStepReducers(
global: Record<string, any> = globalThis,
ops: Promise<void>[],
runId: string | Promise<string>
runId: string
): Reducers {
return {
...getCommonReducers(global),
Expand Down Expand Up @@ -926,7 +917,7 @@ export function getCommonRevivers(global: Record<string, any> = globalThis) {
export function getExternalRevivers(
global: Record<string, any> = globalThis,
ops: Promise<void>[],
runId: string | Promise<string>
runId: string
): Revivers {
return {
...getCommonRevivers(global),
Expand Down Expand Up @@ -1138,7 +1129,7 @@ export function getWorkflowRevivers(
function getStepRevivers(
global: Record<string, any> = globalThis,
ops: Promise<void>[],
runId: string | Promise<string>
runId: string
): Revivers {
return {
...getCommonRevivers(global),
Expand Down Expand Up @@ -1315,7 +1306,7 @@ function getStepRevivers(
export function dehydrateWorkflowArguments(
value: unknown,
ops: Promise<void>[],
runId: string | Promise<string>,
runId: string,
global: Record<string, any> = globalThis,
v1Compat = false
): Uint8Array | unknown {
Expand Down Expand Up @@ -1412,7 +1403,7 @@ export function dehydrateWorkflowReturnValue(
export function hydrateWorkflowReturnValue(
value: Uint8Array | unknown,
ops: Promise<void>[],
runId: string | Promise<string>,
runId: string,
global: Record<string, any> = globalThis,
extraRevivers: Record<string, (value: any) => any> = {}
) {
Expand Down Expand Up @@ -1480,7 +1471,7 @@ export function dehydrateStepArguments(
export function hydrateStepArguments(
value: Uint8Array | unknown,
ops: Promise<any>[],
runId: string | Promise<string>,
runId: string,
global: Record<string, any> = globalThis,
extraRevivers: Record<string, (value: any) => any> = {}
) {
Expand Down Expand Up @@ -1519,7 +1510,7 @@ export function hydrateStepArguments(
export function dehydrateStepReturnValue(
value: unknown,
ops: Promise<any>[],
runId: string | Promise<string>,
runId: string,
global: Record<string, any> = globalThis,
v1Compat = false
): Uint8Array | unknown {
Expand Down
50 changes: 2 additions & 48 deletions packages/core/src/writable-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ describe('WorkflowServerWritableStream', () => {
});

describe('constructor validation', () => {
it('should throw error when runId is not a string or promise', () => {
it('should throw error when runId is not a string', () => {
expect(() => {
new WorkflowServerWritableStream('test-stream', 123 as any);
}).toThrow(
'"runId" must be a string or a promise that resolves to a string'
);
}).toThrow('"runId" must be a string');
});

it('should throw error when name is empty', () => {
Expand All @@ -51,15 +49,6 @@ describe('WorkflowServerWritableStream', () => {
new WorkflowServerWritableStream('test-stream', 'run-123');
}).not.toThrow();
});

it('should accept a promise runId', () => {
expect(() => {
new WorkflowServerWritableStream(
'test-stream',
Promise.resolve('run-123')
);
}).not.toThrow();
});
});

describe('buffering behavior', () => {
Expand Down Expand Up @@ -279,41 +268,6 @@ describe('WorkflowServerWritableStream', () => {
});
});

describe('promise runId handling', () => {
it('should wait for runId promise before writing', async () => {
let resolveRunId: (value: string) => void;
const runIdPromise = new Promise<string>((resolve) => {
resolveRunId = resolve;
});

const stream = new WorkflowServerWritableStream(
'test-stream',
runIdPromise
);
const writer = stream.getWriter();

// Write and trigger flush
await writer.write(new Uint8Array([1, 2, 3]));
await vi.advanceTimersByTimeAsync(10);

// Write should not have happened yet because runId is not resolved
expect(mockWorld.writeToStream).not.toHaveBeenCalled();

// Resolve runId
resolveRunId!('resolved-run-123');
await vi.advanceTimersByTimeAsync(0); // Let promises settle

// Now the write should have happened
expect(mockWorld.writeToStream).toHaveBeenCalledWith(
'test-stream',
'resolved-run-123',
new Uint8Array([1, 2, 3])
);

await writer.close();
});
});

describe('empty buffer handling', () => {
it('should not call write methods when buffer is empty on close', async () => {
const stream = new WorkflowServerWritableStream('test-stream', 'run-123');
Expand Down
2 changes: 1 addition & 1 deletion packages/world-local/src/storage/events-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function createEventsStorage(basedir: string): Storage['events'] {
const eventId = `evnt_${monotonicUlid()}`;
const now = new Date();

// For run_created events, generate runId server-side if null or empty
// For run_created events, use client-provided runId or generate one server-side
let effectiveRunId: string;
if (data.eventType === 'run_created' && (!runId || runId === '')) {
effectiveRunId = `wrun_${monotonicUlid()}`;
Expand Down
2 changes: 1 addition & 1 deletion packages/world-postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
async create(runId, data, params): Promise<EventResult> {
const eventId = `wevt_${ulid()}`;

// For run_created events, generate runId server-side if null or empty
// For run_created events, use client-provided runId or generate one server-side
let effectiveRunId: string;
if (data.eventType === 'run_created' && (!runId || runId === '')) {
effectiveRunId = `wrun_${ulid()}`;
Expand Down
2 changes: 1 addition & 1 deletion packages/world-vercel/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export async function createWorkflowRunEvent(
return { event: wireResult };
}

// For run_created events, runId is null - use "null" string in the URL path
// For run_created events, runId may be client-provided or null
const runIdPath = id === null ? 'null' : id;

const remoteRefBehavior = eventsNeedingResolve.has(data.eventType)
Expand Down
2 changes: 1 addition & 1 deletion packages/world/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ export type AnyEventRequest = z.infer<typeof CreateEventSchema>;

/**
* Event request for creating a new workflow run.
* Must be used with runId: null since the server generates the runId.
* Can be used with a client-generated runId or null for server-generated.
*/
export type RunCreatedEventRequest = z.infer<typeof RunCreatedEventSchema>;

Expand Down
14 changes: 7 additions & 7 deletions packages/world/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type {
export interface Streamer {
writeToStream(
name: string,
runId: string | Promise<string>,
runId: string,
chunk: string | Uint8Array
): Promise<void>;

Expand All @@ -38,16 +38,16 @@ export interface Streamer {
* If not implemented, the caller should fall back to sequential writeToStream() calls.
*
* @param name - The stream name
* @param runId - The run ID (can be a promise)
* @param runId - The run ID
* @param chunks - Array of chunks to write, in order
*/
writeToStreamMulti?(
name: string,
runId: string | Promise<string>,
runId: string,
chunks: (string | Uint8Array)[]
): Promise<void>;

closeStream(name: string, runId: string | Promise<string>): Promise<void>;
closeStream(name: string, runId: string): Promise<void>;
readFromStream(
name: string,
startIndex?: number
Expand Down Expand Up @@ -127,15 +127,15 @@ export interface Storage {
events: {
/**
* Create a run_created event to start a new workflow run.
* The runId parameter must be null - the server generates and returns the runId.
* The runId may be provided by the client or left as null for the server to generate.
*
* @param runId - Must be null for run_created events
* @param runId - Client-generated runId, or null for server-generated
* @param data - The run_created event data
* @param params - Optional parameters for event creation
* @returns Promise resolving to the created event and run entity
*/
create(
runId: null,
runId: string | null,
data: RunCreatedEventRequest,
params?: CreateEventParams
): Promise<EventResult>;
Expand Down