diff --git a/.changeset/throttle-retry-handling.md b/.changeset/throttle-retry-handling.md new file mode 100644 index 0000000000..2cfcfbf097 --- /dev/null +++ b/.changeset/throttle-retry-handling.md @@ -0,0 +1,8 @@ +--- +"@workflow/errors": patch +"@workflow/world": patch +"@workflow/world-vercel": patch +"@workflow/core": patch +--- + +Add 429 throttle retry handling and 500 server error retry with exponential backoff to the workflow and step runtimes diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 99e63d0951..994727ba84 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -1,4 +1,4 @@ -import { WorkflowRuntimeError } from '@workflow/errors'; +import { WorkflowAPIError, WorkflowRuntimeError } from '@workflow/errors'; import { parseWorkflowName } from '@workflow/utils/parse-name'; import { type Event, @@ -11,9 +11,12 @@ import { runtimeLogger } from './logger.js'; import { getAllWorkflowRunEvents, getQueueOverhead, + getWorkflowQueueName, handleHealthCheckMessage, parseHealthCheckPayload, + queueMessage, withHealthCheck, + withThrottleRetry, } from './runtime/helpers.js'; import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld, getWorldHandlers } from './runtime/world.js'; @@ -21,6 +24,7 @@ import { remapErrorStack } from './source-map.js'; import * as Attribute from './telemetry/semantic-conventions.js'; import { linkToCurrentContext, + serializeTraceCarrier, trace, withTraceContext, withWorkflowBaggage, @@ -50,11 +54,11 @@ export { export { cancelRun, listStreams, + type ReadStreamOptions, + type RecreateRunOptions, readStream, recreateRunFromExisting, reenqueueRun, - type ReadStreamOptions, - type RecreateRunOptions, type StopSleepOptions, type StopSleepResult, wakeUpRun, @@ -96,6 +100,7 @@ export function workflowEntrypoint( runId, traceCarrier: traceContext, requestedAt, + serverErrorRetryCount, } = WorkflowInvokePayloadSchema.parse(message_); // Extract the workflow name from the topic name const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); @@ -130,200 +135,239 @@ export function workflowEntrypoint( ...Attribute.WorkflowTracePropagated(!!traceContext), }); - let workflowStartedAt = -1; - try { - let workflowRun = await world.runs.get(runId); + return await withThrottleRetry(async () => { + let workflowStartedAt = -1; + try { + let workflowRun = await world.runs.get(runId); - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }); + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` + ); + } + workflowRun = result.run; + } + + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` + `Workflow run "${runId}" has no "startedAt" timestamp` ); } - workflowRun = result.run; - } + workflowStartedAt = +workflowRun.startedAt; - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - workflowStartedAt = +workflowRun.startedAt; + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); + + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), - }); + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } + return; + } + + // Load all events into memory before running + const events = await getAllWorkflowRunEvents( + workflowRun.runId ); - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); - return; - } + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); - // Load all events into memory before running - const events = await getAllWorkflowRunEvents( - workflowRun.runId - ); + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + const result = await world.events.create( + runId, + waitEvent + ); + // Add the event to the events array so the workflow can see it + events.push(result.event!); + } - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); + const result = await trace( + 'workflow.replay', + {}, + async (replaySpan) => { + replaySpan?.setAttributes({ + ...Attribute.WorkflowEventsCount(events.length), + }); + return await runWorkflow( + workflowCode, + workflowRun, + events + ); + } + ); - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, + // Complete the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_completed', specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); - - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - const result = await world.events.create(runId, waitEvent); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } + eventData: { + output: result, + }, + }); - const result = await trace( - 'workflow.replay', - {}, - async (replaySpan) => { - replaySpan?.setAttributes({ - ...Attribute.WorkflowEventsCount(events.length), - }); - return await runWorkflow( - workflowCode, - workflowRun, - events + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), + }); + } catch (err) { + if (WorkflowSuspension.is(err)) { + const suspensionMessage = buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount ); - } - ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } - // Complete the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_completed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - output: result, - }, - }); + const result = await handleSuspension({ + suspension: err, + world, + runId, + workflowName, + workflowStartedAt, + span, + }); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); - } catch (err) { - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount - ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + } else { + // Retry server errors (5xx) with exponential backoff before failing the run + if ( + WorkflowAPIError.is(err) && + err.status !== undefined && + err.status >= 500 + ) { + const retryCount = serverErrorRetryCount ?? 0; + const delaySecondSteps = [5, 30, 120]; // 5s, 30s, 120s + if (retryCount < delaySecondSteps.length) { + runtimeLogger.warn( + 'Server error (5xx), re-enqueueing workflow with backoff', + { + workflowRunId: runId, + retryCount, + delaySeconds: delaySecondSteps[retryCount], + error: err.message, + } + ); + await queueMessage( + world, + getWorkflowQueueName(workflowName), + { + runId, + serverErrorRetryCount: retryCount + 1, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { delaySeconds: delaySecondSteps[retryCount] } + ); + return; // Don't fail the run, retry later + } + // Fall through to run_failed after exhausting retries + } - const result = await handleSuspension({ - suspension: err, - world, - runId, - workflowName, - workflowStartedAt, - span, - }); + // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError + // (for instance when the event log is corrupted, this is thrown by the event consumer). We could + // specially handle these if needed. - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; - } - } else { - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } - // Record exception for OTEL error tracking - if (err instanceof Error) { - span?.recordException?.(err); - } + const errorName = getErrorName(err); + const errorMessage = + err instanceof Error ? err.message : String(err); + let errorStack = getErrorStack(err); - const errorName = getErrorName(err); - const errorMessage = - err instanceof Error ? err.message : String(err); - let errorStack = getErrorStack(err); + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = - parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, errorStack, - filename, - workflowCode - ); - } - - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, - errorStack, - }); - // Fail the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: { - message: errorMessage, - stack: errorStack, + }); + // Fail the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them }, - // TODO: include error codes when we define them - }, - }); + }); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(String(err)), - ...Attribute.ErrorType(errorName), - }); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(String(err)), + ...Attribute.ErrorType(errorName), + }); + } } - } + }); // End withThrottleRetry } ); // End trace } diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 9681f68189..c964917552 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -1,3 +1,4 @@ +import { WorkflowAPIError } from '@workflow/errors'; import type { Event, HealthCheckPayload, @@ -6,6 +7,7 @@ import type { } from '@workflow/world'; import { HealthCheckPayloadSchema } from '@workflow/world'; import { monotonicFactory } from 'ulid'; +import { runtimeLogger } from '../logger.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { getSpanKind, trace } from '../telemetry.js'; import { getWorld } from './world.js'; @@ -17,7 +19,7 @@ const DEFAULT_HEALTH_CHECK_TIMEOUT = 30_000; * Pattern for safe workflow names. Only allows alphanumeric characters, * underscores, hyphens, dots, and forward slashes (for namespaced workflows). */ -const SAFE_WORKFLOW_NAME_PATTERN = /^[a-zA-Z0-9_\-.\/]+$/; +const SAFE_WORKFLOW_NAME_PATTERN = /^[a-zA-Z0-9_\-./]+$/; /** * Validates a workflow name and returns the corresponding queue name. @@ -398,3 +400,70 @@ export function getQueueOverhead(message: { requestedAt?: Date }) { return; } } + +/** + * Wraps a queue handler with HTTP 429 throttle retry logic. + * - retryAfter < 10s: waits in-process via setTimeout, then retries once + * - retryAfter >= 10s: returns { timeoutSeconds } to defer to the queue + * + * Safe to retry the entire handler because 429 is sent from server middleware + * before the request is processed — no server state has changed. + */ +// biome-ignore lint/suspicious/noConfusingVoidType: matches Queue handler return type +export async function withThrottleRetry( + fn: () => Promise +): Promise { + try { + return await fn(); + } catch (err) { + if (WorkflowAPIError.is(err) && err.status === 429) { + const retryAfterSeconds = Math.max( + // If we don't have a retry-after value, 30s seems a reasonable default + // to avoid re-trying during the unknown rate-limiting period. + 1, + typeof err.retryAfter === 'number' ? err.retryAfter : 30 + ); + + if (retryAfterSeconds < 10) { + runtimeLogger.warn( + 'Throttled by workflow-server (429), retrying in-process', + { + retryAfterSeconds, + url: err.url, + } + ); + // Short wait: sleep in-process, then retry once + await new Promise((resolve) => + setTimeout(resolve, retryAfterSeconds * 1000) + ); + try { + return await fn(); + } catch (retryErr) { + // If the retry also gets throttled, defer to queue + if (WorkflowAPIError.is(retryErr) && retryErr.status === 429) { + const retryRetryAfter = Math.max( + 1, + typeof retryErr.retryAfter === 'number' ? retryErr.retryAfter : 1 + ); + runtimeLogger.warn('Throttled again on retry, deferring to queue', { + retryAfterSeconds: retryRetryAfter, + }); + return { timeoutSeconds: retryRetryAfter }; + } + throw retryErr; + } + } + + // Long wait: defer to queue infrastructure + runtimeLogger.warn( + 'Throttled by workflow-server (429), deferring to queue', + { + retryAfterSeconds, + url: err.url, + } + ); + return { timeoutSeconds: retryAfterSeconds }; + } + throw err; + } +} diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 65daaa33c1..aacce7a6a7 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -126,6 +126,19 @@ const stepHandler = getWorldHandlers().createQueueHandler( step = startResult.step; } catch (err) { if (WorkflowAPIError.is(err)) { + if (WorkflowAPIError.is(err) && err.status === 429) { + const retryRetryAfter = Math.max( + 1, + typeof err.retryAfter === 'number' ? err.retryAfter : 1 + ); + runtimeLogger.warn( + 'Throttled again on retry, deferring to queue', + { + retryAfterSeconds: retryRetryAfter, + } + ); + return { timeoutSeconds: retryRetryAfter }; + } // 410 Gone: Workflow has already completed if (err.status === 410) { console.warn( diff --git a/packages/errors/src/index.ts b/packages/errors/src/index.ts index 59017b9be1..c7dcedf4a3 100644 --- a/packages/errors/src/index.ts +++ b/packages/errors/src/index.ts @@ -101,10 +101,18 @@ export class WorkflowAPIError extends WorkflowError { status?: number; code?: string; url?: string; + /** Retry-After value in seconds, present on 429 responses */ + retryAfter?: number; constructor( message: string, - options?: { status?: number; url?: string; code?: string; cause?: unknown } + options?: { + status?: number; + url?: string; + code?: string; + retryAfter?: number; + cause?: unknown; + } ) { super(message, { cause: options?.cause, @@ -113,6 +121,7 @@ export class WorkflowAPIError extends WorkflowError { this.status = options?.status; this.code = options?.code; this.url = options?.url; + this.retryAfter = options?.retryAfter; } static is(value: unknown): value is WorkflowAPIError { diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 4d7b26efe7..bfed59e674 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -51,7 +51,7 @@ const MAX_DELAY_SECONDS = Number( type QueueFunction = ( queueName: ValidQueueName, payload: QueuePayload, - opts?: QueueOptions & { delaySeconds?: number } + opts?: QueueOptions ) => ReturnType; export function createQueue(config?: APIConfig): Queue { @@ -71,7 +71,7 @@ export function createQueue(config?: APIConfig): Queue { const queue: QueueFunction = async ( queueName, payload, - opts?: QueueOptions & { delaySeconds?: number } + opts?: QueueOptions ) => { // Check if we have a deployment ID either from options or environment const deploymentId = opts?.deploymentId ?? process.env.VERCEL_DEPLOYMENT_ID; diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 6a91ddd34d..d1ad41b47a 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -6,18 +6,18 @@ import { type StructuredError, StructuredErrorSchema } from '@workflow/world'; import { decode, encode } from 'cbor-x'; import type { z } from 'zod'; import { - trace, + ErrorType, getSpanKind, HttpRequestMethod, HttpResponseStatusCode, - UrlFull, + PeerService, + RpcService, + RpcSystem, ServerAddress, ServerPort, - ErrorType, + trace, + UrlFull, WorldParseFormat, - PeerService, - RpcSystem, - RpcService, } from './telemetry.js'; import { version } from './version.js'; @@ -292,10 +292,23 @@ export async function makeRequest({ `Failed to fetch, reproduce with:\ncurl -X ${request.method} ${stringifiedHeaders} "${url}"` ); } + + // Parse Retry-After header for 429 responses (value is in seconds) + let retryAfter: number | undefined; + if (response.status === 429) { + const retryAfterHeader = response.headers.get('Retry-After'); + if (retryAfterHeader) { + const parsed = parseInt(retryAfterHeader, 10); + if (!Number.isNaN(parsed)) { + retryAfter = parsed; + } + } + } + const error = new WorkflowAPIError( errorData.message || `${request.method} ${endpoint} -> HTTP ${response.status}: ${response.statusText}`, - { url, status: response.status, code: errorData.code } + { url, status: response.status, code: errorData.code, retryAfter } ); // Record error attributes per OTEL conventions span?.setAttributes({ diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 743684f121..5fcd6108a1 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -25,6 +25,8 @@ export const WorkflowInvokePayloadSchema = z.object({ runId: z.string(), traceCarrier: TraceCarrierSchema.optional(), requestedAt: z.coerce.date().optional(), + /** Number of times this message has been re-enqueued due to server errors (5xx) */ + serverErrorRetryCount: z.number().int().optional(), }); export const StepInvokePayloadSchema = z.object({ @@ -60,6 +62,8 @@ export interface QueueOptions { deploymentId?: string; idempotencyKey?: string; headers?: Record; + /** Delay message delivery by this many seconds */ + delaySeconds?: number; } export interface Queue {