From 4625d85cdf5ce7c72bf9314946b659f325449649 Mon Sep 17 00:00:00 2001 From: arszen123 <aliarszen@gmail.com> Date: Mon, 4 Jul 2022 12:34:04 +0200 Subject: [PATCH 1/2] Fix consumer connection issue --- src/consumer/fetchManager.js | 6 +++++- src/consumer/fetchManager.spec.js | 8 +++++++- src/consumer/runner.js | 2 +- src/errors.js | 8 ++++++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/consumer/fetchManager.js b/src/consumer/fetchManager.js index 309a0986e..6b5cf65e4 100644 --- a/src/consumer/fetchManager.js +++ b/src/consumer/fetchManager.js @@ -2,7 +2,7 @@ const seq = require('../utils/seq') const createFetcher = require('./fetcher') const createWorker = require('./worker') const createWorkerQueue = require('./workerQueue') -const { KafkaJSFetcherRebalanceError } = require('../errors') +const { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError } = require('../errors') /** @typedef {ReturnType<typeof createFetchManager>} FetchManager */ @@ -34,6 +34,10 @@ const createFetchManager = ({ const nodeIds = getNodeIds() const partitionAssignments = new Map() + if (nodeIds.length === 0) { + throw new KafkaJSNoBrokerAvailableError() + } + const validateShouldRebalance = () => { const current = getNodeIds() const hasChanged = diff --git a/src/consumer/fetchManager.spec.js b/src/consumer/fetchManager.spec.js index 9dbf26df8..d6d14d2d0 100644 --- a/src/consumer/fetchManager.spec.js +++ b/src/consumer/fetchManager.spec.js @@ -4,7 +4,7 @@ const createFetchManager = require('./fetchManager') const Batch = require('./batch') const { newLogger } = require('testHelpers') const waitFor = require('../utils/waitFor') -const { KafkaJSNonRetriableError } = require('../errors') +const { KafkaJSNonRetriableError, KafkaJSNoBrokerAvailableError } = require('../errors') describe('FetchManager', () => { let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize @@ -94,4 +94,10 @@ describe('FetchManager', () => { await expect(fetchManagerPromise).rejects.toThrow('Node not found') }) }) + + it('should throw an error when there are no avaiblae brokers', async () => { + getNodeIds.mockImplementation(() => seq(0)) + + await expect(fetchManager.start()).rejects.toThrowError(new KafkaJSNoBrokerAvailableError()) + }) }) diff --git a/src/consumer/runner.js b/src/consumer/runner.js index 7504d9b98..8ced6fa7b 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -144,7 +144,7 @@ module.exports = class Runner extends EventEmitter { return bail(e) } - if (e.name === 'KafkaJSConnectionError') { + if (e.name === 'KafkaJSNoBrokerAvailableError') { return bail(e) } diff --git a/src/errors.js b/src/errors.js index 81a927c4f..4c30b27f8 100644 --- a/src/errors.js +++ b/src/errors.js @@ -248,6 +248,13 @@ class KafkaJSAggregateError extends Error { class KafkaJSFetcherRebalanceError extends Error {} +class KafkaJSNoBrokerAvailableError extends KafkaJSError { + constructor() { + super('No broker available') + this.name = 'KafkaJSNoBrokerAvailableError' + } +} + const isRebalancing = e => e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP' @@ -283,6 +290,7 @@ module.exports = { KafkaJSCreateTopicError, KafkaJSAggregateError, KafkaJSFetcherRebalanceError, + KafkaJSNoBrokerAvailableError, isRebalancing, isKafkaJSError, } From dc758a74eae9a6aa15ec2f0735921301d2d24fc4 Mon Sep 17 00:00:00 2001 From: Tommy Brunn <tommy.brunn@gmail.com> Date: Mon, 27 Feb 2023 13:34:45 +0100 Subject: [PATCH 2/2] Fix test name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Márcio Saeger <marcio@marciosaeger.com.br> --- src/consumer/fetchManager.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer/fetchManager.spec.js b/src/consumer/fetchManager.spec.js index d6d14d2d0..bfbe65dd0 100644 --- a/src/consumer/fetchManager.spec.js +++ b/src/consumer/fetchManager.spec.js @@ -95,7 +95,7 @@ describe('FetchManager', () => { }) }) - it('should throw an error when there are no avaiblae brokers', async () => { + it('should throw an error when there are no brokers available', async () => { getNodeIds.mockImplementation(() => seq(0)) await expect(fetchManager.start()).rejects.toThrowError(new KafkaJSNoBrokerAvailableError())