diff --git a/.changeset/client-side-runid.md b/.changeset/client-side-runid.md new file mode 100644 index 000000000..e1b88ff33 --- /dev/null +++ b/.changeset/client-side-runid.md @@ -0,0 +1,8 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +--- + +Generate runId client-side in start() and simplify runId types + +The `runId` is now generated client-side using ULID before serialization, rather than waiting for the server response. This simplifies the `Streamer` interface and `WorkflowServerWritableStream` to accept `string` instead of `string | Promise` for `runId`. diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index ff11bffd7..0f75de275 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -81,8 +81,10 @@ describe('start', () => { let mockQueue: ReturnType; beforeEach(() => { - mockEventsCreate = vi.fn().mockResolvedValue({ - run: { runId: 'wrun_test123', status: 'pending' }, + mockEventsCreate = vi.fn().mockImplementation((runId) => { + return Promise.resolve({ + run: { runId: runId ?? 'wrun_test123', status: 'pending' }, + }); }); mockQueue = vi.fn().mockResolvedValue(undefined); @@ -105,7 +107,7 @@ describe('start', () => { await start(validWorkflow, []); expect(mockEventsCreate).toHaveBeenCalledWith( - null, + expect.stringMatching(/^wrun_/), expect.objectContaining({ eventType: 'run_created', specVersion: SPEC_VERSION_CURRENT, @@ -124,7 +126,7 @@ describe('start', () => { await start(validWorkflow, [], { specVersion: SPEC_VERSION_LEGACY }); expect(mockEventsCreate).toHaveBeenCalledWith( - null, + expect.stringMatching(/^wrun_/), expect.objectContaining({ eventType: 'run_created', specVersion: SPEC_VERSION_LEGACY, @@ -143,7 +145,7 @@ describe('start', () => { await start(validWorkflow, [], { specVersion: 1 }); expect(mockEventsCreate).toHaveBeenCalledWith( - null, + expect.stringMatching(/^wrun_/), expect.objectContaining({ eventType: 'run_created', specVersion: 1, diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index a51e34b93..b536145fc 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,9 +1,8 @@ import { waitUntil } from '@vercel/functions'; import { WorkflowRuntimeError } from '@workflow/errors'; -import { withResolvers } from '@workflow/utils'; import type { WorkflowInvokePayload, World } from '@workflow/world'; import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; -import { Run } from './run.js'; +import { monotonicFactory } from 'ulid'; import type { Serializable } from '../schemas.js'; import { dehydrateWorkflowArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; @@ -11,8 +10,12 @@ import { serializeTraceCarrier, trace } from '../telemetry.js'; import { waitedUntil } from '../util.js'; import { version as workflowCoreVersion } from '../version.js'; import { getWorkflowQueueName } from './helpers.js'; +import { Run } from './run.js'; import { getWorld } from './world.js'; +/** ULID generator for client-side runId generation */ +const ulid = monotonicFactory(); + export interface StartOptions { /** * The deployment ID to use for the workflow run. @@ -103,8 +106,10 @@ export async function start( const world = opts?.world ?? getWorld(); const deploymentId = opts.deploymentId ?? (await world.getDeploymentId()); const ops: Promise[] = []; - const { promise: runIdPromise, resolve: resolveRunId } = - withResolvers(); + + // Generate runId client-side so we have it before serialization + // (required for future E2E encryption where runId is part of the encryption context) + const runId = `wrun_${ulid()}`; // Serialize current trace context to propagate across queue boundary const traceCarrier = await serializeTraceCarrier(); @@ -113,16 +118,16 @@ export async function start( const v1Compat = isLegacySpecVersion(specVersion); // Create run via run_created event (event-sourced architecture) - // Pass null for runId - the server generates it and returns it in the response + // Pass client-generated runId - server will accept and use it const workflowArguments = dehydrateWorkflowArguments( args, ops, - runIdPromise, + runId, globalThis, v1Compat ); const result = await world.events.create( - null, + runId, { eventType: 'run_created', specVersion, @@ -143,8 +148,12 @@ export async function start( ); } - const runId = result.run.runId; - resolveRunId(runId); + // Verify server accepted our runId + if (result.run.runId !== runId) { + throw new WorkflowRuntimeError( + `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` + ); + } waitUntil( Promise.all(ops).catch((err) => { diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index bd41a3684..5b4b18d70 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -280,15 +280,9 @@ export class WorkflowServerReadableStream extends ReadableStream { const STREAM_FLUSH_INTERVAL_MS = 10; export class WorkflowServerWritableStream extends WritableStream { - constructor(name: string, runId: string | Promise) { - // runId can be a promise, because we need a runID to write to a stream, - // but at class instantiation time, we might not have a run ID yet. This - // mainly happens when calling start() for a workflow with already-serialized - // arguments. - if (typeof runId !== 'string' && !(runId instanceof Promise)) { - throw new Error( - `"runId" must be a string or a promise that resolves to a string, got "${typeof runId}"` - ); + constructor(name: string, runId: string) { + if (typeof runId !== 'string') { + throw new Error(`"runId" must be a string, got "${typeof runId}"`); } if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); @@ -312,18 +306,16 @@ export class WorkflowServerWritableStream extends WritableStream { // This prevents data loss if the write operation fails const chunksToFlush = buffer.slice(); - const _runId = await runId; - // Use writeToStreamMulti if available for batch writes if ( typeof world.writeToStreamMulti === 'function' && chunksToFlush.length > 1 ) { - await world.writeToStreamMulti(name, _runId, chunksToFlush); + await world.writeToStreamMulti(name, runId, chunksToFlush); } else { // Fall back to sequential writes for (const chunk of chunksToFlush) { - await world.writeToStream(name, _runId, chunk); + await world.writeToStream(name, runId, chunk); } } @@ -361,8 +353,7 @@ export class WorkflowServerWritableStream extends WritableStream { // Flush any remaining buffered chunks await flush(); - const _runId = await runId; - await world.closeStream(name, _runId); + await world.closeStream(name, runId); }, abort() { // Clean up timer to prevent leaks @@ -620,7 +611,7 @@ function getCommonReducers(global: Record = globalThis) { export function getExternalReducers( global: Record = globalThis, ops: Promise[], - runId: string | Promise + runId: string ): Reducers { return { ...getCommonReducers(global), @@ -727,7 +718,7 @@ export function getWorkflowReducers( function getStepReducers( global: Record = globalThis, ops: Promise[], - runId: string | Promise + runId: string ): Reducers { return { ...getCommonReducers(global), @@ -926,7 +917,7 @@ export function getCommonRevivers(global: Record = globalThis) { export function getExternalRevivers( global: Record = globalThis, ops: Promise[], - runId: string | Promise + runId: string ): Revivers { return { ...getCommonRevivers(global), @@ -1138,7 +1129,7 @@ export function getWorkflowRevivers( function getStepRevivers( global: Record = globalThis, ops: Promise[], - runId: string | Promise + runId: string ): Revivers { return { ...getCommonRevivers(global), @@ -1315,7 +1306,7 @@ function getStepRevivers( export function dehydrateWorkflowArguments( value: unknown, ops: Promise[], - runId: string | Promise, + runId: string, global: Record = globalThis, v1Compat = false ): Uint8Array | unknown { @@ -1412,7 +1403,7 @@ export function dehydrateWorkflowReturnValue( export function hydrateWorkflowReturnValue( value: Uint8Array | unknown, ops: Promise[], - runId: string | Promise, + runId: string, global: Record = globalThis, extraRevivers: Record any> = {} ) { @@ -1480,7 +1471,7 @@ export function dehydrateStepArguments( export function hydrateStepArguments( value: Uint8Array | unknown, ops: Promise[], - runId: string | Promise, + runId: string, global: Record = globalThis, extraRevivers: Record any> = {} ) { @@ -1519,7 +1510,7 @@ export function hydrateStepArguments( export function dehydrateStepReturnValue( value: unknown, ops: Promise[], - runId: string | Promise, + runId: string, global: Record = globalThis, v1Compat = false ): Uint8Array | unknown { diff --git a/packages/core/src/writable-stream.test.ts b/packages/core/src/writable-stream.test.ts index 6941b6534..6eb9433c2 100644 --- a/packages/core/src/writable-stream.test.ts +++ b/packages/core/src/writable-stream.test.ts @@ -32,12 +32,10 @@ describe('WorkflowServerWritableStream', () => { }); describe('constructor validation', () => { - it('should throw error when runId is not a string or promise', () => { + it('should throw error when runId is not a string', () => { expect(() => { new WorkflowServerWritableStream('test-stream', 123 as any); - }).toThrow( - '"runId" must be a string or a promise that resolves to a string' - ); + }).toThrow('"runId" must be a string'); }); it('should throw error when name is empty', () => { @@ -51,15 +49,6 @@ describe('WorkflowServerWritableStream', () => { new WorkflowServerWritableStream('test-stream', 'run-123'); }).not.toThrow(); }); - - it('should accept a promise runId', () => { - expect(() => { - new WorkflowServerWritableStream( - 'test-stream', - Promise.resolve('run-123') - ); - }).not.toThrow(); - }); }); describe('buffering behavior', () => { @@ -279,41 +268,6 @@ describe('WorkflowServerWritableStream', () => { }); }); - describe('promise runId handling', () => { - it('should wait for runId promise before writing', async () => { - let resolveRunId: (value: string) => void; - const runIdPromise = new Promise((resolve) => { - resolveRunId = resolve; - }); - - const stream = new WorkflowServerWritableStream( - 'test-stream', - runIdPromise - ); - const writer = stream.getWriter(); - - // Write and trigger flush - await writer.write(new Uint8Array([1, 2, 3])); - await vi.advanceTimersByTimeAsync(10); - - // Write should not have happened yet because runId is not resolved - expect(mockWorld.writeToStream).not.toHaveBeenCalled(); - - // Resolve runId - resolveRunId!('resolved-run-123'); - await vi.advanceTimersByTimeAsync(0); // Let promises settle - - // Now the write should have happened - expect(mockWorld.writeToStream).toHaveBeenCalledWith( - 'test-stream', - 'resolved-run-123', - new Uint8Array([1, 2, 3]) - ); - - await writer.close(); - }); - }); - describe('empty buffer handling', () => { it('should not call write methods when buffer is empty on close', async () => { const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 03f7aca11..eaa066002 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -41,7 +41,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { const eventId = `evnt_${monotonicUlid()}`; const now = new Date(); - // For run_created events, generate runId server-side if null or empty + // For run_created events, use client-provided runId or generate one server-side let effectiveRunId: string; if (data.eventType === 'run_created' && (!runId || runId === '')) { effectiveRunId = `wrun_${monotonicUlid()}`; diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index b7b024021..d7b207658 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -287,7 +287,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { async create(runId, data, params): Promise { const eventId = `wevt_${ulid()}`; - // For run_created events, generate runId server-side if null or empty + // For run_created events, use client-provided runId or generate one server-side let effectiveRunId: string; if (data.eventType === 'run_created' && (!runId || runId === '')) { effectiveRunId = `wrun_${ulid()}`; diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 9c4eddce1..f58b2b1f3 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -138,7 +138,7 @@ export async function createWorkflowRunEvent( return { event: wireResult }; } - // For run_created events, runId is null - use "null" string in the URL path + // For run_created events, runId may be client-provided or null const runIdPath = id === null ? 'null' : id; const remoteRefBehavior = eventsNeedingResolve.has(data.eventType) diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index c7719ca4f..61f164f11 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -282,7 +282,7 @@ export type AnyEventRequest = z.infer; /** * Event request for creating a new workflow run. - * Must be used with runId: null since the server generates the runId. + * Can be used with a client-generated runId or null for server-generated. */ export type RunCreatedEventRequest = z.infer; diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index c737a619e..a68e5437c 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -26,7 +26,7 @@ import type { export interface Streamer { writeToStream( name: string, - runId: string | Promise, + runId: string, chunk: string | Uint8Array ): Promise; @@ -38,16 +38,16 @@ export interface Streamer { * If not implemented, the caller should fall back to sequential writeToStream() calls. * * @param name - The stream name - * @param runId - The run ID (can be a promise) + * @param runId - The run ID * @param chunks - Array of chunks to write, in order */ writeToStreamMulti?( name: string, - runId: string | Promise, + runId: string, chunks: (string | Uint8Array)[] ): Promise; - closeStream(name: string, runId: string | Promise): Promise; + closeStream(name: string, runId: string): Promise; readFromStream( name: string, startIndex?: number @@ -127,15 +127,15 @@ export interface Storage { events: { /** * Create a run_created event to start a new workflow run. - * The runId parameter must be null - the server generates and returns the runId. + * The runId may be provided by the client or left as null for the server to generate. * - * @param runId - Must be null for run_created events + * @param runId - Client-generated runId, or null for server-generated * @param data - The run_created event data * @param params - Optional parameters for event creation * @returns Promise resolving to the created event and run entity */ create( - runId: null, + runId: string | null, data: RunCreatedEventRequest, params?: CreateEventParams ): Promise;