Skip to content

Commit

Permalink
feat: export agent pool stats
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Dec 21, 2023
1 parent 5bb3a62 commit 196a3b6
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 24 deletions.
39 changes: 38 additions & 1 deletion src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import {
Dispatcher,
Agent,
getGlobalDispatcher,
Pool,
} from 'undici';
import { kClients } from 'undici/lib/core/symbols.js';
import { FormData as FormDataNode } from 'formdata-node';
import { FormDataEncoder } from 'form-data-encoder';
import createUserAgent from 'default-user-agent';
Expand Down Expand Up @@ -136,7 +138,7 @@ function defaultIsRetry(response: HttpClientResponse) {
return response.status >= 500;
}

type RequestContext = {
export type RequestContext = {
retries: number;
socketErrorRetries: number;
requestStartTime?: number;
Expand All @@ -157,6 +159,20 @@ export type ResponseDiagnosticsMessage = {
error?: Error;
};

export interface PoolStat {
/** Number of open socket connections in this pool. */
connected: number;
/** Number of open socket connections in this pool that do not have an active request. */
free: number;
/** Number of pending requests across all clients in this pool. */
pending: number;
/** Number of queued requests across all clients in this pool. */
queued: number;
/** Number of currently active requests across all clients in this pool. */
running: number;
/** Number of active, pending, or queued requests across all clients in this pool. */
size: number;
}

export class HttpClient extends EventEmitter {
#defaultArgs?: RequestOptions;
Expand Down Expand Up @@ -187,6 +203,27 @@ export class HttpClient extends EventEmitter {
this.#dispatcher = dispatcher;
}

getDispatcherPoolStats() {
const agent = this.getDispatcher();
// origin => Pool Instance
const clients: Map<string, WeakRef<Pool>> = agent[kClients];
const poolStatsMap: Record<string, PoolStat> = {};
for (const [ key, ref ] of clients) {
const pool = ref.deref();
const stats = pool?.stats;
if (!stats) continue;
poolStatsMap[key] = {
connected: stats.connected,
free: stats.free,
pending: stats.pending,
queued: stats.queued,
running: stats.running,
size: stats.size,
} satisfies PoolStat;
}
return poolStatsMap;
}

async request<T = any>(url: RequestURL, options?: RequestOptions) {
return await this.#requestInternal<T>(url, options);
}
Expand Down
8 changes: 4 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import LRU from 'ylru';
import { HttpClient, HEADER_USER_AGENT } from './HttpClient.js';
import { RequestOptions, RequestURL } from './Request.js';

let httpclient: HttpClient;
let httpClient: HttpClient;
const domainSocketHttpclients = new LRU(50);

export async function request<T = any>(url: RequestURL, options?: RequestOptions) {
Expand All @@ -17,10 +17,10 @@ export async function request<T = any>(url: RequestURL, options?: RequestOptions
return await domainSocketHttpclient.request<T>(url, options);
}

if (!httpclient) {
httpclient = new HttpClient({});
if (!httpClient) {
httpClient = new HttpClient({});
}
return await httpclient.request<T>(url, options);
return await httpClient.request<T>(url, options);
}

// export curl method is keep compatible with urllib.curl()
Expand Down
56 changes: 37 additions & 19 deletions test/keep-alive-header.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { strict as assert } from 'node:assert';
import { describe, it, beforeAll, afterAll } from 'vitest';
import urllib from '../src';
import { HttpClient } from '../src';
import { startServer } from './fixtures/server';
import { sleep } from './utils';

describe('keep-alive-header.test.ts', () => {
// should shorter than server keepalive timeout
// https://zhuanlan.zhihu.com/p/34147188
const keepAliveTimeout = 2000;
const httpClient = new HttpClient();
let close: any;
let _url: string;
beforeAll(async () => {
Expand All @@ -25,97 +26,114 @@ describe('keep-alive-header.test.ts', () => {
const max = process.env.TEST_KEEPALIVE_COUNT ? parseInt(process.env.TEST_KEEPALIVE_COUNT) : 3;
let otherSideClosed = 0;
let readECONNRESET = 0;
const origin = _url.substring(0, _url.length - 1);
while (count < max) {
count++;
try {
let response = await urllib.request(_url);
const task = httpClient.request(_url);
console.log('after request stats: %o', httpClient.getDispatcherPoolStats());
assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 1);
assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 1);
let response = await task;
console.log('after response stats: %o', httpClient.getDispatcherPoolStats());
assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 0);
assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 1);
// console.log(response.res.socket);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
assert(parseInt(response.headers['x-requests-persocket'] as string) > 1);
await sleep(keepAliveTimeout / 2);
response = await urllib.request(_url);
response = await httpClient.request(_url);
// console.log(response.res.socket);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
assert(parseInt(response.headers['x-requests-persocket'] as string) > 1);
console.log('before sleep stats: %o', httpClient.getDispatcherPoolStats());
// { connected: 2, free: 1, pending: 0, queued: 0, running: 0, size: 0 }
assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 2);
assert.equal(httpClient.getDispatcherPoolStats()[origin].free, 1);
await sleep(keepAliveTimeout);
console.log('after sleep stats: %o', httpClient.getDispatcherPoolStats());
// { connected: 0, free: 0, pending: 0, queued: 0, running: 0, size: 0 }
assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 0);
assert.equal(httpClient.getDispatcherPoolStats()[origin].free, 0);
assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 0);
} catch (err) {
if (err.message === 'other side closed') {
console.log(err);
Expand Down

0 comments on commit 196a3b6

Please sign in to comment.