Skip to content

Commit

Permalink
feat: export connection and query diagnostics_channel (#111)
Browse files Browse the repository at this point in the history
Drop Node.js < 16.17.0 support
  • Loading branch information
fengmk2 authored Jun 10, 2023
1 parent 586317b commit 64aa75d
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 17 deletions.
5 changes: 4 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"extends": "eslint-config-egg/typescript"
"extends": [
"eslint-config-egg/typescript",
"eslint-config-egg/lib/rules/enforce-node-prefix"
]
}
2 changes: 1 addition & 1 deletion .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ jobs:
uses: node-modules/github-actions/.github/workflows/node-test-mysql.yml@master
with:
os: 'ubuntu-latest'
version: '16, 18, 20'
version: '16.17.0, 16, 18, 20'
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"version": "6.1.0",
"description": "Aliyun RDS client",
"main": "lib/client.js",
"types": "lib/client.d.ts",
"files": [
"lib"
],
Expand Down Expand Up @@ -43,7 +44,7 @@
"mysql"
],
"engines": {
"node": ">= 14.17.0"
"node": ">= 16.17.0"
},
"author": "fengmk2 <[email protected]> (https://github.com/fengmk2)",
"license": "MIT"
Expand Down
35 changes: 35 additions & 0 deletions src/channels.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import diagnosticsChannel from 'node:diagnostics_channel';
import type { PoolConnectionPromisify } from './types';
import type { RDSClient } from './client';

export default {
// pool events https://github.com/mysqljs/mysql#pool-events
connectionNew: diagnosticsChannel.channel('ali-rds:connection:new'),
connectionAcquire: diagnosticsChannel.channel('ali-rds:connection:acquire'),
connectionRelease: diagnosticsChannel.channel('ali-rds:connection:release'),
connectionEnqueue: diagnosticsChannel.channel('ali-rds:connection:enqueue'),
// query
queryStart: diagnosticsChannel.channel('ali-rds:query:start'),
queryEnd: diagnosticsChannel.channel('ali-rds:query:end'),
};

export interface ConnectionMessage {
client: RDSClient;
connection: PoolConnectionPromisify;
}

export interface ConnectionEnqueueMessage {
client: RDSClient;
}

export interface QueryStartMessage {
connection: PoolConnectionPromisify;
sql: string;
}

export interface QueryEndMessage {
connection: PoolConnectionPromisify;
sql: string;
duration: number;
error?: Error;
}
42 changes: 36 additions & 6 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { RDSConnection } from './connection';
import { RDSTransaction } from './transaction';
import { RDSPoolConfig } from './PoolConfig';
import literals from './literals';
import channels from './channels';
import type { ConnectionMessage, ConnectionEnqueueMessage } from './channels';

interface PoolPromisify extends Omit<Pool, 'query'> {
query(sql: string): Promise<any>;
Expand Down Expand Up @@ -39,8 +41,8 @@ export class RDSClient extends Operator {
// get connection options from getConnectionConfig method every time
if (mysqlOptions.getConnectionConfig) {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const Pool = require('mysql/lib/Pool');
this.#pool = new Pool({
const MySQLPool = require('mysql/lib/Pool');
this.#pool = new MySQLPool({
config: new RDSPoolConfig(mysqlOptions, mysqlOptions.getConnectionConfig),
});
// override _needsChangeUser to return false
Expand All @@ -57,11 +59,39 @@ export class RDSClient extends Operator {
});
this.#connectionStorage = connectionStorage || new AsyncLocalStorage();
this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY;
// https://github.com/mysqljs/mysql#pool-events
this.#pool.on('connection', (connection: PoolConnectionPromisify) => {
channels.connectionNew.publish({
client: this,
connection,
} as ConnectionMessage);
});
this.#pool.on('enqueue', () => {
channels.connectionEnqueue.publish({
client: this,
} as ConnectionEnqueueMessage);
});
this.#pool.on('acquire', (connection: PoolConnectionPromisify) => {
channels.connectionAcquire.publish({
client: this,
connection,
} as ConnectionMessage);
});
this.#pool.on('release', (connection: PoolConnectionPromisify) => {
channels.connectionRelease.publish({
client: this,
connection,
} as ConnectionMessage);
});
}

