Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Dec 5, 2024
1 parent 7f4fd62 commit 9a98389
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 94 deletions.
66 changes: 30 additions & 36 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ import { FormData } from './FormData.js';
import { HttpAgent, CheckAddressFunction } from './HttpAgent.js';
import type { IncomingHttpHeaders } from './IncomingHttpHeaders.js';
import { RequestURL, RequestOptions, HttpMethod, RequestMeta } from './Request.js';
import { RawResponseWithMeta, HttpClientResponse, SocketInfo } from './Response.js';
import { RawResponseWithMeta, HttpClientResponse, SocketInfo, InternalStore } from './Response.js';
import { parseJSON, digestAuthHeader, globalId, performanceTime, isReadable, updateSocketInfo } from './utils.js';
import symbols from './symbols.js';
import { initDiagnosticsChannel } from './diagnosticsChannel.js';
import { HttpClientConnectTimeoutError, HttpClientRequestTimeoutError } from './HttpClientError.js';
import { asyncLocalStorage } from './asyncLocalStorage2.js';
import { asyncLocalStorage } from './asyncLocalStorage.js';

type Exists<T> = T extends undefined ? never : T;
type UndiciRequestOption = Exists<Parameters<typeof undiciRequest>[1]>;
Expand Down Expand Up @@ -137,7 +136,6 @@ function defaultIsRetry(response: HttpClientResponse) {
export type RequestContext = {
retries: number;
socketErrorRetries: number;
requestStartTime?: number;
redirects: number;
history: string[];
};
Expand Down Expand Up @@ -247,7 +245,16 @@ export class HttpClient extends EventEmitter {
}

async request<T = any>(url: RequestURL, options?: RequestOptions) {
return await this.#requestInternal<T>(url, options);
// using opaque to diagnostics channel, binding request and socket
const requestId = globalId('HttpClientRequest');
const internalStore = {
requestId,
requestStartTime: performance.now(),
enableRequestTiming: !!options?.timing,
} as InternalStore;
return await asyncLocalStorage.run(internalStore, async () => {
return await this.#requestInternal<T>(url, options);
});
}

// alias to request, keep compatible with urllib@2 HttpClient.curl
Expand All @@ -259,12 +266,12 @@ export class HttpClient extends EventEmitter {
return [
(dispatch: any) => {
return function dnsAfterInterceptor(options: any, handler: any) {
const opaque = options.opaque;
if (opaque?.[symbols.kEnableRequestTiming]) {
const dnslookup = opaque[symbols.kRequestTiming].dnslookup =
performanceTime(opaque[symbols.kRequestStartTime]);
const store = asyncLocalStorage.getStore();
if (store?.enableRequestTiming) {
const dnslookup = store.requestTiming.dnslookup =
performanceTime(store.requestStartTime);
debug('Request#%d dns lookup %sms, servername: %s, origin: %s',
opaque[symbols.kRequestId], dnslookup, options.servername, options.origin);
store.requestId, dnslookup, options.servername, options.origin);
}
return dispatch(options, handler);
};
Expand All @@ -274,7 +281,6 @@ export class HttpClient extends EventEmitter {
}

async #requestInternal<T>(url: RequestURL, options?: RequestOptions, requestContext?: RequestContext): Promise<HttpClientResponse<T>> {
const requestId = globalId('HttpClientRequest');
let requestUrl: URL;
if (typeof url === 'string') {
if (!PROTO_RE.test(url)) {
Expand All @@ -298,7 +304,6 @@ export class HttpClient extends EventEmitter {
const args = {
retry: 0,
socketErrorRetry: 1,
timing: true,
...this.#defaultArgs,
...options,
// keep method and headers exists on args for request event handler to easy use
Expand All @@ -312,12 +317,11 @@ export class HttpClient extends EventEmitter {
history: [],
...requestContext,
};
if (!requestContext.requestStartTime) {
requestContext.requestStartTime = performance.now();
}
requestContext.history.push(requestUrl.href);
const requestStartTime = requestContext.requestStartTime;

const internalStore = asyncLocalStorage.getStore()!;
const requestStartTime = internalStore.requestStartTime;
const requestId = internalStore.requestId;
// https://developer.chrome.com/docs/devtools/network/reference/?utm_source=devtools#timing-explanation
const timing = {
// socket assigned
Expand All @@ -335,15 +339,9 @@ export class HttpClient extends EventEmitter {
// the response body and trailers have been received
contentDownload: 0,
};
internalStore.requestTiming = timing;
internalStore.enableRequestTiming = !!args.timing;
const originalOpaque = args.opaque;
// using opaque to diagnostics channel, binding request and socket
const internalOpaque = {
[symbols.kRequestId]: requestId,
[symbols.kRequestStartTime]: requestStartTime,
[symbols.kEnableRequestTiming]: !!args.timing,
[symbols.kRequestTiming]: timing,
[symbols.kRequestOriginalOpaque]: originalOpaque,
};
const reqMeta = {
requestId,
url: requestUrl.href,
Expand Down Expand Up @@ -447,7 +445,7 @@ export class HttpClient extends EventEmitter {
headersTimeout,
headers,
bodyTimeout,
opaque: internalOpaque,
opaque: originalOpaque,
dispatcher: args.dispatcher ?? this.#dispatcher,
signal: args.signal,
};
Expand Down Expand Up @@ -592,7 +590,9 @@ export class HttpClient extends EventEmitter {
}

debug('Request#%d %s %s, headers: %j, headersTimeout: %s, bodyTimeout: %s, isStreamingRequest: %s, isStreamingResponse: %s, maxRedirections: %s, redirects: %s',
requestId, requestOptions.method, requestUrl.href, headers, headersTimeout, bodyTimeout, isStreamingRequest, isStreamingResponse, maxRedirects, requestContext.redirects);
requestId, requestOptions.method, requestUrl.href, headers,
headersTimeout, bodyTimeout, isStreamingRequest, isStreamingResponse,
maxRedirects, requestContext.redirects);
requestOptions.headers = headers;
channels.request.publish({
request: reqMeta,
Expand All @@ -601,7 +601,7 @@ export class HttpClient extends EventEmitter {
this.emit('request', reqMeta);
}

let response = await this.#undiciRequest(internalOpaque, requestUrl, requestOptions as UndiciRequestOption);
let response = await undiciRequest(requestUrl, requestOptions as UndiciRequestOption);
if (response.statusCode === 401
&& (response.headers['www-authenticate'] || response.headers['x-www-authenticate'])
&& !requestOptions.headers.authorization
Expand All @@ -622,7 +622,7 @@ export class HttpClient extends EventEmitter {
}
// Ensure the previous response is consumed as we re-use the same variable
await response.body.arrayBuffer();
response = await this.#undiciRequest(internalOpaque, requestUrl, requestOptions as UndiciRequestOption);
response = await undiciRequest(requestUrl, requestOptions as UndiciRequestOption);
}
}
const contentEncoding = response.headers['content-encoding'];
Expand Down Expand Up @@ -690,7 +690,7 @@ export class HttpClient extends EventEmitter {
}
res.rt = performanceTime(requestStartTime);
// get real socket info from internalOpaque
updateSocketInfo(socketInfo, internalOpaque);
updateSocketInfo(socketInfo, internalStore);

const clientResponse: HttpClientResponse = {
opaque: originalOpaque,
Expand Down Expand Up @@ -738,7 +738,7 @@ export class HttpClient extends EventEmitter {

return clientResponse;
} catch (rawError: any) {
updateSocketInfo(socketInfo, internalOpaque, rawError);
updateSocketInfo(socketInfo, internalStore, rawError);
debug('Request#%d throw error: %s, socketErrorRetry: %s, socketErrorRetries: %s, socket: %j',
requestId, rawError, args.socketErrorRetry, requestContext.socketErrorRetries, socketInfo);
let err = rawError;
Expand Down Expand Up @@ -790,10 +790,4 @@ export class HttpClient extends EventEmitter {
throw err;
}
}

async #undiciRequest(internalOpaque: unknown, requestUrl: URL, requestOptions: UndiciRequestOption) {
return await asyncLocalStorage.run(internalOpaque, async () => {
return await undiciRequest(requestUrl, requestOptions);
});
}
}
9 changes: 9 additions & 0 deletions src/Response.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Readable } from 'node:stream';
import type { Socket } from 'node:net';
import type { IncomingHttpHeaders } from './IncomingHttpHeaders.js';

export type SocketInfo = {
Expand Down Expand Up @@ -41,6 +42,14 @@ export type Timing = {
contentDownload: number;
};

export interface InternalStore {
requestId: number;
requestStartTime: number;
enableRequestTiming: boolean;
requestTiming: Timing;
requestSocket?: Socket;
}

export type RawResponseWithMeta = Readable & {
status: number;
statusCode: number;
Expand Down
4 changes: 4 additions & 0 deletions src/asyncLocalStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import type { InternalStore } from './Response.js';

export const asyncLocalStorage = new AsyncLocalStorage<InternalStore>();
3 changes: 0 additions & 3 deletions src/asyncLocalStorage2.ts

This file was deleted.

78 changes: 41 additions & 37 deletions src/diagnosticsChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Socket } from 'node:net';
import { DiagnosticsChannel } from 'undici';
import symbols from './symbols.js';
import { globalId, performanceTime } from './utils.js';
import { asyncLocalStorage } from './asyncLocalStorage2.js';
import { asyncLocalStorage } from './asyncLocalStorage.js';

const debug = debuglog('urllib:DiagnosticsChannel');
let initedDiagnosticsChannel = false;
Expand Down Expand Up @@ -62,17 +62,17 @@ export function initDiagnosticsChannel() {
// Note: a request is only loosely completed to a given socket.
subscribe('undici:request:create', (message, name) => {
const { request } = message as DiagnosticsChannel.RequestCreateMessage;
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
const store = asyncLocalStorage.getStore();
if (!store?.requestId) {
debug('[%s] store not found', name);
return;
}
let queuing = 0;
if (opaque[symbols.kEnableRequestTiming]) {
queuing = opaque[symbols.kRequestTiming].queuing = performanceTime(opaque[symbols.kRequestStartTime]);
if (store.enableRequestTiming) {
queuing = store.requestTiming.queuing = performanceTime(store.requestStartTime);
}
debug('[%s] Request#%d %s %s, path: %s, headers: %o, queuing: %d',
name, opaque[symbols.kRequestId], request.method, request.origin, request.path,
name, store.requestId, request.method, request.origin, request.path,
request.headers, queuing);
});

Expand Down Expand Up @@ -123,80 +123,84 @@ export function initDiagnosticsChannel() {
// This message is published right before the first byte of the request is written to the socket.
subscribe('undici:client:sendHeaders', (message, name) => {
const { socket } = message as DiagnosticsChannel.ClientSendHeadersMessage & { socket: SocketExtend };
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
const store = asyncLocalStorage.getStore();
if (!store?.requestId) {
debug('[%s] store not found', name);
return;
}

(socket[symbols.kHandledRequests] as number)++;
// attach socket to opaque
opaque[symbols.kRequestSocket] = socket;
// attach socket to store
store.requestSocket = socket;
debug('[%s] Request#%d send headers on Socket#%d (handled %d requests, sock: %o)',
name, opaque[symbols.kRequestId], socket[symbols.kSocketId], socket[symbols.kHandledRequests],
name, store.requestId, socket[symbols.kSocketId], socket[symbols.kHandledRequests],
formatSocket(socket));

if (!opaque[symbols.kEnableRequestTiming]) return;
opaque[symbols.kRequestTiming].requestHeadersSent = performanceTime(opaque[symbols.kRequestStartTime]);
if (!store.enableRequestTiming) return;
store.requestTiming.requestHeadersSent = performanceTime(store.requestStartTime);
// first socket need to calculate the connected time
if (socket[symbols.kHandledRequests] === 1) {
// kSocketStartTime - kRequestStartTime = connected time
opaque[symbols.kRequestTiming].connected =
performanceTime(opaque[symbols.kRequestStartTime], socket[symbols.kSocketStartTime] as number);
store.requestTiming.connected =
performanceTime(store.requestStartTime, socket[symbols.kSocketStartTime] as number);
}
});

subscribe('undici:request:bodySent', (_message, name) => {
// const { request } = message as DiagnosticsChannel.RequestBodySentMessage;
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
const store = asyncLocalStorage.getStore();
if (!store?.requestId) {
debug('[%s] store not found', name);
return;
}

debug('[%s] Request#%d send body', name, opaque[symbols.kRequestId]);
if (!opaque[symbols.kEnableRequestTiming]) return;
opaque[symbols.kRequestTiming].requestSent = performanceTime(opaque[symbols.kRequestStartTime]);
debug('[%s] Request#%d send body', name, store.requestId);
if (!store.enableRequestTiming) return;
store.requestTiming.requestSent = performanceTime(store.requestStartTime);
});

// This message is published after the response headers have been received, i.e. the response has been completed.
subscribe('undici:request:headers', (message, name) => {
const { response } = message as DiagnosticsChannel.RequestHeadersMessage;
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
const store = asyncLocalStorage.getStore();
if (!store?.requestId) {
debug('[%s] store not found', name);
return;
}

// get socket from opaque
const socket = opaque[symbols.kRequestSocket];
const socket = store.requestSocket as any;
// console.log(name, opaque[symbols.kRequestId], formatSocket(socket), performanceTime(opaque[symbols.kRequestStartTime]), opaque[symbols.kEnableRequestTiming]);
if (socket) {
socket[symbols.kHandledResponses]++;
debug('[%s] Request#%d get %s response headers on Socket#%d (handled %d responses, sock: %o)',
name, opaque[symbols.kRequestId], response.statusCode, socket[symbols.kSocketId], socket[symbols.kHandledResponses],
name, store.requestId, response.statusCode,
socket[symbols.kSocketId], socket[symbols.kHandledResponses],
formatSocket(socket));
} else {
debug('[%s] Request#%d get %s response headers on Unknown Socket',
name, opaque[symbols.kRequestId], response.statusCode);
name, store.requestId, response.statusCode);
}

if (!opaque[symbols.kEnableRequestTiming]) return;
opaque[symbols.kRequestTiming].waiting = performanceTime(opaque[symbols.kRequestStartTime]);
if (!store.enableRequestTiming) return;
// console.log(name, opaque[symbols.kRequestId], 'waiting', opaque[symbols.kRequestTiming]);
store.requestTiming.waiting = performanceTime(store.requestStartTime);
// console.log(name, opaque[symbols.kRequestId], 'waiting', opaque[symbols.kRequestTiming]);
});

// This message is published after the response body and trailers have been received, i.e. the response has been completed.
subscribe('undici:request:trailers', (_message, name) => {
// const { request } = message as DiagnosticsChannel.RequestTrailersMessage;
const opaque = asyncLocalStorage.getStore();
if (!opaque?.[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
const store = asyncLocalStorage.getStore();
if (!store?.requestId) {
debug('[%s] store not found', name);
return;
}

debug('[%s] Request#%d get response body and trailers', name, opaque[symbols.kRequestId]);
debug('[%s] Request#%d get response body and trailers', name, store.requestId);

if (!opaque[symbols.kEnableRequestTiming]) return;
opaque[symbols.kRequestTiming].contentDownload = performanceTime(opaque[symbols.kRequestStartTime]);
if (!store.enableRequestTiming) return;
store.requestTiming.contentDownload = performanceTime(store.requestStartTime);
});

// This message is published if the request is going to error, but it has not errored yet.
Expand Down
2 changes: 1 addition & 1 deletion src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import {
} from './Request.js';
import { RawResponseWithMeta, SocketInfo } from './Response.js';
import { IncomingHttpHeaders } from './IncomingHttpHeaders.js';
import { asyncLocalStorage } from './asyncLocalStorage2.js';
import { asyncLocalStorage } from './asyncLocalStorage.js';

const debug = debuglog('urllib:fetch');

Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export {
} from './IncomingHttpHeaders.js';
export * from './HttpClientError.js';
export { FetchFactory, fetch } from './fetch.js';
export { asyncLocalStorage } from './asyncLocalStorage2.js';
export { asyncLocalStorage } from './asyncLocalStorage.js';

export default {
request,
Expand Down
Empty file added src/types.ts
Empty file.
8 changes: 4 additions & 4 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { performance } from 'node:perf_hooks';
import { ReadableStream } from 'node:stream/web';
import { Blob } from 'node:buffer';
import type { FixJSONCtlChars } from './Request.js';
import { SocketInfo } from './Response.js';
import type { InternalStore, SocketInfo } from './Response.js';
import symbols from './symbols.js';
import { IncomingHttpHeaders } from './IncomingHttpHeaders.js';
import type { IncomingHttpHeaders } from './IncomingHttpHeaders.js';

const JSONCtlCharsMap: Record<string, string> = {
'"': '\\"', // \u0022
Expand Down Expand Up @@ -157,8 +157,8 @@ export function isReadable(stream: any) {
&& typeof stream._readableState === 'object';
}

export function updateSocketInfo(socketInfo: SocketInfo, internalOpaque: any, err?: any) {
const socket = internalOpaque[symbols.kRequestSocket] ?? err?.[symbols.kErrorSocket];
export function updateSocketInfo(socketInfo: SocketInfo, internalStore: InternalStore, err?: any) {
const socket = internalStore.requestSocket ?? err?.[symbols.kErrorSocket];

if (socket) {
socketInfo.id = socket[symbols.kSocketId];
Expand Down
Loading

0 comments on commit 9a98389

Please sign in to comment.