From 1bb12bed3eec8e1a24b5577e1dc289b4cf43a2fc Mon Sep 17 00:00:00 2001 From: 1u0n <1u0n@users.noreply.github.com> Date: Sun, 31 Dec 2023 11:57:21 +0100 Subject: [PATCH] enable fetcher immediate stop consumer.disconnect() waits for all fetchers' ongoing fetch() to return before unsubscribing and closing up. If maxWaitTimeInMs is high, this can take long. This change makes the fetcher stop immediately not waiting for fetch() to return, resulting in fast and clean disconnects -- only significant if you're using high maxWaitTimeInMs. --- src/consumer/fetcher.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/consumer/fetcher.js b/src/consumer/fetcher.js index fcea2ba4c..878e52515 100644 --- a/src/consumer/fetcher.js +++ b/src/consumer/fetcher.js @@ -49,10 +49,11 @@ const createFetcher = ({ const start = async () => { if (isRunning) return isRunning = true + const stopPromise = new Promise(resolve => emitter.once('stop', resolve)) while (isRunning) { try { - const batches = await fetch(nodeId) + const batches = await Promise.any([fetch(nodeId), stopPromise]) if (isRunning) { const availableBatches = filterUnassignedBatches(batches) @@ -77,7 +78,9 @@ const createFetcher = ({ const stop = async () => { if (!isRunning) return isRunning = false - await new Promise(resolve => emitter.once('end', () => resolve())) + const endPromise = new Promise(resolve => emitter.once('end', resolve)) + emitter.emit('stop'); + await endPromise; } return { start, stop, getWorkerQueue }