Skip to content

Commit

Permalink
feat: use AsyncLocalStorage to store request state
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Dec 5, 2024
1 parent b98b502 commit 566877f
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 155 deletions.
29 changes: 29 additions & 0 deletions examples/helloworld.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
const { HttpClient } = require('..');

const httpClient = new HttpClient({
connect: {
timeout: 1500,
},
});
const url = process.argv[2] || 'https://npmmirror.com/';
console.log('timing: %s', url);

async function request() {
let res = await httpClient.request(url, {
followRedirect: false,
});
console.log('---------------------------');
console.log('GET %s, content size: %d, requestUrls: %o, socket: %o, rt: %o',
res.statusCode, res.data.length, res.res.requestUrls, res.res.socket, res.res.rt);
console.log(res.res.timing);

res = await httpClient.request(url, {
followRedirect: false,
});
console.log('---------------------------');
console.log('GET %s, content size: %d, requestUrls: %o, socket: %o, rt: %o',
res.statusCode, res.data.length, res.res.requestUrls, res.res.socket, res.res.rt);
console.log(res.res.timing);
}

request();
3 changes: 3 additions & 0 deletions src/AsyncLocalStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { AsyncLocalStorage } from 'node:async_hooks';

export const asyncLocalStorage = new AsyncLocalStorage<any>();
27 changes: 0 additions & 27 deletions src/BaseAgent.ts

This file was deleted.

29 changes: 0 additions & 29 deletions src/FetchOpaqueInterceptor.ts

This file was deleted.

5 changes: 2 additions & 3 deletions src/HttpAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import {
Dispatcher,
buildConnector,
} from 'undici';
import { BaseAgent, BaseAgentOptions } from './BaseAgent.js';

export type CheckAddressFunction = (ip: string, family: number | string, hostname: string) => boolean;

export interface HttpAgentOptions extends BaseAgentOptions {
export interface HttpAgentOptions extends Agent.Options {
lookup?: LookupFunction;
checkAddress?: CheckAddressFunction;
connect?: buildConnector.BuildOptions,
Expand All @@ -32,7 +31,7 @@ class IllegalAddressError extends Error {
}
}

export class HttpAgent extends BaseAgent {
export class HttpAgent extends Agent {
#checkAddress?: CheckAddressFunction;

constructor(options: HttpAgentOptions) {
Expand Down
38 changes: 33 additions & 5 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
Agent,
getGlobalDispatcher,
Pool,
interceptors,
} from 'undici';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
Expand All @@ -40,6 +41,7 @@ import { parseJSON, digestAuthHeader, globalId, performanceTime, isReadable, upd
import symbols from './symbols.js';
import { initDiagnosticsChannel } from './diagnosticsChannel.js';
import { HttpClientConnectTimeoutError, HttpClientRequestTimeoutError } from './HttpClientError.js';
import { asyncLocalStorage } from './AsyncLocalStorage.js';

type Exists<T> = T extends undefined ? never : T;
type UndiciRequestOption = Exists<Parameters<typeof undiciRequest>[1]>;
Expand Down Expand Up @@ -205,7 +207,10 @@ export class HttpClient extends EventEmitter {
this.#dispatcher = new Agent({
allowH2: clientOptions.allowH2,
});
} else {
this.#dispatcher = new Agent();
}
this.#dispatcher = this.#dispatcher.compose(this.#setInterceptors());
initDiagnosticsChannel();
}

Expand Down Expand Up @@ -250,6 +255,21 @@ export class HttpClient extends EventEmitter {
return await this.request<T>(url, options);
}

#setInterceptors() {
return [
(dispatch: any) => {
return function dnsAfterInterceptor(options: any, handler: any) {
const opaque = options.opaque;
const dnslookup = opaque[symbols.kRequestTiming].dnslookup = performanceTime(opaque[symbols.kRequestStartTime]);
debug('Request#%d dns lookup %sms, servername: %s, origin: %s',
opaque[symbols.kRequestId], dnslookup, options.servername, options.origin);
return dispatch(options, handler);
};
},
interceptors.dns(),
];
}

async #requestInternal<T>(url: RequestURL, options?: RequestOptions, requestContext?: RequestContext): Promise<HttpClientResponse<T>> {
const requestId = globalId('HttpClientRequest');
let requestUrl: URL;
Expand Down Expand Up @@ -300,7 +320,7 @@ export class HttpClient extends EventEmitter {
// socket assigned
queuing: 0,
// dns lookup time
// dnslookup: 0,
dnslookup: 0,
// socket connected
connected: 0,
// request headers sent
Expand Down Expand Up @@ -578,9 +598,11 @@ export class HttpClient extends EventEmitter {
this.emit('request', reqMeta);
}

let response = await undiciRequest(requestUrl, requestOptions as UndiciRequestOption);
if (response.statusCode === 401 && (response.headers['www-authenticate'] || response.headers['x-www-authenticate']) &&
!requestOptions.headers.authorization && args.digestAuth) {
let response = await this.#undiciRequest(internalOpaque, requestUrl, requestOptions as UndiciRequestOption);
if (response.statusCode === 401
&& (response.headers['www-authenticate'] || response.headers['x-www-authenticate'])
&& !requestOptions.headers.authorization
&& args.digestAuth) {
// handle digest auth
const authenticateHeaders = response.headers['www-authenticate'] ?? response.headers['x-www-authenticate'];
const authenticate = Array.isArray(authenticateHeaders)
Expand All @@ -597,7 +619,7 @@ export class HttpClient extends EventEmitter {
}
// Ensure the previous response is consumed as we re-use the same variable
await response.body.arrayBuffer();
response = await undiciRequest(requestUrl, requestOptions as UndiciRequestOption);
response = await this.#undiciRequest(internalOpaque, requestUrl, requestOptions as UndiciRequestOption);
}
}
const contentEncoding = response.headers['content-encoding'];
Expand Down Expand Up @@ -765,4 +787,10 @@ export class HttpClient extends EventEmitter {
throw err;
}
}

