From 4abe2c3b82b331ced179a2c6e921e81fa4933ef1 Mon Sep 17 00:00:00 2001 From: MDSLKTR Date: Mon, 8 May 2023 22:29:48 +0200 Subject: [PATCH 1/3] fix regression when timeout is marginal See https://github.com/tulios/kafkajs/issues/1556 fixes --- src/network/requestQueue/index.js | 22 ++++++++++++++-------- src/network/requestQueue/index.spec.js | 1 - 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/network/requestQueue/index.js b/src/network/requestQueue/index.js index eee14d66c..076c19504 100644 --- a/src/network/requestQueue/index.js +++ b/src/network/requestQueue/index.js @@ -9,7 +9,6 @@ const PRIVATE = { } const REQUEST_QUEUE_EMPTY = 'requestQueueEmpty' -const CHECK_PENDING_REQUESTS_INTERVAL = 10 module.exports = class RequestQueue extends EventEmitter { /** @@ -56,6 +55,13 @@ module.exports = class RequestQueue extends EventEmitter { */ this.throttledUntil = -1 + /** + * Current timestamp when the throttling was set + * + * @type {number} + */ + this.throttleCurrentTimestamp = 0 + /** * Timeout id if we have scheduled a check for pending requests due to client-side throttling * @@ -105,7 +111,8 @@ module.exports = class RequestQueue extends EventEmitter { maybeThrottle(clientSideThrottleTime) { if (clientSideThrottleTime !== null && clientSideThrottleTime > 0) { this.logger.debug(`Client side throttling in effect for ${clientSideThrottleTime}ms`) - const minimumThrottledUntil = Date.now() + clientSideThrottleTime + this.throttleCurrentTimestamp = Date.now() + const minimumThrottledUntil = this.throttleCurrentTimestamp + clientSideThrottleTime this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil) } } @@ -309,15 +316,14 @@ module.exports = class RequestQueue extends EventEmitter { // will be fine, and potentially fix up a new timeout if needed at that time. // Note that if we're merely "overloaded" by having too many inflight requests // we will anyways check the queue when one of them gets fulfilled. - let scheduleAt = this.throttledUntil - Date.now() - if (!this.throttleCheckTimeoutId) { - if (this.pending.length > 0) { - scheduleAt = scheduleAt > 0 ? scheduleAt : CHECK_PENDING_REQUESTS_INTERVAL - } + const timeUntilUnthrottled = this.throttledUntil - this.throttleCurrentTimestamp + + if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) { this.throttleCheckTimeoutId = setTimeout(() => { this.throttleCheckTimeoutId = null + this.throttleCurrentTimestamp = Date.now() this.checkPendingRequests() - }, scheduleAt) + }, 0) } } } diff --git a/src/network/requestQueue/index.spec.js b/src/network/requestQueue/index.spec.js index 238fbf310..dbfe0e28e 100644 --- a/src/network/requestQueue/index.spec.js +++ b/src/network/requestQueue/index.spec.js @@ -225,7 +225,6 @@ describe('Network > RequestQueue', () => { const before = Date.now() const clientSideThrottleTime = 1 requestQueue.maybeThrottle(clientSideThrottleTime) - // Sleep until the marginal delay is passed before calling scheduleCheckPendingRequests() await sleep(clientSideThrottleTime) requestQueue.scheduleCheckPendingRequests() From 463076d45400833571fb66005347a4234e204794 Mon Sep 17 00:00:00 2001 From: MDSLKTR Date: Thu, 11 May 2023 12:50:11 +0200 Subject: [PATCH 2/3] always use the calculated scheduled timeout or 0 --- src/network/requestQueue/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/network/requestQueue/index.js b/src/network/requestQueue/index.js index 076c19504..8ab6997ae 100644 --- a/src/network/requestQueue/index.js +++ b/src/network/requestQueue/index.js @@ -323,7 +323,7 @@ module.exports = class RequestQueue extends EventEmitter { this.throttleCheckTimeoutId = null this.throttleCurrentTimestamp = Date.now() this.checkPendingRequests() - }, 0) + }, Math.max(timeUntilUnthrottled, 0)) } } } From 600ec04d31f566ae2c091f9481505644ab9f4308 Mon Sep 17 00:00:00 2001 From: Simon Kunz Date: Tue, 20 Jun 2023 15:33:27 +0200 Subject: [PATCH 3/3] Update src/network/requestQueue/index.js Co-authored-by: Siim Sams --- src/network/requestQueue/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/network/requestQueue/index.js b/src/network/requestQueue/index.js index 8ab6997ae..f64eadba7 100644 --- a/src/network/requestQueue/index.js +++ b/src/network/requestQueue/index.js @@ -323,7 +323,7 @@ module.exports = class RequestQueue extends EventEmitter { this.throttleCheckTimeoutId = null this.throttleCurrentTimestamp = Date.now() this.checkPendingRequests() - }, Math.max(timeUntilUnthrottled, 0)) + }, timeUntilUnthrottled) } } }