From 8ff0413a1858e6a76675be35be28b616be3f5504 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 21 Feb 2024 14:30:46 +0100 Subject: [PATCH] refactor: use the ClusterAdapter class from socket.io-adapter package The ClusterAdapter class has been moved to [1], so that this adapter only needs to implement to pub/sub mechanism. Also, [2] should reduce the number of "timeout reached: only x responses received out of y" errors, since the fetchSockets() requests will now succeed even if a server leaves the cluster. [1]: https://github.com/socketio/socket.io-adapter [2]: https://github.com/socketio/socket.io-adapter/commit/0e23ff0cc671e3186510f7cfb8a4c1147457296f Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/6 --- lib/adapter.ts | 69 +++-- lib/cluster-adapter.ts | 556 ----------------------------------------- package-lock.json | 97 ++++--- package.json | 2 +- test/serverSideEmit.ts | 25 +- 5 files changed, 134 insertions(+), 615 deletions(-) delete mode 100644 lib/cluster-adapter.ts diff --git a/lib/adapter.ts b/lib/adapter.ts index 70a2bb7..b6b74ac 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -1,11 +1,12 @@ -import type { PrivateSessionId, Session } from "socket.io-adapter"; -import { decode, encode } from "@msgpack/msgpack"; import { - ClusterAdapter, - ClusterAdapterOptions, - ClusterMessage, - MessageType, -} from "./cluster-adapter"; + ClusterAdapterWithHeartbeat, + type ClusterMessage, + type PrivateSessionId, + type Session, + type ServerId, + type ClusterResponse, +} from "socket.io-adapter"; +import { decode, encode } from "@msgpack/msgpack"; import debugModule from "debug"; import { hasBinary, XADD, XREAD } from "./util"; @@ -13,7 +14,21 @@ const debug = debugModule("socket.io-redis-streams-adapter"); const RESTORE_SESSION_MAX_XRANGE_CALLS = 100; -export interface RedisStreamsAdapterOptions extends ClusterAdapterOptions { +// TODO ClusterAdapterOptions should be exported by the socket.io-adapter package +interface ClusterAdapterOptions { + /** + * The number of ms between two heartbeats. + * @default 5_000 + */ + heartbeatInterval?: number; + /** + * The number of ms without heartbeat before we consider a node down. + * @default 10_000 + */ + heartbeatTimeout?: number; +} + +export interface RedisStreamsAdapterOptions { /** * The name of the Redis stream. */ @@ -45,7 +60,7 @@ interface RawClusterMessage { */ export function createAdapter( redisClient: any, - opts?: RedisStreamsAdapterOptions + opts?: RedisStreamsAdapterOptions & ClusterAdapterOptions ) { const namespaceToAdapters = new Map(); const options = Object.assign( @@ -122,16 +137,20 @@ export function createAdapter( }; } -class RedisStreamsAdapter extends ClusterAdapter { +class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat { readonly #redisClient: any; readonly #opts: Required; - constructor(nsp, redisClient, opts: Required) { + constructor( + nsp, + redisClient, + opts: Required & ClusterAdapterOptions + ) { super(nsp, opts); this.#redisClient = redisClient; this.#opts = opts; - this.initHeartbeat(); + this.init(); } override doPublish(message: ClusterMessage) { @@ -145,6 +164,14 @@ class RedisStreamsAdapter extends ClusterAdapter { ); } + protected doPublishResponse( + requesterUid: ServerId, + response: ClusterResponse + ): Promise { + // @ts-ignore + return this.doPublish(response); + } + static encode(message: ClusterMessage): RawClusterMessage { const rawMessage: RawClusterMessage = { uid: message.uid, @@ -152,18 +179,23 @@ class RedisStreamsAdapter extends ClusterAdapter { type: message.type.toString(), }; + // @ts-ignore if (message.data) { + // TODO MessageType should be exported by the socket.io-adapter package const mayContainBinary = [ - MessageType.BROADCAST, - MessageType.BROADCAST_ACK, - MessageType.FETCH_SOCKETS_RESPONSE, - MessageType.SERVER_SIDE_EMIT, - MessageType.SERVER_SIDE_EMIT_RESPONSE, + 3, // MessageType.BROADCAST, + 8, // MessageType.FETCH_SOCKETS_RESPONSE, + 9, // MessageType.SERVER_SIDE_EMIT, + 10, // MessageType.SERVER_SIDE_EMIT_RESPONSE, + 12, // MessageType.BROADCAST_ACK, ].includes(message.type); + // @ts-ignore if (mayContainBinary && hasBinary(message.data)) { + // @ts-ignore rawMessage.data = Buffer.from(encode(message.data)).toString("base64"); } else { + // @ts-ignore rawMessage.data = JSON.stringify(message.data); } } @@ -191,8 +223,10 @@ class RedisStreamsAdapter extends ClusterAdapter { if (rawMessage.data) { if (rawMessage.data.startsWith("{")) { + // @ts-ignore message.data = JSON.parse(rawMessage.data); } else { + // @ts-ignore message.data = decode(Buffer.from(rawMessage.data, "base64")) as Record< string, unknown @@ -261,6 +295,7 @@ class RedisStreamsAdapter extends ClusterAdapter { if (entry.message.nsp === this.nsp.name && entry.message.type === "3") { const message = RedisStreamsAdapter.decode(entry.message); + // @ts-ignore if (shouldIncludePacket(session.rooms, message.data.opts)) { // @ts-ignore session.missedPackets.push(message.data.packet.data); diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts deleted file mode 100644 index 6d49b1c..0000000 --- a/lib/cluster-adapter.ts +++ /dev/null @@ -1,556 +0,0 @@ -import { Adapter, BroadcastOptions, Room } from "socket.io-adapter"; -import debugModule from "debug"; -import { randomId } from "./util"; - -const debug = debugModule("socket.io-adapter"); -const EMITTER_UID = "emitter"; -const DEFAULT_TIMEOUT = 5000; - -export interface ClusterAdapterOptions { - /** - * The number of ms between two heartbeats. - * @default 5_000 - */ - heartbeatInterval?: number; - /** - * The number of ms without heartbeat before we consider a node down. - * @default 10_000 - */ - heartbeatTimeout?: number; -} - -export enum MessageType { - INITIAL_HEARTBEAT = 1, - HEARTBEAT, - BROADCAST, - SOCKETS_JOIN, - SOCKETS_LEAVE, - DISCONNECT_SOCKETS, - FETCH_SOCKETS, - FETCH_SOCKETS_RESPONSE, - SERVER_SIDE_EMIT, - SERVER_SIDE_EMIT_RESPONSE, - BROADCAST_CLIENT_COUNT, - BROADCAST_ACK, -} - -export interface ClusterMessage { - uid: string; - nsp: string; - type: MessageType; - data?: Record; -} - -interface ClusterRequest { - type: MessageType; - resolve: Function; - timeout: NodeJS.Timeout; - expected: number; - current: number; - responses: any[]; -} - -interface ClusterAckRequest { - clientCountCallback: (clientCount: number) => void; - ack: (...args: any[]) => void; -} - -function encodeOptions(opts: BroadcastOptions) { - return { - rooms: [...opts.rooms], - except: [...opts.except], - flags: opts.flags, - }; -} - -function decodeOptions(opts): BroadcastOptions { - return { - rooms: new Set(opts.rooms), - except: new Set(opts.except), - flags: opts.flags, - }; -} - -export abstract class ClusterAdapter extends Adapter { - readonly #opts: Required; - readonly #uid: string; - - #heartbeatTimer: NodeJS.Timeout; - #nodesMap: Map = new Map(); // uid => timestamp of last message - #requests: Map = new Map(); - #ackRequests: Map = new Map(); - - protected constructor(nsp, opts: Required) { - super(nsp); - this.#opts = opts; - this.#uid = randomId(); - } - - protected initHeartbeat() { - this.#publish({ - type: MessageType.INITIAL_HEARTBEAT, - }); - } - - #scheduleHeartbeat() { - if (this.#heartbeatTimer) { - clearTimeout(this.#heartbeatTimer); - } - this.#heartbeatTimer = setTimeout(() => { - this.#publish({ - type: MessageType.HEARTBEAT, - }); - }, this.#opts.heartbeatInterval); - } - - override close(): Promise | void { - clearTimeout(this.#heartbeatTimer); - } - - public async onMessage(message: ClusterMessage, offset: string) { - if (message.uid === this.#uid) { - return debug("ignore message from self"); - } - - if (message.uid && message.uid !== EMITTER_UID) { - this.#nodesMap.set(message.uid, Date.now()); - } - - debug("new event of type %d from %s", message.type, message.uid); - - switch (message.type) { - case MessageType.INITIAL_HEARTBEAT: - this.#publish({ - type: MessageType.HEARTBEAT, - }); - break; - - case MessageType.BROADCAST: { - const withAck = message.data.requestId !== undefined; - if (withAck) { - super.broadcastWithAck( - message.data.packet, - decodeOptions(message.data.opts), - (clientCount) => { - debug("waiting for %d client acknowledgements", clientCount); - this.#publish({ - type: MessageType.BROADCAST_CLIENT_COUNT, - data: { - requestId: message.data.requestId, - clientCount, - }, - }); - }, - (arg) => { - debug("received acknowledgement with value %j", arg); - this.#publish({ - type: MessageType.BROADCAST_ACK, - data: { - requestId: message.data.requestId, - packet: arg, - }, - }); - } - ); - } else { - const packet = message.data.packet; - const opts = decodeOptions(message.data.opts); - - this.#addOffsetIfNecessary(packet, opts, offset); - - super.broadcast(packet, opts); - } - break; - } - - case MessageType.BROADCAST_CLIENT_COUNT: { - const request = this.#ackRequests.get(message.data.requestId as string); - request?.clientCountCallback(message.data.clientCount as number); - break; - } - - case MessageType.BROADCAST_ACK: { - const request = this.#ackRequests.get(message.data.requestId as string); - request?.ack(message.data.packet); - break; - } - - case MessageType.SOCKETS_JOIN: - super.addSockets( - decodeOptions(message.data.opts), - message.data.rooms as string[] - ); - break; - - case MessageType.SOCKETS_LEAVE: - super.delSockets( - decodeOptions(message.data.opts), - message.data.rooms as string[] - ); - break; - - case MessageType.DISCONNECT_SOCKETS: - super.disconnectSockets( - decodeOptions(message.data.opts), - message.data.close as boolean - ); - break; - - case MessageType.FETCH_SOCKETS: { - debug("calling fetchSockets with opts %j", message.data.opts); - const localSockets = await super.fetchSockets( - decodeOptions(message.data.opts) - ); - - this.#publish({ - type: MessageType.FETCH_SOCKETS_RESPONSE, - data: { - requestId: message.data.requestId, - sockets: localSockets.map((socket) => { - // remove sessionStore from handshake, as it may contain circular references - const { sessionStore, ...handshake } = socket.handshake; - return { - id: socket.id, - handshake, - rooms: [...socket.rooms], - data: socket.data, - }; - }), - }, - }); - break; - } - - case MessageType.FETCH_SOCKETS_RESPONSE: { - const requestId = message.data.requestId as string; - const request = this.#requests.get(requestId); - - if (!request) { - return; - } - - request.current++; - (message.data.sockets as any[]).forEach((socket) => - request.responses.push(socket) - ); - - if (request.current === request.expected) { - clearTimeout(request.timeout); - request.resolve(request.responses); - this.#requests.delete(requestId); - } - break; - } - - case MessageType.SERVER_SIDE_EMIT: { - const packet = message.data.packet as unknown[]; - const withAck = message.data.requestId !== undefined; - if (!withAck) { - this.nsp._onServerSideEmit(packet); - return; - } - let called = false; - const callback = (arg: any) => { - // only one argument is expected - if (called) { - return; - } - called = true; - debug("calling acknowledgement with %j", arg); - this.#publish({ - type: MessageType.SERVER_SIDE_EMIT_RESPONSE, - data: { - requestId: message.data.requestId, - packet: arg, - }, - }); - }; - - packet.push(callback); - this.nsp._onServerSideEmit(packet); - break; - } - - case MessageType.SERVER_SIDE_EMIT_RESPONSE: { - const requestId = message.data.requestId as string; - const request = this.#requests.get(requestId); - - if (!request) { - return; - } - - request.current++; - request.responses.push(message.data.packet); - - if (request.current === request.expected) { - clearTimeout(request.timeout); - request.resolve(null, request.responses); - this.#requests.delete(requestId); - } - } - } - } - - override async broadcast(packet: any, opts: BroadcastOptions) { - const onlyLocal = opts.flags?.local; - - if (!onlyLocal) { - try { - const offset = await this.#publish({ - type: MessageType.BROADCAST, - data: { - packet, - opts: encodeOptions(opts), - }, - }); - this.#addOffsetIfNecessary(packet, opts, offset); - } catch (e) { - return debug("error while broadcasting message: %s", e.message); - } - } - - super.broadcast(packet, opts); - } - - /** - * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it - * reconnects after a temporary disconnection. - * - * @param packet - * @param opts - * @param offset - * @private - */ - #addOffsetIfNecessary(packet: any, opts: BroadcastOptions, offset: string) { - if (!this.nsp.server.opts.connectionStateRecovery) { - return; - } - const isEventPacket = packet.type === 2; - // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and - // restored on another server upon reconnection - const withoutAcknowledgement = packet.id === undefined; - const notVolatile = opts.flags?.volatile === undefined; - - if (isEventPacket && withoutAcknowledgement && notVolatile) { - packet.data.push(offset); - } - } - - override broadcastWithAck( - packet: any, - opts: BroadcastOptions, - clientCountCallback: (clientCount: number) => void, - ack: (...args: any[]) => void - ) { - const onlyLocal = opts?.flags?.local; - if (!onlyLocal) { - const requestId = randomId(); - - this.#publish({ - type: MessageType.BROADCAST, - data: { - packet, - requestId, - opts: encodeOptions(opts), - }, - }); - - this.#ackRequests.set(requestId, { - clientCountCallback, - ack, - }); - - // we have no way to know at this level whether the server has received an acknowledgement from each client, so we - // will simply clean up the ackRequests map after the given delay - setTimeout(() => { - this.#ackRequests.delete(requestId); - }, opts.flags!.timeout); - } - - super.broadcastWithAck(packet, opts, clientCountCallback, ack); - } - - override serverCount(): Promise { - return Promise.resolve(1 + this.#nodesMap.size); - } - - /** - * - * @param opts - * @param rooms - */ - override addSockets(opts: BroadcastOptions, rooms: Room[]) { - super.addSockets(opts, rooms); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.#publish({ - type: MessageType.SOCKETS_JOIN, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); - } - - override delSockets(opts: BroadcastOptions, rooms: Room[]) { - super.delSockets(opts, rooms); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.#publish({ - type: MessageType.SOCKETS_LEAVE, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); - } - - override disconnectSockets(opts: BroadcastOptions, close: boolean) { - super.disconnectSockets(opts, close); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.#publish({ - type: MessageType.DISCONNECT_SOCKETS, - data: { - opts: encodeOptions(opts), - close, - }, - }); - } - - #getExpectedResponseCount() { - this.#nodesMap.forEach((lastSeen, uid) => { - const nodeSeemsDown = Date.now() - lastSeen > this.#opts.heartbeatTimeout; - if (nodeSeemsDown) { - debug("node %s seems down", uid); - this.#nodesMap.delete(uid); - } - }); - return this.#nodesMap.size; - } - - async fetchSockets(opts: BroadcastOptions): Promise { - const localSockets = await super.fetchSockets(opts); - const expectedResponseCount = this.#getExpectedResponseCount(); - - if (opts.flags?.local || expectedResponseCount === 0) { - return localSockets; - } - - const requestId = randomId(); - - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - const storedRequest = this.#requests.get(requestId); - if (storedRequest) { - reject( - new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` - ) - ); - this.#requests.delete(requestId); - } - }, opts.flags.timeout || DEFAULT_TIMEOUT); - - const storedRequest = { - type: MessageType.FETCH_SOCKETS, - resolve, - timeout, - current: 0, - expected: expectedResponseCount, - responses: localSockets, - }; - this.#requests.set(requestId, storedRequest); - - this.#publish({ - type: MessageType.FETCH_SOCKETS, - data: { - opts: encodeOptions(opts), - requestId, - }, - }); - }); - } - - override serverSideEmit(packet: any[]) { - const withAck = typeof packet[packet.length - 1] === "function"; - - if (!withAck) { - return this.#publish({ - type: MessageType.SERVER_SIDE_EMIT, - data: { - packet, - }, - }); - } - - const ack = packet.pop(); - const expectedResponseCount = this.#getExpectedResponseCount(); - - debug( - 'waiting for %d responses to "serverSideEmit" request', - expectedResponseCount - ); - - if (expectedResponseCount <= 0) { - return ack(null, []); - } - - const requestId = randomId(); - - const timeout = setTimeout(() => { - const storedRequest = this.#requests.get(requestId); - if (storedRequest) { - ack( - new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` - ), - storedRequest.responses - ); - this.#requests.delete(requestId); - } - }, DEFAULT_TIMEOUT); - - const storedRequest = { - type: MessageType.SERVER_SIDE_EMIT, - resolve: ack, - timeout, - current: 0, - expected: expectedResponseCount, - responses: [], - }; - this.#requests.set(requestId, storedRequest); - - this.#publish({ - type: MessageType.SERVER_SIDE_EMIT, - data: { - requestId, // the presence of this attribute defines whether an acknowledgement is needed - packet, - }, - }); - } - - #publish(message: Omit) { - this.#scheduleHeartbeat(); - - return this.doPublish({ - uid: this.#uid, - nsp: this.nsp.name, - ...message, - }); - } - - abstract doPublish(message: ClusterMessage): Promise; -} diff --git a/package-lock.json b/package-lock.json index 3e0c7c1..4d5d2ab 100644 --- a/package-lock.json +++ b/package-lock.json @@ -440,9 +440,9 @@ "dev": true }, "node_modules/@types/cors": { - "version": "2.8.13", - "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.13.tgz", - "integrity": "sha512-RG8AStHlUiV5ysZQKq97copd2UmVYw3/pRMLefISZ3S1hK104Cwm7iLQ3fTKx+lsUH2CE8FlLaYeEA2LSeqYUA==", + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", "dev": true, "dependencies": { "@types/node": "*" @@ -914,9 +914,9 @@ "dev": true }, "node_modules/engine.io": { - "version": "6.4.1", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.4.1.tgz", - "integrity": "sha512-JFYQurD/nbsA5BSPmbaOSLa3tSVj8L6o4srSwXXY3NqE+gGUNmmPTbhn8tjzcCtSqhFgIeqef81ngny8JM25hw==", + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", "dev": true, "dependencies": { "@types/cookie": "^0.4.1", @@ -927,11 +927,11 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.0.3", + "engine.io-parser": "~5.2.1", "ws": "~8.11.0" }, "engines": { - "node": ">=10.0.0" + "node": ">=10.2.0" } }, "node_modules/engine.io-client": { @@ -956,6 +956,15 @@ "node": ">=10.0.0" } }, + "node_modules/engine.io/node_modules/engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -2428,27 +2437,29 @@ "dev": true }, "node_modules/socket.io": { - "version": "4.6.1", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.6.1.tgz", - "integrity": "sha512-KMcaAi4l/8+xEjkRICl6ak8ySoxsYG+gG6/XfRCPJPQ/haCRIJBTL4wIl8YCsmtaBovcAXGLOShyVWQ/FG8GZA==", + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", "dev": true, "dependencies": { "accepts": "~1.3.4", "base64id": "~2.0.0", + "cors": "~2.8.5", "debug": "~4.3.2", - "engine.io": "~6.4.1", + "engine.io": "~6.5.2", "socket.io-adapter": "~2.5.2", - "socket.io-parser": "~4.2.1" + "socket.io-parser": "~4.2.4" }, "engines": { - "node": ">=10.0.0" + "node": ">=10.2.0" } }, "node_modules/socket.io-adapter": { - "version": "2.5.2", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", - "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "version": "2.5.3", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.3.tgz", + "integrity": "sha512-OtkQtynXUM0JSEwmI6YlEJ5hU9kpDUVjda0hx8QVffKhqum53xhynH8eTCyjHSfI8FiJnyfK8I3Dlc88Jr81Dg==", "dependencies": { + "debug": "~4.3.4", "ws": "~8.11.0" } }, @@ -2468,9 +2479,9 @@ } }, "node_modules/socket.io-parser": { - "version": "4.2.2", - "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.2.tgz", - "integrity": "sha512-DJtziuKypFkMMHCm2uIshOYC7QaylbtzQwiMYDuCKy3OPkjLzu4B2vAhTlqipRHHzrI0NJeBAizTK7X+6m1jVw==", + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", "dev": true, "dependencies": { "@socket.io/component-emitter": "~3.1.0", @@ -3316,9 +3327,9 @@ "dev": true }, "@types/cors": { - "version": "2.8.13", - "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.13.tgz", - "integrity": "sha512-RG8AStHlUiV5ysZQKq97copd2UmVYw3/pRMLefISZ3S1hK104Cwm7iLQ3fTKx+lsUH2CE8FlLaYeEA2LSeqYUA==", + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", "dev": true, "requires": { "@types/node": "*" @@ -3683,9 +3694,9 @@ "dev": true }, "engine.io": { - "version": "6.4.1", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.4.1.tgz", - "integrity": "sha512-JFYQurD/nbsA5BSPmbaOSLa3tSVj8L6o4srSwXXY3NqE+gGUNmmPTbhn8tjzcCtSqhFgIeqef81ngny8JM25hw==", + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", "dev": true, "requires": { "@types/cookie": "^0.4.1", @@ -3696,8 +3707,16 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.0.3", + "engine.io-parser": "~5.2.1", "ws": "~8.11.0" + }, + "dependencies": { + "engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true + } } }, "engine.io-client": { @@ -4799,24 +4818,26 @@ "dev": true }, "socket.io": { - "version": "4.6.1", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.6.1.tgz", - "integrity": "sha512-KMcaAi4l/8+xEjkRICl6ak8ySoxsYG+gG6/XfRCPJPQ/haCRIJBTL4wIl8YCsmtaBovcAXGLOShyVWQ/FG8GZA==", + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", "dev": true, "requires": { "accepts": "~1.3.4", "base64id": "~2.0.0", + "cors": "~2.8.5", "debug": "~4.3.2", - "engine.io": "~6.4.1", + "engine.io": "~6.5.2", "socket.io-adapter": "~2.5.2", - "socket.io-parser": "~4.2.1" + "socket.io-parser": "~4.2.4" } }, "socket.io-adapter": { - "version": "2.5.2", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", - "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "version": "2.5.3", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.3.tgz", + "integrity": "sha512-OtkQtynXUM0JSEwmI6YlEJ5hU9kpDUVjda0hx8QVffKhqum53xhynH8eTCyjHSfI8FiJnyfK8I3Dlc88Jr81Dg==", "requires": { + "debug": "~4.3.4", "ws": "~8.11.0" } }, @@ -4833,9 +4854,9 @@ } }, "socket.io-parser": { - "version": "4.2.2", - "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.2.tgz", - "integrity": "sha512-DJtziuKypFkMMHCm2uIshOYC7QaylbtzQwiMYDuCKy3OPkjLzu4B2vAhTlqipRHHzrI0NJeBAizTK7X+6m1jVw==", + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", "dev": true, "requires": { "@socket.io/component-emitter": "~3.1.0", diff --git a/package.json b/package.json index 8ad6de1..0b4673c 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "debug": "~4.3.1" }, "peerDependencies": { - "socket.io-adapter": "^2.5.2" + "socket.io-adapter": "^2.5.3" }, "devDependencies": { "@types/expect.js": "^0.3.29", diff --git a/test/serverSideEmit.ts b/test/serverSideEmit.ts index ae51486..e7667bc 100644 --- a/test/serverSideEmit.ts +++ b/test/serverSideEmit.ts @@ -61,9 +61,7 @@ describe("serverSideEmit()", function () { this.timeout(6000); servers[0].serverSideEmit("hello", (err: Error, response: any) => { - expect(err.message).to.be( - "timeout reached: only 1 responses received out of 2" - ); + expect(err.message).to.be("timeout reached: missing 1 responses"); expect(response).to.be.an(Array); expect(response).to.contain(2); done(); @@ -81,4 +79,25 @@ describe("serverSideEmit()", function () { // do nothing }); }); + + it("succeeds even if an instance leaves the cluster", (done) => { + servers[0].on("hello", () => { + done(new Error("should not happen")); + }); + + servers[1].on("hello", (cb) => { + cb(2); + }); + + servers[2].on("hello", (cb) => { + servers[2].of("/").adapter.close(); + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); });