From 5624cfafbb5b05a1bc26fe815f67c78f3a33d13b Mon Sep 17 00:00:00 2001 From: Dan Caragea Date: Fri, 9 Jun 2023 16:33:26 +0200 Subject: [PATCH] Fix regression when throttle timeout is marginal (#2) * fix regression when timeout is marginal See https://github.com/tulios/kafkajs/issues/1556 fixes * always use the calculated scheduled timeout or 0 --------- Co-authored-by: MDSLKTR --- src/network/requestQueue/index.js | 22 ++++++++++++++-------- src/network/requestQueue/index.spec.js | 1 - 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/network/requestQueue/index.js b/src/network/requestQueue/index.js index eee14d66c..8ab6997ae 100644 --- a/src/network/requestQueue/index.js +++ b/src/network/requestQueue/index.js @@ -9,7 +9,6 @@ const PRIVATE = { } const REQUEST_QUEUE_EMPTY = 'requestQueueEmpty' -const CHECK_PENDING_REQUESTS_INTERVAL = 10 module.exports = class RequestQueue extends EventEmitter { /** @@ -56,6 +55,13 @@ module.exports = class RequestQueue extends EventEmitter { */ this.throttledUntil = -1 + /** + * Current timestamp when the throttling was set + * + * @type {number} + */ + this.throttleCurrentTimestamp = 0 + /** * Timeout id if we have scheduled a check for pending requests due to client-side throttling * @@ -105,7 +111,8 @@ module.exports = class RequestQueue extends EventEmitter { maybeThrottle(clientSideThrottleTime) { if (clientSideThrottleTime !== null && clientSideThrottleTime > 0) { this.logger.debug(`Client side throttling in effect for ${clientSideThrottleTime}ms`) - const minimumThrottledUntil = Date.now() + clientSideThrottleTime + this.throttleCurrentTimestamp = Date.now() + const minimumThrottledUntil = this.throttleCurrentTimestamp + clientSideThrottleTime this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil) } } @@ -309,15 +316,14 @@ module.exports = class RequestQueue extends EventEmitter { // will be fine, and potentially fix up a new timeout if needed at that time. // Note that if we're merely "overloaded" by having too many inflight requests // we will anyways check the queue when one of them gets fulfilled. - let scheduleAt = this.throttledUntil - Date.now() - if (!this.throttleCheckTimeoutId) { - if (this.pending.length > 0) { - scheduleAt = scheduleAt > 0 ? scheduleAt : CHECK_PENDING_REQUESTS_INTERVAL - } + const timeUntilUnthrottled = this.throttledUntil - this.throttleCurrentTimestamp + + if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) { this.throttleCheckTimeoutId = setTimeout(() => { this.throttleCheckTimeoutId = null + this.throttleCurrentTimestamp = Date.now() this.checkPendingRequests() - }, scheduleAt) + }, Math.max(timeUntilUnthrottled, 0)) } } } diff --git a/src/network/requestQueue/index.spec.js b/src/network/requestQueue/index.spec.js index 238fbf310..dbfe0e28e 100644 --- a/src/network/requestQueue/index.spec.js +++ b/src/network/requestQueue/index.spec.js @@ -225,7 +225,6 @@ describe('Network > RequestQueue', () => { const before = Date.now() const clientSideThrottleTime = 1 requestQueue.maybeThrottle(clientSideThrottleTime) - // Sleep until the marginal delay is passed before calling scheduleCheckPendingRequests() await sleep(clientSideThrottleTime) requestQueue.scheduleCheckPendingRequests()