From 8fce41b488130f06db761539832463f602d630a1 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 21 Feb 2024 11:30:31 +0100 Subject: [PATCH] test(cluster): init the test suite for the clustered adapter --- lib/cluster-adapter.ts | 107 +++++++---- lib/index.ts | 9 +- package-lock.json | 414 ++++++++++++++++++++++++++++++++++++++- package.json | 5 +- test/cluster-adapter.ts | 416 ++++++++++++++++++++++++++++++++++++++++ test/util.ts | 15 ++ 6 files changed, 924 insertions(+), 42 deletions(-) create mode 100644 test/cluster-adapter.ts create mode 100644 test/util.ts diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts index fc63a75..33a59a0 100644 --- a/lib/cluster-adapter.ts +++ b/lib/cluster-adapter.ts @@ -20,14 +20,14 @@ type DistributiveOmit = T extends any : never; /** - * The unique ID of this server + * The unique ID of a server */ -type ServerId = string; +export type ServerId = string; /** - * The id of a message (for the Connection state recovery) + * The unique ID of a message (for the connection state recovery feature) */ -type Offset = string; +export type Offset = string; export interface ClusterAdapterOptions { /** @@ -101,7 +101,7 @@ export type ClusterMessage = { type: MessageType.SERVER_SIDE_EMIT; data: { requestId?: string; - packet: unknown; + packet: any[]; }; } ); @@ -196,10 +196,15 @@ export abstract class ClusterAdapter extends Adapter { */ protected async onMessage(message: ClusterMessage, offset?: string) { if (message.uid === this.uid) { - return debug("ignore message from self"); + return debug("[%s] ignore message from self", this.uid); } - debug("new event of type %d from %s", message.type, message.uid); + debug( + "[%s] new event of type %d from %s", + this.uid, + message.type, + message.uid + ); switch (message.type) { case MessageType.BROADCAST: { @@ -209,7 +214,11 @@ export abstract class ClusterAdapter extends Adapter { message.data.packet, decodeOptions(message.data.opts), (clientCount) => { - debug("waiting for %d client acknowledgements", clientCount); + debug( + "[%s] waiting for %d client acknowledgements", + this.uid, + clientCount + ); this.publishResponse(message.uid, { type: MessageType.BROADCAST_CLIENT_COUNT, data: { @@ -219,7 +228,11 @@ export abstract class ClusterAdapter extends Adapter { }); }, (arg) => { - debug("received acknowledgement with value %j", arg); + debug( + "[%s] received acknowledgement with value %j", + this.uid, + arg + ); this.publishResponse(message.uid, { type: MessageType.BROADCAST_ACK, data: { @@ -256,7 +269,11 @@ export abstract class ClusterAdapter extends Adapter { break; case MessageType.FETCH_SOCKETS: { - debug("calling fetchSockets with opts %j", message.data.opts); + debug( + "[%s] calling fetchSockets with opts %j", + this.uid, + message.data.opts + ); const localSockets = await super.fetchSockets( decodeOptions(message.data.opts) ); @@ -281,7 +298,7 @@ export abstract class ClusterAdapter extends Adapter { } case MessageType.SERVER_SIDE_EMIT: { - const packet = message.data.packet as unknown[]; + const packet = message.data.packet; const withAck = message.data.requestId !== undefined; if (!withAck) { this.nsp._onServerSideEmit(packet); @@ -294,7 +311,7 @@ export abstract class ClusterAdapter extends Adapter { return; } called = true; - debug("calling acknowledgement with %j", arg); + debug("[%s] calling acknowledgement with %j", this.uid, arg); this.publishResponse(message.uid, { type: MessageType.SERVER_SIDE_EMIT_RESPONSE, data: { @@ -304,8 +321,7 @@ export abstract class ClusterAdapter extends Adapter { }); }; - packet.push(callback); - this.nsp._onServerSideEmit(packet); + this.nsp._onServerSideEmit([...packet, callback]); break; } @@ -323,7 +339,7 @@ export abstract class ClusterAdapter extends Adapter { break; default: - debug("unknown message type: %s", message.type); + debug("[%s] unknown message type: %s", this.uid, message.type); } } @@ -336,7 +352,12 @@ export abstract class ClusterAdapter extends Adapter { protected onResponse(response: ClusterResponse) { const requestId = response.data.requestId; - debug("received response %s to request %s", response.type, requestId); + debug( + "[%s] received response %s to request %s", + this.uid, + response.type, + requestId + ); switch (response.type) { case MessageType.BROADCAST_CLIENT_COUNT: { @@ -391,7 +412,7 @@ export abstract class ClusterAdapter extends Adapter { default: // @ts-ignore - debug("unknown response type: %s", response.type); + debug("[%s] unknown response type: %s", this.uid, response.type); } } @@ -409,7 +430,11 @@ export abstract class ClusterAdapter extends Adapter { }); this.addOffsetIfNecessary(packet, opts, offset); } catch (e) { - return debug("error while broadcasting message: %s", e.message); + return debug( + "[%s] error while broadcasting message: %s", + this.uid, + e.message + ); } } @@ -454,6 +479,11 @@ export abstract class ClusterAdapter extends Adapter { if (!onlyLocal) { const requestId = randomId(); + this.ackRequests.set(requestId, { + clientCountCallback, + ack, + }); + this.publish({ type: MessageType.BROADCAST, data: { @@ -463,11 +493,6 @@ export abstract class ClusterAdapter extends Adapter { }, }); - this.ackRequests.set(requestId, { - clientCountCallback, - ack, - }); - // we have no way to know at this level whether the server has received an acknowledgement from each client, so we // will simply clean up the ackRequests map after the given delay setTimeout(() => { @@ -591,7 +616,8 @@ export abstract class ClusterAdapter extends Adapter { const expectedResponseCount = (await this.serverCount()) - 1; debug( - 'waiting for %d responses to "serverSideEmit" request', + '[%s] waiting for %d responses to "serverSideEmit" request', + this.uid, expectedResponseCount ); @@ -637,7 +663,7 @@ export abstract class ClusterAdapter extends Adapter { message: DistributiveOmit ): void { this.publishAndReturnOffset(message).catch((err) => { - debug("error while publishing message: %s", err); + debug("[%s] error while publishing message: %s", this.uid, err); }); } @@ -666,7 +692,7 @@ export abstract class ClusterAdapter extends Adapter { (response as ClusterResponse).nsp = this.nsp.name; this.doPublishResponse(requesterUid, response as ClusterResponse).catch( (err) => { - debug("error while publishing response: %s", err); + debug("[%s] error while publishing response: %s", this.uid, err); } ); } @@ -714,7 +740,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { this.nodesMap.forEach((lastSeen, uid) => { const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout; if (nodeSeemsDown) { - debug("node %s seems down", uid); + debug("[%s] node %s seems down", this.uid, uid); this.removeNode(uid); } }); @@ -751,7 +777,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { override async onMessage(message: ClusterMessage, offset?: string) { if (message.uid === this.uid) { - return debug("ignore message from self"); + return debug("[%s] ignore message from self", this.uid); } if (message.uid && message.uid !== EMITTER_UID) { @@ -759,6 +785,13 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { this.nodesMap.set(message.uid, Date.now()); } + debug( + "[%s] new event of type %d from %s", + this.uid, + message.type, + message.uid + ); + switch (message.type) { case MessageType.INITIAL_HEARTBEAT: this.publish({ @@ -777,14 +810,6 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { } override serverCount(): Promise { - const now = Date.now(); - this.nodesMap.forEach((lastSeen, uid) => { - const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout; - if (nodeSeemsDown) { - debug("node %s seems down", uid); - this.nodesMap.delete(uid); - } - }); return Promise.resolve(1 + this.nodesMap.size); } @@ -810,7 +835,8 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { const expectedResponseCount = this.nodesMap.size; debug( - 'waiting for %d responses to "serverSideEmit" request', + '[%s] waiting for %d responses to "serverSideEmit" request', + this.uid, expectedResponseCount ); @@ -905,7 +931,12 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { override onResponse(response: ClusterResponse) { const requestId = response.data.requestId; - debug("received response %s to request %s", response.type, requestId); + debug( + "[%s] received response %s to request %s", + this.uid, + response.type, + requestId + ); switch (response.type) { case MessageType.FETCH_SOCKETS_RESPONSE: { diff --git a/lib/index.ts b/lib/index.ts index fc56679..7743723 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -506,4 +506,11 @@ function shouldIncludePacket( return included && notExcluded; } -export { ClusterAdapter } from "./cluster-adapter"; +export { + ClusterAdapter, + ClusterAdapterWithHeartbeat, + ClusterMessage, + ClusterResponse, + ServerId, + Offset, +} from "./cluster-adapter"; diff --git a/package-lock.json b/package-lock.json index 01eb7b8..9a49af0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,12 +14,15 @@ }, "devDependencies": { "@types/debug": "^4.1.12", + "@types/expect.js": "^0.3.32", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1", "mocha": "^10.2.0", "nyc": "^15.1.0", "prettier": "^2.8.1", + "socket.io": "^4.7.4", + "socket.io-client": "^4.7.4", "ts-node": "^10.9.1", "typescript": "^4.9.4" } @@ -375,6 +378,12 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", + "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==", + "dev": true + }, "node_modules/@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -405,6 +414,21 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "node_modules/@types/cookie": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", + "integrity": "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q==", + "dev": true + }, + "node_modules/@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/debug": { "version": "4.1.12", "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", @@ -414,6 +438,12 @@ "@types/ms": "*" } }, + "node_modules/@types/expect.js": { + "version": "0.3.32", + "resolved": "https://registry.npmjs.org/@types/expect.js/-/expect.js-0.3.32.tgz", + "integrity": "sha512-vUK0KSPtQTeANmOfiqsNNA/8hJ0xz8gOyB0ZhYRtoYOZBtZYir7ujNGr6GKw2hJAjltW0ocCNIGn9YxIXTT99Q==", + "dev": true + }, "node_modules/@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", @@ -432,6 +462,19 @@ "integrity": "sha512-jiE3QIxJ8JLNcb1Ps6rDbysDhN4xa8DJJvuC9prr6w+1tIh+QAbYyNF3tyiZNLDBIuBCf4KEcV2UvQm/V60xfA==", "dev": true }, + "node_modules/accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "dev": true, + "dependencies": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/acorn": { "version": "8.8.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz", @@ -539,6 +582,15 @@ "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true }, + "node_modules/base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "dev": true, + "engines": { + "node": "^4.5.0 || >= 5.9" + } + }, "node_modules/binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -759,6 +811,28 @@ "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", "dev": true }, + "node_modules/cookie": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", + "integrity": "sha512-aSWTXFzaKWkvHO1Ny/s+ePFpvKsPnjc551iI41v3ny/ow6tBG5Vd+FuqGNhh1LxOmVzOlGUriIlOaokOvhaStA==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/cors": { + "version": "2.8.5", + "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", + "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", + "dev": true, + "dependencies": { + "object-assign": "^4", + "vary": "^1" + }, + "engines": { + "node": ">= 0.10" + } + }, "node_modules/create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -831,6 +905,49 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "node_modules/engine.io": { + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", + "dev": true, + "dependencies": { + "@types/cookie": "^0.4.1", + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.4.1", + "cors": "~2.8.5", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/engine.io-client": { + "version": "6.5.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.3.tgz", + "integrity": "sha512-9Z0qLB0NIisTRt1DZ/8U2k12RJn8yls/nXMZLn+/N8hANT3TcYjKFKcwbw5zFQiN4NTde3TSY9zb79e1ij6j9Q==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -871,7 +988,7 @@ "node_modules/expect.js": { "version": "0.3.1", "resolved": "https://registry.npmjs.org/expect.js/-/expect.js-0.3.1.tgz", - "integrity": "sha1-sKWaDS7/VDdUTr8M6qYBWEHQm1s=", + "integrity": "sha512-okDF/FAPEul1ZFLae4hrgpIqAeapoo5TRdcg/lD0iN9S3GWrBFIJwNezGH1DMtIz+RxU4RrFmMq7WUUvDg3J6A==", "dev": true }, "node_modules/fill-range": { @@ -1478,6 +1595,27 @@ "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", "dev": true }, + "node_modules/mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dev": true, + "dependencies": { + "mime-db": "1.52.0" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -1637,6 +1775,15 @@ "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" } }, + "node_modules/negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", @@ -1845,6 +1992,15 @@ "node": ">=6" } }, + "node_modules/object-assign": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "dev": true, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -2211,6 +2367,61 @@ "integrity": "sha512-VUJ49FC8U1OxwZLxIbTTrDvLnf/6TDgxZcK8wxR8zs13xpx7xbG60ndBlhNrFi2EMuFRoeDoJO7wthSLq42EjA==", "dev": true }, + "node_modules/socket.io": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "dev": true, + "dependencies": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.5.2", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/socket.io-adapter": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", + "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "dev": true, + "dependencies": { + "ws": "~8.11.0" + } + }, + "node_modules/socket.io-client": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.4.tgz", + "integrity": "sha512-wh+OkeF0rAVCrABWQBaEjLfb7DVPotMbu0cgWgyR0v6eA4EoVnAwcIeIbcdTE3GT/H3kbdLl7OoH2+asoDRIIg==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/source-map": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", @@ -2436,6 +2647,15 @@ "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", "dev": true }, + "node_modules/vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "dev": true, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -2551,6 +2771,15 @@ } } }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", + "dev": true, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", @@ -2955,6 +3184,12 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "@socket.io/component-emitter": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", + "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==", + "dev": true + }, "@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -2985,6 +3220,21 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "@types/cookie": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", + "integrity": "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q==", + "dev": true + }, + "@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "dev": true, + "requires": { + "@types/node": "*" + } + }, "@types/debug": { "version": "4.1.12", "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", @@ -2994,6 +3244,12 @@ "@types/ms": "*" } }, + "@types/expect.js": { + "version": "0.3.32", + "resolved": "https://registry.npmjs.org/@types/expect.js/-/expect.js-0.3.32.tgz", + "integrity": "sha512-vUK0KSPtQTeANmOfiqsNNA/8hJ0xz8gOyB0ZhYRtoYOZBtZYir7ujNGr6GKw2hJAjltW0ocCNIGn9YxIXTT99Q==", + "dev": true + }, "@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", @@ -3012,6 +3268,16 @@ "integrity": "sha512-jiE3QIxJ8JLNcb1Ps6rDbysDhN4xa8DJJvuC9prr6w+1tIh+QAbYyNF3tyiZNLDBIuBCf4KEcV2UvQm/V60xfA==", "dev": true }, + "accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "dev": true, + "requires": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + } + }, "acorn": { "version": "8.8.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz", @@ -3095,6 +3361,12 @@ "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true }, + "base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "dev": true + }, "binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -3272,6 +3544,22 @@ } } }, + "cookie": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", + "integrity": "sha512-aSWTXFzaKWkvHO1Ny/s+ePFpvKsPnjc551iI41v3ny/ow6tBG5Vd+FuqGNhh1LxOmVzOlGUriIlOaokOvhaStA==", + "dev": true + }, + "cors": { + "version": "2.8.5", + "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", + "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", + "dev": true, + "requires": { + "object-assign": "^4", + "vary": "^1" + } + }, "create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -3324,6 +3612,43 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "engine.io": { + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", + "dev": true, + "requires": { + "@types/cookie": "^0.4.1", + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.4.1", + "cors": "~2.8.5", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0" + } + }, + "engine.io-client": { + "version": "6.5.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.3.tgz", + "integrity": "sha512-9Z0qLB0NIisTRt1DZ/8U2k12RJn8yls/nXMZLn+/N8hANT3TcYjKFKcwbw5zFQiN4NTde3TSY9zb79e1ij6j9Q==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true + }, "es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -3351,7 +3676,7 @@ "expect.js": { "version": "0.3.1", "resolved": "https://registry.npmjs.org/expect.js/-/expect.js-0.3.1.tgz", - "integrity": "sha1-sKWaDS7/VDdUTr8M6qYBWEHQm1s=", + "integrity": "sha512-okDF/FAPEul1ZFLae4hrgpIqAeapoo5TRdcg/lD0iN9S3GWrBFIJwNezGH1DMtIz+RxU4RrFmMq7WUUvDg3J6A==", "dev": true }, "fill-range": { @@ -3789,6 +4114,21 @@ "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", "dev": true }, + "mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "dev": true + }, + "mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dev": true, + "requires": { + "mime-db": "1.52.0" + } + }, "minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -3906,6 +4246,12 @@ "integrity": "sha512-p1sjXuopFs0xg+fPASzQ28agW1oHD7xDsd9Xkf3T15H3c/cifrFHVwrh74PdoklAPi+i7MdRsE47vm2r6JoB+w==", "dev": true }, + "negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "dev": true + }, "node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", @@ -4071,6 +4417,12 @@ } } }, + "object-assign": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "dev": true + }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -4329,6 +4681,52 @@ "integrity": "sha512-VUJ49FC8U1OxwZLxIbTTrDvLnf/6TDgxZcK8wxR8zs13xpx7xbG60ndBlhNrFi2EMuFRoeDoJO7wthSLq42EjA==", "dev": true }, + "socket.io": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "dev": true, + "requires": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.5.2", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + } + }, + "socket.io-adapter": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", + "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "dev": true, + "requires": { + "ws": "~8.11.0" + } + }, + "socket.io-client": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.4.tgz", + "integrity": "sha512-wh+OkeF0rAVCrABWQBaEjLfb7DVPotMbu0cgWgyR0v6eA4EoVnAwcIeIbcdTE3GT/H3kbdLl7OoH2+asoDRIIg==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + } + }, + "socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + } + }, "source-map": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", @@ -4484,6 +4882,12 @@ "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", "dev": true }, + "vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "dev": true + }, "which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -4566,6 +4970,12 @@ "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", "requires": {} }, + "xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", + "dev": true + }, "y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", diff --git a/package.json b/package.json index e3efe80..6590722 100644 --- a/package.json +++ b/package.json @@ -18,17 +18,20 @@ }, "devDependencies": { "@types/debug": "^4.1.12", + "@types/expect.js": "^0.3.32", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1", "mocha": "^10.2.0", "nyc": "^15.1.0", "prettier": "^2.8.1", + "socket.io": "^4.7.4", + "socket.io-client": "^4.7.4", "ts-node": "^10.9.1", "typescript": "^4.9.4" }, "scripts": { - "test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts", + "test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/*.ts", "format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'", "format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'", "prepack": "tsc" diff --git a/test/cluster-adapter.ts b/test/cluster-adapter.ts new file mode 100644 index 0000000..d5d6fdb --- /dev/null +++ b/test/cluster-adapter.ts @@ -0,0 +1,416 @@ +import { createServer } from "http"; +import { Server, Socket as ServerSocket } from "socket.io"; +import { io as ioc, Socket as ClientSocket } from "socket.io-client"; +import expect = require("expect.js"); +import type { AddressInfo } from "net"; +import { times, shouldNotHappen } from "./util"; +import { + ClusterAdapterWithHeartbeat, + type ClusterMessage, + type ClusterResponse, +} from "../lib"; +import { EventEmitter } from "events"; + +const NODES_COUNT = 3; + +class EventEmitterAdapter extends ClusterAdapterWithHeartbeat { + private offset = 1; + + constructor(nsp, readonly eventBus) { + super(nsp, {}); + this.eventBus.on("message", (message) => { + this.onMessage(message as ClusterMessage); + }); + } + + protected doPublish(message: ClusterMessage): Promise { + this.eventBus.emit("message", message); + return Promise.resolve(String(++this.offset)); + } + + protected doPublishResponse( + requesterUid: string, + response: ClusterResponse + ): Promise { + this.eventBus.emit("message", response); + return Promise.resolve(); + } +} + +describe("cluster adapter", () => { + let servers: Server[], + serverSockets: ServerSocket[], + clientSockets: ClientSocket[]; + + beforeEach((done) => { + servers = []; + serverSockets = []; + clientSockets = []; + + const eventBus = new EventEmitter(); + for (let i = 1; i <= NODES_COUNT; i++) { + const httpServer = createServer(); + const io = new Server(httpServer); + // @ts-ignore + io.adapter(function (nsp) { + return new EventEmitterAdapter(nsp, eventBus); + }); + httpServer.listen(() => { + const port = (httpServer.address() as AddressInfo).port; + const clientSocket = ioc(`http://localhost:${port}`); + + io.on("connection", async (socket) => { + clientSockets.push(clientSocket); + serverSockets.push(socket); + servers.push(io); + if (servers.length === NODES_COUNT) { + // ensure all nodes know each other + servers[0].emit("ping"); + servers[1].emit("ping"); + servers[2].emit("ping"); + + done(); + } + }); + }); + } + }); + + afterEach(() => { + servers.forEach((server) => { + // @ts-ignore + server.httpServer.close(); + server.of("/").adapter.close(); + }); + clientSockets.forEach((socket) => { + socket.disconnect(); + }); + }); + + describe("broadcast", function () { + it("broadcasts to all clients", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("test", (arg1, arg2, arg3) => { + expect(arg1).to.eql(1); + expect(arg2).to.eql("2"); + expect(Buffer.isBuffer(arg3)).to.be(true); + partialDone(); + }); + }); + + servers[0].emit("test", 1, "2", Buffer.from([3, 4])); + }); + + it("broadcasts to all clients in a namespace", (done) => { + const partialDone = times(3, () => { + servers.forEach((server) => server.of("/custom").adapter.close()); + done(); + }); + + servers.forEach((server) => server.of("/custom")); + + const onConnect = times(3, async () => { + servers[0].of("/custom").emit("test"); + }); + + clientSockets.forEach((clientSocket) => { + const socket = clientSocket.io.socket("/custom"); + socket.on("connect", onConnect); + socket.on("test", () => { + socket.disconnect(); + partialDone(); + }); + }); + }); + + it("broadcasts to all clients in a room", (done) => { + serverSockets[1].join("room1"); + + clientSockets[0].on("test", shouldNotHappen(done)); + clientSockets[1].on("test", () => done()); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].to("room1").emit("test"); + }); + + it("broadcasts to all clients except in room", (done) => { + const partialDone = times(2, done); + serverSockets[1].join("room1"); + + clientSockets[0].on("test", () => partialDone()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", () => partialDone()); + + servers[0].of("/").except("room1").emit("test"); + }); + + it("broadcasts to local clients only", (done) => { + clientSockets[0].on("test", () => done()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].local.emit("test"); + }); + + it("broadcasts with multiple acknowledgements", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (cb) => cb(3)); + + servers[0].timeout(50).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + expect(responses).to.contain(3); + + setTimeout(() => { + // @ts-ignore + expect(servers[0].of("/").adapter.ackRequests.size).to.eql(0); + + done(); + }, 50); + }); + }); + + it("broadcasts with multiple acknowledgements (binary content)", (done) => { + clientSockets[0].on("test", (cb) => cb(Buffer.from([1]))); + clientSockets[1].on("test", (cb) => cb(Buffer.from([2]))); + clientSockets[2].on("test", (cb) => cb(Buffer.from([3]))); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + responses.forEach((response) => { + expect(Buffer.isBuffer(response)).to.be(true); + }); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (no client)", (done) => { + servers[0] + .to("abc") + .timeout(500) + .emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.eql([]); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (timeout)", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (_cb) => { + // do nothing + }); + + servers[0].timeout(50).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be.an(Error); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + + done(); + }); + }); + + it("broadcasts with a single acknowledgement (local)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const response = await serverSockets[1].emitWithAck("test"); + expect(response).to.eql(2); + }); + + it("broadcasts with a single acknowledgement (remote)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const sockets = await servers[0].in(serverSockets[1].id).fetchSockets(); + expect(sockets.length).to.eql(1); + + const response = await sockets[0].timeout(500).emitWithAck("test"); + expect(response).to.eql(2); + }); + }); + + describe("socketsJoin", () => { + it("makes all socket instances join the specified room", async () => { + servers[0].socketsJoin("room1"); + + expect(serverSockets[0].rooms.has("room1")).to.be(true); + expect(serverSockets[1].rooms.has("room1")).to.be(true); + expect(serverSockets[2].rooms.has("room1")).to.be(true); + }); + + it("makes the matching socket instances join the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].in("room1").socketsJoin("room2"); + + expect(serverSockets[0].rooms.has("room2")).to.be(true); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance join the specified room", async () => { + servers[0].in(serverSockets[1].id).socketsJoin("room3"); + + expect(serverSockets[0].rooms.has("room3")).to.be(false); + expect(serverSockets[1].rooms.has("room3")).to.be(true); + expect(serverSockets[2].rooms.has("room3")).to.be(false); + }); + }); + + describe("socketsLeave", () => { + it("makes all socket instances leave the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].socketsLeave("room1"); + + expect(serverSockets[0].rooms.has("room1")).to.be(false); + expect(serverSockets[1].rooms.has("room1")).to.be(false); + expect(serverSockets[2].rooms.has("room1")).to.be(false); + }); + + it("makes the matching socket instances leave the specified room", async () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join(["room1", "room2"]); + serverSockets[2].join(["room2"]); + + servers[0].in("room1").socketsLeave("room2"); + + expect(serverSockets[0].rooms.has("room2")).to.be(false); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance leave the specified room", async () => { + serverSockets[0].join("room3"); + serverSockets[1].join("room3"); + serverSockets[2].join("room3"); + + servers[0].in(serverSockets[1].id).socketsLeave("room3"); + + expect(serverSockets[0].rooms.has("room3")).to.be(true); + expect(serverSockets[1].rooms.has("room3")).to.be(false); + expect(serverSockets[2].rooms.has("room3")).to.be(true); + }); + }); + + describe("disconnectSockets", () => { + it("makes all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", (reason) => { + expect(reason).to.eql("io server disconnect"); + partialDone(); + }); + }); + + servers[0].disconnectSockets(true); + }); + }); + + describe("fetchSockets", () => { + it("returns all socket instances", async () => { + const sockets = await servers[0].fetchSockets(); + + expect(sockets).to.be.an(Array); + expect(sockets).to.have.length(3); + // @ts-ignore + expect(servers[0].of("/").adapter.requests.size).to.eql(0); // clean up + }); + + it("returns a single socket instance", async () => { + serverSockets[1].data = "test" as any; + + const [remoteSocket] = await servers[0] + .in(serverSockets[1].id) + .fetchSockets(); + + expect(remoteSocket.handshake).to.eql(serverSockets[1].handshake); + expect(remoteSocket.data).to.eql("test"); + expect(remoteSocket.rooms.size).to.eql(1); + }); + + it("returns only local socket instances", async () => { + const sockets = await servers[0].local.fetchSockets(); + + expect(sockets).to.have.length(1); + }); + }); + + describe("serverSideEmit", () => { + it("sends an event to other server instances", (done) => { + const partialDone = times(2, done); + + servers[0].on("hello", shouldNotHappen(done)); + + servers[1].on("hello", (arg1, arg2, arg3) => { + expect(arg1).to.eql("world"); + expect(arg2).to.eql(1); + expect(arg3).to.eql("2"); + partialDone(); + }); + + servers[2].of("/").on("hello", () => partialDone()); + + servers[0].serverSideEmit("hello", "world", 1, "2"); + }); + + it("sends an event and receives a response from the other server instances", (done) => { + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", (cb) => cb("3")); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + expect(response).to.contain("3"); + done(); + }); + }); + + it("sends an event but timeout if one server does not respond", function (done) { + this.timeout(6000); + + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + // do nothing + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err.message).to.be("timeout reached: missing 1 responses"); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); + + it("succeeds even if an instance leaves the cluster", (done) => { + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + servers[2].of("/").adapter.close(); + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); + }); +}); diff --git a/test/util.ts b/test/util.ts new file mode 100644 index 0000000..8109ef1 --- /dev/null +++ b/test/util.ts @@ -0,0 +1,15 @@ +export function times(count: number, fn: () => void) { + let i = 0; + return () => { + i++; + if (i === count) { + fn(); + } else if (i > count) { + throw new Error(`too many calls: ${i} instead of ${count}`); + } + }; +} + +export function shouldNotHappen(done) { + return () => done(new Error("should not happen")); +}