Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RD-14290-support-AWS-Lambda-Stream #533

Merged
merged 9 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/tracer/tracer.interface.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import type { Handler } from 'aws-lambda';
import type { Callback, Context, Handler } from 'aws-lambda';

import { ExecutionTags } from '../globals';
import * as LumigoLogger from '../lumigoLogger';

export type ResponseStreamHandler<TEvent = any, TResult = any> = (
event: TEvent,
responseStream: any,
context: Context,
callback?: Callback<TResult>
) => void | Promise<TResult>;

export interface Tracer {
trace: (handler: Handler) => Handler;
trace<T extends Handler | ResponseStreamHandler>(handler: T): T;
addExecutionTag: typeof ExecutionTags.addTag;
info: typeof LumigoLogger.info;
warn: typeof LumigoLogger.warn;
Expand Down
13 changes: 7 additions & 6 deletions src/tracer/tracer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -818,18 +818,19 @@ describe('tracer', () => {
});

test('responseStreamFunctionLogic - tracer disabled and decorator marked as responseStream', async () => {
const handler = jest.fn(async () => {});
const handler = jest.fn(async (event, responseStream, _) => responseStream);
handler[HANDLER_STREAMING] = STREAM_RESPONSE;

const { event, context } = new HandlerInputsBuilder().build();
const { event, context, responseStream } = new HandlerInputsBuilder()
// here we put mocked data to stream, because handler is mocked as well. Mocked `handler` just return response stream back
.withResponseStream({ hello: 'world' })
.build();

const decoratedUserHandler = tracer.trace({})(handler);
await decoratedUserHandler(event, context);
const result = await decoratedUserHandler(event, responseStream, context);

expect(decoratedUserHandler[HANDLER_STREAMING]).toEqual(STREAM_RESPONSE);
expect(spies.warnClient).toHaveBeenCalledWith(
'Tracer is disabled, running on a response stream function'
);
expect(result).toEqual({ hello: 'world' });
});

test('performStepFunctionLogic - Happy flow', async () => {
Expand Down
201 changes: 131 additions & 70 deletions src/tracer/tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
import { runOneTimeWrapper } from '../utils/functionUtils';
import { TraceOptions } from './trace-options.type';
import { GenericSpan } from '../types/spans/basicSpan';
import { ResponseStreamHandler } from './tracer.interface';

export const HANDLER_CALLBACKED = 'handler_callbacked';
export const HANDLER_STREAMING = Symbol.for('aws.lambda.runtime.handler.streaming');
Expand All @@ -49,86 +50,138 @@ export const MAX_ELEMENTS_IN_EXTRA = 10;
export const LEAK_MESSAGE =
'Execution leak detected. More information is available in: https://docs.lumigo.io/docs/execution-leak-detected';

export const trace =
({ token, debug, edgeHost, switchOff, stepFunction }: TraceOptions) =>
(userHandler: Handler) => {
const isResponseStreamFunction = userHandler[HANDLER_STREAMING] === STREAM_RESPONSE;
const decoratedUserHandler = async <Event = any>(
event: Event,
context?: Context,
callback?: Callback
): Promise<Handler> => {
if (!!switchOff || isSwitchedOff()) {
info(
`The '${SWITCH_OFF_FLAG}' environment variable is set to 'true': this invocation will not be traced by Lumigo`
);
return userHandler(event, context, callback);
}
const isResponseStreamFunction = (userHandler: any) =>
userHandler[HANDLER_STREAMING] === STREAM_RESPONSE;

const runUserHandler = <Event>(
userHandler: any,
event: Event,
context: Context,
callback?: Callback,
responseStream?: any
) =>
isResponseStreamFunction(userHandler)
? userHandler(event, responseStream, context, callback)
: userHandler(event, context, callback);

const processUserHandler = async <Event>(
userHandler: any,
event: Event,
context: Context,
options: TraceOptions,
callback?: Callback,
responseStream?: any
) => {
const { token, debug, edgeHost, switchOff, stepFunction } = options;

if (!!switchOff || isSwitchedOff()) {
info(
`The '${SWITCH_OFF_FLAG}' environment variable is set to 'true': this invocation will not be traced by Lumigo`
);
return runUserHandler(userHandler, event, context, callback, responseStream);
}

if (!isAwsEnvironment()) {
warnClient('Tracer is disabled, running on non-aws environment');
return userHandler(event, context, callback);
}
if (isResponseStreamFunction) {
warnClient('Tracer is disabled, running on a response stream function');
return userHandler(event, context, callback);
}
try {
TracerGlobals.setHandlerInputs({ event, context });
TracerGlobals.setTracerInputs({
token,
debug,
edgeHost,
switchOff,
stepFunction,
lambdaTimeout: context.getRemainingTimeInMillis(),
});
ExecutionTags.autoTagEvent(event);
} catch (err) {
logger.warn('Failed to start tracer', err);
}
if (!isAwsEnvironment()) {
warnClient('Tracer is disabled, running on non-aws environment');
return runUserHandler(userHandler, event, context, callback, responseStream);
}

if (!context || !isAwsContext(context)) {
logger.warnClient(
'missing context parameter - learn more at https://docs.lumigo.io/docs/nodejs'
);
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}
try {
TracerGlobals.setHandlerInputs({ event, context });
TracerGlobals.setTracerInputs({
token,
debug,
edgeHost,
switchOff,
stepFunction,
lambdaTimeout: context.getRemainingTimeInMillis(),
});
ExecutionTags.autoTagEvent(event);
} catch (err) {
logger.warn('Failed to start tracer', err);
}

if (context.__wrappedByLumigo) {
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}
context.__wrappedByLumigo = true;
if (!context || !isAwsContext(context)) {
logger.warnClient(
'missing context parameter - learn more at https://docs.lumigo.io/docs/nodejs'
);
const { err, data, type } = await promisifyUserHandler(
userHandler,
event,
context,
responseStream
);
return performPromisifyType(err, data, type, callback);
}

const functionSpan = getFunctionSpan(event, context);
if (context.__wrappedByLumigo) {
const { err, data, type } = await promisifyUserHandler(
userHandler,
event,
context,
responseStream
);
return performPromisifyType(err, data, type, callback);
}
context.__wrappedByLumigo = true;

await hookUnhandledRejection(functionSpan);
const functionSpan = getFunctionSpan(event, context);

const pStartTrace = startTrace(functionSpan);
const pUserHandler = promisifyUserHandler(userHandler, event, context);
await hookUnhandledRejection(functionSpan);

let [, handlerReturnValue] = await Promise.all([pStartTrace, pUserHandler]);
const pStartTrace = startTrace(functionSpan);
const pUserHandler = promisifyUserHandler(userHandler, event, context, responseStream);

handlerReturnValue = normalizeLambdaError(handlerReturnValue);
let [, handlerReturnValue] = await Promise.all([pStartTrace, pUserHandler]);

if (isStepFunction()) {
handlerReturnValue = performStepFunctionLogic(handlerReturnValue);
}
handlerReturnValue = normalizeLambdaError(handlerReturnValue);

const cleanedHandlerReturnValue = removeLumigoFromStacktrace(handlerReturnValue);
if (isStepFunction()) {
handlerReturnValue = performStepFunctionLogic(handlerReturnValue);
}

await endTrace(functionSpan, cleanedHandlerReturnValue);
const { err, data, type } = cleanedHandlerReturnValue;
const cleanedHandlerReturnValue = removeLumigoFromStacktrace(handlerReturnValue);

return performPromisifyType(err, data, type, callback);
};
await endTrace(functionSpan, cleanedHandlerReturnValue);
const { err, data, type } = cleanedHandlerReturnValue;

if (isResponseStreamFunction) {
decoratedUserHandler[HANDLER_STREAMING] = STREAM_RESPONSE;
}
return decoratedUserHandler;
return performPromisifyType(err, data, type, callback);
};

const decorateUserHandler = <T extends Handler | ResponseStreamHandler>(
userHandler: T,
options: TraceOptions
) => {
const decoratedUserHandler = async <Event = any>(
event: Event,
context?: Context,
callback?: Callback
): Promise<Handler> => {
return await processUserHandler(userHandler, event, context, options, callback, undefined);
};

const decoratedResponseStreamUserHandler = async <Event = any>(
event: Event,
responseStream?: any,
context?: Context,
callback?: Callback
): Promise<ResponseStreamHandler> => {
return await processUserHandler(userHandler, event, context, options, callback, responseStream);
};

if (isResponseStreamFunction(userHandler)) {
logger.debug('Function has response stream in the handler');
decoratedResponseStreamUserHandler[HANDLER_STREAMING] = STREAM_RESPONSE;
return decoratedResponseStreamUserHandler as T;
} else {
return decoratedUserHandler as T;
}
};

export const trace =
(options: TraceOptions) =>
<T extends Handler | ResponseStreamHandler>(userHandler: T): T => {
return decorateUserHandler(userHandler, options);
};

export const startTrace = async (functionSpan: GenericSpan) => {
Expand Down Expand Up @@ -191,11 +244,14 @@ export const isCallbacked = (handlerReturnValue) => {
export function promisifyUserHandler(
userHandler,
event,
context
context,
responseStream?
): Promise<{ err: any; data: any; type: string }> {
return new Promise((resolve) => {
try {
const result = userHandler(event, context, callbackResolver(resolve));
const result = isResponseStreamFunction(userHandler)
? userHandler(event, responseStream, context, callbackResolver(resolve))
: userHandler(event, context, callbackResolver(resolve));
if (isPromise(result)) {
result
.then((data) => resolve({ err: null, data, type: ASYNC_HANDLER_RESOLVED }))
Expand Down Expand Up @@ -286,7 +342,12 @@ const logLeakedSpans = (allSpans) => {
});
};

const performPromisifyType = (err, data, type, callback): Handler => {
const performPromisifyType = <T extends Handler | ResponseStreamHandler>(
err,
data: T,
type,
callback
): T => {
switch (type) {
case HANDLER_CALLBACKED:
callback(err, data);
Expand Down
7 changes: 7 additions & 0 deletions testUtils/handlerInputsBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Context } from 'aws-lambda';
export class HandlerInputsBuilder {
_event: any;
_context: Context;
_responseStream: any;

static DEFAULT_AWS_REQUEST_ID = HttpSpanBuilder.DEFAULT_PARENT_ID;
static DEFAULT_INVOKED_FUNCTION_ARN = HttpSpanBuilder.DEFAULT_ARN;
Expand Down Expand Up @@ -45,8 +46,14 @@ export class HandlerInputsBuilder {
return this;
};

withResponseStream = (stream: any) => {
this._responseStream = stream;
return this;
};

build = () => ({
event: this._event,
context: this._context,
responseStream: this._responseStream,
});
}
Loading