diff --git a/src/consumer/fetchManager.js b/src/consumer/fetchManager.js index 309a0986e..6b5cf65e4 100644 --- a/src/consumer/fetchManager.js +++ b/src/consumer/fetchManager.js @@ -2,7 +2,7 @@ const seq = require('../utils/seq') const createFetcher = require('./fetcher') const createWorker = require('./worker') const createWorkerQueue = require('./workerQueue') -const { KafkaJSFetcherRebalanceError } = require('../errors') +const { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError } = require('../errors') /** @typedef {ReturnType} FetchManager */ @@ -34,6 +34,10 @@ const createFetchManager = ({ const nodeIds = getNodeIds() const partitionAssignments = new Map() + if (nodeIds.length === 0) { + throw new KafkaJSNoBrokerAvailableError() + } + const validateShouldRebalance = () => { const current = getNodeIds() const hasChanged = diff --git a/src/consumer/fetchManager.spec.js b/src/consumer/fetchManager.spec.js index 9dbf26df8..bfbe65dd0 100644 --- a/src/consumer/fetchManager.spec.js +++ b/src/consumer/fetchManager.spec.js @@ -4,7 +4,7 @@ const createFetchManager = require('./fetchManager') const Batch = require('./batch') const { newLogger } = require('testHelpers') const waitFor = require('../utils/waitFor') -const { KafkaJSNonRetriableError } = require('../errors') +const { KafkaJSNonRetriableError, KafkaJSNoBrokerAvailableError } = require('../errors') describe('FetchManager', () => { let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize @@ -94,4 +94,10 @@ describe('FetchManager', () => { await expect(fetchManagerPromise).rejects.toThrow('Node not found') }) }) + + it('should throw an error when there are no brokers available', async () => { + getNodeIds.mockImplementation(() => seq(0)) + + await expect(fetchManager.start()).rejects.toThrowError(new KafkaJSNoBrokerAvailableError()) + }) }) diff --git a/src/consumer/runner.js b/src/consumer/runner.js index 7504d9b98..8ced6fa7b 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -144,7 +144,7 @@ module.exports = class Runner extends EventEmitter { return bail(e) } - if (e.name === 'KafkaJSConnectionError') { + if (e.name === 'KafkaJSNoBrokerAvailableError') { return bail(e) } diff --git a/src/errors.js b/src/errors.js index 67db90ad0..266192e19 100644 --- a/src/errors.js +++ b/src/errors.js @@ -258,6 +258,13 @@ class KafkaJSAggregateError extends Error { class KafkaJSFetcherRebalanceError extends Error {} +class KafkaJSNoBrokerAvailableError extends KafkaJSError { + constructor() { + super('No broker available') + this.name = 'KafkaJSNoBrokerAvailableError' + } +} + const isRebalancing = e => e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP' || @@ -295,6 +302,7 @@ module.exports = { KafkaJSCreateTopicError, KafkaJSAggregateError, KafkaJSFetcherRebalanceError, + KafkaJSNoBrokerAvailableError, KafkaJSAlterPartitionReassignmentsError, isRebalancing, isKafkaJSError,