From fdf7f0799b683269ace9714fe186af43a6c2cfb7 Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Wed, 4 Dec 2024 09:52:35 +0800 Subject: [PATCH] fix: should got undici:client:sendHeaders message on H2 closes https://github.com/node-modules/urllib/issues/510 --- src/HttpClient.ts | 4 +- src/utils.ts | 6 +- test/diagnostics_channel.test.ts | 139 ++++++++++++++++++++++++++++++- 3 files changed, 143 insertions(+), 6 deletions(-) diff --git a/src/HttpClient.ts b/src/HttpClient.ts index 1d0f0b1e..f2e88b4c 100644 --- a/src/HttpClient.ts +++ b/src/HttpClient.ts @@ -680,8 +680,8 @@ export class HttpClient extends EventEmitter { res, }; - debug('Request#%d got response, status: %s, headers: %j, timing: %j', - requestId, res.status, res.headers, res.timing); + debug('Request#%d got response, status: %s, headers: %j, timing: %j, socket: %j', + requestId, res.status, res.headers, res.timing, res.socket); if (args.retry > 0 && requestContext.retries < args.retry) { const isRetry = args.isRetry ?? defaultIsRetry; diff --git a/src/utils.ts b/src/utils.ts index 04f7a688..ebc8b2ab 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -172,13 +172,13 @@ export function updateSocketInfo(socketInfo: SocketInfo, internalOpaque: any, er socketInfo.remotePort = socket.remotePort; socketInfo.remoteFamily = socket.remoteFamily; } + if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) { + socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses; + } socketInfo.bytesRead = socket.bytesRead; socketInfo.bytesWritten = socket.bytesWritten; if (socket[symbols.kSocketConnectErrorTime]) { socketInfo.connectErrorTime = socket[symbols.kSocketConnectErrorTime]; - if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) { - socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses; - } socketInfo.connectProtocol = socket[symbols.kSocketConnectProtocol]; socketInfo.connectHost = socket[symbols.kSocketConnectHost]; socketInfo.connectPort = socket[symbols.kSocketConnectPort]; diff --git a/test/diagnostics_channel.test.ts b/test/diagnostics_channel.test.ts index d04ec466..f184b87c 100644 --- a/test/diagnostics_channel.test.ts +++ b/test/diagnostics_channel.test.ts @@ -1,8 +1,11 @@ import { strict as assert } from 'node:assert'; import diagnosticsChannel from 'node:diagnostics_channel'; import { setTimeout as sleep } from 'node:timers/promises'; +import { createSecureServer } from 'node:http2'; +import { once } from 'node:events'; import { describe, it, beforeEach, afterEach } from 'vitest'; -import urllib from '../src/index.js'; +import pem from 'https-pem'; +import urllib, { HttpClient } from '../src/index.js'; import type { RequestDiagnosticsMessage, ResponseDiagnosticsMessage, @@ -138,6 +141,140 @@ describe('diagnostics_channel.test.ts', () => { diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage); }); + it('should support trace socket info with H2 by undici:client:sendHeaders and undici:request:trailers', async () => { + const server = createSecureServer(pem); + server.on('stream', (stream, headers) => { + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200, + }); + if (headers[':method'] !== 'HEAD') { + stream.end('hello h2!'); + } + }); + + server.listen(0); + await once(server, 'listening'); + + const kRequests = Symbol('requests'); + let lastRequestOpaque: any; + let kHandler: any; + function onMessage(message: any, name: string | symbol) { + if (name === 'undici:client:connected') { + // console.log('%s %j', name, message.connectParams); + message.socket[kRequests] = 0; + return; + } + const { request, socket } = message; + if (!kHandler) { + const symbols = Object.getOwnPropertySymbols(request); + for (const symbol of symbols) { + if (symbol.description === 'handler') { + kHandler = symbol; + break; + } + } + } + const handler = request[kHandler]; + let opaque = handler.opaque || handler.opts?.opaque; + assert(opaque); + opaque = opaque[symbols.kRequestOriginalOpaque]; + if (opaque && name === 'undici:client:sendHeaders' && socket) { + socket[kRequests]++; + opaque.tracer.socket = { + localAddress: socket.localAddress, + localPort: socket.localPort, + remoteAddress: socket.remoteAddress, + remotePort: socket.remotePort, + remoteFamily: socket.remoteFamily, + timeout: socket.timeout, + bytesWritten: socket.bytesWritten, + bytesRead: socket.bytesRead, + requests: socket[kRequests], + }; + } + // console.log('%s emit, %s %s, opaque: %j', name, request.method, request.origin, opaque); + lastRequestOpaque = opaque; + // console.log(request); + } + diagnosticsChannel.subscribe('undici:client:connected', onMessage); + diagnosticsChannel.subscribe('undici:client:sendHeaders', onMessage); + diagnosticsChannel.subscribe('undici:request:trailers', onMessage); + + const httpClient = new HttpClient({ + allowH2: true, + connect: { + rejectUnauthorized: false, + }, + }); + + let traceId = `mock-traceid-${Date.now()}`; + _url = `https://localhost:${server.address().port}`; + let response = await httpClient.request(`${_url}?head=true`, { + method: 'HEAD', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert(response.url.startsWith(_url)); + assert(!response.redirected); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 1); + + // HEAD, GET 请求都走同一个 http2 session socket + await sleep(1); + traceId = `mock-traceid-${Date.now()}`; + response = await httpClient.request(_url, { + method: 'GET', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 2); + + await sleep(1); + traceId = `mock-traceid-${Date.now()}`; + response = await httpClient.request(_url, { + method: 'GET', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 3); + + // socket 复用 1000 次 + let count = 1000; + while (count-- > 0) { + await sleep(1); + traceId = `mock-traceid-${Date.now()}`; + response = await httpClient.request(`${_url}?count=${count}`, { + method: 'GET', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 3 + 1000 - count); + } + assert.equal(lastRequestOpaque.tracer.socket.requests, 1003); + + diagnosticsChannel.unsubscribe('undici:client:connected', onMessage); + diagnosticsChannel.unsubscribe('undici:client:sendHeaders', onMessage); + diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage); + server.close(); + }); + it('should support trace request by urllib:request and urllib:response', async () => { let lastRequestOpaque: any; let socket: any;