async #undiciRequest(internalOpaque: unknown, requestUrl: URL, requestOptions: UndiciRequestOption) {
return await asyncLocalStorage.run(internalOpaque, async () => {
return await undiciRequest(requestUrl, requestOptions);
});
}
}
2 changes: 1 addition & 1 deletion src/Response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export type Timing = {
// socket assigned
queuing: number;
// dns lookup time
// dnslookup: number;
dnslookup: number;
// socket connected
connected: number;
// request headers sent
Expand Down
71 changes: 26 additions & 45 deletions src/diagnosticsChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Socket } from 'node:net';
import { DiagnosticsChannel } from 'undici';
import symbols from './symbols.js';
import { globalId, performanceTime } from './utils.js';
import { asyncLocalStorage } from './AsyncLocalStorage.js';

const debug = debuglog('urllib:DiagnosticsChannel');
let initedDiagnosticsChannel = false;
Expand Down Expand Up @@ -52,47 +53,27 @@ Socket.prototype.destroy = function(err?: any) {
return destroySocket.call(this, err);
};

function getRequestOpaque(request: DiagnosticsChannel.Request, kHandler?: symbol) {
if (!kHandler) return;
const handler = Reflect.get(request, kHandler);
// maxRedirects = 0 will get [Symbol(handler)]: RequestHandler {
// responseHeaders: null,
// opaque: {
// [Symbol(request id)]: 1,
// [Symbol(request start time)]: 465.0712921619415,
// [Symbol(enable request timing or not)]: true,
// [Symbol(request timing)]: [Object],
// [Symbol(request original opaque)]: undefined
// }
return handler?.opts?.opaque ?? handler?.opaque;
}

export function initDiagnosticsChannel() {
// make sure init global DiagnosticsChannel once
if (initedDiagnosticsChannel) return;
initedDiagnosticsChannel = true;

let kHandler: symbol;
// This message is published when a new outgoing request is created.
// Note: a request is only loosely completed to a given socket.
subscribe('undici:request:create', (message, name) => {
const { request } = message as DiagnosticsChannel.RequestCreateMessage;
if (!kHandler) {
const symbols = Object.getOwnPropertySymbols(request);
for (const symbol of symbols) {
if (symbol.description === 'handler') {
kHandler = symbol;
break;
}
}
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}
const opaque = getRequestOpaque(request, kHandler);
// ignore non HttpClient Request
if (!opaque || !opaque[symbols.kRequestId]) return;
debug('[%s] Request#%d %s %s, path: %s, headers: %o',
name, opaque[symbols.kRequestId], request.method, request.origin, request.path, request.headers);
if (!opaque[symbols.kEnableRequestTiming]) return;
opaque[symbols.kRequestTiming].queuing = performanceTime(opaque[symbols.kRequestStartTime]);
let queuing = 0;
if (opaque[symbols.kEnableRequestTiming]) {
queuing = opaque[symbols.kRequestTiming].queuing = performanceTime(opaque[symbols.kRequestStartTime]);
}
debug('[%s] Request#%d %s %s, path: %s, headers: %o, queuing: %d',
name, opaque[symbols.kRequestId], request.method, request.origin, request.path,
request.headers, queuing);
});

subscribe('undici:client:connectError', (message, name) => {
Expand Down Expand Up @@ -141,9 +122,9 @@ export function initDiagnosticsChannel() {

// This message is published right before the first byte of the request is written to the socket.
subscribe('undici:client:sendHeaders', (message, name) => {
const { request, socket } = message as DiagnosticsChannel.ClientSendHeadersMessage & { socket: SocketExtend };
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) {
const { socket } = message as DiagnosticsChannel.ClientSendHeadersMessage & { socket: SocketExtend };
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}
Expand All @@ -165,10 +146,10 @@ export function initDiagnosticsChannel() {
}
});

subscribe('undici:request:bodySent', (message, name) => {
const { request } = message as DiagnosticsChannel.RequestBodySentMessage;
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) {
subscribe('undici:request:bodySent', (_message, name) => {
// const { request } = message as DiagnosticsChannel.RequestBodySentMessage;
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}
Expand All @@ -180,9 +161,9 @@ export function initDiagnosticsChannel() {

// This message is published after the response headers have been received, i.e. the response has been completed.
subscribe('undici:request:headers', (message, name) => {
const { request, response } = message as DiagnosticsChannel.RequestHeadersMessage;
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) {
const { response } = message as DiagnosticsChannel.RequestHeadersMessage;
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}
Expand All @@ -204,10 +185,10 @@ export function initDiagnosticsChannel() {
});

// This message is published after the response body and trailers have been received, i.e. the response has been completed.
subscribe('undici:request:trailers', (message, name) => {
const { request } = message as DiagnosticsChannel.RequestTrailersMessage;
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) {
subscribe('undici:request:trailers', (_message, name) => {
// const { request } = message as DiagnosticsChannel.RequestTrailersMessage;
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}
Expand Down
Loading

0 comments on commit 566877f

Please sign in to comment.