-
Notifications
You must be signed in to change notification settings - Fork 188
[core] Handle 429 and 500 errors from worlds in runtime #966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8764b75
fde5d99
87add00
c60aa2a
9f9c30a
2172560
bae9c7d
fa848ed
64fc6b1
d228e80
6e8adaf
9f75d33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| --- | ||
| "@workflow/errors": patch | ||
| "@workflow/world": patch | ||
| "@workflow/world-vercel": patch | ||
| "@workflow/core": patch | ||
| --- | ||
|
|
||
| Add 429 throttle retry handling and 500 server error retry with exponential backoff to the workflow and step runtimes |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import { WorkflowAPIError } from '@workflow/errors'; | ||
| import type { | ||
| Event, | ||
| HealthCheckPayload, | ||
|
|
@@ -6,6 +7,7 @@ import type { | |
| } from '@workflow/world'; | ||
| import { HealthCheckPayloadSchema } from '@workflow/world'; | ||
| import { monotonicFactory } from 'ulid'; | ||
| import { runtimeLogger } from '../logger.js'; | ||
| import * as Attribute from '../telemetry/semantic-conventions.js'; | ||
| import { getSpanKind, trace } from '../telemetry.js'; | ||
| import { getWorld } from './world.js'; | ||
|
|
@@ -17,7 +19,7 @@ const DEFAULT_HEALTH_CHECK_TIMEOUT = 30_000; | |
| * Pattern for safe workflow names. Only allows alphanumeric characters, | ||
| * underscores, hyphens, dots, and forward slashes (for namespaced workflows). | ||
| */ | ||
| const SAFE_WORKFLOW_NAME_PATTERN = /^[a-zA-Z0-9_\-.\/]+$/; | ||
| const SAFE_WORKFLOW_NAME_PATTERN = /^[a-zA-Z0-9_\-./]+$/; | ||
|
|
||
| /** | ||
| * Validates a workflow name and returns the corresponding queue name. | ||
|
|
@@ -398,3 +400,70 @@ export function getQueueOverhead(message: { requestedAt?: Date }) { | |
| return; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wraps a queue handler with HTTP 429 throttle retry logic. | ||
| * - retryAfter < 10s: waits in-process via setTimeout, then retries once | ||
| * - retryAfter >= 10s: returns { timeoutSeconds } to defer to the queue | ||
| * | ||
| * Safe to retry the entire handler because 429 is sent from server middleware | ||
| * before the request is processed — no server state has changed. | ||
| */ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment says "Safe to retry the entire handler because 429 is sent from server middleware before the request is processed — no server state has changed." This holds if the 429 always hits on the first API call in the handler. But the handler makes multiple world API calls sequentially (e.g. For the workflow handler this is probably fine since replay is deterministic and events are idempotent. For the step handler this is more concerning — the retry would re-execute user step code, which may not be idempotent. Is the assumption that the workflow-server's rate limiting middleware rejects at the connection level (so all calls in one handler invocation either succeed or all fail)? If so, worth documenting that assumption.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, fixed |
||
| // biome-ignore lint/suspicious/noConfusingVoidType: matches Queue handler return type | ||
| export async function withThrottleRetry( | ||
| fn: () => Promise<void | { timeoutSeconds: number }> | ||
| ): Promise<void | { timeoutSeconds: number }> { | ||
| try { | ||
| return await fn(); | ||
| } catch (err) { | ||
| if (WorkflowAPIError.is(err) && err.status === 429) { | ||
| const retryAfterSeconds = Math.max( | ||
| // If we don't have a retry-after value, 30s seems a reasonable default | ||
| // to avoid re-trying during the unknown rate-limiting period. | ||
| 1, | ||
| typeof err.retryAfter === 'number' ? err.retryAfter : 30 | ||
| ); | ||
|
|
||
| if (retryAfterSeconds < 10) { | ||
| runtimeLogger.warn( | ||
| 'Throttled by workflow-server (429), retrying in-process', | ||
| { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: if |
||
| retryAfterSeconds, | ||
| url: err.url, | ||
| } | ||
| ); | ||
| // Short wait: sleep in-process, then retry once | ||
| await new Promise((resolve) => | ||
| setTimeout(resolve, retryAfterSeconds * 1000) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we account for function execution time limits specifically in the case of Vercel world? If the serverless fuction is already close to the end of it's limit and the workflow server throws a 429, adding a 10 sec sleep could potentially exceed the function execution limit and the function could get SIGKILLd midway.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The workflow layer should never take much more than a few seconds, so I think it's highly unlikely that we'd run into timeouts, so I'm not too worried about this, but technically a concern |
||
| ); | ||
| try { | ||
| return await fn(); | ||
| } catch (retryErr) { | ||
| // If the retry also gets throttled, defer to queue | ||
| if (WorkflowAPIError.is(retryErr) && retryErr.status === 429) { | ||
| const retryRetryAfter = Math.max( | ||
| 1, | ||
| typeof retryErr.retryAfter === 'number' ? retryErr.retryAfter : 1 | ||
| ); | ||
| runtimeLogger.warn('Throttled again on retry, deferring to queue', { | ||
| retryAfterSeconds: retryRetryAfter, | ||
| }); | ||
| return { timeoutSeconds: retryRetryAfter }; | ||
| } | ||
| throw retryErr; | ||
| } | ||
| } | ||
|
|
||
| // Long wait: defer to queue infrastructure | ||
| runtimeLogger.warn( | ||
| 'Throttled by workflow-server (429), deferring to queue', | ||
| { | ||
| retryAfterSeconds, | ||
| url: err.url, | ||
| } | ||
| ); | ||
| return { timeoutSeconds: retryAfterSeconds }; | ||
| } | ||
| throw err; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,6 +126,19 @@ const stepHandler = getWorldHandlers().createQueueHandler( | |
| step = startResult.step; | ||
| } catch (err) { | ||
| if (WorkflowAPIError.is(err)) { | ||
| if (WorkflowAPIError.is(err) && err.status === 429) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The step handler has 429 handling here inside the |
||
| const retryRetryAfter = Math.max( | ||
| 1, | ||
| typeof err.retryAfter === 'number' ? err.retryAfter : 1 | ||
| ); | ||
| runtimeLogger.warn( | ||
| 'Throttled again on retry, deferring to queue', | ||
| { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant check: this is already inside |
||
| retryAfterSeconds: retryRetryAfter, | ||
| } | ||
| ); | ||
| return { timeoutSeconds: retryRetryAfter }; | ||
| } | ||
| // 410 Gone: Workflow has already completed | ||
| if (err.status === 410) { | ||
| console.warn( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,18 +6,18 @@ import { type StructuredError, StructuredErrorSchema } from '@workflow/world'; | |
| import { decode, encode } from 'cbor-x'; | ||
| import type { z } from 'zod'; | ||
| import { | ||
| trace, | ||
| ErrorType, | ||
| getSpanKind, | ||
| HttpRequestMethod, | ||
| HttpResponseStatusCode, | ||
| UrlFull, | ||
| PeerService, | ||
| RpcService, | ||
| RpcSystem, | ||
| ServerAddress, | ||
| ServerPort, | ||
| ErrorType, | ||
| trace, | ||
| UrlFull, | ||
| WorldParseFormat, | ||
| PeerService, | ||
| RpcSystem, | ||
| RpcService, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: the HTTP |
||
| } from './telemetry.js'; | ||
| import { version } from './version.js'; | ||
|
|
||
|
|
@@ -292,10 +292,23 @@ export async function makeRequest<T>({ | |
| `Failed to fetch, reproduce with:\ncurl -X ${request.method} ${stringifiedHeaders} "${url}"` | ||
| ); | ||
| } | ||
|
|
||
| // Parse Retry-After header for 429 responses (value is in seconds) | ||
| let retryAfter: number | undefined; | ||
| if (response.status === 429) { | ||
| const retryAfterHeader = response.headers.get('Retry-After'); | ||
| if (retryAfterHeader) { | ||
| const parsed = parseInt(retryAfterHeader, 10); | ||
| if (!Number.isNaN(parsed)) { | ||
| retryAfter = parsed; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const error = new WorkflowAPIError( | ||
| errorData.message || | ||
| `${request.method} ${endpoint} -> HTTP ${response.status}: ${response.statusText}`, | ||
| { url, status: response.status, code: errorData.code } | ||
| { url, status: response.status, code: errorData.code, retryAfter } | ||
| ); | ||
| // Record error attributes per OTEL conventions | ||
| span?.setAttributes({ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,8 @@ export const WorkflowInvokePayloadSchema = z.object({ | |
| runId: z.string(), | ||
| traceCarrier: TraceCarrierSchema.optional(), | ||
| requestedAt: z.coerce.date().optional(), | ||
| /** Number of times this message has been re-enqueued due to server errors (5xx) */ | ||
| serverErrorRetryCount: z.number().int().optional(), | ||
| }); | ||
|
|
||
| export const StepInvokePayloadSchema = z.object({ | ||
|
|
@@ -60,6 +62,8 @@ export interface QueueOptions { | |
| deploymentId?: string; | ||
| idempotencyKey?: string; | ||
| headers?: Record<string, string>; | ||
| /** Delay message delivery by this many seconds */ | ||
| delaySeconds?: number; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good that this was promoted from the vercel-specific type to the shared interface. Note that
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems fine to ignore for local world, since there are no 429s
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. although we probably do actually want to support delaySeconds in local world anyway as we start using this option in queue more often in the future for more use cases. even if it's not implemented in this PR, I think we should leave an explicit comment that local world ignores this since it's a nuance. We should later have an e2e test that checks for this behaviour and would fail on local world without proper queue |
||
| } | ||
|
|
||
| export interface Queue { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by this, but apparently it works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I double checked it. Seems to work fine