diff --git a/README.md b/README.md index 14a75bc..a2c76c0 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ import { DynamoStreamHandler } from '@lifeomic/delta'; const stream = new DynamoStreamHandler({ logger, + // Optionally specify a list of image keys to obfuscate the values of + loggerObfuscateImageKeys: ['api-secret'], parse: (item) => { // parse the item using your custom logic, e.g. using zod or ajv. return { id: item.id }; @@ -165,12 +167,22 @@ test('something', async () => { ### Parallel Processing + Ordering -By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and `SQSMessageHandler`) will process events in parallel. To control the parallelization, specify a `concurrency` value when creating the handler. +By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and +`SQSMessageHandler`) will process events in parallel. To control the +parallelization, specify a `concurrency` value when creating the handler. -These abstractions also ensure that within a batch of events correct _ordering_ of events is maintained according to the ordering semantics of the upstream event source, even when processing in parallel. +These abstractions also ensure that within a batch of events correct _ordering_ +of events is maintained according to the ordering semantics of the upstream +event source, even when processing in parallel. -In `DynamoStreamHandler`, events for the same _key_ will always be processed serially -- events from different keys will be processed in parallel. +In `DynamoStreamHandler`, events for the same _key_ will always be processed +serially -- events from different keys will be processed in parallel. -In `SQSMessageHandler`, events with the same `MessageGroupId` will always processed serially -- events with different `MessageGroupId` values will be processed in parallel. +In `SQSMessageHandler`, events with the same `MessageGroupId` will always +processed serially -- events with different `MessageGroupId` values will be +processed in parallel. -**Note**: while the ordering semantics above will always be preserved, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch (even when using a `concurrency` value of `1`). +**Note**: while the ordering semantics above will always be preserved, events +that do _not_ need to be ordered will not necessarily be processed in the same +order they were received in the batch (even when using a `concurrency` value of +`1`). diff --git a/src/dynamo-streams.test.ts b/src/dynamo-streams.test.ts index 80abfbf..0709fc9 100644 --- a/src/dynamo-streams.test.ts +++ b/src/dynamo-streams.test.ts @@ -223,7 +223,6 @@ describe('DynamoStreamHandler', () => { ); expect(dataSources.doSomething).toHaveBeenCalledTimes(5); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(1, 'insert 1', { id: 'test-id-1', }); @@ -431,6 +430,7 @@ describe('DynamoStreamHandler', () => { describe('error scenarios', () => { const lambda = new DynamoStreamHandler({ logger, + loggerObfuscateImageKeys: ['secret'], parse: testSerializer.parse, createRunContext: () => ({ logger, dataSources }), }).lambda(); @@ -443,7 +443,6 @@ describe('DynamoStreamHandler', () => { ); expect(logger.error).toHaveBeenCalledWith( - expect.anything(), 'The dynamodb property was not present on event', ); }); @@ -456,7 +455,6 @@ describe('DynamoStreamHandler', () => { ); expect(logger.error).toHaveBeenCalledWith( - expect.anything(), 'No NewImage was defined for an INSERT event', ); }); @@ -476,9 +474,14 @@ describe('DynamoStreamHandler', () => { ); expect(logger.error).toHaveBeenCalledWith( - expect.anything(), 'No NewImage was defined for a MODIFY event', ); + expect(logger.child).toHaveBeenCalledWith({ + record: { + eventName: 'MODIFY', + dynamodb: { OldImage: { id: { S: 'test-id' } } }, + }, + }); }); test('MODIFY with no OldImage', async () => { @@ -487,7 +490,9 @@ describe('DynamoStreamHandler', () => { Records: [ { eventName: 'MODIFY', - dynamodb: { NewImage: { id: { S: 'test-id' } } }, + dynamodb: { + NewImage: { id: { S: 'test-id' }, secret: { S: 'test-id' } }, + }, }, ], }, @@ -496,9 +501,16 @@ describe('DynamoStreamHandler', () => { ); expect(logger.error).toHaveBeenCalledWith( - expect.anything(), 'No OldImage was defined for a MODIFY event', ); + expect(logger.child).toHaveBeenCalledWith({ + record: { + eventName: 'MODIFY', + dynamodb: { + NewImage: { id: { S: 'test-id' }, secret: { S: 'obfuscated' } }, + }, + }, + }); }); test('REMOVE with no OldImage', async () => { @@ -509,7 +521,6 @@ describe('DynamoStreamHandler', () => { ); expect(logger.error).toHaveBeenCalledWith( - expect.anything(), 'No OldImage was defined for a REMOVE event', ); }); diff --git a/src/dynamo-streams.ts b/src/dynamo-streams.ts index a7acee1..279dfdf 100644 --- a/src/dynamo-streams.ts +++ b/src/dynamo-streams.ts @@ -1,6 +1,10 @@ import { LoggerInterface } from '@lifeomic/logging'; import { v4 as uuid } from 'uuid'; -import { DynamoDBStreamEvent, DynamoDBStreamHandler } from 'aws-lambda'; +import { + DynamoDBStreamEvent, + DynamoDBStreamHandler, + DynamoDBRecord, +} from 'aws-lambda'; import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'; import { BaseContext, @@ -13,6 +17,11 @@ export type DynamoStreamHandlerConfig = { * A logger to use in the context. */ logger: LoggerInterface; + /** + * A listing of keys within a dynamo record's images to obfuscate in logging + * output. + */ + loggerObfuscateImageKeys?: string[]; /** * A function for parsing images from the stream into your custom type. * @@ -60,7 +69,6 @@ export type DynamoStreamHandlerHarnessConfig = { * An optional override for the logger. */ logger?: LoggerInterface; - /** * An optional override for creating the run context. */ @@ -127,6 +135,45 @@ export class DynamoStreamHandler { return copy; } + private obfuscate(blob: any, keys: string[]): any { + if (blob === undefined) return undefined; + const obfuscated = blob; + keys.forEach((k) => { + if (obfuscated[k]) { + obfuscated[k] = { S: 'obfuscated' }; + } + }); + return obfuscated; + } + + private obfuscateRecord(dynamoRecord: DynamoDBRecord): DynamoDBRecord { + if (this.config.loggerObfuscateImageKeys && dynamoRecord.dynamodb) { + return { + ...dynamoRecord, + dynamodb: { + ...dynamoRecord.dynamodb, + NewImage: this.obfuscate( + dynamoRecord.dynamodb.NewImage, + this.config.loggerObfuscateImageKeys, + ), + OldImage: this.obfuscate( + dynamoRecord.dynamodb.OldImage, + this.config.loggerObfuscateImageKeys, + ), + }, + }; + } + return dynamoRecord; + } + + private obfuscateEvent( + dynamoEvent: DynamoDBStreamEvent, + ): DynamoDBStreamEvent { + return { + Records: dynamoEvent.Records.map((r) => this.obfuscateRecord(r)), + }; + } + /** * Adds an "INSERT" event handler. */ @@ -178,7 +225,10 @@ export class DynamoStreamHandler { ...base, }; - context.logger.info({ event }, 'Processing DynamoDB stream event'); + context.logger.info( + { event: this.obfuscateEvent(event) }, + 'Processing DynamoDB stream event', + ); await processWithOrdering( { @@ -194,7 +244,7 @@ export class DynamoStreamHandler { // We need to order by key -- so, just stringify the key. // // But, add custom logic to ensure that the key object is stringified - // determinstically, regardless of the order of its keys. (e.g. we + // deterministically, regardless of the order of its keys. (e.g. we // should stringify { a: 1, b: 2 } and { b: 2, a: 1 } to the same string) // // It's possible that AWS already ensures that the keys are deterministically @@ -210,10 +260,11 @@ export class DynamoStreamHandler { stopOnError: false, }, async (record) => { - const recordLogger = this.config.logger.child({ record }); + const recordLogger = this.config.logger.child({ + record: this.obfuscateRecord(record), + }); if (!record.dynamodb) { recordLogger.error( - { record }, 'The dynamodb property was not present on event', ); return; @@ -233,10 +284,7 @@ export class DynamoStreamHandler { // Handle INSERT events -- invoke the INSERT actions in order. if (record.eventName === 'INSERT') { if (!newEntity) { - recordLogger.error( - { record }, - 'No NewImage was defined for an INSERT event', - ); + recordLogger.error('No NewImage was defined for an INSERT event'); return; } @@ -247,17 +295,11 @@ export class DynamoStreamHandler { // Handle MODIFY events -- invoke the MODIFY actions in order. else if (record.eventName === 'MODIFY') { if (!oldEntity) { - recordLogger.error( - { record }, - 'No OldImage was defined for a MODIFY event', - ); + recordLogger.error('No OldImage was defined for a MODIFY event'); return; } if (!newEntity) { - recordLogger.error( - { record }, - 'No NewImage was defined for a MODIFY event', - ); + recordLogger.error('No NewImage was defined for a MODIFY event'); return; } @@ -272,10 +314,7 @@ export class DynamoStreamHandler { // Handle REMOVE events -- invoke the REMOVE actions in order. else if (record.eventName === 'REMOVE') { if (!oldEntity) { - recordLogger.error( - { record }, - 'No OldImage was defined for a REMOVE event', - ); + recordLogger.error('No OldImage was defined for a REMOVE event'); return; } @@ -290,7 +329,7 @@ export class DynamoStreamHandler { /** * Returns a test harness for exercising the handler, with an optional - * overriden context. + * overridden context. */ harness( options?: DynamoStreamHandlerHarnessConfig,