diff --git a/.changeset/ninety-dancers-brush.md b/.changeset/ninety-dancers-brush.md new file mode 100644 index 0000000000..a67832c44e --- /dev/null +++ b/.changeset/ninety-dancers-brush.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Make `getWorld` asynchronous so it can use dynamic imports diff --git a/docs/content/docs/api-reference/workflow-api/get-world.mdx b/docs/content/docs/api-reference/workflow-api/get-world.mdx index 60d7c80b7e..669d4c99c2 100644 --- a/docs/content/docs/api-reference/workflow-api/get-world.mdx +++ b/docs/content/docs/api-reference/workflow-api/get-world.mdx @@ -3,14 +3,14 @@ title: getWorld description: Access the World instance for low-level storage, queuing, and streaming operations. --- -Retrieves the World instance for direct access to workflow storage, queuing, and streaming backends. This function returns a `World` which provides low-level access to manage workflow runs, steps, events, and hooks. +Retrieves the World instance for direct access to workflow storage, queuing, and streaming backends. This async function returns a `Promise` which provides low-level access to manage workflow runs, steps, events, and hooks. Use this function when you need direct access to the underlying workflow infrastructure, such as listing all runs, querying events, or implementing custom workflow management logic. ```typescript lineNumbers import { getWorld } from "workflow/runtime"; -const world = getWorld(); +const world = await getWorld(); ``` ## API Signature @@ -21,7 +21,7 @@ This function does not accept any parameters. ### Returns -Returns a `World` object: +Returns a `Promise` object: { const { getWorld } = await import("workflow/runtime"); - await getWorld().start?.(); + const world = await getWorld(); + await world.start?.(); }; ``` @@ -85,7 +87,8 @@ import { defineNitroPlugin } from "nitro/~internal/runtime/plugin"; export default defineNitroPlugin(async () => { const { getWorld } = await import("workflow/runtime"); - await getWorld().start?.(); + const world = await getWorld(); + await world.start?.(); }); ``` @@ -155,7 +158,8 @@ Number of concurrent workers polling for jobs. Default: `10` ### Programmatic configuration -{/* @skip-typecheck: incomplete code sample */} +{/*@skip-typecheck: incomplete code sample*/} + ```typescript title="workflow.config.ts" lineNumbers import { createWorld } from "@workflow/world-postgres"; @@ -186,6 +190,7 @@ Deploy your application to any cloud that supports long-running servers: - Platform-as-a-Service providers (Railway, Render, Fly.io, etc.) Ensure your deployment has: + 1. Network access to your PostgreSQL database 2. Environment variables configured correctly 3. The `start()` function called on server initialization diff --git a/packages/cli/src/lib/inspect/env.ts b/packages/cli/src/lib/inspect/env.ts index c385f11fae..a34de547f3 100644 --- a/packages/cli/src/lib/inspect/env.ts +++ b/packages/cli/src/lib/inspect/env.ts @@ -42,6 +42,7 @@ export const getEnvVars = (): Record => { WORKFLOW_VERCEL_PROJECT: env.WORKFLOW_VERCEL_PROJECT || '', WORKFLOW_VERCEL_TEAM: env.WORKFLOW_VERCEL_TEAM || '', WORKFLOW_LOCAL_UI: env.WORKFLOW_LOCAL_UI || '', + WORKFLOW_LOCAL_BASE_URL: env.WORKFLOW_LOCAL_BASE_URL || '', PORT: env.PORT || '', WORKFLOW_LOCAL_DATA_DIR: env.WORKFLOW_LOCAL_DATA_DIR || '', WORKFLOW_MANIFEST_PATH: env.WORKFLOW_MANIFEST_PATH || '', diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 6e229a4971..10ac925aa2 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -16,7 +16,11 @@ import { withHealthCheck, } from './runtime/helpers.js'; import { handleSuspension } from './runtime/suspension-handler.js'; -import { getWorld, getWorldHandlers } from './runtime/world.js'; +import { + getWorld, + getWorldHandlers, + type WorldHandlers, +} from './runtime/world.js'; import { remapErrorStack } from './source-map.js'; import * as Attribute from './telemetry/semantic-conventions.js'; import { linkToCurrentContext, trace, withTraceContext } from './telemetry.js'; @@ -62,236 +66,238 @@ export { export function workflowEntrypoint( workflowCode: string ): (req: Request) => Promise { - const handler = getWorldHandlers().createQueueHandler( - '__wkf_workflow_', - async (message_, metadata) => { - // Check if this is a health check message - // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. - // They only write a simple status response to a stream and do not expose sensitive data. - // The stream name includes a unique correlationId that must be known by the caller. - const healthCheck = parseHealthCheckPayload(message_); - if (healthCheck) { - await handleHealthCheckMessage(healthCheck, 'workflow'); - return; - } - - const { - runId, - traceCarrier: traceContext, - requestedAt, - } = WorkflowInvokePayloadSchema.parse(message_); - // Extract the workflow name from the topic name - const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); - const spanLinks = await linkToCurrentContext(); - - // Invoke user workflow within the propagated trace context - return await withTraceContext(traceContext, async () => { - const world = getWorld(); - return trace( - `WORKFLOW ${workflowName}`, - { links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowOperation('execute'), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - // TODO: validate `workflowName` exists before consuming message? + const handler = (worldHandlers: WorldHandlers) => + worldHandlers.createQueueHandler( + '__wkf_workflow_', + async (message_, metadata) => { + // Check if this is a health check message + // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. + // They only write a simple status response to a stream and do not expose sensitive data. + // The stream name includes a unique correlationId that must be known by the caller. + const healthCheck = parseHealthCheckPayload(message_); + if (healthCheck) { + await handleHealthCheckMessage(healthCheck, 'workflow'); + return; + } - span?.setAttributes({ - ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowTracePropagated(!!traceContext), - }); + const { + runId, + traceCarrier: traceContext, + requestedAt, + } = WorkflowInvokePayloadSchema.parse(message_); + // Extract the workflow name from the topic name + const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); + const spanLinks = await linkToCurrentContext(); - let workflowStartedAt = -1; - try { - let workflowRun = await world.runs.get(runId); - - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; - } + // Invoke user workflow within the propagated trace context + return await withTraceContext(traceContext, async () => { + const world = await getWorld(); + return trace( + `WORKFLOW ${workflowName}`, + { links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowOperation('execute'), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - workflowStartedAt = +workflowRun.startedAt; + // TODO: validate `workflowName` exists before consuming message? span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), + ...Attribute.WorkflowRunId(runId), + ...Attribute.WorkflowTracePropagated(!!traceContext), }); - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } - ); - - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + let workflowStartedAt = -1; + try { + let workflowRun = await world.runs.get(runId); - return; - } + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }); + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` + ); + } + workflowRun = result.run; + } - // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); + } + workflowStartedAt = +workflowRun.startedAt; - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - const result = await world.events.create(runId, waitEvent); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } + return; + } - const result = await runWorkflow( - workflowCode, - workflowRun, - events - ); + // Load all events into memory before running + const events = await getAllWorkflowRunEvents(workflowRun.runId); - // Complete the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_completed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - output: result, - }, - }); + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); - } catch (err) { - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } - const result = await handleSuspension({ - suspension: err, - world, - runId, - workflowName, - workflowStartedAt, - span, - }); + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + const result = await world.events.create(runId, waitEvent); + // Add the event to the events array so the workflow can see it + events.push(result.event!); } - } else { - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. - - const errorName = getErrorName(err); - const errorMessage = - err instanceof Error ? err.message : String(err); - let errorStack = getErrorStack(err); - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); - } + const result = await runWorkflow( + workflowCode, + workflowRun, + events + ); - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, - errorStack, - }); - // Fail the workflow run via event (event-sourced architecture) + // Complete the workflow run via event (event-sourced architecture) await world.events.create(runId, { - eventType: 'run_failed', + eventType: 'run_completed', specVersion: SPEC_VERSION_CURRENT, eventData: { - error: { - message: errorMessage, - stack: errorStack, - }, - // TODO: include error codes when we define them + output: result, }, }); span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(String(err)), + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), }); + } catch (err) { + if (WorkflowSuspension.is(err)) { + const suspensionMessage = buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount + ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } + + const result = await handleSuspension({ + suspension: err, + world, + runId, + workflowName, + workflowStartedAt, + span, + }); + + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + } else { + // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError + // (for instance when the event log is corrupted, this is thrown by the event consumer). We could + // specially handle these if needed. + + const errorName = getErrorName(err); + const errorMessage = + err instanceof Error ? err.message : String(err); + let errorStack = getErrorStack(err); + + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); + // Fail the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them + }, + }); + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(String(err)), + }); + } } } - } - ); // End withTraceContext - }); - } - ); + ); // End withTraceContext + }); + } + ); - return withHealthCheck(handler); + return withHealthCheck(async (req) => handler(await getWorldHandlers())(req)); } // this is a no-op placeholder as the client is diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 0c1e361330..d5d2bfe30a 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -65,7 +65,7 @@ export async function handleHealthCheckMessage( healthCheck: HealthCheckPayload, endpoint: 'workflow' | 'step' ): Promise { - const world = getWorld(); + const world = await getWorld(); const streamName = getHealthCheckStreamName(healthCheck.correlationId); const response = JSON.stringify({ healthy: true, @@ -259,7 +259,7 @@ export async function getAllWorkflowRunEvents(runId: string): Promise { let hasMore = true; let pagesLoaded = 0; - const world = getWorld(); + const world = await getWorld(); while (hasMore) { // TODO: we're currently loading all the data with resolveRef behaviour. We need to update this // to lazyload the data from the world instead so that we can optimize and make the event log loading diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index b94d511038..00cb82c466 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -24,7 +24,7 @@ import { getWorld } from './world.js'; * @param token - The unique token identifying the hook */ export async function getHookByToken(token: string): Promise { - const world = getWorld(); + const world = await getWorld(); const hook = await world.hooks.getByToken(token); if (typeof hook.metadata !== 'undefined') { hook.metadata = hydrateStepArguments(hook.metadata as any, [], hook.runId); @@ -67,7 +67,7 @@ export async function resumeHook( ): Promise { return await waitedUntil(() => { return trace('hook.resume', async (span) => { - const world = getWorld(); + const world = await getWorld(); try { const hook = diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 298e5850e0..70dffeec25 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -5,8 +5,8 @@ import { } from '@workflow/errors'; import { SPEC_VERSION_CURRENT, - type World, type WorkflowRunStatus, + type World, } from '@workflow/world'; import { getExternalRevivers, @@ -55,18 +55,19 @@ export class Run { * The world object. * @internal */ - private world: World; + private worldPromise: Promise; constructor(runId: string) { this.runId = runId; - this.world = getWorld(); + this.worldPromise = getWorld(); } /** * Cancels the workflow run. */ async cancel(): Promise { - await this.world.events.create(this.runId, { + const world = await this.worldPromise; + await world.events.create(this.runId, { eventType: 'run_cancelled', specVersion: SPEC_VERSION_CURRENT, }); @@ -76,7 +77,9 @@ export class Run { * The status of the workflow run. */ get status(): Promise { - return this.world.runs.get(this.runId).then((run) => run.status); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.status) + ); } /** @@ -91,14 +94,18 @@ export class Run { * The name of the workflow. */ get workflowName(): Promise { - return this.world.runs.get(this.runId).then((run) => run.workflowName); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.workflowName) + ); } /** * The timestamp when the workflow run was created. */ get createdAt(): Promise { - return this.world.runs.get(this.runId).then((run) => run.createdAt); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.createdAt) + ); } /** @@ -106,7 +113,9 @@ export class Run { * Returns undefined if the workflow has not started yet. */ get startedAt(): Promise { - return this.world.runs.get(this.runId).then((run) => run.startedAt); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.startedAt) + ); } /** @@ -114,7 +123,9 @@ export class Run { * Returns undefined if the workflow has not completed yet. */ get completedAt(): Promise { - return this.world.runs.get(this.runId).then((run) => run.completedAt); + return this.worldPromise.then((world) => + world.runs.get(this.runId).then((run) => run.completedAt) + ); } /** @@ -148,9 +159,10 @@ export class Run { * @returns The workflow return value. */ private async pollReturnValue(): Promise { + const world = await this.worldPromise; while (true) { try { - const run = await this.world.runs.get(this.runId); + const run = await world.runs.get(this.runId); if (run.status === 'completed') { return hydrateWorkflowReturnValue(run.output, [], this.runId); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 13e001b452..5e52754ebd 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -3,13 +3,13 @@ import { WorkflowRuntimeError } from '@workflow/errors'; import { withResolvers } from '@workflow/utils'; import type { WorkflowInvokePayload, World } from '@workflow/world'; import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; -import { Run } from './run.js'; import type { Serializable } from '../schemas.js'; import { dehydrateWorkflowArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier, trace } from '../telemetry.js'; import { waitedUntil } from '../util.js'; import { version as workflowCoreVersion } from '../version.js'; +import { Run } from './run.js'; import { getWorld } from './world.js'; export interface StartOptions { @@ -99,7 +99,7 @@ export async function start( ...Attribute.WorkflowArgumentsCount(args.length), }); - const world = opts?.world ?? getWorld(); + const world = opts?.world ?? (await getWorld()); const deploymentId = opts.deploymentId ?? (await world.getDeploymentId()); const ops: Promise[] = []; const { promise: runIdPromise, resolve: resolveRunId } = diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 38a6652b8a..5384b47c53 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -31,478 +31,486 @@ import { queueMessage, withHealthCheck, } from './helpers.js'; -import { getWorld, getWorldHandlers } from './world.js'; +import { getWorld, getWorldHandlers, type WorldHandlers } from './world.js'; const DEFAULT_STEP_MAX_RETRIES = 3; -const stepHandler = getWorldHandlers().createQueueHandler( - '__wkf_step_', - async (message_, metadata) => { - // Check if this is a health check message - // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. - // They only write a simple status response to a stream and do not expose sensitive data. - // The stream name includes a unique correlationId that must be known by the caller. - const healthCheck = parseHealthCheckPayload(message_); - if (healthCheck) { - await handleHealthCheckMessage(healthCheck, 'step'); - return; - } +const stepHandler = (worldHandlers: WorldHandlers) => + worldHandlers.createQueueHandler( + '__wkf_step_', + async (message_, metadata) => { + // Check if this is a health check message + // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. + // They only write a simple status response to a stream and do not expose sensitive data. + // The stream name includes a unique correlationId that must be known by the caller. + const healthCheck = parseHealthCheckPayload(message_); + if (healthCheck) { + await handleHealthCheckMessage(healthCheck, 'step'); + return; + } + + const { + workflowName, + workflowRunId, + workflowStartedAt, + stepId, + traceCarrier: traceContext, + requestedAt, + } = StepInvokePayloadSchema.parse(message_); + const spanLinks = await linkToCurrentContext(); + // Execute step within the propagated trace context + return await withTraceContext(traceContext, async () => { + // Extract the step name from the topic name + const stepName = metadata.queueName.slice('__wkf_step_'.length); + const world = await getWorld(); + + // Get the port early to avoid async operations during step execution + const port = await getPort(); + + return trace( + `step ${stepName}`, + { kind: await getSpanKind('CONSUMER'), links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.StepName(stepName), + ...Attribute.StepAttempt(metadata.attempt), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - const { - workflowName, - workflowRunId, - workflowStartedAt, - stepId, - traceCarrier: traceContext, - requestedAt, - } = StepInvokePayloadSchema.parse(message_); - const spanLinks = await linkToCurrentContext(); - // Execute step within the propagated trace context - return await withTraceContext(traceContext, async () => { - // Extract the step name from the topic name - const stepName = metadata.queueName.slice('__wkf_step_'.length); - const world = getWorld(); - - // Get the port early to avoid async operations during step execution - const port = await getPort(); - - return trace( - `step ${stepName}`, - { kind: await getSpanKind('CONSUMER'), links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.StepName(stepName), - ...Attribute.StepAttempt(metadata.attempt), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - const stepFn = getStepFunction(stepName); - if (!stepFn) { - throw new Error(`Step "${stepName}" not found`); - } - if (typeof stepFn !== 'function') { - throw new Error( - `Step "${stepName}" is not a function (got ${typeof stepFn})` - ); - } + const stepFn = getStepFunction(stepName); + if (!stepFn) { + throw new Error(`Step "${stepName}" not found`); + } + if (typeof stepFn !== 'function') { + throw new Error( + `Step "${stepName}" is not a function (got ${typeof stepFn})` + ); + } + + const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowRunId(workflowRunId), - ...Attribute.StepId(stepId), - ...Attribute.StepMaxRetries(maxRetries), - ...Attribute.StepTracePropagated(!!traceContext), - }); - - let step = await world.steps.get(workflowRunId, stepId); - - runtimeLogger.debug('Step execution details', { - stepName, - stepId: step.stepId, - status: step.status, - attempt: step.attempt, - }); - - span?.setAttributes({ - ...Attribute.StepStatus(step.status), - }); - - // Check if the step has a `retryAfter` timestamp that hasn't been reached yet - const now = Date.now(); - if (step.retryAfter && step.retryAfter.getTime() > now) { - const timeoutSeconds = Math.ceil( - (step.retryAfter.getTime() - now) / 1000 - ); span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - }); - runtimeLogger.debug('Step retryAfter timestamp not yet reached', { - stepName, - stepId: step.stepId, - retryAfter: step.retryAfter, - timeoutSeconds, + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowRunId(workflowRunId), + ...Attribute.StepId(stepId), + ...Attribute.StepMaxRetries(maxRetries), + ...Attribute.StepTracePropagated(!!traceContext), }); - return { timeoutSeconds }; - } - let result: unknown; - - // Check max retries FIRST before any state changes. - // step.attempt tracks how many times step_started has been called. - // If step.attempt >= maxRetries, we've already tried maxRetries times. - // This handles edge cases where the step handler is invoked after max retries have been exceeded - // (e.g., when the step repeatedly times out or fails before reaching the catch handler). - // Without this check, the step would retry forever. - // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 - // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. - // The post-failure check uses >= to decide whether to retry after a failure. - if (step.attempt > maxRetries + 1) { - const retryCount = step.attempt - 1; - const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`; - stepLogger.error('Step exceeded max retries', { - workflowRunId, + let step = await world.steps.get(workflowRunId, stepId); + + runtimeLogger.debug('Step execution details', { stepName, - retryCount, - }); - // Fail the step via event (event-sourced architecture) - await world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: errorMessage, - stack: step.error?.stack, - }, + stepId: step.stepId, + status: step.status, + attempt: step.attempt, }); span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), + ...Attribute.StepStatus(step.status), }); - // Re-invoke the workflow to handle the failed step - await queueMessage( - world, - `__wkf_workflow_${workflowName}`, - { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { - headers: { 'x-workflow-run-id': workflowRunId }, - } - ); - return; - } + // Check if the step has a `retryAfter` timestamp that hasn't been reached yet + const now = Date.now(); + if (step.retryAfter && step.retryAfter.getTime() > now) { + const timeoutSeconds = Math.ceil( + (step.retryAfter.getTime() - now) / 1000 + ); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + }); + runtimeLogger.debug('Step retryAfter timestamp not yet reached', { + stepName, + stepId: step.stepId, + retryAfter: step.retryAfter, + timeoutSeconds, + }); + return { timeoutSeconds }; + } - try { - if (!['pending', 'running'].includes(step.status)) { - // We should only be running the step if it's either - // a) pending - initial state, or state set on re-try - // b) running - if a step fails mid-execution, like a function timeout - // otherwise, the step has been invoked erroneously - stepLogger.warn('Step invoked erroneously, skipping execution', { + let result: unknown; + + // Check max retries FIRST before any state changes. + // step.attempt tracks how many times step_started has been called. + // If step.attempt >= maxRetries, we've already tried maxRetries times. + // This handles edge cases where the step handler is invoked after max retries have been exceeded + // (e.g., when the step repeatedly times out or fails before reaching the catch handler). + // Without this check, the step would retry forever. + // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 + // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. + // The post-failure check uses >= to decide whether to retry after a failure. + if (step.attempt > maxRetries + 1) { + const retryCount = step.attempt - 1; + const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`; + stepLogger.error('Step exceeded max retries', { workflowRunId, stepName, - expectedStatus: ['pending', 'running'], - actualStatus: step.status, + retryCount, + }); + // Fail the step via event (event-sourced architecture) + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: step.error?.stack, + }, }); + span?.setAttributes({ - ...Attribute.StepSkipped(true), - ...Attribute.StepSkipReason(step.status), + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), }); - // There's a chance that a step terminates correctly, but the underlying process - // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. - // The queue lease expires and stepEntrypoint again, which leads us here, so - // we optimistically re-enqueue the workflow if the step is in a terminal state, - // under the assumption that this edge case happened. - // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case - // where the we execute this code based on the `step` entity status, but the runtime - // failed to create the `step_completed` event (due to failing between step and event update), - // in which case, this might lead to an infinite loop. - // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 - const isTerminalStep = [ - 'completed', - 'failed', - 'cancelled', - ].includes(step.status); - if (isTerminalStep) { - await queueMessage( - world, - `__wkf_workflow_${workflowName}`, - { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, + + // Re-invoke the workflow to handle the failed step + await queueMessage( + world, + `__wkf_workflow_${workflowName}`, + { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + headers: { 'x-workflow-run-id': workflowRunId }, + } + ); + return; + } + + try { + if (!['pending', 'running'].includes(step.status)) { + // We should only be running the step if it's either + // a) pending - initial state, or state set on re-try + // b) running - if a step fails mid-execution, like a function timeout + // otherwise, the step has been invoked erroneously + stepLogger.warn( + 'Step invoked erroneously, skipping execution', { - headers: { 'x-workflow-run-id': workflowRunId }, + workflowRunId, + stepName, + expectedStatus: ['pending', 'running'], + actualStatus: step.status, } ); + span?.setAttributes({ + ...Attribute.StepSkipped(true), + ...Attribute.StepSkipReason(step.status), + }); + // There's a chance that a step terminates correctly, but the underlying process + // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. + // The queue lease expires and stepEntrypoint again, which leads us here, so + // we optimistically re-enqueue the workflow if the step is in a terminal state, + // under the assumption that this edge case happened. + // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case + // where the we execute this code based on the `step` entity status, but the runtime + // failed to create the `step_completed` event (due to failing between step and event update), + // in which case, this might lead to an infinite loop. + // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 + const isTerminalStep = [ + 'completed', + 'failed', + 'cancelled', + ].includes(step.status); + if (isTerminalStep) { + await queueMessage( + world, + `__wkf_workflow_${workflowName}`, + { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + headers: { 'x-workflow-run-id': workflowRunId }, + } + ); + } + return; } - return; - } - // Start the step via event (event-sourced architecture) - // step_started increments the attempt counter in the World implementation - const startResult = await world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - }); + // Start the step via event (event-sourced architecture) + // step_started increments the attempt counter in the World implementation + const startResult = await world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + }); - // Use the step entity from the event response (no extra get call needed) - if (!startResult.step) { - throw new WorkflowRuntimeError( - `step_started event for "${stepId}" did not return step entity` - ); - } - step = startResult.step; + // Use the step entity from the event response (no extra get call needed) + if (!startResult.step) { + throw new WorkflowRuntimeError( + `step_started event for "${stepId}" did not return step entity` + ); + } + step = startResult.step; - // step.attempt is now the current attempt number (after increment) - const attempt = step.attempt; + // step.attempt is now the current attempt number (after increment) + const attempt = step.attempt; - if (!step.startedAt) { - throw new WorkflowRuntimeError( - `Step "${stepId}" has no "startedAt" timestamp` + if (!step.startedAt) { + throw new WorkflowRuntimeError( + `Step "${stepId}" has no "startedAt" timestamp` + ); + } + // Hydrate the step input arguments, closure variables, and thisVal + // Track deserialization time for observability + // NOTE: This captures only the synchronous portion of hydration. Any async + // operations (e.g., stream loading) are added to `ops` and executed later + // via Promise.all(ops) - their timing is not included in this measurement. + const deserializeStartTime = Date.now(); + const ops: Promise[] = []; + const hydratedInput = hydrateStepArguments( + step.input, + ops, + workflowRunId ); - } - // Hydrate the step input arguments, closure variables, and thisVal - // Track deserialization time for observability - // NOTE: This captures only the synchronous portion of hydration. Any async - // operations (e.g., stream loading) are added to `ops` and executed later - // via Promise.all(ops) - their timing is not included in this measurement. - const deserializeStartTime = Date.now(); - const ops: Promise[] = []; - const hydratedInput = hydrateStepArguments( - step.input, - ops, - workflowRunId - ); - const deserializeTimeMs = Date.now() - deserializeStartTime; + const deserializeTimeMs = Date.now() - deserializeStartTime; - const args = hydratedInput.args; - const thisVal = hydratedInput.thisVal ?? null; + const args = hydratedInput.args; + const thisVal = hydratedInput.thisVal ?? null; - span?.setAttributes({ - ...Attribute.StepArgumentsCount(args.length), - ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), - }); + span?.setAttributes({ + ...Attribute.StepArgumentsCount(args.length), + ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), + }); - // Track execution time for observability - const executionStartTime = Date.now(); - result = await contextStorage.run( - { - stepMetadata: { - stepId, - stepStartedAt: new Date(+step.startedAt), - attempt, - }, - workflowMetadata: { - workflowRunId, - workflowStartedAt: new Date(+workflowStartedAt), - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - url: process.env.VERCEL_URL - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, + // Track execution time for observability + const executionStartTime = Date.now(); + result = await contextStorage.run( + { + stepMetadata: { + stepId, + stepStartedAt: new Date(+step.startedAt), + attempt, + }, + workflowMetadata: { + workflowRunId, + workflowStartedAt: new Date(+workflowStartedAt), + // TODO: there should be a getUrl method on the world interface itself. This + // solution only works for vercel + local worlds. + url: process.env.VERCEL_URL + ? `https://${process.env.VERCEL_URL}` + : `http://localhost:${port ?? 3000}`, + }, + ops, + closureVars: hydratedInput.closureVars, }, - ops, - closureVars: hydratedInput.closureVars, - }, - () => stepFn.apply(thisVal, args) - ); - const executionTimeMs = Date.now() - executionStartTime; - - span?.setAttributes({ - ...Attribute.QueueExecutionTimeMs(executionTimeMs), - }); - - // NOTE: None of the code from this point is guaranteed to run - // Since the step might fail or cause a function timeout and the process might be SIGKILL'd - // The workflow runtime must be resilient to the below code not executing on a failed step - // Track serialization time for observability - const serializeStartTime = Date.now(); - result = dehydrateStepReturnValue(result, ops, workflowRunId); - const serializeTimeMs = Date.now() - serializeStartTime; - - span?.setAttributes({ - ...Attribute.QueueSerializeTimeMs(serializeTimeMs), - }); - - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); + () => stepFn.apply(thisVal, args) + ); + const executionTimeMs = Date.now() - executionStartTime; - // Complete the step via event (event-sourced architecture) - // The event creation atomically updates the step entity - // result was dehydrated above by dehydrateStepReturnValue, which returns Uint8Array - await world.events.create(workflowRunId, { - eventType: 'step_completed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - result: result as Uint8Array, - }, - }); + span?.setAttributes({ + ...Attribute.QueueExecutionTimeMs(executionTimeMs), + }); - span?.setAttributes({ - ...Attribute.StepStatus('completed'), - ...Attribute.StepResultType(typeof result), - }); - } catch (err: unknown) { - span?.setAttributes({ - ...Attribute.StepErrorName(getErrorName(err)), - ...Attribute.StepErrorMessage(String(err)), - }); + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step + // Track serialization time for observability + const serializeStartTime = Date.now(); + result = dehydrateStepReturnValue(result, ops, workflowRunId); + const serializeTimeMs = Date.now() - serializeStartTime; - if (WorkflowAPIError.is(err)) { - if (err.status === 410) { - // Workflow has already completed, so no-op - stepLogger.info( - 'Workflow run already completed, skipping step', - { - workflowRunId, - stepId, - message: err.message, - } - ); - return; - } - } + span?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(serializeTimeMs), + }); - if (FatalError.is(err)) { - const errorStack = getErrorStack(err); - stepLogger.error( - 'Encountered FatalError while executing step, bubbling up to parent workflow', - { - workflowRunId, - stepName, - errorStack, - } + waitUntil( + Promise.all(ops).catch((err) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + err?.name === 'AbortError' || + err?.name === 'ResponseAborted'; + if (!isAbortError) throw err; + }) ); - // Fail the step via event (event-sourced architecture) + + // Complete the step via event (event-sourced architecture) + // The event creation atomically updates the step entity + // result was dehydrated above by dehydrateStepReturnValue, which returns Uint8Array await world.events.create(workflowRunId, { - eventType: 'step_failed', + eventType: 'step_completed', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, eventData: { - error: String(err), - stack: errorStack, + result: result as Uint8Array, }, }); span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepFatalError(true), + ...Attribute.StepStatus('completed'), + ...Attribute.StepResultType(typeof result), }); - } else { - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - // step.attempt was incremented by step_started, use it here - const currentAttempt = step.attempt; - + } catch (err: unknown) { span?.setAttributes({ - ...Attribute.StepAttempt(currentAttempt), - ...Attribute.StepMaxRetries(maxRetries), + ...Attribute.StepErrorName(getErrorName(err)), + ...Attribute.StepErrorMessage(String(err)), }); - // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 - if (currentAttempt >= maxRetries + 1) { - // Max retries reached + if (WorkflowAPIError.is(err)) { + if (err.status === 410) { + // Workflow has already completed, so no-op + stepLogger.info( + 'Workflow run already completed, skipping step', + { + workflowRunId, + stepId, + message: err.message, + } + ); + return; + } + } + + if (FatalError.is(err)) { const errorStack = getErrorStack(err); - const retryCount = step.attempt - 1; stepLogger.error( - 'Max retries reached, bubbling error to parent workflow', + 'Encountered FatalError while executing step, bubbling up to parent workflow', { workflowRunId, stepName, - attempt: step.attempt, - retryCount, errorStack, } ); - const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${String(err)}`; // Fail the step via event (event-sourced architecture) await world.events.create(workflowRunId, { eventType: 'step_failed', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, eventData: { - error: errorMessage, + error: String(err), stack: errorStack, }, }); span?.setAttributes({ ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), + ...Attribute.StepFatalError(true), }); } else { - // Not at max retries yet - log as a retryable error - if (RetryableError.is(err)) { - stepLogger.warn( - 'Encountered RetryableError, step will be retried', + const maxRetries = + stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; + // step.attempt was incremented by step_started, use it here + const currentAttempt = step.attempt; + + span?.setAttributes({ + ...Attribute.StepAttempt(currentAttempt), + ...Attribute.StepMaxRetries(maxRetries), + }); + + // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 + if (currentAttempt >= maxRetries + 1) { + // Max retries reached + const errorStack = getErrorStack(err); + const retryCount = step.attempt - 1; + stepLogger.error( + 'Max retries reached, bubbling error to parent workflow', { workflowRunId, stepName, - attempt: currentAttempt, - message: err.message, + attempt: step.attempt, + retryCount, + errorStack, } ); + const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${String(err)}`; + // Fail the step via event (event-sourced architecture) + await world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: errorStack, + }, + }); + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), + }); } else { + // Not at max retries yet - log as a retryable error + if (RetryableError.is(err)) { + stepLogger.warn( + 'Encountered RetryableError, step will be retried', + { + workflowRunId, + stepName, + attempt: currentAttempt, + message: err.message, + } + ); + } else { + const errorStack = getErrorStack(err); + stepLogger.warn('Encountered Error, step will be retried', { + workflowRunId, + stepName, + attempt: currentAttempt, + errorStack, + }); + } + // Set step to pending for retry via event (event-sourced architecture) + // step_retrying records the error and sets status to pending const errorStack = getErrorStack(err); - stepLogger.warn('Encountered Error, step will be retried', { - workflowRunId, - stepName, - attempt: currentAttempt, - errorStack, + await world.events.create(workflowRunId, { + eventType: 'step_retrying', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: String(err), + stack: errorStack, + ...(RetryableError.is(err) && { + retryAfter: err.retryAfter, + }), + }, }); - } - // Set step to pending for retry via event (event-sourced architecture) - // step_retrying records the error and sets status to pending - const errorStack = getErrorStack(err); - await world.events.create(workflowRunId, { - eventType: 'step_retrying', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: String(err), - stack: errorStack, - ...(RetryableError.is(err) && { - retryAfter: err.retryAfter, - }), - }, - }); - const timeoutSeconds = Math.max( - 1, - RetryableError.is(err) - ? Math.ceil((+err.retryAfter.getTime() - Date.now()) / 1000) - : 1 - ); + const timeoutSeconds = Math.max( + 1, + RetryableError.is(err) + ? Math.ceil( + (+err.retryAfter.getTime() - Date.now()) / 1000 + ) + : 1 + ); - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - ...Attribute.StepRetryWillRetry(true), - }); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + ...Attribute.StepRetryWillRetry(true), + }); - // It's a retryable error - so have the queue keep the message visible - // so that it gets retried. - return { timeoutSeconds }; + // It's a retryable error - so have the queue keep the message visible + // so that it gets retried. + return { timeoutSeconds }; + } } } - } - await queueMessage( - world, - `__wkf_workflow_${workflowName}`, - { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { - headers: { 'x-workflow-run-id': workflowRunId }, - } - ); - } - ); - }); - } -); + await queueMessage( + world, + `__wkf_workflow_${workflowName}`, + { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + headers: { 'x-workflow-run-id': workflowRunId }, + } + ); + } + ); + }); + } + ); /** * A single route that handles any step execution request and routes to the @@ -510,4 +518,6 @@ const stepHandler = getWorldHandlers().createQueueHandler( * for each step, this is temporary. */ export const stepEntrypoint: (req: Request) => Promise = - /* @__PURE__ */ withHealthCheck(stepHandler); + /* @__PURE__ */ withHealthCheck(async (req) => + stepHandler(await getWorldHandlers())(req) + ); diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index 1da089e729..fc9f9d2e85 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -1,17 +1,22 @@ import { createRequire } from 'node:module'; -import { join } from 'node:path'; +import { resolve } from 'node:path'; +import { pathToFileURL } from 'node:url'; import type { World } from '@workflow/world'; import { createLocalWorld } from '@workflow/world-local'; import { createVercelWorld } from '@workflow/world-vercel'; -const require = createRequire(join(process.cwd(), 'index.js')); - const WorldCache = Symbol.for('@workflow/world//cache'); const StubbedWorldCache = Symbol.for('@workflow/world//stubbedCache'); +const WorldCachePromise = Symbol.for('@workflow/world//cachePromise'); +const StubbedWorldCachePromise = Symbol.for( + '@workflow/world//stubbedCachePromise' +); const globalSymbols: typeof globalThis & { [WorldCache]?: World; [StubbedWorldCache]?: World; + [WorldCachePromise]?: Promise; + [StubbedWorldCachePromise]?: Promise; } = globalThis; function defaultWorld(): 'vercel' | 'local' { @@ -22,12 +27,45 @@ function defaultWorld(): 'vercel' | 'local' { return 'local'; } +/** + * This hides the dynamic import behind a function to prevent the bundler from + * trying to resolve it at build time, instead of at runtime, since the world + * being imported might not exist at build time. + */ +const dynamicImport = new Function('specifier', 'return import(specifier)') as ( + specifier: string +) => Promise; + +function resolveModulePath(specifier: string): string { + // Already a file:// URL + if (specifier.startsWith('file://')) { + return specifier; + } + // Absolute path - convert to file:// URL + if (specifier.startsWith('/')) { + return pathToFileURL(specifier).href; + } + // Relative path - resolve relative to cwd and convert to file:// URL + if (specifier.startsWith('./') || specifier.startsWith('../')) { + return pathToFileURL(resolve(process.cwd(), specifier)).href; + } + // Package specifier - use require.resolve to find the package + try { + const require = createRequire( + pathToFileURL(process.cwd() + '/package.json').href + ); + return pathToFileURL(require.resolve(specifier)).href; + } catch { + return specifier; + } +} + /** * Create a new world instance based on environment variables. * WORKFLOW_TARGET_WORLD is used to determine the target world. * All other environment variables are specific to the target world */ -export const createWorld = (): World => { +export const createWorld = async (): Promise => { const targetWorld = process.env.WORKFLOW_TARGET_WORLD || defaultWorld(); if (targetWorld === 'vercel') { @@ -47,7 +85,8 @@ export const createWorld = (): World => { }); } - const mod = require(targetWorld); + const resolvedPath = resolveModulePath(targetWorld); + const mod = await dynamicImport(resolvedPath); if (typeof mod === 'function') { return mod() as World; } else if (typeof mod.default === 'function') { @@ -61,6 +100,8 @@ export const createWorld = (): World => { ); }; +export type WorldHandlers = Pick; + /** * Some functions from the world are needed at build time, but we do NOT want * to cache the world in those instances for general use, since we don't have @@ -70,22 +111,30 @@ export const createWorld = (): World => { * Once we migrate to a file-based configuration (workflow.config.ts), we should * be able to re-combine getWorld and getWorldHandlers into one singleton. */ -export const getWorldHandlers = (): Pick => { +export const getWorldHandlers = async (): Promise => { if (globalSymbols[StubbedWorldCache]) { return globalSymbols[StubbedWorldCache]; } - const _world = createWorld(); + // Store the promise immediately to prevent race conditions with concurrent calls + if (!globalSymbols[StubbedWorldCachePromise]) { + globalSymbols[StubbedWorldCachePromise] = createWorld(); + } + const _world = await globalSymbols[StubbedWorldCachePromise]; globalSymbols[StubbedWorldCache] = _world; return { createQueueHandler: _world.createQueueHandler, }; }; -export const getWorld = (): World => { +export const getWorld = async (): Promise => { if (globalSymbols[WorldCache]) { return globalSymbols[WorldCache]; } - globalSymbols[WorldCache] = createWorld(); + // Store the promise immediately to prevent race conditions with concurrent calls + if (!globalSymbols[WorldCachePromise]) { + globalSymbols[WorldCachePromise] = createWorld(); + } + globalSymbols[WorldCache] = await globalSymbols[WorldCachePromise]; return globalSymbols[WorldCache]; }; @@ -96,4 +145,6 @@ export const getWorld = (): World => { export const setWorld = (world: World | undefined): void => { globalSymbols[WorldCache] = world; globalSymbols[StubbedWorldCache] = world; + globalSymbols[WorldCachePromise] = undefined; + globalSymbols[StubbedWorldCachePromise] = undefined; }; diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index fb4ce0b284..59b39bb480 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -252,7 +252,7 @@ export class WorkflowServerReadableStream extends ReadableStream { pull: async (controller) => { let reader = this.#reader; if (!reader) { - const world = getWorld(); + const world = await getWorld(); const stream = await world.readFromStream(name, startIndex); reader = this.#reader = stream.getReader(); } @@ -293,7 +293,6 @@ export class WorkflowServerWritableStream extends WritableStream { if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } - const world = getWorld(); // Buffering state for batched writes let buffer: Uint8Array[] = []; @@ -314,6 +313,7 @@ export class WorkflowServerWritableStream extends WritableStream { const _runId = await runId; + const world = await getWorld(); // Use writeToStreamMulti if available for batch writes if ( typeof world.writeToStreamMulti === 'function' && @@ -362,6 +362,7 @@ export class WorkflowServerWritableStream extends WritableStream { await flush(); const _runId = await runId; + const world = await getWorld(); await world.closeStream(name, _runId); }, abort() { diff --git a/packages/web-shared/src/api/workflow-server-actions.ts b/packages/web-shared/src/api/workflow-server-actions.ts index 3122cb4217..d62e1d05e5 100644 --- a/packages/web-shared/src/api/workflow-server-actions.ts +++ b/packages/web-shared/src/api/workflow-server-actions.ts @@ -443,7 +443,7 @@ async function getWorldFromEnv(userEnvMap: EnvMap): Promise { return cachedWorld; } - const world = createWorld(); + const world = await createWorld(); worldCache.set(cacheKey, world); return world; } diff --git a/packages/world-postgres/HOW_IT_WORKS.md b/packages/world-postgres/HOW_IT_WORKS.md index 0ca48cd32f..1334b4ce4f 100644 --- a/packages/world-postgres/HOW_IT_WORKS.md +++ b/packages/world-postgres/HOW_IT_WORKS.md @@ -41,9 +41,12 @@ In **Next.js**, the `world.setup()` function needs to be added to `instrumentati // instrumentation.ts if (process.env.NEXT_RUNTIME !== "edge") { - import("workflow/api").then(async ({ getWorld }) => { - // start listening to the jobs. - await getWorld().start?.(); + import("workflow/runtime").then(async ({ getWorld }) => { + const world = await getWorld(); + if (world.start) { + console.log('Starting workers for pg-boss queues...'); + await world.start(); + } }); } ``` diff --git a/packages/world-testing/src/server.mts b/packages/world-testing/src/server.mts index 9f52d29ec8..c5d61fa6ce 100644 --- a/packages/world-testing/src/server.mts +++ b/packages/world-testing/src/server.mts @@ -65,7 +65,8 @@ const app = new Hono() return ctx.json({ runId, hookId: hook.hookId }); }) .get('/runs/:runId', async (ctx) => { - const run = await getWorld().runs.get(ctx.req.param('runId')); + const world = await getWorld(); + const run = await world.runs.get(ctx.req.param('runId')); // Custom JSON serialization to handle Uint8Array as base64 const json = JSON.stringify(run, (_key, value) => { if (value instanceof Uint8Array) { @@ -112,7 +113,7 @@ serve( } } - const world = getWorld(); + const world = await getWorld(); if (world.start) { console.log(`starting background tasks...`); await world.start().then( diff --git a/workbench/astro/scripts/start-with-pg.mjs b/workbench/astro/scripts/start-with-pg.mjs index c6c5060906..def1d9f795 100644 --- a/workbench/astro/scripts/start-with-pg.mjs +++ b/workbench/astro/scripts/start-with-pg.mjs @@ -8,7 +8,11 @@ async function main() { if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { console.log('Starting Postgres World...'); const { getWorld } = await import('workflow/runtime'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } } // Now start the Astro server diff --git a/workbench/astro/src/pages/api/test-health-check.ts b/workbench/astro/src/pages/api/test-health-check.ts index a340a71d14..c4d73daf28 100644 --- a/workbench/astro/src/pages/api/test-health-check.ts +++ b/workbench/astro/src/pages/api/test-health-check.ts @@ -11,7 +11,7 @@ export async function POST({ request }: { request: Request }) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/example/api/test-health-check.ts b/workbench/example/api/test-health-check.ts index d838392cd2..3659ff734c 100644 --- a/workbench/example/api/test-health-check.ts +++ b/workbench/example/api/test-health-check.ts @@ -11,7 +11,7 @@ export async function POST(req: Request) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/express/src/index.ts b/workbench/express/src/index.ts index d29fcd53b9..717b783c0d 100644 --- a/workbench/express/src/index.ts +++ b/workbench/express/src/index.ts @@ -223,7 +223,7 @@ app.post('/api/test-health-check', async (req, res) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/fastify/src/index.ts b/workbench/fastify/src/index.ts index e2c15da35b..3db872c79d 100644 --- a/workbench/fastify/src/index.ts +++ b/workbench/fastify/src/index.ts @@ -276,7 +276,7 @@ server.post('/api/test-health-check', async (req: any, reply) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/hono/src/index.ts b/workbench/hono/src/index.ts index 02232a07e4..2f7d6ff7d1 100644 --- a/workbench/hono/src/index.ts +++ b/workbench/hono/src/index.ts @@ -211,7 +211,7 @@ app.post('/api/test-health-check', async ({ req }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nest/src/app.controller.ts b/workbench/nest/src/app.controller.ts index b48ad3012b..20139f353f 100644 --- a/workbench/nest/src/app.controller.ts +++ b/workbench/nest/src/app.controller.ts @@ -254,7 +254,7 @@ export class AppController { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint as 'workflow' | 'step', { timeout, }); diff --git a/workbench/nest/src/main.ts b/workbench/nest/src/main.ts index 100c8774eb..b0c22548c6 100644 --- a/workbench/nest/src/main.ts +++ b/workbench/nest/src/main.ts @@ -7,8 +7,11 @@ async function bootstrap() { // Start the Postgres World if configured if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { const { getWorld } = await import('workflow/runtime'); - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } } const app = await NestFactory.create(AppModule, { diff --git a/workbench/nextjs-turbopack/app/api/test-health-check/route.ts b/workbench/nextjs-turbopack/app/api/test-health-check/route.ts index d838392cd2..3659ff734c 100644 --- a/workbench/nextjs-turbopack/app/api/test-health-check/route.ts +++ b/workbench/nextjs-turbopack/app/api/test-health-check/route.ts @@ -11,7 +11,7 @@ export async function POST(req: Request) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nextjs-turbopack/instrumentation.ts b/workbench/nextjs-turbopack/instrumentation.ts index 174137a971..508eddd8d3 100644 --- a/workbench/nextjs-turbopack/instrumentation.ts +++ b/workbench/nextjs-turbopack/instrumentation.ts @@ -5,6 +5,10 @@ registerOTel({ serviceName: 'example-nextjs-workflow' }); if (process.env.NEXT_RUNTIME !== 'edge') { // kickstart the world import('workflow/runtime').then(async ({ getWorld }) => { - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } }); } diff --git a/workbench/nextjs-webpack/app/api/test-health-check/route.ts b/workbench/nextjs-webpack/app/api/test-health-check/route.ts index d838392cd2..3659ff734c 100644 --- a/workbench/nextjs-webpack/app/api/test-health-check/route.ts +++ b/workbench/nextjs-webpack/app/api/test-health-check/route.ts @@ -11,7 +11,7 @@ export async function POST(req: Request) { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nitro-v2/server/api/test-health-check.post.ts b/workbench/nitro-v2/server/api/test-health-check.post.ts index 4e48e30b7f..32cc532b9e 100644 --- a/workbench/nitro-v2/server/api/test-health-check.post.ts +++ b/workbench/nitro-v2/server/api/test-health-check.post.ts @@ -13,7 +13,7 @@ export default defineEventHandler(async (event) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nitro-v3/plugins/start-pg-world.ts b/workbench/nitro-v3/plugins/start-pg-world.ts index 7e9cff2247..43f690fdad 100644 --- a/workbench/nitro-v3/plugins/start-pg-world.ts +++ b/workbench/nitro-v3/plugins/start-pg-world.ts @@ -5,8 +5,11 @@ import { defineNitroPlugin } from 'nitro/~internal/runtime/plugin'; export default defineNitroPlugin(async () => { if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { import('workflow/runtime').then(async ({ getWorld }) => { - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } }); } }); diff --git a/workbench/nitro-v3/routes/api/test-health-check.post.ts b/workbench/nitro-v3/routes/api/test-health-check.post.ts index 86222b7a32..84a62e597c 100644 --- a/workbench/nitro-v3/routes/api/test-health-check.post.ts +++ b/workbench/nitro-v3/routes/api/test-health-check.post.ts @@ -11,7 +11,7 @@ export default async ({ req }: { req: Request }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nuxt/server/api/test-health-check.post.ts b/workbench/nuxt/server/api/test-health-check.post.ts index 4e48e30b7f..32cc532b9e 100644 --- a/workbench/nuxt/server/api/test-health-check.post.ts +++ b/workbench/nuxt/server/api/test-health-check.post.ts @@ -13,7 +13,7 @@ export default defineEventHandler(async (event) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/nuxt/server/plugins/start-pg-world.ts b/workbench/nuxt/server/plugins/start-pg-world.ts index 2824d2b3ec..613d8e3110 100644 --- a/workbench/nuxt/server/plugins/start-pg-world.ts +++ b/workbench/nuxt/server/plugins/start-pg-world.ts @@ -5,8 +5,11 @@ import { defineNitroPlugin } from '#imports'; export default defineNitroPlugin(async () => { if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { import('workflow/runtime').then(async ({ getWorld }) => { - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } }); } }); diff --git a/workbench/sveltekit/src/hooks.server.ts b/workbench/sveltekit/src/hooks.server.ts index 16d598cf0b..619b303f8b 100644 --- a/workbench/sveltekit/src/hooks.server.ts +++ b/workbench/sveltekit/src/hooks.server.ts @@ -5,7 +5,10 @@ export const init: ServerInit = async () => { // Needed since we test this in CI if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { const { getWorld } = await import('workflow/runtime'); - console.log('Starting Postgres World...'); - await getWorld().start?.(); + const world = await getWorld(); + if (world.start) { + console.log('Starting World workers...'); + await world.start(); + } } }; diff --git a/workbench/sveltekit/src/routes/api/test-health-check/+server.ts b/workbench/sveltekit/src/routes/api/test-health-check/+server.ts index c68147a822..07d8ba3ac7 100644 --- a/workbench/sveltekit/src/routes/api/test-health-check/+server.ts +++ b/workbench/sveltekit/src/routes/api/test-health-check/+server.ts @@ -12,7 +12,7 @@ export const POST: RequestHandler = async ({ request }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result); diff --git a/workbench/vite/routes/api/test-health-check.post.ts b/workbench/vite/routes/api/test-health-check.post.ts index 86222b7a32..84a62e597c 100644 --- a/workbench/vite/routes/api/test-health-check.post.ts +++ b/workbench/vite/routes/api/test-health-check.post.ts @@ -11,7 +11,7 @@ export default async ({ req }: { req: Request }) => { `Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms` ); - const world = getWorld(); + const world = await getWorld(); const result = await healthCheck(world, endpoint, { timeout }); console.log(`Health check result:`, result);