diff --git a/.changeset/e2e-encryption.md b/.changeset/e2e-encryption.md new file mode 100644 index 000000000..f2d8ce1d5 --- /dev/null +++ b/.changeset/e2e-encryption.md @@ -0,0 +1,19 @@ +--- +"@workflow/cli": patch +"@workflow/core": patch +"@workflow/web-shared": patch +"@workflow/world-vercel": patch +"@workflow/world": patch +"@workflow/world-testing": patch +--- + +Add end-to-end encryption for workflow user data + +This implements AES-256-GCM encryption with per-run key derivation via HKDF-SHA256 for workflow user data. + +Key changes: +- Add encryption module with `createEncryptor()` and `createEncryptorFromEnv()` functions +- Add `Encryptor`, `EncryptionContext`, `KeyMaterial` interfaces to `@workflow/world` +- Make all (de)hydrate serialization functions async and accept encryptor parameter +- Update `runWorkflow()` to take world as 4th parameter +- Update `WorkflowOrchestratorContext` to include `runId` and `world` diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index f72d90419..fe9feb344 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -153,11 +153,11 @@ export class Run { const run = await this.world.runs.get(this.runId); if (run.status === 'completed') { - return await hydrateWorkflowReturnValue( + return (await hydrateWorkflowReturnValue( run.output, this.runId, this.world - ); + )) as TResult; } if (run.status === 'cancelled') { diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 1bd42aa8d..33ed47c36 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -280,17 +280,17 @@ const stepHandler = getWorldHandlers().createQueueHandler( // operations (e.g., stream loading) are added to `ops` and executed later // via Promise.all(ops) - their timing is not included in this measurement. const ops: Promise[] = []; - const hydratedInput = await trace( + const hydratedInput = (await trace( 'step.hydrate', {}, async (hydrateSpan) => { const startTime = Date.now(); - const result = await hydrateStepArguments( + const result = (await hydrateStepArguments( step.input, workflowRunId, world, ops - ); + )) as any; const durationMs = Date.now() - startTime; hydrateSpan?.setAttributes({ ...Attribute.StepArgumentsCount(result.args.length), @@ -298,7 +298,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( }); return result; } - ); + )) as any; const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 655a7d5ff..a00a27cbf 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -1,6 +1,7 @@ import { runInContext } from 'node:vm'; import type { WorkflowRuntimeError } from '@workflow/errors'; import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde'; +import type { Encryptor } from '@workflow/world'; import { describe, expect, it } from 'vitest'; import { registerSerializationClass } from './class-serialization.js'; import { getStepFunction, registerStepFunction } from './private.js'; @@ -2883,3 +2884,263 @@ describe('decodeFormatPrefix legacy compatibility', () => { expect(decoded).toBe('["test"]'); }); }); +describe('encryption integration', () => { + // Create a mock encryptor that actually encrypts/decrypts + const createTestEncryptor = (): Encryptor => { + // Simple XOR-based "encryption" for testing (NOT secure, just for tests) + const xorKey = 0x42; + + return { + async encrypt(data: Uint8Array, context: { runId: string }) { + // Add 'encr' prefix and XOR the data + const result = new Uint8Array(4 + data.length); + result.set(new TextEncoder().encode('encr'), 0); + for (let i = 0; i < data.length; i++) { + result[4 + i] = + data[i] ^ + xorKey ^ + (context.runId.charCodeAt(i % context.runId.length) & 0xff); + } + return result; + }, + async decrypt(data: Uint8Array, context: { runId: string }) { + // Check prefix and XOR back + const prefix = new TextDecoder().decode(data.subarray(0, 4)); + if (prefix !== 'encr') { + throw new Error(`Invalid prefix: ${prefix}`); + } + const result = new Uint8Array(data.length - 4); + for (let i = 0; i < result.length; i++) { + result[i] = + data[4 + i] ^ + xorKey ^ + (context.runId.charCodeAt(i % context.runId.length) & 0xff); + } + return result; + }, + }; + }; + + it('should encrypt workflow arguments when encryptor is provided', async () => { + const testEncryptor = createTestEncryptor(); + const testRunId = 'wrun_test123'; + const testValue = { message: 'secret data', count: 42 }; + + const encrypted = await dehydrateWorkflowArguments( + testValue, + testRunId, + testEncryptor, + [], + globalThis, + false + ); + + // Should be a Uint8Array with 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + }); + + it('should decrypt workflow arguments when encryptor is provided', async () => { + const testEncryptor = createTestEncryptor(); + const testRunId = 'wrun_test123'; + const testValue = { message: 'secret data', count: 42 }; + + const encrypted = await dehydrateWorkflowArguments( + testValue, + testRunId, + testEncryptor, + [], + globalThis, + false + ); + + const decrypted = await hydrateWorkflowArguments( + encrypted, + testRunId, + testEncryptor, + globalThis, + {} + ); + + expect(decrypted).toEqual(testValue); + }); + + it('should fail to decrypt with wrong runId', async () => { + const testEncryptor = createTestEncryptor(); + const testRunId = 'wrun_test123'; + const wrongRunId = 'wrun_wrong456'; + const testValue = { message: 'secret data' }; + + const encrypted = await dehydrateWorkflowArguments( + testValue, + testRunId, + testEncryptor, + [], + globalThis, + false + ); + + // Decrypting with wrong runId should produce garbage (in real crypto would fail auth) + // Our simple XOR produces bytes that aren't valid JSON, so hydration throws + await expect( + hydrateWorkflowArguments( + encrypted, + wrongRunId, + testEncryptor, + globalThis, + {} + ) + ).rejects.toThrow(); + }); + + it('should not encrypt when no encryptor is provided', async () => { + const noEncryptor: Encryptor = {}; + const testRunId = 'wrun_test123'; + const testValue = { message: 'plain data' }; + + const serialized = await dehydrateWorkflowArguments( + testValue, + testRunId, + noEncryptor, + [], + globalThis, + false + ); + + // Should be a Uint8Array with 'devl' prefix (not encrypted) + expect(serialized).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (serialized as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('devl'); + }); + + it('should handle unencrypted data when encryptor is provided', async () => { + const testEncryptor = createTestEncryptor(); + const testRunId = 'wrun_test123'; + const testValue = { message: 'plain data' }; + + // Serialize without encryption + const serialized = await dehydrateWorkflowArguments( + testValue, + testRunId, + {}, // no encryptor + [], + globalThis, + false + ); + + // Hydrate with encryptor - should still work because data isn't encrypted + const hydrated = await hydrateWorkflowArguments( + serialized, + testRunId, + testEncryptor, + globalThis, + {} + ); + + expect(hydrated).toEqual(testValue); + }); + + it('should encrypt step arguments', async () => { + const testEncryptor = createTestEncryptor(); + const testRunId = 'wrun_test123'; + const testValue = ['arg1', { nested: 'value' }, 123]; + + // dehydrateStepArguments signature: (value, runId, encryptor, global, v1Compat) + const encrypted = await dehydrateStepArguments( + testValue, + testRunId, + testEncryptor, + globalThis, + false + ); + + // Should have 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + + // Should round-trip correctly + // hydrateStepArguments signature: (value, runId, encryptor, ops, global) + const decrypted = await hydrateStepArguments( + encrypted, + testRunId, + testEncryptor, + [], + globalThis + ); + + expect(decrypted).toEqual(testValue); + }); + + it('should encrypt step return values', async () => { + const testEncryptor = createTestEncryptor(); + const testRunId = 'wrun_test123'; + const testValue = { result: 'success', data: [1, 2, 3] }; + + const encrypted = await dehydrateStepReturnValue( + testValue, + testRunId, + testEncryptor, + [], + globalThis + ); + + // Should have 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + + // Should round-trip correctly + const decrypted = await hydrateStepReturnValue( + encrypted, + testRunId, + testEncryptor, + globalThis + ); + + expect(decrypted).toEqual(testValue); + }); + + it('should encrypt workflow return values', async () => { + const testEncryptor = createTestEncryptor(); + const testRunId = 'wrun_test123'; + const testValue = { final: 'result', timestamp: Date.now() }; + + // dehydrateWorkflowReturnValue signature: (value, runId, encryptor, global) + const encrypted = await dehydrateWorkflowReturnValue( + testValue, + testRunId, + testEncryptor, + globalThis + ); + + // Should have 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + + // Should round-trip correctly + // hydrateWorkflowReturnValue signature: (value, runId, encryptor, ops, global, extraRevivers) + const decrypted = await hydrateWorkflowReturnValue( + encrypted, + testRunId, + testEncryptor, + [], + globalThis, + {} + ); + + expect(decrypted).toEqual(testValue); + }); +}); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index d7e3e6af7..da45d2135 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1,6 +1,6 @@ import { WorkflowRuntimeError } from '@workflow/errors'; import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde'; -import type { Encryptor } from '@workflow/world'; +import type { EncryptionContext, Encryptor } from '@workflow/world'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; import { getSerializationClass } from './class-serialization.js'; @@ -53,6 +53,8 @@ import { export const SerializationFormat = { /** devalue stringify/parse with TextEncoder/TextDecoder */ DEVALUE_V1: 'devl', + /** Encrypted payload (inner payload has its own format prefix) */ + ENCRYPTED: 'encr', } as const; export type SerializationFormatType = @@ -95,6 +97,38 @@ export function encodeWithFormatPrefix( return result; } +/** + * Peek at the format prefix without consuming it. + * Useful for checking if data is encrypted before deciding how to process it. + * + * @param data - The format-prefixed data + * @returns The format identifier, or null if data is legacy/non-binary + */ +export function peekFormatPrefix( + data: Uint8Array | unknown +): SerializationFormatType | null { + if (!(data instanceof Uint8Array) || data.length < FORMAT_PREFIX_LENGTH) { + return null; + } + const prefixBytes = data.subarray(0, FORMAT_PREFIX_LENGTH); + const format = formatDecoder.decode(prefixBytes); + const knownFormats = Object.values(SerializationFormat) as string[]; + if (!knownFormats.includes(format)) { + return null; + } + return format as SerializationFormatType; +} + +/** + * Check if data is encrypted (has 'encr' format prefix). + * + * @param data - The format-prefixed data + * @returns true if data has the encrypted format prefix + */ +export function isEncrypted(data: Uint8Array | unknown): boolean { + return peekFormatPrefix(data) === SerializationFormat.ENCRYPTED; +} + /** * Decode a format-prefixed payload. * @@ -239,10 +273,70 @@ export function getDeserializeStream( return stream; } +/** + * Create a transform stream that encrypts each chunk. + * Used to encrypt stream data before writing to storage. + * + * @param encryptor - Encryptor instance with encrypt function + * @param runId - Run ID for encryption context + * @returns TransformStream that encrypts Uint8Array chunks + */ +export function getEncryptStream( + encryptor: Encryptor, + runId: string +): TransformStream { + return new TransformStream({ + async transform(chunk, controller) { + if (!encryptor.encrypt) { + // No encryption available, pass through unchanged + controller.enqueue(chunk); + return; + } + + const encrypted = await encryptor.encrypt(chunk, { runId }); + controller.enqueue(encrypted); + }, + }); +} + +/** + * Create a transform stream that decrypts each chunk. + * Used to decrypt stream data read from storage. + * + * @param encryptor - Encryptor instance with decrypt function + * @param runId - Run ID for decryption context + * @returns TransformStream that decrypts Uint8Array chunks + */ +export function getDecryptStream( + encryptor: Encryptor, + runId: string +): TransformStream { + return new TransformStream({ + async transform(chunk, controller) { + // Check if chunk is encrypted + if (!isEncrypted(chunk)) { + // Not encrypted, pass through unchanged + controller.enqueue(chunk); + return; + } + + if (!encryptor.decrypt) { + throw new WorkflowRuntimeError( + 'Encrypted stream data encountered but Encryptor does not support decryption. ' + + 'Ensure VERCEL_DEPLOYMENT_KEY is set or provide an Encryptor with decryption support.' + ); + } + + const decrypted = await encryptor.decrypt(chunk, { runId }); + controller.enqueue(decrypted); + }, + }); +} + export class WorkflowServerReadableStream extends ReadableStream { #reader?: ReadableStreamDefaultReader; - constructor(name: string, startIndex?: number) { + constructor(name: string, startIndex?: number, runId?: string) { if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } @@ -267,7 +361,22 @@ export class WorkflowServerReadableStream extends ReadableStream { this.#reader = undefined; controller.close(); } else { - controller.enqueue(result.value); + // Decrypt chunk if encrypted and runId is provided + let chunk = result.value; + if (runId && isEncrypted(chunk)) { + const world = getWorld(); + if (!world.decrypt) { + controller.error( + new WorkflowRuntimeError( + 'Encrypted stream data encountered but World does not support decryption. ' + + 'Ensure VERCEL_DEPLOYMENT_KEY is set.' + ) + ); + return; + } + chunk = await world.decrypt(chunk, { runId }); + } + controller.enqueue(chunk); } }, }); @@ -305,7 +414,14 @@ export class WorkflowServerWritableStream extends WritableStream { // Copy chunks to flush, but don't clear buffer until write succeeds // This prevents data loss if the write operation fails - const chunksToFlush = buffer.slice(); + let chunksToFlush = buffer.slice(); + + // Encrypt chunks if world supports encryption + if (world.encrypt) { + chunksToFlush = await Promise.all( + chunksToFlush.map((chunk) => world.encrypt!(chunk, { runId })) + ); + } // Use writeToStreamMulti if available for batch writes if ( @@ -653,7 +769,7 @@ export function getExternalReducers( const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); const name = `strm_${streamId}`; - const readable = new WorkflowServerReadableStream(name); + const readable = new WorkflowServerReadableStream(name, undefined, runId); ops.push(readable.pipeTo(value)); return { name }; @@ -739,12 +855,6 @@ function getStepReducers( let type = value[STREAM_TYPE_SYMBOL]; if (!name) { - if (!runId) { - throw new Error( - 'ReadableStream cannot be serialized without a valid runId' - ); - } - const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); name = `strm_${streamId}`; type = getStreamType(value); @@ -773,16 +883,10 @@ function getStepReducers( let name = value[STREAM_NAME_SYMBOL]; if (!name) { - if (!runId) { - throw new Error( - 'WritableStream cannot be serialized without a valid runId' - ); - } - const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); name = `strm_${streamId}`; ops.push( - new WorkflowServerReadableStream(name) + new WorkflowServerReadableStream(name, undefined, runId) .pipeThrough( getDeserializeStream(getStepRevivers(global, ops, runId)) ) @@ -959,7 +1063,8 @@ export function getExternalRevivers( const readable = new WorkflowServerReadableStream( value.name, - value.startIndex + value.startIndex, + runId ); if (value.type === 'bytes') { // For byte streams, use flushable pipe with lock polling @@ -1227,7 +1332,11 @@ function getStepRevivers( return response.body; } - const readable = new WorkflowServerReadableStream(value.name); + const readable = new WorkflowServerReadableStream( + value.name, + undefined, + runId + ); if (value.type === 'bytes') { // For byte streams, use flushable pipe with lock polling const state = createFlushableState(); @@ -1265,12 +1374,6 @@ function getStepRevivers( } }, WritableStream: (value) => { - if (!runId) { - throw new Error( - 'WritableStream cannot be revived without a valid runId' - ); - } - const serialize = getSerializeStream(getStepReducers(global, ops, runId)); const serverWritable = new WorkflowServerWritableStream( value.name, @@ -1294,20 +1397,82 @@ function getStepRevivers( }; } +// ============================================================================ +// Encryption Helpers +// ============================================================================ + +/** + * Encrypt data if the world supports encryption. + * Returns original data if encryption is not available. + * + * @param data - Serialized data to encrypt + * @param world - World instance (may have encrypt function) + * @param context - Encryption context with runId + * @returns Encrypted data if encryption available, original data otherwise + */ +export async function maybeEncrypt( + data: Uint8Array, + encryptor: Encryptor, + context: EncryptionContext +): Promise { + if (encryptor.encrypt) { + return encryptor.encrypt(data, context); + } + return data; +} + +/** + * Decrypt data if it has the 'encr' prefix. + * Throws if encrypted but encryptor doesn't support decryption. + * + * @param data - Data that may be encrypted + * @param encryptor - Encryptor instance (may have decrypt function) + * @param context - Encryption context with runId + * @returns Decrypted data if encrypted, original data otherwise + */ +export async function maybeDecrypt( + data: Uint8Array | unknown, + encryptor: Encryptor, + context: EncryptionContext +): Promise { + if (!(data instanceof Uint8Array)) { + return data; + } + + if (isEncrypted(data)) { + if (!encryptor.decrypt) { + throw new WorkflowRuntimeError( + 'Encrypted data encountered but Encryptor does not support decryption. ' + + 'Ensure VERCEL_DEPLOYMENT_KEY is set or provide an Encryptor with decryption support.' + ); + } + return encryptor.decrypt(data, context); + } + + return data; +} + +// ============================================================================ +// Dehydrate / Hydrate Functions +// ============================================================================ + /** * Called from the `start()` function to serialize the workflow arguments * into a format that can be saved to the database and then hydrated from * within the workflow execution environment. * - * @param value - * @param global - * @param runId + * @param value - The value to serialize + * @param runId - The workflow run ID (required for encryption context) + * @param world - World instance for encryption support + * @param ops - Promise array for stream operations + * @param global - Global object for serialization context + * @param v1Compat - Enable legacy v1 compatibility mode * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateWorkflowArguments( value: unknown, runId: string, - _encryptor: Encryptor, + encryptor: Encryptor, ops: Promise[] = [], global: Record = globalThis, v1Compat = false @@ -1318,7 +1483,13 @@ export async function dehydrateWorkflowArguments( return revive(str); } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, encryptor, { runId }); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('workflow arguments', error), @@ -1332,25 +1503,30 @@ export async function dehydrateWorkflowArguments( * arguments from the database at the start of workflow execution. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param global - * @param extraRevivers + * @param runId - Run ID for decryption context + * @param world - World instance for decryption support + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated value */ export async function hydrateWorkflowArguments( value: Uint8Array | unknown, - _runId: string, - _encryptor: Encryptor, + runId: string, + encryptor: Encryptor, global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, encryptor, { runId }); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getWorkflowRevivers(global), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); @@ -1368,24 +1544,28 @@ export async function hydrateWorkflowArguments( * Called at the end of a completed workflow execution to serialize the * return value into a format that can be saved to the database. * - * @param value - * @param global + * @param value - The value to serialize + * @param runId - Run ID for encryption context + * @param world - World instance for encryption support + * @param global - Global object for serialization context * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateWorkflowReturnValue( value: unknown, - _runId: string, - _encryptor: Encryptor, - global: Record = globalThis, - v1Compat = false + runId: string, + encryptor: Encryptor, + global: Record = globalThis ): Promise { try { const str = stringify(value, getWorkflowReducers(global)); - if (v1Compat) { - return revive(str); - } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, encryptor, { runId }); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('workflow return value', error), @@ -1400,28 +1580,32 @@ export async function dehydrateWorkflowReturnValue( * return value of a completed workflow run. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param ops - * @param global - * @param extraRevivers - * @param runId + * @param runId - Run ID for decryption context + * @param world - World instance for decryption support + * @param ops - Promise array for stream operations + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated return value, ready to be consumed by the client */ export async function hydrateWorkflowReturnValue( value: Uint8Array | unknown, runId: string, - _encryptor: Encryptor, + encryptor: Encryptor, ops: Promise[] = [], global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, encryptor, { runId }); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getExternalRevivers(global, ops, runId), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); @@ -1440,14 +1624,17 @@ export async function hydrateWorkflowReturnValue( * Dehydrates values from within the workflow execution environment * into a format that can be saved to the database. * - * @param value - * @param global + * @param value - The value to serialize + * @param runId - Run ID for encryption context + * @param world - World instance for encryption support + * @param global - Global object for serialization context + * @param v1Compat - Enable legacy v1 compatibility mode * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateStepArguments( value: unknown, - _runId: string, - _encryptor: Encryptor, + runId: string, + encryptor: Encryptor, global: Record = globalThis, v1Compat = false ): Promise { @@ -1457,7 +1644,13 @@ export async function dehydrateStepArguments( return revive(str); } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, encryptor, { runId }); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('step arguments', error), @@ -1471,28 +1664,32 @@ export async function dehydrateStepArguments( * from the database at the start of the step execution. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param ops - * @param global - * @param extraRevivers - * @param runId + * @param runId - Run ID for decryption context + * @param world - World instance for decryption support + * @param ops - Promise array for stream operations + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated value, ready to be consumed by the step user-code function */ export async function hydrateStepArguments( value: Uint8Array | unknown, runId: string, - _encryptor: Encryptor, + encryptor: Encryptor, ops: Promise[] = [], global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, encryptor, { runId }); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getStepRevivers(global, ops, runId), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); @@ -1511,16 +1708,18 @@ export async function hydrateStepArguments( * Dehydrates values from within the step execution environment * into a format that can be saved to the database. * - * @param value - * @param ops - * @param global - * @param runId + * @param value - The value to serialize + * @param runId - Run ID for encryption context + * @param world - World instance for encryption support + * @param ops - Promise array for stream operations + * @param global - Global object for serialization context + * @param v1Compat - Enable legacy v1 compatibility mode * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateStepReturnValue( value: unknown, runId: string, - _encryptor: Encryptor, + encryptor: Encryptor, ops: Promise[] = [], global: Record = globalThis, v1Compat = false @@ -1531,7 +1730,13 @@ export async function dehydrateStepReturnValue( return revive(str); } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, encryptor, { runId }); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('step return value', error), @@ -1545,25 +1750,30 @@ export async function dehydrateStepReturnValue( * Hydrates the return value of a step from the database. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param global - * @param extraRevivers + * @param runId - Run ID for decryption context + * @param world - World instance for decryption support + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated return value of a step, ready to be consumed by the workflow handler */ export async function hydrateStepReturnValue( value: Uint8Array | unknown, - _runId: string, - _encryptor: Encryptor, + runId: string, + encryptor: Encryptor, global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, encryptor, { runId }); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getWorkflowRevivers(global), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index 6cd93d2c7..80bea2076 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -149,7 +149,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { ctx.globalThis ) .then((hydratedResult) => { - resolve(hydratedResult); + resolve(hydratedResult as Result); }) .catch((error) => { reject(error); diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 45d77a340..f564f9d88 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -627,12 +627,12 @@ export async function runWorkflow( ); } - const args = await hydrateWorkflowArguments( + const args = (await hydrateWorkflowArguments( workflowRun.input, workflowRun.runId, encryptor, vmGlobalThis - ); + )) as any[]; span?.setAttributes({ ...Attribute.WorkflowArgumentsCount(args.length), diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 36f19b20a..6a6fdf888 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -101,7 +101,7 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { ctx.globalThis ) .then((payload) => { - next.resolve(payload); + next.resolve(payload as T); }) .catch((error) => { next.reject(error); @@ -152,7 +152,7 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { ctx.globalThis ) .then((payload) => { - resolvers.resolve(payload); + resolvers.resolve(payload as T); }) .catch((error) => { resolvers.reject(error);