Skip to content

Commit

Permalink
feat: send response to the requesting node only
Browse files Browse the repository at this point in the history
Previously, a request would be sent to all listening nodes, on channel
`${key}-request#${nsp}#` (e.g. "socket.io-request#/#"), and the other
nodes would respond on a common channel `${key}-response#${nsp}#`, so
every node get the response, instead of only the requesting node.

This commit adds a new option: "publishOnSpecificResponseChannel". If
it's set to true, then the other nodes will now respond on
`${key}-response#${nsp}#${uid}#`, which is the channel specific to the
requesting node, thus reducing the noise for the other nodes.

To upgrade an existing deployment, users will need to upgrade all nodes
to the latest version with publishOnSpecificResponseChannel = false,
and then toggle the option on each node.

Note: the option will default to true in the next major release

Related: #407
  • Loading branch information
darrachequesne committed Nov 29, 2021
1 parent aa681b3 commit f66de11
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
54 changes: 46 additions & 8 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ export interface RedisAdapterOptions {
* @default 5000
*/
requestsTimeout: number;
/**
* Whether to publish a response to the channel specific to the requesting node.
*
* - if true, the response will be published to `${key}-request#${nsp}#${uid}#`
* - if false, the response will be published to `${key}-request#${nsp}#`
*
* This option currently defaults to false for backward compatibility, but will be set to true in the next major
* release.
*
* @default false
*/
publishOnSpecificResponseChannel: boolean;
}

/**
Expand All @@ -66,6 +78,7 @@ export function createAdapter(
export class RedisAdapter extends Adapter {
public readonly uid;
public readonly requestsTimeout: number;
public readonly publishOnSpecificResponseChannel: boolean;

private readonly channel: string;
private readonly requestChannel: string;
Expand All @@ -92,12 +105,14 @@ export class RedisAdapter extends Adapter {

this.uid = uid2(6);
this.requestsTimeout = opts.requestsTimeout || 5000;
this.publishOnSpecificResponseChannel = !!opts.publishOnSpecificResponseChannel;

const prefix = opts.key || "socket.io";

this.channel = prefix + "#" + nsp.name + "#";
this.requestChannel = prefix + "-request#" + this.nsp.name + "#";
this.responseChannel = prefix + "-response#" + this.nsp.name + "#";
const specificResponseChannel = this.responseChannel + this.uid + "#";

const onError = (err) => {
if (err) {
Expand All @@ -115,7 +130,7 @@ export class RedisAdapter extends Adapter {
true
);
this.subClient.subscribe(
[this.requestChannel, this.responseChannel],
[this.requestChannel, this.responseChannel, specificResponseChannel],
(msg, channel) => {
this.onrequest(channel, msg);
}
Expand All @@ -125,7 +140,7 @@ export class RedisAdapter extends Adapter {
this.subClient.on("pmessageBuffer", this.onmessage.bind(this));

this.subClient.subscribe(
[this.requestChannel, this.responseChannel],
[this.requestChannel, this.responseChannel, specificResponseChannel],
onError
);
this.subClient.on("messageBuffer", this.onrequest.bind(this));
Expand Down Expand Up @@ -217,7 +232,7 @@ export class RedisAdapter extends Adapter {
sockets: [...sockets],
});

this.pubClient.publish(this.responseChannel, response);
this.publishResponse(request, response);
break;

case RequestType.ALL_ROOMS:
Expand All @@ -230,7 +245,7 @@ export class RedisAdapter extends Adapter {
rooms: [...this.rooms.keys()],
});

this.pubClient.publish(this.responseChannel, response);
this.publishResponse(request, response);
break;

case RequestType.REMOTE_JOIN:
Expand All @@ -253,7 +268,7 @@ export class RedisAdapter extends Adapter {
requestId: request.requestId,
});

this.pubClient.publish(this.responseChannel, response);
this.publishResponse(request, response);
break;

case RequestType.REMOTE_LEAVE:
Expand All @@ -276,7 +291,7 @@ export class RedisAdapter extends Adapter {
requestId: request.requestId,
});

this.pubClient.publish(this.responseChannel, response);
this.publishResponse(request, response);
break;

case RequestType.REMOTE_DISCONNECT:
Expand All @@ -299,7 +314,7 @@ export class RedisAdapter extends Adapter {
requestId: request.requestId,
});

this.pubClient.publish(this.responseChannel, response);
this.publishResponse(request, response);
break;

case RequestType.REMOTE_FETCH:
Expand Down Expand Up @@ -327,7 +342,7 @@ export class RedisAdapter extends Adapter {
}),
});

this.pubClient.publish(this.responseChannel, response);
this.publishResponse(request, response);
break;

case RequestType.SERVER_SIDE_EMIT:
Expand Down Expand Up @@ -366,6 +381,20 @@ export class RedisAdapter extends Adapter {
}
}

/**
* Send the response to the requesting node
* @param request
* @param response
* @private
*/
private publishResponse(request, response) {
const responseChannel = this.publishOnSpecificResponseChannel
? `${this.responseChannel}${request.uid}#`
: this.responseChannel;
debug("publishing response to channel %s", responseChannel);
this.pubClient.publish(responseChannel, response);
}

