Skip to content

Commit

Permalink
fix: should got undici:client:sendHeaders message on H2
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Dec 4, 2024
1 parent 104664d commit ba54c54
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,8 @@ export class HttpClient extends EventEmitter {
res,
};

debug('Request#%d got response, status: %s, headers: %j, timing: %j',
requestId, res.status, res.headers, res.timing);
debug('Request#%d got response, status: %s, headers: %j, timing: %j, socket: %j',
requestId, res.status, res.headers, res.timing, res.socket);

if (args.retry > 0 && requestContext.retries < args.retry) {
const isRetry = args.isRetry ?? defaultIsRetry;
Expand Down
6 changes: 3 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ export function updateSocketInfo(socketInfo: SocketInfo, internalOpaque: any, er
socketInfo.remotePort = socket.remotePort;
socketInfo.remoteFamily = socket.remoteFamily;
}
if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) {
socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses;
}
socketInfo.bytesRead = socket.bytesRead;
socketInfo.bytesWritten = socket.bytesWritten;
if (socket[symbols.kSocketConnectErrorTime]) {
socketInfo.connectErrorTime = socket[symbols.kSocketConnectErrorTime];
if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) {
socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses;
}
socketInfo.connectProtocol = socket[symbols.kSocketConnectProtocol];
socketInfo.connectHost = socket[symbols.kSocketConnectHost];
socketInfo.connectPort = socket[symbols.kSocketConnectPort];
Expand Down
139 changes: 138 additions & 1 deletion test/diagnostics_channel.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { strict as assert } from 'node:assert';
import diagnosticsChannel from 'node:diagnostics_channel';
import { setTimeout as sleep } from 'node:timers/promises';
import { createSecureServer } from 'node:http2';
import { once } from 'node:events';
import { describe, it, beforeEach, afterEach } from 'vitest';
import urllib from '../src/index.js';
import pem from 'https-pem';
import urllib, { HttpClient } from '../src/index.js';
import type {
RequestDiagnosticsMessage,
ResponseDiagnosticsMessage,
Expand Down Expand Up @@ -138,6 +141,140 @@ describe('diagnostics_channel.test.ts', () => {
diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage);
});

it('should support trace socket info with H2 by undici:client:sendHeaders and undici:request:trailers', async () => {
const server = createSecureServer(pem);
server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': 'hello',
':status': 200,
});
if (headers[':method'] !== 'HEAD') {
stream.end('hello h2!');
}
});

server.listen(0);
await once(server, 'listening');

const kRequests = Symbol('requests');
let lastRequestOpaque: any;
let kHandler: any;
function onMessage(message: any, name: string | symbol) {
if (name === 'undici:client:connected') {
// console.log('%s %j', name, message.connectParams);
message.socket[kRequests] = 0;
return;
}
const { request, socket } = message;
if (!kHandler) {
const symbols = Object.getOwnPropertySymbols(request);
for (const symbol of symbols) {
if (symbol.description === 'handler') {
kHandler = symbol;
break;
}
}
}
const handler = request[kHandler];
let opaque = handler.opaque || handler.opts?.opaque;
assert(opaque);
opaque = opaque[symbols.kRequestOriginalOpaque];
if (opaque && name === 'undici:client:sendHeaders' && socket) {
socket[kRequests]++;
opaque.tracer.socket = {
localAddress: socket.localAddress,
localPort: socket.localPort,
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort,
remoteFamily: socket.remoteFamily,
timeout: socket.timeout,
bytesWritten: socket.bytesWritten,
bytesRead: socket.bytesRead,
requests: socket[kRequests],
};
}
// console.log('%s emit, %s %s, opaque: %j', name, request.method, request.origin, opaque);
lastRequestOpaque = opaque;
// console.log(request);
}
diagnosticsChannel.subscribe('undici:client:connected', onMessage);
diagnosticsChannel.subscribe('undici:client:sendHeaders', onMessage);
diagnosticsChannel.subscribe('undici:request:trailers', onMessage);

const httpClient = new HttpClient({
allowH2: true,
connect: {
rejectUnauthorized: false,
},
});

let traceId = `mock-traceid-${Date.now()}`;
_url = `https://localhost:${server.address().port}`;
let response = await httpClient.request(`${_url}?head=true`, {
method: 'HEAD',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert(response.url.startsWith(_url));
assert(!response.redirected);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 1);

// HEAD, GET 请求都走同一个 http2 session socket
await sleep(1);
traceId = `mock-traceid-${Date.now()}`;
response = await httpClient.request(_url, {
method: 'GET',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 2);

await sleep(1);
traceId = `mock-traceid-${Date.now()}`;
response = await httpClient.request(_url, {
method: 'GET',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 3);

// socket 复用 1000 次
let count = 1000;
while (count-- > 0) {
await sleep(1);
traceId = `mock-traceid-${Date.now()}`;
response = await httpClient.request(`${_url}?count=${count}`, {
method: 'GET',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 3 + 1000 - count);
}
assert.equal(lastRequestOpaque.tracer.socket.requests, 1003);

diagnosticsChannel.unsubscribe('undici:client:connected', onMessage);
diagnosticsChannel.unsubscribe('undici:client:sendHeaders', onMessage);
diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage);
server.close();
});

it('should support trace request by urllib:request and urllib:response', async () => {
let lastRequestOpaque: any;
let socket: any;
Expand Down

0 comments on commit ba54c54

Please sign in to comment.