diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 0f58ddab52..91b7154d0a 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -52,7 +52,7 @@ export default class RedisCommandsQueue { readonly decoder; readonly #pubSub = new PubSub(); readonly #pushHandlers: Map) => unknown> = new Map(); - readonly #builtInSet = new Set; + readonly #builtInSet: ReadonlySet; get isPubSubActive() { return this.#pubSub.isActive; @@ -76,9 +76,11 @@ export default class RedisCommandsQueue { this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub)); this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this)); this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this)); - + + const s = new Set(); + this.#builtInSet = s; for (const str in this.#pushHandlers.keys) { - this.#builtInSet.add(str); + s.add(str); } this.decoder = this.#initiateDecoder(); @@ -122,6 +124,14 @@ export default class RedisCommandsQueue { this.#pushHandlers.set(messageType, handler); } + removePushHandler(messageType: string) { + if (this.#builtInSet.has(messageType)) { + throw new Error("Cannot override built in push message handler"); + } + + this.#pushHandlers.delete(messageType); + } + #onPush(push: Array) { const handler = this.#pushHandlers.get(push[0].toString()); if (handler) { @@ -141,9 +151,7 @@ export default class RedisCommandsQueue { onReply: reply => this.#onReply(reply), onErrorReply: err => this.#onErrorReply(err), onPush: push => { - if (!this.#onPush(push)) { - - } + return this.#onPush(push); }, getTypeMapping: () => this.#getTypeMapping() }); diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 2fd689b9d7..47cb0d62d0 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -9,6 +9,7 @@ import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec' import { RESP_TYPES } from '../RESP/decoder'; import { BlobStringReply, NumberReply } from '../RESP/types'; import { SortedSetMember } from '../commands/generic-transformers'; +import { createClient } from '../..'; export const SQUARE_SCRIPT = defineScript({ SCRIPT: @@ -769,4 +770,79 @@ describe('Client', () => { } }, GLOBAL.SERVERS.OPEN); }); + + describe('Push Handlers', () => { + testUtils.testWithClient('RESP2: add/remove invalidate handler, and validate its called', async client => { + const key = 'x' + + const duplicate = await client.duplicate().connect(); + try { + const id = await duplicate.clientId(); + + let nodeResolve; + + const promise = new Promise((res) => { + nodeResolve = res; + }); + + duplicate.addPushHandler("invalidate", (push: Array) => { + assert.equal(push[0].toString(), "invalidate"); + assert.notEqual(push[1], null); + assert.equal(push[1].length, 1); + assert.equal(push[1][0].toString(), key); + // this test removing the handler, + // as flushAll in cleanup of test will issue a full invalidate, + // which would fail if this handler is called on it + duplicate.removePushHandler("invalidate"); + nodeResolve(); + }) + + await client.sendCommand(['CLIENT', 'TRACKING', 'ON', 'REDIRECT', id.toString()]); + await client.get(key); + await client.set(key, '1'); + + // force an invalidate all + await client.flushAll(); + + await nodeResolve; + } finally { + duplicate.destroy(); + } + }, { + ...GLOBAL.SERVERS.OPEN + }); + + testUtils.testWithClient('RESP3: add/remove invalidate handler, and validate its called', async client => { + const key = 'x' + + let nodeResolve; + + const promise = new Promise((res) => { + nodeResolve = res; + }); + + client.addPushHandler("invalidate", (push: Array) => { + assert.equal(push[0].toString(), "invalidate"); + assert.equal(push[1].length, 1); + assert.equal(push[1].length, 1); + assert.equal(push[1][0].toString(), key); + // this test removing the handler, + // as flushAll in cleanup of test will issue a full invalidate, + // which would fail if this handler is called on it + client.removePushHandler("invalidate"); + nodeResolve(); + }) + + await client.sendCommand(['CLIENT', 'TRACKING', 'ON']); + await client.get(key); + await client.set(key, '1'); + + await nodeResolve; + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3 + } + }); + }); }); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 3efa793eeb..7ffacc4f3f 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -573,6 +573,14 @@ export default class RedisClient< return this as unknown as RedisClientType; } + addPushHandler(messageType: string, handler: (pushMsg: Array) => unknown) { + this._self.#queue.addPushHandler(messageType, handler); + } + + removePushHandler(messageType: string) { + this._self.#queue.removePushHandler(messageType); + } + sendCommand( args: Array, options?: CommandOptions