Skip to content

Commit

Permalink
Merge pull request #1408 from arszen123/fix/consumer-connection-issue
Browse files Browse the repository at this point in the history
Fix consumer connection issue
  • Loading branch information
Nevon authored Feb 27, 2023
2 parents 427bcf9 + dc758a7 commit c7135f7
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 3 deletions.
6 changes: 5 additions & 1 deletion src/consumer/fetchManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const seq = require('../utils/seq')
const createFetcher = require('./fetcher')
const createWorker = require('./worker')
const createWorkerQueue = require('./workerQueue')
const { KafkaJSFetcherRebalanceError } = require('../errors')
const { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError } = require('../errors')

/** @typedef {ReturnType<typeof createFetchManager>} FetchManager */

Expand Down Expand Up @@ -34,6 +34,10 @@ const createFetchManager = ({
const nodeIds = getNodeIds()
const partitionAssignments = new Map()

if (nodeIds.length === 0) {
throw new KafkaJSNoBrokerAvailableError()
}

const validateShouldRebalance = () => {
const current = getNodeIds()
const hasChanged =
Expand Down
8 changes: 7 additions & 1 deletion src/consumer/fetchManager.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const createFetchManager = require('./fetchManager')
const Batch = require('./batch')
const { newLogger } = require('testHelpers')
const waitFor = require('../utils/waitFor')
const { KafkaJSNonRetriableError } = require('../errors')
const { KafkaJSNonRetriableError, KafkaJSNoBrokerAvailableError } = require('../errors')

describe('FetchManager', () => {
let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize
Expand Down Expand Up @@ -94,4 +94,10 @@ describe('FetchManager', () => {
await expect(fetchManagerPromise).rejects.toThrow('Node not found')
})
})

it('should throw an error when there are no brokers available', async () => {
getNodeIds.mockImplementation(() => seq(0))

await expect(fetchManager.start()).rejects.toThrowError(new KafkaJSNoBrokerAvailableError())
})
})
2 changes: 1 addition & 1 deletion src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ module.exports = class Runner extends EventEmitter {
return bail(e)
}

if (e.name === 'KafkaJSConnectionError') {
if (e.name === 'KafkaJSNoBrokerAvailableError') {
return bail(e)
}

Expand Down
8 changes: 8 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ class KafkaJSAggregateError extends Error {

class KafkaJSFetcherRebalanceError extends Error {}

class KafkaJSNoBrokerAvailableError extends KafkaJSError {
constructor() {
super('No broker available')
this.name = 'KafkaJSNoBrokerAvailableError'
}
}

const isRebalancing = e =>
e.type === 'REBALANCE_IN_PROGRESS' ||
e.type === 'NOT_COORDINATOR_FOR_GROUP' ||
Expand Down Expand Up @@ -295,6 +302,7 @@ module.exports = {
KafkaJSCreateTopicError,
KafkaJSAggregateError,
KafkaJSFetcherRebalanceError,
KafkaJSNoBrokerAvailableError,
KafkaJSAlterPartitionReassignmentsError,
isRebalancing,
isKafkaJSError,
Expand Down

0 comments on commit c7135f7

Please sign in to comment.