diff --git a/lib/index.ts b/lib/index.ts index c0f5cb6..f13480a 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -66,6 +66,7 @@ export class RedisAdapter extends Adapter { public readonly requestsTimeout: number; private readonly channel: string; + private readonly participantChannel: string; private readonly requestChannel: string; private readonly responseChannel: string; private requests: Map = new Map(); @@ -94,6 +95,7 @@ export class RedisAdapter extends Adapter { const prefix = opts.key || "socket.io"; this.channel = prefix + "#" + nsp.name + "#"; + this.participantChannel = prefix + "-participant#" + this.nsp.name + "#"; this.requestChannel = prefix + "-request#" + this.nsp.name + "#"; this.responseChannel = prefix + "-response#" + this.nsp.name + "#"; @@ -107,7 +109,7 @@ export class RedisAdapter extends Adapter { this.subClient.on("pmessageBuffer", this.onmessage.bind(this)); this.subClient.subscribe( - [this.requestChannel, this.responseChannel], + [this.participantChannel, this.requestChannel, this.responseChannel], onError ); this.subClient.on("messageBuffer", this.onrequest.bind(this)); @@ -855,12 +857,17 @@ export class RedisAdapter extends Adapter { const nodes = this.pubClient.nodes(); return Promise.all( nodes.map((node) => - node.send_command("pubsub", ["numsub", this.requestChannel]) + node.send_command("pubsub", [ + "numsub", + this.requestChannel, + this.participantChannel, + ]) ) ).then((values) => { let numSub = 0; values.map((value) => { - numSub += parseInt(value[1], 10); + // Fall back to requestChannel for backwards compatibility + numSub += parseInt(value[3] || value[1], 10); }); return numSub; }); @@ -869,10 +876,11 @@ export class RedisAdapter extends Adapter { return new Promise((resolve, reject) => { this.pubClient.send_command( "pubsub", - ["numsub", this.requestChannel], + ["numsub", this.requestChannel, this.participantChannel], (err, numSub) => { if (err) return reject(err); - resolve(parseInt(numSub[1], 10)); + // Fall back to requestChannel for backwards compatibility + resolve(parseInt(numSub[3] || numSub[1], 10)); } ); }); diff --git a/test/index.ts b/test/index.ts index 5d46441..d950b39 100644 --- a/test/index.ts +++ b/test/index.ts @@ -411,6 +411,80 @@ let socket1, socket2, socket3; // do nothing }); }); + + it("sends an event but timeout if there are external non-participating subcribers", (done) => { + namespace1.adapter.requestsTimeout = 100; + + const spySub = suite.createRedisClient(); + done = ((done) => (...rest) => { + spySub.quit(); + done(...rest); + })(done); + + spySub.subscribe(namespace1.adapter.participantChannel, (err) => { + expect(err).to.not.be.ok(); + + namespace1.serverSideEmit("hello", (err, response) => { + expect(err).to.be.ok(); + expect(err.message).to.be( + "timeout reached: only 2 responses received out of 3" + ); + expect(response).to.be.an(Array); + expect(response).to.have.length(2); + expect(response).to.contain(2); + expect(response).to.contain(3); + done(); + }); + + namespace1.on("hello", () => { + done(new Error("should not happen")); + }); + + namespace2.on("hello", (cb) => { + cb(2); + }); + + namespace3.on("hello", (cb) => { + cb(3); + }); + }); + }); + + it("sends an event and receives a response from the other server instances with external non-participating subcribers on the request/response channels", (done) => { + namespace1.adapter.requestsTimeout = 100; + const { requestChannel, responseChannel } = namespace1.adapter; + + const spySub = suite.createRedisClient(); + done = ((done) => (...rest) => { + spySub.quit(); + done(...rest); + })(done); + + spySub.subscribe([requestChannel, responseChannel], (err) => { + expect(err).to.not.be.ok(); + + namespace1.serverSideEmit("hello", (err, response) => { + expect(err).to.not.be.ok(); + expect(response).to.be.an(Array); + expect(response).to.have.length(2); + expect(response).to.contain(2); + expect(response).to.contain(3); + done(); + }); + + namespace1.on("hello", () => { + done(new Error("should not happen")); + }); + + namespace2.on("hello", (cb) => { + cb(2); + }); + + namespace3.on("hello", (cb) => { + cb(3); + }); + }); + }); }); }); });