diff --git a/lib/broadcast-operator.ts b/lib/broadcast-operator.ts new file mode 100644 index 0000000..28625da --- /dev/null +++ b/lib/broadcast-operator.ts @@ -0,0 +1,228 @@ +import debugModule from "debug"; +import { PacketType } from "socket.io-parser"; +import type { + EventNames, + EventParams, + EventsMap, + TypedEventBroadcaster, +} from "./typed-events"; +import { UID, RESERVED_EVENTS, BroadcastFlags, BroadcastOptions } from "./util"; + +const debug = debugModule("socket.io-emitter"); + +/** + * Request types, for messages between nodes + */ + +export enum RequestType { + SOCKETS = 0, + ALL_ROOMS = 1, + REMOTE_JOIN = 2, + REMOTE_LEAVE = 3, + REMOTE_DISCONNECT = 4, + REMOTE_FETCH = 5, + SERVER_SIDE_EMIT = 6, +} + +export class BroadcastOperator + implements TypedEventBroadcaster { + constructor( + private readonly redisClient: any, + private readonly broadcastOptions: BroadcastOptions, + private readonly rooms: Set = new Set(), + private readonly exceptRooms: Set = new Set(), + private readonly flags: BroadcastFlags = {} + ) {} + + /** + * Targets a room when emitting. + * + * @param room + * @return a new BroadcastOperator instance + * @public + */ + public to(room: string | string[]): BroadcastOperator { + const rooms = new Set(this.rooms); + if (Array.isArray(room)) { + room.forEach((r) => rooms.add(r)); + } else { + rooms.add(room); + } + return new BroadcastOperator( + this.redisClient, + this.broadcastOptions, + rooms, + this.exceptRooms, + this.flags + ); + } + + /** + * Targets a room when emitting. + * + * @param room + * @return a new BroadcastOperator instance + * @public + */ + public in(room: string | string[]): BroadcastOperator { + return this.to(room); + } + + /** + * Excludes a room when emitting. + * + * @param room + * @return a new BroadcastOperator instance + * @public + */ + public except(room: string | string[]): BroadcastOperator { + const exceptRooms = new Set(this.exceptRooms); + if (Array.isArray(room)) { + room.forEach((r) => exceptRooms.add(r)); + } else { + exceptRooms.add(room); + } + return new BroadcastOperator( + this.redisClient, + this.broadcastOptions, + this.rooms, + exceptRooms, + this.flags + ); + } + + /** + * Sets the compress flag. + * + * @param compress - if `true`, compresses the sending data + * @return a new BroadcastOperator instance + * @public + */ + public compress(compress: boolean): BroadcastOperator { + const flags = Object.assign({}, this.flags, { compress }); + return new BroadcastOperator( + this.redisClient, + this.broadcastOptions, + this.rooms, + this.exceptRooms, + flags + ); + } + + /** + * Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to + * receive messages (because of network slowness or other issues, or because they’re connected through long polling + * and is in the middle of a request-response cycle). + * + * @return a new BroadcastOperator instance + * @public + */ + public get volatile(): BroadcastOperator { + const flags = Object.assign({}, this.flags, { volatile: true }); + return new BroadcastOperator( + this.redisClient, + this.broadcastOptions, + this.rooms, + this.exceptRooms, + flags + ); + } + + /** + * Emits to all clients. + * + * @return Always true + * @public + */ + public emit>( + ev: Ev, + ...args: EventParams + ): true { + if (RESERVED_EVENTS.has(ev)) { + throw new Error(`"${ev}" is a reserved event name`); + } + + // set up packet object + const data = [ev, ...args]; + const packet = { + type: PacketType.EVENT, + data: data, + nsp: this.broadcastOptions.nsp, + }; + + const opts = { + rooms: [...this.rooms], + flags: this.flags, + except: [...this.exceptRooms], + }; + + const msg = this.broadcastOptions.parser.encode([UID, packet, opts]); + let channel = this.broadcastOptions.broadcastChannel; + if (this.rooms && this.rooms.size === 1) { + channel += this.rooms.keys().next().value + "#"; + } + + debug("publishing message to channel %s", channel); + + this.redisClient.publish(channel, msg); + + return true; + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param rooms + * @public + */ + public socketsJoin(rooms: string | string[]): void { + const request = JSON.stringify({ + type: RequestType.REMOTE_JOIN, + opts: { + rooms: [...this.rooms], + except: [...this.exceptRooms], + }, + rooms: Array.isArray(rooms) ? rooms : [rooms], + }); + + this.redisClient.publish(this.broadcastOptions.requestChannel, request); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param rooms + * @public + */ + public socketsLeave(rooms: string | string[]): void { + const request = JSON.stringify({ + type: RequestType.REMOTE_LEAVE, + opts: { + rooms: [...this.rooms], + except: [...this.exceptRooms], + }, + rooms: Array.isArray(rooms) ? rooms : [rooms], + }); + + this.redisClient.publish(this.broadcastOptions.requestChannel, request); + } + + /** + * Makes the matching socket instances disconnect + * + * @param close - whether to close the underlying connection + * @public + */ + public disconnectSockets(close: boolean = false): void { + const request = JSON.stringify({ + type: RequestType.REMOTE_DISCONNECT, + opts: { + rooms: [...this.rooms], + except: [...this.exceptRooms], + }, + close, + }); + + this.redisClient.publish(this.broadcastOptions.requestChannel, request); + } +} diff --git a/lib/index.ts b/lib/index.ts index 8bd29e1..a006758 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,6 +1,4 @@ -import { PacketType } from "socket.io-parser"; import msgpack = require("notepack.io"); -import debugModule from "debug"; import type { DefaultEventsMap, EventNames, @@ -8,28 +6,12 @@ import type { EventsMap, TypedEventBroadcaster, } from "./typed-events"; - -const debug = debugModule("socket.io-emitter"); - -const UID = "emitter"; - -/** - * Request types, for messages between nodes - */ - -enum RequestType { - SOCKETS = 0, - ALL_ROOMS = 1, - REMOTE_JOIN = 2, - REMOTE_LEAVE = 3, - REMOTE_DISCONNECT = 4, - REMOTE_FETCH = 5, - SERVER_SIDE_EMIT = 6, -} - -interface Parser { - encode: (msg: any) => any; -} +import { Parser, UID, BroadcastOptions } from "./util"; +import { BroadcastOperator, RequestType } from "./broadcast-operator"; +import { + ShardedBroadcastOperator, + MessageType, +} from "./sharded-broadcast-operator"; export interface EmitterOptions { /** @@ -41,18 +23,9 @@ export interface EmitterOptions { * Defaults to notepack.io, a MessagePack implementation. */ parser?: Parser; -} -interface BroadcastOptions { - nsp: string; - broadcastChannel: string; - requestChannel: string; - parser: Parser; -} - -interface BroadcastFlags { - volatile?: boolean; - compress?: boolean; + sharded?: boolean; + subscriptionMode?: string; } export class Emitter { @@ -77,6 +50,11 @@ export class Emitter { requestChannel: this.opts.key + "-request#" + nsp + "#", parser: this.opts.parser, }; + if (this.opts.sharded) { + this.broadcastOptions = Object.assign(this.broadcastOptions, { + subscriptionMode: this.opts.subscriptionMode ?? "dynamic", + }); + } } /** @@ -103,7 +81,7 @@ export class Emitter { ev: Ev, ...args: EventParams ): true { - return new BroadcastOperator( + return this.newBroadcastOperator( this.redisClient, this.broadcastOptions ).emit(ev, ...args); @@ -116,10 +94,11 @@ export class Emitter { * @return BroadcastOperator * @public */ - public to(room: string | string[]): BroadcastOperator { - return new BroadcastOperator(this.redisClient, this.broadcastOptions).to( - room - ); + public to(room: string | string[]): TypedEventBroadcaster { + return this.newBroadcastOperator( + this.redisClient, + this.broadcastOptions + ).to(room); } /** @@ -129,10 +108,11 @@ export class Emitter { * @return BroadcastOperator * @public */ - public in(room: string | string[]): BroadcastOperator { - return new BroadcastOperator(this.redisClient, this.broadcastOptions).in( - room - ); + public in(room: string | string[]): TypedEventBroadcaster { + return this.newBroadcastOperator( + this.redisClient, + this.broadcastOptions + ).in(room); } /** @@ -142,8 +122,8 @@ export class Emitter { * @return BroadcastOperator * @public */ - public except(room: string | string[]): BroadcastOperator { - return new BroadcastOperator( + public except(room: string | string[]): TypedEventBroadcaster { + return this.newBroadcastOperator( this.redisClient, this.broadcastOptions ).except(room); @@ -157,8 +137,8 @@ export class Emitter { * @return BroadcastOperator * @public */ - public get volatile(): BroadcastOperator { - return new BroadcastOperator(this.redisClient, this.broadcastOptions) + public get volatile(): TypedEventBroadcaster { + return this.newBroadcastOperator(this.redisClient, this.broadcastOptions) .volatile; } @@ -169,8 +149,8 @@ export class Emitter { * @return BroadcastOperator * @public */ - public compress(compress: boolean): BroadcastOperator { - return new BroadcastOperator( + public compress(compress: boolean): TypedEventBroadcaster { + return this.newBroadcastOperator( this.redisClient, this.broadcastOptions ).compress(compress); @@ -183,7 +163,7 @@ export class Emitter { * @public */ public socketsJoin(rooms: string | string[]): void { - return new BroadcastOperator( + return this.newBroadcastOperator( this.redisClient, this.broadcastOptions ).socketsJoin(rooms); @@ -196,7 +176,7 @@ export class Emitter { * @public */ public socketsLeave(rooms: string | string[]): void { - return new BroadcastOperator( + return this.newBroadcastOperator( this.redisClient, this.broadcastOptions ).socketsLeave(rooms); @@ -209,7 +189,7 @@ export class Emitter { * @public */ public disconnectSockets(close: boolean = false): void { - return new BroadcastOperator( + return this.newBroadcastOperator( this.redisClient, this.broadcastOptions ).disconnectSockets(close); @@ -227,224 +207,35 @@ export class Emitter { throw new Error("Acknowledgements are not supported"); } - const request = JSON.stringify({ - uid: UID, - type: RequestType.SERVER_SIDE_EMIT, - data: args, - }); - - this.redisClient.publish(this.broadcastOptions.requestChannel, request); - } -} - -export const RESERVED_EVENTS: ReadonlySet = new Set([ - "connect", - "connect_error", - "disconnect", - "disconnecting", - "newListener", - "removeListener", -]); - -export class BroadcastOperator - implements TypedEventBroadcaster { - constructor( - private readonly redisClient: any, - private readonly broadcastOptions: BroadcastOptions, - private readonly rooms: Set = new Set(), - private readonly exceptRooms: Set = new Set(), - private readonly flags: BroadcastFlags = {} - ) {} - - /** - * Targets a room when emitting. - * - * @param room - * @return a new BroadcastOperator instance - * @public - */ - public to(room: string | string[]): BroadcastOperator { - const rooms = new Set(this.rooms); - if (Array.isArray(room)) { - room.forEach((r) => rooms.add(r)); + if (this.opts.sharded) { + const shardedMessage = { + uid: UID, + type: MessageType.SERVER_SIDE_EMIT, + data: { + packet: args, + }, + }; + const broadcaster = this.newBroadcastOperator( + this.redisClient, + this.broadcastOptions + ) as ShardedBroadcastOperator; + broadcaster.publishMessage(shardedMessage); } else { - rooms.add(room); - } - return new BroadcastOperator( - this.redisClient, - this.broadcastOptions, - rooms, - this.exceptRooms, - this.flags - ); - } - - /** - * Targets a room when emitting. - * - * @param room - * @return a new BroadcastOperator instance - * @public - */ - public in(room: string | string[]): BroadcastOperator { - return this.to(room); - } - - /** - * Excludes a room when emitting. - * - * @param room - * @return a new BroadcastOperator instance - * @public - */ - public except(room: string | string[]): BroadcastOperator { - const exceptRooms = new Set(this.exceptRooms); - if (Array.isArray(room)) { - room.forEach((r) => exceptRooms.add(r)); - } else { - exceptRooms.add(room); - } - return new BroadcastOperator( - this.redisClient, - this.broadcastOptions, - this.rooms, - exceptRooms, - this.flags - ); - } - - /** - * Sets the compress flag. - * - * @param compress - if `true`, compresses the sending data - * @return a new BroadcastOperator instance - * @public - */ - public compress(compress: boolean): BroadcastOperator { - const flags = Object.assign({}, this.flags, { compress }); - return new BroadcastOperator( - this.redisClient, - this.broadcastOptions, - this.rooms, - this.exceptRooms, - flags - ); - } - - /** - * Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to - * receive messages (because of network slowness or other issues, or because they’re connected through long polling - * and is in the middle of a request-response cycle). - * - * @return a new BroadcastOperator instance - * @public - */ - public get volatile(): BroadcastOperator { - const flags = Object.assign({}, this.flags, { volatile: true }); - return new BroadcastOperator( - this.redisClient, - this.broadcastOptions, - this.rooms, - this.exceptRooms, - flags - ); - } - - /** - * Emits to all clients. - * - * @return Always true - * @public - */ - public emit>( - ev: Ev, - ...args: EventParams - ): true { - if (RESERVED_EVENTS.has(ev)) { - throw new Error(`"${ev}" is a reserved event name`); - } - - // set up packet object - const data = [ev, ...args]; - const packet = { - type: PacketType.EVENT, - data: data, - nsp: this.broadcastOptions.nsp, - }; - - const opts = { - rooms: [...this.rooms], - flags: this.flags, - except: [...this.exceptRooms], - }; - - const msg = this.broadcastOptions.parser.encode([UID, packet, opts]); - let channel = this.broadcastOptions.broadcastChannel; - if (this.rooms && this.rooms.size === 1) { - channel += this.rooms.keys().next().value + "#"; + const request = JSON.stringify({ + uid: UID, + type: RequestType.SERVER_SIDE_EMIT, + data: args, + }); + this.redisClient.publish(this.broadcastOptions.requestChannel, request); } - - debug("publishing message to channel %s", channel); - - this.redisClient.publish(channel, msg); - - return true; } - /** - * Makes the matching socket instances join the specified rooms - * - * @param rooms - * @public - */ - public socketsJoin(rooms: string | string[]): void { - const request = JSON.stringify({ - type: RequestType.REMOTE_JOIN, - opts: { - rooms: [...this.rooms], - except: [...this.exceptRooms], - }, - rooms: Array.isArray(rooms) ? rooms : [rooms], - }); - - this.redisClient.publish(this.broadcastOptions.requestChannel, request); - } - - /** - * Makes the matching socket instances leave the specified rooms - * - * @param rooms - * @public - */ - public socketsLeave(rooms: string | string[]): void { - const request = JSON.stringify({ - type: RequestType.REMOTE_LEAVE, - opts: { - rooms: [...this.rooms], - except: [...this.exceptRooms], - }, - rooms: Array.isArray(rooms) ? rooms : [rooms], - }); - - this.redisClient.publish(this.broadcastOptions.requestChannel, request); - } - - /** - * Makes the matching socket instances disconnect - * - * @param close - whether to close the underlying connection - * @public - */ - public disconnectSockets(close: boolean = false): void { - const request = JSON.stringify({ - type: RequestType.REMOTE_DISCONNECT, - opts: { - rooms: [...this.rooms], - except: [...this.exceptRooms], - }, - close, - }); - - this.redisClient.publish(this.broadcastOptions.requestChannel, request); + private newBroadcastOperator( + redisClient: any, + broadcastOptions: any + ): TypedEventBroadcaster { + return this.opts.sharded + ? new ShardedBroadcastOperator(redisClient, broadcastOptions) + : new BroadcastOperator(redisClient, broadcastOptions); } } diff --git a/lib/sharded-broadcast-operator.ts b/lib/sharded-broadcast-operator.ts new file mode 100644 index 0000000..ab397f8 --- /dev/null +++ b/lib/sharded-broadcast-operator.ts @@ -0,0 +1,309 @@ +import debugModule from "debug"; +import { PacketType } from "socket.io-parser"; +import type { + EventNames, + EventParams, + EventsMap, + TypedEventBroadcaster, +} from "./typed-events"; +import { + SPUBLISH, + hasBinary, + UID, + RESERVED_EVENTS, + BroadcastOptions, +} from "./util"; + +const debug = debugModule("socket.io-emitter"); + +export type Room = string; + +interface ShardedBroadcastFlags { + volatile?: boolean; + compress?: boolean; +} + +function encodeOptions(opts: { + rooms: Set; + except?: Set; + flags?: ShardedBroadcastFlags; +}) { + return { + rooms: [...opts.rooms], + except: [...opts.except], + flags: opts.flags, + }; +} + +export enum MessageType { + INITIAL_HEARTBEAT = 1, + HEARTBEAT, + BROADCAST, + SOCKETS_JOIN, + SOCKETS_LEAVE, + DISCONNECT_SOCKETS, + FETCH_SOCKETS, + FETCH_SOCKETS_RESPONSE, + SERVER_SIDE_EMIT, + SERVER_SIDE_EMIT_RESPONSE, + BROADCAST_CLIENT_COUNT, + BROADCAST_ACK, +} + +export interface ClusterMessage { + uid: string; + type: MessageType; + data?: Record; +} + +export class ShardedBroadcastOperator + implements TypedEventBroadcaster { + constructor( + private readonly redisClient: any, + private readonly broadcastOptions: BroadcastOptions, + private readonly rooms: Set = new Set(), + private readonly exceptRooms: Set = new Set(), + private readonly flags: ShardedBroadcastFlags = {} + ) {} + + /** + * Targets a room when emitting. + * + * @param room + * @return a new ShardedBroadcastOperator instance + * @public + */ + public to(room: string | string[]): ShardedBroadcastOperator { + const rooms = new Set(this.rooms); + if (Array.isArray(room)) { + room.forEach((r) => rooms.add(r)); + } else { + rooms.add(room); + } + return new ShardedBroadcastOperator( + this.redisClient, + this.broadcastOptions, + rooms, + this.exceptRooms, + this.flags + ); + } + + /** + * Targets a room when emitting. + * + * @param room + * @return a new BroadcastOperator instance + * @public + */ + public in(room: string | string[]): ShardedBroadcastOperator { + return this.to(room); + } + + /** + * Excludes a room when emitting. + * + * @param room + * @return a new ShardedBroadcastOperator instance + * @public + */ + public except(room: string | string[]): ShardedBroadcastOperator { + const exceptRooms = new Set(this.exceptRooms); + if (Array.isArray(room)) { + room.forEach((r) => exceptRooms.add(r)); + } else { + exceptRooms.add(room); + } + return new ShardedBroadcastOperator( + this.redisClient, + this.broadcastOptions, + this.rooms, + exceptRooms, + this.flags + ); + } + + /** + * Sets the compress flag. + * + * @param compress - if `true`, compresses the sending data + * @return a new ShardedBroadcastOperator instance + * @public + */ + public compress(compress: boolean): ShardedBroadcastOperator { + const flags = Object.assign({}, this.flags, { compress }); + return new ShardedBroadcastOperator( + this.redisClient, + this.broadcastOptions, + this.rooms, + this.exceptRooms, + flags + ); + } + + /** + * Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to + * receive messages (because of network slowness or other issues, or because they’re connected through long polling + * and is in the middle of a request-response cycle). + * + * @return a new ShardedBroadcastOperator instance + * @public + */ + public get volatile(): ShardedBroadcastOperator { + const flags = Object.assign({}, this.flags, { volatile: true }); + return new ShardedBroadcastOperator( + this.redisClient, + this.broadcastOptions, + this.rooms, + this.exceptRooms, + flags + ); + } + + /** + * Emits to all clients. + * + * @return Always true + * @public + */ + public emit>( + ev: Ev, + ...args: EventParams + ): true { + if (RESERVED_EVENTS.has(ev)) { + throw new Error(`"${ev}" is a reserved event name`); + } + + // set up packet object + const data = [ev, ...args]; + const packet = { + type: PacketType.EVENT, + data: data, + nsp: this.broadcastOptions.nsp, + }; + + const msg = { + type: MessageType.BROADCAST, + uid: UID, + data: { + packet, + opts: encodeOptions({ + rooms: this.rooms, + except: this.exceptRooms, + flags: this.flags, + }), + }, + }; + + this.publishMessage(msg); + return true; + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param rooms + * @public + */ + public socketsJoin(rooms: string | string[]): void { + const msg = { + type: MessageType.SOCKETS_JOIN, + data: { + opts: encodeOptions({ + rooms: this.rooms, + except: this.exceptRooms, + flags: this.flags, + }), + rooms, + }, + }; + this.publishMessage(msg); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param rooms + * @public + */ + public socketsLeave(rooms: string | string[]): void { + const msg = { + type: MessageType.SOCKETS_LEAVE, + data: { + opts: encodeOptions({ + rooms: this.rooms, + except: this.exceptRooms, + flags: this.flags, + }), + rooms, + }, + }; + this.publishMessage(msg); + } + + /** + * Makes the matching socket instances disconnect + * + * @param close - whether to close the underlying connection + * @public + */ + public disconnectSockets(close: boolean = false): void { + const msg = { + type: MessageType.DISCONNECT_SOCKETS, + data: { + opts: encodeOptions({ + rooms: this.rooms, + except: this.exceptRooms, + flags: this.flags, + }), + close, + }, + }; + this.publishMessage(msg); + } + + public publishMessage(message: Omit) { + const msg: ClusterMessage = { + ...message, + uid: UID, + }; + const channel = this.computeChannel(message); + debug("publishing message of type %s to %s", message.type, channel); + SPUBLISH(this.redisClient, channel, this.encode(msg)); + } + + private encode(message: ClusterMessage) { + const mayContainBinary = [ + MessageType.BROADCAST, + MessageType.BROADCAST_ACK, + MessageType.FETCH_SOCKETS_RESPONSE, + MessageType.SERVER_SIDE_EMIT, + MessageType.SERVER_SIDE_EMIT_RESPONSE, + ].includes(message.type); + + if (mayContainBinary && hasBinary(message.data)) { + return this.broadcastOptions.parser.encode(message); + } else { + return JSON.stringify(message); + } + } + + private computeChannel(message) { + // broadcast with ack can not use a dynamic channel, because the serverCount() method return the number of all + // servers, not only the ones where the given room exists + const useDynamicChannel = + this.broadcastOptions.subscriptionMode === "dynamic" && + message.type === MessageType.BROADCAST && + message.data.requestId === undefined && + message.data.opts.rooms.length === 1; + if (useDynamicChannel) { + return this.dynamicChannel(message.data.opts.rooms[0]); + } else { + return this.broadcastOptions.broadcastChannel; + } + } + + private dynamicChannel(room) { + return this.broadcastOptions.broadcastChannel + room + "#"; + } +} diff --git a/lib/typed-events.ts b/lib/typed-events.ts index 33f7bb8..841f14a 100644 --- a/lib/typed-events.ts +++ b/lib/typed-events.ts @@ -30,8 +30,16 @@ export type EventParams< * strictly typed `emit` method. */ export interface TypedEventBroadcaster { + in(room: string | string[]): TypedEventBroadcaster; + to(room: string | string[]): TypedEventBroadcaster; + except(room: string | string[]): TypedEventBroadcaster; + compress(compress: boolean): TypedEventBroadcaster; + readonly volatile: TypedEventBroadcaster; emit>( ev: Ev, ...args: EventParams - ): boolean; + ): true; + socketsJoin(rooms: string | string[]): void; + socketsLeave(rooms: string | string[]): void; + disconnectSockets(close: boolean): void; } diff --git a/lib/util.ts b/lib/util.ts new file mode 100644 index 0000000..3c77f6e --- /dev/null +++ b/lib/util.ts @@ -0,0 +1,81 @@ +export const UID = "emitter"; + +export interface Parser { + encode: (msg: any) => any; +} + +export interface BroadcastFlags { + volatile?: boolean; + compress?: boolean; +} + +export const RESERVED_EVENTS: ReadonlySet = new Set([ + "connect", + "connect_error", + "disconnect", + "disconnecting", + "newListener", + "removeListener", +]); + +export interface BroadcastOptions { + nsp: string; + broadcastChannel: string; + requestChannel: string; + parser: Parser; + subscriptionMode?: string; +} + +/** + * Whether the client comes from the `redis` package + * + * @param redisClient + * + * @see https://github.com/redis/node-redis + */ +function isRedisV4Client(redisClient: any) { + return typeof redisClient.sSubscribe === "function"; +} + +export function SPUBLISH( + redisClient: any, + channel: string, + payload: string | Uint8Array +) { + if (isRedisV4Client(redisClient)) { + redisClient.sPublish(channel, payload); + } else { + redisClient.spublish(channel, payload); + } +} + +export function hasBinary(obj: any, toJSON?: boolean): boolean { + if (!obj || typeof obj !== "object") { + return false; + } + + if (obj instanceof ArrayBuffer || ArrayBuffer.isView(obj)) { + return true; + } + + if (Array.isArray(obj)) { + for (let i = 0, l = obj.length; i < l; i++) { + if (hasBinary(obj[i])) { + return true; + } + } + return false; + } + + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key) && hasBinary(obj[key])) { + return true; + } + } + + if (obj.toJSON && typeof obj.toJSON === "function" && !toJSON) { + return hasBinary(obj.toJSON(), true); + } + + return false; +}