diff --git a/src/cluster/__tests__/findBroker.spec.js b/src/cluster/__tests__/findBroker.spec.js index c20221e85..a3c6c9eb9 100644 --- a/src/cluster/__tests__/findBroker.spec.js +++ b/src/cluster/__tests__/findBroker.spec.js @@ -7,7 +7,11 @@ const { } = require('testHelpers') const Broker = require('../../broker') -const { KafkaJSLockTimeout, KafkaJSConnectionError } = require('../../errors') +const { + KafkaJSLockTimeout, + KafkaJSConnectionError, + KafkaJSBrokerNotFound, +} = require('../../errors') describe('Cluster > findBroker', () => { let cluster, topic @@ -77,4 +81,20 @@ describe('Cluster > findBroker', () => { await expect(cluster.findBroker({ nodeId })).resolves.toBeInstanceOf(Broker) expect(cluster.refreshMetadata).toHaveBeenCalled() }) + + test('refresh metadata on KafkaJSBrokerNotFound', async () => { + const nodeId = 0 + cluster.brokerPool.findBroker = jest.fn(() => { + throw new KafkaJSBrokerNotFound('Broker not found') + }) + + jest.spyOn(cluster, 'refreshMetadata') + + await expect(cluster.findBroker({ nodeId })).rejects.toHaveProperty( + 'name', + 'KafkaJSBrokerNotFound' + ) + + expect(cluster.refreshMetadata).toHaveBeenCalled() + }) }) diff --git a/src/cluster/index.js b/src/cluster/index.js index 50e5a44d6..55e6f2d51 100644 --- a/src/cluster/index.js +++ b/src/cluster/index.js @@ -181,7 +181,11 @@ module.exports = class Cluster { return await this.brokerPool.findBroker({ nodeId }) } catch (e) { // The client probably has stale metadata - if (e.name === 'KafkaJSLockTimeout' || e.code === 'ECONNREFUSED') { + if ( + e.name === 'KafkaJSBrokerNotFound' || + e.name === 'KafkaJSLockTimeout' || + e.code === 'ECONNREFUSED' + ) { await this.refreshMetadata() } diff --git a/src/consumer/__tests__/runner.spec.js b/src/consumer/__tests__/runner.spec.js index 99b5686cb..15235b395 100644 --- a/src/consumer/__tests__/runner.spec.js +++ b/src/consumer/__tests__/runner.spec.js @@ -17,6 +17,7 @@ describe('Consumer > Runner', () => { eachBatch = jest.fn() onCrash = jest.fn() consumerGroup = { + connect: jest.fn(), join: jest.fn(), sync: jest.fn(), fetch: jest.fn(), diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index 216e00f33..d76072caa 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -81,6 +81,11 @@ module.exports = class ConsumerGroup { return this.leaderId && this.memberId === this.leaderId } + async connect() { + await this.cluster.connect() + await this.cluster.refreshMetadataIfNecessary() + } + async join() { const { groupId, sessionTimeout, rebalanceTimeout } = this diff --git a/src/consumer/offsetManager/__tests__/commitOffsets.spec.js b/src/consumer/offsetManager/__tests__/commitOffsets.spec.js index b9daf4fd8..0e99d3268 100644 --- a/src/consumer/offsetManager/__tests__/commitOffsets.spec.js +++ b/src/consumer/offsetManager/__tests__/commitOffsets.spec.js @@ -1,11 +1,14 @@ const OffsetManager = require('../index') const InstrumentationEventEmitter = require('../../../instrumentation/emitter') +const { createErrorFromCode } = require('../../../protocol/error') +const NOT_COORDINATOR_FOR_GROUP_CODE = 16 describe('Consumer > OffsetMananger > commitOffsets', () => { let offsetManager, topic1, topic2, memberAssignment, + mockCluster, mockCoordinator, groupId, generationId, @@ -23,20 +26,24 @@ describe('Consumer > OffsetMananger > commitOffsets', () => { [topic2]: [0, 1, 2, 3], } + mockCluster = { + committedOffsets: jest.fn(() => ({})), + refreshMetadata: jest.fn(() => ({})), + } + mockCoordinator = { offsetCommit: jest.fn(), } offsetManager = new OffsetManager({ - cluster: { - committedOffsets: jest.fn(() => ({})), - }, + cluster: mockCluster, memberAssignment, groupId, generationId, memberId, instrumentationEmitter: new InstrumentationEventEmitter(), }) + offsetManager.getCoordinator = jest.fn(() => mockCoordinator) }) @@ -111,4 +118,16 @@ describe('Consumer > OffsetMananger > commitOffsets', () => { ], }) }) + + it('refreshes metadata on NOT_COORDINATOR_FOR_GROUP protocol error', async () => { + mockCoordinator.offsetCommit.mockImplementation(() => { + throw createErrorFromCode(NOT_COORDINATOR_FOR_GROUP_CODE) + }) + + const offset = Math.random().toString() + const offsets = { topics: [{ topic: topic1, partitions: [{ partition: '0', offset }] }] } + + await expect(offsetManager.commitOffsets(offsets)).rejects.toThrow() + expect(mockCluster.refreshMetadata).toHaveBeenCalled() + }) }) diff --git a/src/consumer/offsetManager/index.js b/src/consumer/offsetManager/index.js index 597f1668a..b987dcdf7 100644 --- a/src/consumer/offsetManager/index.js +++ b/src/consumer/offsetManager/index.js @@ -250,20 +250,30 @@ module.exports = class OffsetManager { topics, } - const coordinator = await this.getCoordinator() - await coordinator.offsetCommit(payload) - this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload) - - // Update local reference of committed offsets - topics.forEach(({ topic, partitions }) => { - const updatedOffsets = partitions.reduce( - (obj, { partition, offset }) => assign(obj, { [partition]: offset }), - {} - ) - assign(this.committedOffsets()[topic], updatedOffsets) - }) + try { + const coordinator = await this.getCoordinator() + await coordinator.offsetCommit(payload) + this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload) + + // Update local reference of committed offsets + topics.forEach(({ topic, partitions }) => { + const updatedOffsets = partitions.reduce( + (obj, { partition, offset }) => assign(obj, { [partition]: offset }), + {} + ) + assign(this.committedOffsets()[topic], updatedOffsets) + }) - this.lastCommit = Date.now() + this.lastCommit = Date.now() + } catch (e) { + // metadata is stale, the coordinator has changed due to a restart or + // broker reassignment + if (e.type === 'NOT_COORDINATOR_FOR_GROUP') { + await this.cluster.refreshMetadata() + } + + throw e + } } async resolveOffsets() { diff --git a/src/consumer/runner.js b/src/consumer/runner.js index c549b957f..f171f4162 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -83,6 +83,7 @@ module.exports = class Runner { } try { + await this.consumerGroup.connect() await this.join() this.running = true diff --git a/testHelpers/index.js b/testHelpers/index.js index 80de99248..a4e8c87e8 100644 --- a/testHelpers/index.js +++ b/testHelpers/index.js @@ -13,17 +13,14 @@ const socketFactory = defaultSocketFactory() const { createLogger, - LEVELS: { NOTHING, INFO, DEBUG }, + LEVELS: { NOTHING }, } = require('../src/loggers') const LoggerConsole = require('../src/loggers/console') const { Kafka } = require('../index') -const isCI = process.env.TF_BUILD === 'True' -const ciLevel = process.env.VERBOSE ? DEBUG : INFO - const newLogger = (opts = {}) => - createLogger(Object.assign({ level: isCI ? ciLevel : NOTHING, logCreator: LoggerConsole }, opts)) + createLogger(Object.assign({ level: NOTHING, logCreator: LoggerConsole }, opts)) const getHost = () => process.env.HOST_IP || ip.address() const secureRandom = (length = 10) =>