diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts index cf6baa1..143bb6f 100644 --- a/lib/cluster-adapter.ts +++ b/lib/cluster-adapter.ts @@ -503,55 +503,64 @@ export abstract class ClusterAdapter extends Adapter { super.broadcastWithAck(packet, opts, clientCountCallback, ack); } - override addSockets(opts: BroadcastOptions, rooms: Room[]) { - super.addSockets(opts, rooms); - + override async addSockets(opts: BroadcastOptions, rooms: Room[]) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_JOIN, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.SOCKETS_JOIN, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); + super.addSockets(opts, rooms); } - override delSockets(opts: BroadcastOptions, rooms: Room[]) { - super.delSockets(opts, rooms); - + override async delSockets(opts: BroadcastOptions, rooms: Room[]) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_LEAVE, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.SOCKETS_LEAVE, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); + super.delSockets(opts, rooms); } - override disconnectSockets(opts: BroadcastOptions, close: boolean) { - super.disconnectSockets(opts, close); - + override async disconnectSockets(opts: BroadcastOptions, close: boolean) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.DISCONNECT_SOCKETS, + data: { + opts: encodeOptions(opts), + close, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.DISCONNECT_SOCKETS, - data: { - opts: encodeOptions(opts), - close, - }, - }); + super.disconnectSockets(opts, close); } async fetchSockets(opts: BroadcastOptions): Promise { diff --git a/test/cluster-adapter.ts b/test/cluster-adapter.ts index d5d6fdb..350ecda 100644 --- a/test/cluster-adapter.ts +++ b/test/cluster-adapter.ts @@ -3,7 +3,7 @@ import { Server, Socket as ServerSocket } from "socket.io"; import { io as ioc, Socket as ClientSocket } from "socket.io-client"; import expect = require("expect.js"); import type { AddressInfo } from "net"; -import { times, shouldNotHappen } from "./util"; +import { times, shouldNotHappen, sleep } from "./util"; import { ClusterAdapterWithHeartbeat, type ClusterMessage, @@ -243,6 +243,8 @@ describe("cluster adapter", () => { it("makes all socket instances join the specified room", async () => { servers[0].socketsJoin("room1"); + await sleep(); + expect(serverSockets[0].rooms.has("room1")).to.be(true); expect(serverSockets[1].rooms.has("room1")).to.be(true); expect(serverSockets[2].rooms.has("room1")).to.be(true); @@ -254,6 +256,8 @@ describe("cluster adapter", () => { servers[0].in("room1").socketsJoin("room2"); + await sleep(); + expect(serverSockets[0].rooms.has("room2")).to.be(true); expect(serverSockets[1].rooms.has("room2")).to.be(false); expect(serverSockets[2].rooms.has("room2")).to.be(true); @@ -275,6 +279,8 @@ describe("cluster adapter", () => { servers[0].socketsLeave("room1"); + await sleep(); + expect(serverSockets[0].rooms.has("room1")).to.be(false); expect(serverSockets[1].rooms.has("room1")).to.be(false); expect(serverSockets[2].rooms.has("room1")).to.be(false); @@ -287,6 +293,8 @@ describe("cluster adapter", () => { servers[0].in("room1").socketsLeave("room2"); + await sleep(); + expect(serverSockets[0].rooms.has("room2")).to.be(false); expect(serverSockets[1].rooms.has("room2")).to.be(false); expect(serverSockets[2].rooms.has("room2")).to.be(true); @@ -318,6 +326,22 @@ describe("cluster adapter", () => { servers[0].disconnectSockets(true); }); + + it("sends a packet before all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", shouldNotHappen(done)); + + clientSocket.on("bye", () => { + clientSocket.off("disconnect"); + clientSocket.on("disconnect", partialDone); + }); + }); + + servers[0].emit("bye"); + servers[0].disconnectSockets(true); + }); }); describe("fetchSockets", () => { diff --git a/test/util.ts b/test/util.ts index 8109ef1..ecdf6df 100644 --- a/test/util.ts +++ b/test/util.ts @@ -13,3 +13,7 @@ export function times(count: number, fn: () => void) { export function shouldNotHappen(done) { return () => done(new Error("should not happen")); } + +export function sleep() { + return new Promise((resolve) => process.nextTick(resolve)); +}