// impl Operator._query
protected async _query(sql: string) {
return await this.#pool.query(sql);
async query<T = any>(sql: string, values?: object | any[]): Promise<T> {
const conn = await this.getConnection();
try {
return await conn.query(sql, values);
} finally {
conn.release();
}
}

get pool() {
Expand Down Expand Up @@ -197,7 +227,7 @@ export class RDSClient extends Operator {
* @param scope - scope with code
* @return {Object} - scope return result
*/
async beginTransactionScope(scope: TransactionScope) {
async beginTransactionScope(scope: TransactionScope): Promise<any> {
let ctx = this.#connectionStorage.getStore();
if (ctx) {
return await this.#beginTransactionScope(scope, ctx);
Expand Down
2 changes: 1 addition & 1 deletion src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const kWrapToRDS = Symbol('kWrapToRDS');
export class RDSConnection extends Operator {
conn: PoolConnectionPromisify;
constructor(conn: PoolConnectionPromisify) {
super();
super(conn);
this.conn = conn;
if (!this.conn[kWrapToRDS]) {
[
Expand Down
30 changes: 27 additions & 3 deletions src/operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,33 @@ import {
LockResult, LockTableOption,
SelectOption,
UpdateOption, UpdateResult, UpdateRow,
PoolConnectionPromisify,
} from './types';
import channels from './channels';
import type { QueryStartMessage, QueryEndMessage } from './channels';

const debug = debuglog('ali-rds:operator');

/**
* Operator Interface
*/
export abstract class Operator {
#connection: PoolConnectionPromisify;
constructor(connection?: PoolConnectionPromisify) {
if (connection) {
this.#connection = connection;
}
}

protected beforeQueryHandlers: BeforeQueryHandler[] = [];
protected afterQueryHandlers: AfterQueryHandler[] = [];

get literals() { return literals; }

get threadId() {
return this.#connection?.threadId;
}

beforeQuery(beforeQueryHandler: BeforeQueryHandler) {
this.beforeQueryHandlers.push(beforeQueryHandler);
}
Expand Down Expand Up @@ -66,9 +80,13 @@ export abstract class Operator {
}
}
debug('query %o', sql);
const queryStart = Date.now();
const queryStart = performance.now();
let rows: any;
let lastError: Error | undefined;
channels.queryStart.publish({
sql,
connection: this.#connection,
} as QueryStartMessage);
try {
rows = await this._query(sql);
if (Array.isArray(rows)) {
Expand All @@ -83,10 +101,16 @@ export abstract class Operator {
debug('query error: %o', err);
throw err;
} finally {
const duration = Math.floor((performance.now() - queryStart) * 1000) / 1000;
channels.queryEnd.publish({
sql,
connection: this.#connection,
duration,
error: lastError,
} as QueryEndMessage);
if (this.afterQueryHandlers.length > 0) {
const execDuration = Date.now() - queryStart;
for (const afterQueryHandler of this.afterQueryHandlers) {
afterQueryHandler(sql, rows, execDuration, lastError);
afterQueryHandler(sql, rows, duration, lastError);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export class RDSTransaction extends Operator {
isRollback = false;
conn: RDSConnection | null;
constructor(conn: RDSConnection) {
super();
super(conn.conn);
this.conn = conn;
}

Expand Down
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AsyncLocalStorage } from 'async_hooks';
import { AsyncLocalStorage } from 'node:async_hooks';
import type { PoolConnection, PoolConfig, ConnectionConfig } from 'mysql';
import { RDSTransaction } from './transaction';

Expand Down
40 changes: 39 additions & 1 deletion test/PoolConfig.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
import { strict as assert } from 'node:assert';
import fs from 'node:fs/promises';
import path from 'node:path';
import diagnosticsChannel from 'node:diagnostics_channel';
import mm from 'mm';
import config from './config';
import { RDSClient } from '../src/client';
import type { ConnectionMessage, QueryEndMessage } from '../src/channels';

describe('test/PoolConfig.test.ts', () => {
const prefix = 'prefix-PoolConfig' + process.version + '-';
const table = 'ali-sdk-test-user';
let db: RDSClient;
let index = 0;
let newConnectionCount = 0;
let newConnectionCountByDiagnosticsChannel = 0;
let queryCount = 0;
let queryErrorCount = 0;
let end = false;

before(async () => {
diagnosticsChannel.subscribe('ali-rds:connection:new', message => {
if (end) return;
const { connection } = message as ConnectionMessage;
console.log('[diagnosticsChannel] connection threadId %o created', connection.threadId);
newConnectionCountByDiagnosticsChannel++;
});
diagnosticsChannel.subscribe('ali-rds:query:end', message => {
if (end) return;
const { connection, sql, duration, error } = message as QueryEndMessage;
console.log('[diagnosticsChannel] connection threadId %o query %o, duration: %oms, error: %o',
connection.threadId, sql, duration, error);
queryCount++;
if (error) {
queryErrorCount++;
}
});

db = new RDSClient({
// test getConnectionConfig
connectionLimit: 2,
Expand Down Expand Up @@ -44,7 +68,9 @@ describe('test/PoolConfig.test.ts', () => {
});

after(async () => {
return await db.end();
await db.end();
assert.equal(queryCount, 7);
end = true;
});

afterEach(() => {
Expand Down Expand Up @@ -79,6 +105,18 @@ describe('test/PoolConfig.test.ts', () => {
assert(Array.isArray(results[2]));
assert.equal(index, 3);
assert.equal(newConnectionCount, 2);
assert.equal(newConnectionCountByDiagnosticsChannel, 2);
});

it('should query error', async () => {
assert.equal(queryErrorCount, 0);
await assert.rejects(async () => {
await db.query('show tables wrong sql');
}, (err: Error) => {
assert.match(err.message, /You have an error in your SQL syntax/);
return true;
});
assert.equal(queryErrorCount, 1);
});
});
});
7 changes: 6 additions & 1 deletion test/client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AsyncLocalStorage } from 'async_hooks';
import { AsyncLocalStorage } from 'node:async_hooks';
import { strict as assert } from 'node:assert';
import fs from 'node:fs/promises';
import path from 'node:path';
Expand Down Expand Up @@ -1315,6 +1315,8 @@ describe('test/client.test.ts', () => {
assert(result.insertId > 0);
result = await conn.delete(table);
assert(result.affectedRows > 0);
assert.equal(typeof conn.threadId, 'number');
assert(conn.threadId! > 0);
conn.release();
});
});
Expand Down Expand Up @@ -1417,6 +1419,9 @@ describe('test/client.test.ts', () => {
assert.equal(count, 1);

await db.beginTransactionScope(async conn => {
assert.equal(typeof conn.threadId, 'number');
assert(conn.threadId! > 0);
assert.equal(conn.threadId, conn.conn!.threadId);
await conn.query(`insert into ??(name, email, gmt_create, gmt_modified)
values(?, ?, now(), now())`,
[ table, prefix + 'beginTransactionScope1', prefix + '[email protected]' ]);
Expand Down
7 changes: 7 additions & 0 deletions test/operator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ describe('test/operator.test.ts', () => {
});
});

describe('get threadId()', () => {
it('should get return undefined when connection not exists', () => {
const op = new CustomOperator();
assert.equal(op.threadId, undefined);
});
});

describe('format()', () => {
it('should get literal string', () => {
const op = new CustomOperator();
Expand Down

0 comments on commit 64aa75d

Please sign in to comment.