diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts new file mode 100644 index 0000000..c2e2161 --- /dev/null +++ b/lib/cluster-adapter.ts @@ -0,0 +1,648 @@ +import { Adapter, BroadcastOptions, Room } from "./index"; +import { debug as debugModule } from "debug"; +import { randomBytes } from "crypto"; + +const debug = debugModule("socket.io-adapter"); +const EMITTER_UID = "emitter"; +const DEFAULT_TIMEOUT = 5000; + +function randomId() { + return randomBytes(8).toString("hex"); +} + +export interface ClusterAdapterOptions { + /** + * The number of ms between two heartbeats. + * @default 5_000 + */ + heartbeatInterval?: number; + /** + * The number of ms without heartbeat before we consider a node down. + * @default 10_000 + */ + heartbeatTimeout?: number; +} + +export enum MessageType { + INITIAL_HEARTBEAT = 1, + HEARTBEAT, + BROADCAST, + SOCKETS_JOIN, + SOCKETS_LEAVE, + DISCONNECT_SOCKETS, + FETCH_SOCKETS, + FETCH_SOCKETS_RESPONSE, + SERVER_SIDE_EMIT, + SERVER_SIDE_EMIT_RESPONSE, + BROADCAST_CLIENT_COUNT, + BROADCAST_ACK, +} + +export interface ClusterMessage { + uid: string; + type: MessageType; + data?: Record; +} + +interface ClusterRequest { + type: MessageType; + resolve: Function; + timeout: NodeJS.Timeout; + expected: number; + current: number; + responses: any[]; +} + +interface ClusterResponse { + type: MessageType; + data: { + requestId: string; + [key: string]: unknown; + }; +} + +interface ClusterAckRequest { + clientCountCallback: (clientCount: number) => void; + ack: (...args: any[]) => void; +} + +function encodeOptions(opts: BroadcastOptions) { + return { + rooms: [...opts.rooms], + except: [...opts.except], + flags: opts.flags, + }; +} + +function decodeOptions(opts): BroadcastOptions { + return { + rooms: new Set(opts.rooms), + except: new Set(opts.except), + flags: opts.flags, + }; +} + +/** + * A cluster-ready adapter. Any extending class must: + * + * - implement {@link ClusterAdapter#publishMessage} and {@link ClusterAdapter#publishResponse} + * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse} + */ +export abstract class ClusterAdapter extends Adapter { + protected readonly uid: string; + + private requests: Map = new Map(); + private ackRequests: Map = new Map(); + + protected constructor(nsp) { + super(nsp); + this.uid = randomId(); + } + + /** + * Called when receiving a message from another member of the cluster. + * + * @param message + * @param offset + * @protected + */ + protected async onMessage(message: ClusterMessage, offset?: string) { + if (message.uid === this.uid) { + return debug("ignore message from self"); + } + + debug("new event of type %d from %s", message.type, message.uid); + + switch (message.type) { + case MessageType.BROADCAST: { + const withAck = message.data.requestId !== undefined; + if (withAck) { + super.broadcastWithAck( + message.data.packet, + decodeOptions(message.data.opts), + (clientCount) => { + debug("waiting for %d client acknowledgements", clientCount); + this.publishResponse(message.uid, { + type: MessageType.BROADCAST_CLIENT_COUNT, + data: { + requestId: message.data.requestId as string, + clientCount, + }, + }); + }, + (arg) => { + debug("received acknowledgement with value %j", arg); + this.publishResponse(message.uid, { + type: MessageType.BROADCAST_ACK, + data: { + requestId: message.data.requestId as string, + packet: arg, + }, + }); + } + ); + } else { + const packet = message.data.packet; + const opts = decodeOptions(message.data.opts); + + this.addOffsetIfNecessary(packet, opts, offset); + + super.broadcast(packet, opts); + } + break; + } + + case MessageType.SOCKETS_JOIN: + super.addSockets( + decodeOptions(message.data.opts), + message.data.rooms as string[] + ); + break; + + case MessageType.SOCKETS_LEAVE: + super.delSockets( + decodeOptions(message.data.opts), + message.data.rooms as string[] + ); + break; + + case MessageType.DISCONNECT_SOCKETS: + super.disconnectSockets( + decodeOptions(message.data.opts), + message.data.close as boolean + ); + break; + + case MessageType.FETCH_SOCKETS: { + debug("calling fetchSockets with opts %j", message.data.opts); + const localSockets = await super.fetchSockets( + decodeOptions(message.data.opts) + ); + + this.publishResponse(message.uid, { + type: MessageType.FETCH_SOCKETS_RESPONSE, + data: { + requestId: message.data.requestId as string, + sockets: localSockets.map((socket) => { + // remove sessionStore from handshake, as it may contain circular references + const { sessionStore, ...handshake } = socket.handshake; + return { + id: socket.id, + handshake, + rooms: [...socket.rooms], + data: socket.data, + }; + }), + }, + }); + break; + } + + case MessageType.SERVER_SIDE_EMIT: { + const packet = message.data.packet as unknown[]; + const withAck = message.data.requestId !== undefined; + if (!withAck) { + this.nsp._onServerSideEmit(packet); + return; + } + let called = false; + const callback = (arg: any) => { + // only one argument is expected + if (called) { + return; + } + called = true; + debug("calling acknowledgement with %j", arg); + this.publishResponse(message.uid, { + type: MessageType.SERVER_SIDE_EMIT_RESPONSE, + data: { + requestId: message.data.requestId as string, + packet: arg, + }, + }); + }; + + packet.push(callback); + this.nsp._onServerSideEmit(packet); + break; + } + + default: + debug("unknown message type: %s", message.type); + } + } + + /** + * Called when receiving a response from another member of the cluster. + * + * @param response + * @protected + */ + protected onResponse(response: ClusterResponse) { + const requestId = response.data.requestId; + + debug("received response %s to request %s", response.type, requestId); + + switch (response.type) { + case MessageType.BROADCAST_CLIENT_COUNT: { + this.ackRequests + .get(requestId) + ?.clientCountCallback(response.data.clientCount as number); + break; + } + + case MessageType.BROADCAST_ACK: { + this.ackRequests.get(requestId)?.ack(response.data.packet); + break; + } + + case MessageType.FETCH_SOCKETS_RESPONSE: { + const request = this.requests.get(requestId); + + if (!request) { + return; + } + + request.current++; + (response.data.sockets as any[]).forEach((socket) => + request.responses.push(socket) + ); + + if (request.current === request.expected) { + clearTimeout(request.timeout); + request.resolve(request.responses); + this.requests.delete(requestId); + } + break; + } + + case MessageType.SERVER_SIDE_EMIT_RESPONSE: { + const request = this.requests.get(requestId); + + if (!request) { + return; + } + + request.current++; + request.responses.push(response.data.packet); + + if (request.current === request.expected) { + clearTimeout(request.timeout); + request.resolve(null, request.responses); + this.requests.delete(requestId); + } + break; + } + + default: + debug("unknown response type: %s", response.type); + } + } + + override async broadcast(packet: any, opts: BroadcastOptions) { + const onlyLocal = opts.flags?.local; + + if (!onlyLocal) { + try { + const offset = await this.publish({ + type: MessageType.BROADCAST, + data: { + packet, + opts: encodeOptions(opts), + }, + }); + this.addOffsetIfNecessary(packet, opts, offset); + } catch (e) { + return debug("error while broadcasting message: %s", e.message); + } + } + + super.broadcast(packet, opts); + } + + /** + * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it + * reconnects after a temporary disconnection. + * + * @param packet + * @param opts + * @param offset + * @private + */ + private addOffsetIfNecessary( + packet: any, + opts: BroadcastOptions, + offset: string + ) { + if (!this.nsp.server.opts.connectionStateRecovery) { + return; + } + const isEventPacket = packet.type === 2; + // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and + // restored on another server upon reconnection + const withoutAcknowledgement = packet.id === undefined; + const notVolatile = opts.flags?.volatile === undefined; + + if (isEventPacket && withoutAcknowledgement && notVolatile) { + packet.data.push(offset); + } + } + + override broadcastWithAck( + packet: any, + opts: BroadcastOptions, + clientCountCallback: (clientCount: number) => void, + ack: (...args: any[]) => void + ) { + const onlyLocal = opts?.flags?.local; + if (!onlyLocal) { + const requestId = randomId(); + + this.publish({ + type: MessageType.BROADCAST, + data: { + packet, + requestId, + opts: encodeOptions(opts), + }, + }); + + this.ackRequests.set(requestId, { + clientCountCallback, + ack, + }); + + // we have no way to know at this level whether the server has received an acknowledgement from each client, so we + // will simply clean up the ackRequests map after the given delay + setTimeout(() => { + this.ackRequests.delete(requestId); + }, opts.flags!.timeout); + } + + super.broadcastWithAck(packet, opts, clientCountCallback, ack); + } + + override addSockets(opts: BroadcastOptions, rooms: Room[]) { + super.addSockets(opts, rooms); + + const onlyLocal = opts.flags?.local; + if (onlyLocal) { + return; + } + + this.publish({ + type: MessageType.SOCKETS_JOIN, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } + + override delSockets(opts: BroadcastOptions, rooms: Room[]) { + super.delSockets(opts, rooms); + + const onlyLocal = opts.flags?.local; + if (onlyLocal) { + return; + } + + this.publish({ + type: MessageType.SOCKETS_LEAVE, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } + + override disconnectSockets(opts: BroadcastOptions, close: boolean) { + super.disconnectSockets(opts, close); + + const onlyLocal = opts.flags?.local; + if (onlyLocal) { + return; + } + + this.publish({ + type: MessageType.DISCONNECT_SOCKETS, + data: { + opts: encodeOptions(opts), + close, + }, + }); + } + + async fetchSockets(opts: BroadcastOptions): Promise { + const [localSockets, serverCount] = await Promise.all([ + super.fetchSockets(opts), + this.serverCount(), + ]); + const expectedResponseCount = serverCount - 1; + + if (opts.flags?.local || expectedResponseCount === 0) { + return localSockets; + } + + const requestId = randomId(); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + const storedRequest = this.requests.get(requestId); + if (storedRequest) { + reject( + new Error( + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` + ) + ); + this.requests.delete(requestId); + } + }, opts.flags.timeout || DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.FETCH_SOCKETS, + resolve, + timeout, + current: 0, + expected: expectedResponseCount, + responses: localSockets, + }; + this.requests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.FETCH_SOCKETS, + data: { + opts: encodeOptions(opts), + requestId, + }, + }); + }); + } + + override async serverSideEmit(packet: any[]) { + const withAck = typeof packet[packet.length - 1] === "function"; + + if (!withAck) { + return this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + packet, + }, + }); + } + + const ack = packet.pop(); + const expectedResponseCount = (await this.serverCount()) - 1; + + debug( + 'waiting for %d responses to "serverSideEmit" request', + expectedResponseCount + ); + + if (expectedResponseCount <= 0) { + return ack(null, []); + } + + const requestId = randomId(); + + const timeout = setTimeout(() => { + const storedRequest = this.requests.get(requestId); + if (storedRequest) { + ack( + new Error( + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` + ), + storedRequest.responses + ); + this.requests.delete(requestId); + } + }, DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.SERVER_SIDE_EMIT, + resolve: ack, + timeout, + current: 0, + expected: expectedResponseCount, + responses: [], + }; + this.requests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + requestId, // the presence of this attribute defines whether an acknowledgement is needed + packet, + }, + }); + } + + protected publish(message: Omit) { + return this.publishMessage({ + uid: this.uid, + ...message, + }); + } + + /** + * Send a message to the other members of the cluster. + * + * @param message + * @protected + * @return an offset, if applicable + */ + protected abstract publishMessage(message: ClusterMessage): Promise; + + /** + * Send a response to the given member of the cluster. + * + * @param requesterUid + * @param response + * @protected + */ + protected abstract publishResponse( + requesterUid: string, + response: ClusterResponse + ); +} + +export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { + private readonly _opts: Required; + + private heartbeatTimer: NodeJS.Timeout; + private nodesMap: Map = new Map(); // uid => timestamp of last message + + protected constructor(nsp, opts: ClusterAdapterOptions) { + super(nsp); + this._opts = Object.assign( + { + heartbeatInterval: 5_000, + heartbeatTimeout: 10_000, + }, + opts + ); + } + + override init(): Promise | void { + this.publish({ + type: MessageType.INITIAL_HEARTBEAT, + }); + } + + private scheduleHeartbeat() { + if (this.heartbeatTimer) { + clearTimeout(this.heartbeatTimer); + } + this.heartbeatTimer = setTimeout(() => { + this.publish({ + type: MessageType.HEARTBEAT, + }); + }, this._opts.heartbeatInterval); + } + + override close(): Promise | void { + clearTimeout(this.heartbeatTimer); + } + + override async onMessage(message: ClusterMessage, offset?: string) { + if (message.uid === this.uid) { + return debug("ignore message from self"); + } + + if (message.uid && message.uid !== EMITTER_UID) { + // we track the UID of each sender, in order to know how many servers there are in the cluster + this.nodesMap.set(message.uid, Date.now()); + } + + switch (message.type) { + case MessageType.INITIAL_HEARTBEAT: + this.publish({ + type: MessageType.HEARTBEAT, + }); + break; + case MessageType.HEARTBEAT: + // nothing to do + break; + default: + super.onMessage(message, offset); + } + } + + override serverCount(): Promise { + const now = Date.now(); + this.nodesMap.forEach((lastSeen, uid) => { + const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout; + if (nodeSeemsDown) { + debug("node %s seems down", uid); + this.nodesMap.delete(uid); + } + }); + return Promise.resolve(1 + this.nodesMap.size); + } + + override publish(message: Omit) { + this.scheduleHeartbeat(); + + return super.publish(message); + } +} diff --git a/lib/index.ts b/lib/index.ts index 83d4d67..fc56679 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -505,3 +505,5 @@ function shouldIncludePacket( const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); return included && notExcluded; } + +export { ClusterAdapter } from "./cluster-adapter"; diff --git a/package-lock.json b/package-lock.json index e4dbfec..01eb7b8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,17 +1,19 @@ { "name": "socket.io-adapter", - "version": "2.5.1", + "version": "2.5.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "socket.io-adapter", - "version": "2.5.1", + "version": "2.5.2", "license": "MIT", "dependencies": { + "debug": "~4.3.4", "ws": "~8.11.0" }, "devDependencies": { + "@types/debug": "^4.1.12", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1", @@ -403,12 +405,27 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "node_modules/@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "dependencies": { + "@types/ms": "*" + } + }, "node_modules/@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", "integrity": "sha512-/fvYntiO1GeICvqbQ3doGDIP97vWmvFt83GKguJ6prmQM2iXZfFcq6YE8KteFyRtX2/h5Hf91BYvPodJKFYv5Q==", "dev": true }, + "node_modules/@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "node_modules/@types/node": { "version": "14.11.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.11.2.tgz", @@ -766,7 +783,6 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, "dependencies": { "ms": "2.1.2" }, @@ -1607,8 +1623,7 @@ "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "node_modules/nanoid": { "version": "3.3.3", @@ -2970,12 +2985,27 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "requires": { + "@types/ms": "*" + } + }, "@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", "integrity": "sha512-/fvYntiO1GeICvqbQ3doGDIP97vWmvFt83GKguJ6prmQM2iXZfFcq6YE8KteFyRtX2/h5Hf91BYvPodJKFYv5Q==", "dev": true }, + "@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "@types/node": { "version": "14.11.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.11.2.tgz", @@ -3263,7 +3293,6 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, "requires": { "ms": "2.1.2" } @@ -3869,8 +3898,7 @@ "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "nanoid": { "version": "3.3.3", diff --git a/package.json b/package.json index 62e9071..e3efe80 100644 --- a/package.json +++ b/package.json @@ -13,9 +13,11 @@ "types": "./dist/index.d.ts", "description": "default socket.io in-memory adapter", "dependencies": { + "debug": "~4.3.4", "ws": "~8.11.0" }, "devDependencies": { + "@types/debug": "^4.1.12", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1",