Skip to content

Commit

Permalink
refactor: use the ClusterAdapter class from socket.io-adapter package
Browse files Browse the repository at this point in the history
The ClusterAdapter class has been moved to [1], so that this adapter
only needs to implement to pub/sub mechanism.

Also, [2] should reduce the number of "timeout reached: only x
responses received out of y" errors, since the fetchSockets() requests
will now succeed even if a server leaves the cluster.

[1]: https://github.com/socketio/socket.io-adapter
[2]: socketio/socket.io-adapter@0e23ff0

Related: #6
  • Loading branch information
darrachequesne committed Feb 21, 2024
1 parent 44e10ae commit 8ff0413
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 615 deletions.
69 changes: 52 additions & 17 deletions lib/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
import type { PrivateSessionId, Session } from "socket.io-adapter";
import { decode, encode } from "@msgpack/msgpack";
import {
ClusterAdapter,
ClusterAdapterOptions,
ClusterMessage,
MessageType,
} from "./cluster-adapter";
ClusterAdapterWithHeartbeat,
type ClusterMessage,
type PrivateSessionId,
type Session,
type ServerId,
type ClusterResponse,
} from "socket.io-adapter";
import { decode, encode } from "@msgpack/msgpack";
import debugModule from "debug";
import { hasBinary, XADD, XREAD } from "./util";

const debug = debugModule("socket.io-redis-streams-adapter");

const RESTORE_SESSION_MAX_XRANGE_CALLS = 100;

export interface RedisStreamsAdapterOptions extends ClusterAdapterOptions {
// TODO ClusterAdapterOptions should be exported by the socket.io-adapter package
interface ClusterAdapterOptions {
/**
* The number of ms between two heartbeats.
* @default 5_000
*/
heartbeatInterval?: number;
/**
* The number of ms without heartbeat before we consider a node down.
* @default 10_000
*/
heartbeatTimeout?: number;
}

export interface RedisStreamsAdapterOptions {
/**
* The name of the Redis stream.
*/
Expand Down Expand Up @@ -45,7 +60,7 @@ interface RawClusterMessage {
*/
export function createAdapter(
redisClient: any,
opts?: RedisStreamsAdapterOptions
opts?: RedisStreamsAdapterOptions & ClusterAdapterOptions
) {
const namespaceToAdapters = new Map<string, RedisStreamsAdapter>();
const options = Object.assign(
Expand Down Expand Up @@ -122,16 +137,20 @@ export function createAdapter(
};
}

class RedisStreamsAdapter extends ClusterAdapter {
class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
readonly #redisClient: any;
readonly #opts: Required<RedisStreamsAdapterOptions>;

constructor(nsp, redisClient, opts: Required<RedisStreamsAdapterOptions>) {
constructor(
nsp,
redisClient,
opts: Required<RedisStreamsAdapterOptions> & ClusterAdapterOptions
) {
super(nsp, opts);
this.#redisClient = redisClient;
this.#opts = opts;

this.initHeartbeat();
this.init();
}

override doPublish(message: ClusterMessage) {
Expand All @@ -145,25 +164,38 @@ class RedisStreamsAdapter extends ClusterAdapter {
);
}

protected doPublishResponse(
requesterUid: ServerId,
response: ClusterResponse
): Promise<void> {
// @ts-ignore
return this.doPublish(response);
}

static encode(message: ClusterMessage): RawClusterMessage {
const rawMessage: RawClusterMessage = {
uid: message.uid,
nsp: message.nsp,
type: message.type.toString(),
};

// @ts-ignore
if (message.data) {
// TODO MessageType should be exported by the socket.io-adapter package
const mayContainBinary = [
MessageType.BROADCAST,
MessageType.BROADCAST_ACK,
MessageType.FETCH_SOCKETS_RESPONSE,
MessageType.SERVER_SIDE_EMIT,
MessageType.SERVER_SIDE_EMIT_RESPONSE,
3, // MessageType.BROADCAST,
8, // MessageType.FETCH_SOCKETS_RESPONSE,
9, // MessageType.SERVER_SIDE_EMIT,
10, // MessageType.SERVER_SIDE_EMIT_RESPONSE,
12, // MessageType.BROADCAST_ACK,
].includes(message.type);

// @ts-ignore
if (mayContainBinary && hasBinary(message.data)) {
// @ts-ignore
rawMessage.data = Buffer.from(encode(message.data)).toString("base64");
} else {
// @ts-ignore
rawMessage.data = JSON.stringify(message.data);
}
}
Expand Down Expand Up @@ -191,8 +223,10 @@ class RedisStreamsAdapter extends ClusterAdapter {

if (rawMessage.data) {
if (rawMessage.data.startsWith("{")) {
// @ts-ignore
message.data = JSON.parse(rawMessage.data);
} else {
// @ts-ignore
message.data = decode(Buffer.from(rawMessage.data, "base64")) as Record<
string,
unknown
Expand Down Expand Up @@ -261,6 +295,7 @@ class RedisStreamsAdapter extends ClusterAdapter {
if (entry.message.nsp === this.nsp.name && entry.message.type === "3") {
const message = RedisStreamsAdapter.decode(entry.message);

// @ts-ignore
if (shouldIncludePacket(session.rooms, message.data.opts)) {
// @ts-ignore
session.missedPackets.push(message.data.packet.data);
Expand Down
Loading

0 comments on commit 8ff0413

Please sign in to comment.