Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add participantChannel to isolate keeping count of participants #408

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class RedisAdapter extends Adapter {
public readonly requestsTimeout: number;

private readonly channel: string;
private readonly participantChannel: string;
private readonly requestChannel: string;
private readonly responseChannel: string;
private requests: Map<string, Request> = new Map();
Expand Down Expand Up @@ -94,6 +95,7 @@ export class RedisAdapter extends Adapter {
const prefix = opts.key || "socket.io";

this.channel = prefix + "#" + nsp.name + "#";
this.participantChannel = prefix + "-participant#" + this.nsp.name + "#";
this.requestChannel = prefix + "-request#" + this.nsp.name + "#";
this.responseChannel = prefix + "-response#" + this.nsp.name + "#";

Expand All @@ -107,7 +109,7 @@ export class RedisAdapter extends Adapter {
this.subClient.on("pmessageBuffer", this.onmessage.bind(this));

this.subClient.subscribe(
[this.requestChannel, this.responseChannel],
[this.participantChannel, this.requestChannel, this.responseChannel],
onError
);
this.subClient.on("messageBuffer", this.onrequest.bind(this));
Expand Down Expand Up @@ -855,12 +857,17 @@ export class RedisAdapter extends Adapter {
const nodes = this.pubClient.nodes();
return Promise.all(
nodes.map((node) =>
node.send_command("pubsub", ["numsub", this.requestChannel])
node.send_command("pubsub", [
"numsub",
this.requestChannel,
this.participantChannel,
])
)
).then((values) => {
let numSub = 0;
values.map((value) => {
numSub += parseInt(value[1], 10);
// Fall back to requestChannel for backwards compatibility
numSub += parseInt(value[3] || value[1], 10);
});
return numSub;
});
Expand All @@ -869,10 +876,11 @@ export class RedisAdapter extends Adapter {
return new Promise((resolve, reject) => {
this.pubClient.send_command(
"pubsub",
["numsub", this.requestChannel],
["numsub", this.requestChannel, this.participantChannel],
(err, numSub) => {
if (err) return reject(err);
resolve(parseInt(numSub[1], 10));
// Fall back to requestChannel for backwards compatibility
resolve(parseInt(numSub[3] || numSub[1], 10));
}
);
});
Expand Down
74 changes: 74 additions & 0 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,80 @@ let socket1, socket2, socket3;
// do nothing
});
});

it("sends an event but timeout if there are external non-participating subcribers", (done) => {
namespace1.adapter.requestsTimeout = 100;

const spySub = suite.createRedisClient();
done = ((done) => (...rest) => {
spySub.quit();
done(...rest);
})(done);

spySub.subscribe(namespace1.adapter.participantChannel, (err) => {
expect(err).to.not.be.ok();

namespace1.serverSideEmit("hello", (err, response) => {
expect(err).to.be.ok();
expect(err.message).to.be(
"timeout reached: only 2 responses received out of 3"
);
expect(response).to.be.an(Array);
expect(response).to.have.length(2);
expect(response).to.contain(2);
expect(response).to.contain(3);
done();
});

namespace1.on("hello", () => {
done(new Error("should not happen"));
});

namespace2.on("hello", (cb) => {
cb(2);
});

namespace3.on("hello", (cb) => {
cb(3);
});
});
});

it("sends an event and receives a response from the other server instances with external non-participating subcribers on the request/response channels", (done) => {
namespace1.adapter.requestsTimeout = 100;
const { requestChannel, responseChannel } = namespace1.adapter;

const spySub = suite.createRedisClient();
done = ((done) => (...rest) => {
spySub.quit();
done(...rest);
})(done);

spySub.subscribe([requestChannel, responseChannel], (err) => {
expect(err).to.not.be.ok();

namespace1.serverSideEmit("hello", (err, response) => {
expect(err).to.not.be.ok();
expect(response).to.be.an(Array);
expect(response).to.have.length(2);
expect(response).to.contain(2);
expect(response).to.contain(3);
done();
});

namespace1.on("hello", () => {
done(new Error("should not happen"));
});

namespace2.on("hello", (cb) => {
cb(2);
});

namespace3.on("hello", (cb) => {
cb(3);
});
});
});
});
});
});
Expand Down