/**
* Called on response from another node
*
Expand Down Expand Up @@ -510,6 +539,7 @@ export class RedisAdapter extends Adapter {

const requestId = uid2(6);
const request = JSON.stringify({
uid: this.uid,
requestId,
type: RequestType.SOCKETS,
rooms: [...rooms],
Expand Down Expand Up @@ -554,6 +584,7 @@ export class RedisAdapter extends Adapter {

const requestId = uid2(6);
const request = JSON.stringify({
uid: this.uid,
requestId,
type: RequestType.ALL_ROOMS,
});
Expand Down Expand Up @@ -598,6 +629,7 @@ export class RedisAdapter extends Adapter {
}

const request = JSON.stringify({
uid: this.uid,
requestId,
type: RequestType.REMOTE_JOIN,
sid: id,
Expand Down Expand Up @@ -641,6 +673,7 @@ export class RedisAdapter extends Adapter {
}

const request = JSON.stringify({
uid: this.uid,
requestId,
type: RequestType.REMOTE_LEAVE,
sid: id,
Expand Down Expand Up @@ -684,6 +717,7 @@ export class RedisAdapter extends Adapter {
}

const request = JSON.stringify({
uid: this.uid,
requestId,
type: RequestType.REMOTE_DISCONNECT,
sid: id,
Expand Down Expand Up @@ -729,6 +763,7 @@ export class RedisAdapter extends Adapter {
const requestId = uid2(6);

const request = JSON.stringify({
uid: this.uid,
requestId,
type: RequestType.REMOTE_FETCH,
opts: {
Expand Down Expand Up @@ -766,6 +801,7 @@ export class RedisAdapter extends Adapter {
}

const request = JSON.stringify({
uid: this.uid,
type: RequestType.REMOTE_JOIN,
opts: {
rooms: [...opts.rooms],
Expand All @@ -783,6 +819,7 @@ export class RedisAdapter extends Adapter {
}

const request = JSON.stringify({
uid: this.uid,
type: RequestType.REMOTE_LEAVE,
opts: {
rooms: [...opts.rooms],
Expand All @@ -800,6 +837,7 @@ export class RedisAdapter extends Adapter {
}

const request = JSON.stringify({
uid: this.uid,
type: RequestType.REMOTE_DISCONNECT,
opts: {
rooms: [...opts.rooms],
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"test": "npm run format:check && tsc && npm run test:redis-v4 && npm run test:redis-v3 && npm run test:ioredis",
"test": "npm run format:check && tsc && npm run test:redis-v4 && npm run test:redis-v4-specific-channel && npm run test:redis-v3 && npm run test:ioredis",
"test:redis-v4": "nyc mocha --bail --require ts-node/register test/index.ts",
"test:redis-v4-specific-channel": "SPECIFIC_CHANNEL=1 nyc mocha --bail --require ts-node/register test/index.ts",
"test:redis-v3": "REDIS_CLIENT=redis-v3 nyc mocha --bail --require ts-node/register test/index.ts",
"test:ioredis": "REDIS_CLIENT=ioredis nyc mocha --bail --require ts-node/register test/index.ts",
"format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'",
Expand Down
7 changes: 6 additions & 1 deletion test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,12 @@ function _create() {
return async (nsp, fn?) => {
const httpServer = createServer();
const sio = new Server(httpServer);
sio.adapter(createAdapter(await createClient(), await createClient()));
sio.adapter(
createAdapter(await createClient(), await createClient(), {
publishOnSpecificResponseChannel:
process.env.SPECIFIC_CHANNEL !== undefined,
})
);
httpServer.listen((err) => {
if (err) throw err; // abort tests
if ("function" == typeof nsp) {
Expand Down

0 comments on commit f66de11

Please sign in to comment.