From e8c09880281822e689468f459fbb755f9464dcad Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 4 Apr 2024 22:14:20 +0300 Subject: [PATCH] add push message handler registration and make all pubsub use it. --- packages/client/lib/client/commands-queue.ts | 65 +++++++++++++++----- packages/client/lib/client/pub-sub.ts | 49 +++++++-------- 2 files changed, 72 insertions(+), 42 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 15e8a747b9..8be44ef5cb 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -2,7 +2,7 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-l import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; -import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; +import { COMMANDS, ChannelListeners, PUBSUB_TYPE, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; import { MonitorCallback } from '.'; @@ -51,6 +51,8 @@ export default class RedisCommandsQueue { #chainInExecution: symbol | undefined; readonly decoder; readonly #pubSub = new PubSub(); + readonly #pushHandlers: Map) => unknown> = new Map(); + readonly #builtInSet = new Set; get isPubSubActive() { return this.#pubSub.isActive; @@ -64,6 +66,21 @@ export default class RedisCommandsQueue { this.#respVersion = respVersion; this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; + + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this)); + + for (const str in this.#pushHandlers.keys) { + this.#builtInSet.add(str); + } + this.decoder = this.#initiateDecoder(); } @@ -75,28 +92,44 @@ export default class RedisCommandsQueue { this.#waitingForReply.shift()!.reject(err); } - #onPush(push: Array) { - // TODO: type - if (this.#pubSub.handleMessageReply(push)) return true; - - const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push); - if (isShardedUnsubscribe && !this.#waitingForReply.length) { + #handleStatusReply(push: Array) { + const head = this.#waitingForReply.head!.value; + if ( + (Number.isNaN(head.channelsCounter!) && push[2] === 0) || + --head.channelsCounter! === 0 + ) { + this.#waitingForReply.shift()!.resolve(); + } + } + + #handleShardedUnsubscribe(push: Array) { + if (!this.#waitingForReply.length) { const channel = push[1].toString(); this.#onShardedChannelMoved( channel, this.#pubSub.removeShardedListeners(channel) ); - return true; - } else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) { - const head = this.#waitingForReply.head!.value; - if ( - (Number.isNaN(head.channelsCounter!) && push[2] === 0) || - --head.channelsCounter! === 0 - ) { - this.#waitingForReply.shift()!.resolve(); - } + } else { + this.#handleStatusReply(push); + } + } + + addPushHandler(messageType: string, handler: (pushMsg: Array) => unknown) { + if (this.#builtInSet.has(messageType)) { + throw new Error("Cannot override built in push message handler"); + } + + this.#pushHandlers.set(messageType, handler); + } + + #onPush(push: Array) { + const handler = this.#pushHandlers.get(push[0].toString()); + if (handler) { + handler(push); return true; } + + return false; } #getTypeMapping() { diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index 1387aea841..246707953e 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -11,7 +11,7 @@ export type PUBSUB_TYPE = typeof PUBSUB_TYPE; export type PubSubType = PUBSUB_TYPE[keyof PUBSUB_TYPE]; -const COMMANDS = { +export const COMMANDS = { [PUBSUB_TYPE.CHANNELS]: { subscribe: Buffer.from('subscribe'), unsubscribe: Buffer.from('unsubscribe'), @@ -344,32 +344,29 @@ export class PubSub { return commands; } - handleMessageReply(reply: Array): boolean { - if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) { - this.#emitPubSubMessage( - PUBSUB_TYPE.CHANNELS, - reply[2], - reply[1] - ); - return true; - } else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) { - this.#emitPubSubMessage( - PUBSUB_TYPE.PATTERNS, - reply[3], - reply[2], - reply[1] - ); - return true; - } else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) { - this.#emitPubSubMessage( - PUBSUB_TYPE.SHARDED, - reply[2], - reply[1] - ); - return true; - } + handleMessageReplyChannel(push: Array) { + this.#emitPubSubMessage( + PUBSUB_TYPE.CHANNELS, + push[2], + push[1] + ); + } - return false; + handleMessageReplyPattern(push: Array) { + this.#emitPubSubMessage( + PUBSUB_TYPE.PATTERNS, + push[3], + push[2], + push[1] + ); + } + + handleMessageReplySharded(push: Array) { + this.#emitPubSubMessage( + PUBSUB_TYPE.SHARDED, + push[2], + push[1] + ); } removeShardedListeners(channel: string): ChannelListeners {