Skip to content

Commit

Permalink
Merge pull request #404 from tulios/refresh-metadata-after-restart
Browse files Browse the repository at this point in the history
Refresh metadata after restart
  • Loading branch information
Nevon authored and tulios committed Jun 25, 2019
1 parent 3666f29 commit 6f99fb4
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 23 deletions.
22 changes: 21 additions & 1 deletion src/cluster/__tests__/findBroker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ const {
} = require('testHelpers')

const Broker = require('../../broker')
const { KafkaJSLockTimeout, KafkaJSConnectionError } = require('../../errors')
const {
KafkaJSLockTimeout,
KafkaJSConnectionError,
KafkaJSBrokerNotFound,
} = require('../../errors')

describe('Cluster > findBroker', () => {
let cluster, topic
Expand Down Expand Up @@ -77,4 +81,20 @@ describe('Cluster > findBroker', () => {
await expect(cluster.findBroker({ nodeId })).resolves.toBeInstanceOf(Broker)
expect(cluster.refreshMetadata).toHaveBeenCalled()
})

test('refresh metadata on KafkaJSBrokerNotFound', async () => {
const nodeId = 0
cluster.brokerPool.findBroker = jest.fn(() => {
throw new KafkaJSBrokerNotFound('Broker not found')
})

jest.spyOn(cluster, 'refreshMetadata')

await expect(cluster.findBroker({ nodeId })).rejects.toHaveProperty(
'name',
'KafkaJSBrokerNotFound'
)

expect(cluster.refreshMetadata).toHaveBeenCalled()
})
})
6 changes: 5 additions & 1 deletion src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ module.exports = class Cluster {
return await this.brokerPool.findBroker({ nodeId })
} catch (e) {
// The client probably has stale metadata
if (e.name === 'KafkaJSLockTimeout' || e.code === 'ECONNREFUSED') {
if (
e.name === 'KafkaJSBrokerNotFound' ||
e.name === 'KafkaJSLockTimeout' ||
e.code === 'ECONNREFUSED'
) {
await this.refreshMetadata()
}

Expand Down
1 change: 1 addition & 0 deletions src/consumer/__tests__/runner.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('Consumer > Runner', () => {
eachBatch = jest.fn()
onCrash = jest.fn()
consumerGroup = {
connect: jest.fn(),
join: jest.fn(),
sync: jest.fn(),
fetch: jest.fn(),
Expand Down
5 changes: 5 additions & 0 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ module.exports = class ConsumerGroup {
return this.leaderId && this.memberId === this.leaderId
}

async connect() {
await this.cluster.connect()
await this.cluster.refreshMetadataIfNecessary()
}

async join() {
const { groupId, sessionTimeout, rebalanceTimeout } = this

Expand Down
25 changes: 22 additions & 3 deletions src/consumer/offsetManager/__tests__/commitOffsets.spec.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
const OffsetManager = require('../index')
const InstrumentationEventEmitter = require('../../../instrumentation/emitter')
const { createErrorFromCode } = require('../../../protocol/error')
const NOT_COORDINATOR_FOR_GROUP_CODE = 16

describe('Consumer > OffsetMananger > commitOffsets', () => {
let offsetManager,
topic1,
topic2,
memberAssignment,
mockCluster,
mockCoordinator,
groupId,
generationId,
Expand All @@ -23,20 +26,24 @@ describe('Consumer > OffsetMananger > commitOffsets', () => {
[topic2]: [0, 1, 2, 3],
}

mockCluster = {
committedOffsets: jest.fn(() => ({})),
refreshMetadata: jest.fn(() => ({})),
}

mockCoordinator = {
offsetCommit: jest.fn(),
}

offsetManager = new OffsetManager({
cluster: {
committedOffsets: jest.fn(() => ({})),
},
cluster: mockCluster,
memberAssignment,
groupId,
generationId,
memberId,
instrumentationEmitter: new InstrumentationEventEmitter(),
})

offsetManager.getCoordinator = jest.fn(() => mockCoordinator)
})

Expand Down Expand Up @@ -111,4 +118,16 @@ describe('Consumer > OffsetMananger > commitOffsets', () => {
],
})
})

it('refreshes metadata on NOT_COORDINATOR_FOR_GROUP protocol error', async () => {
mockCoordinator.offsetCommit.mockImplementation(() => {
throw createErrorFromCode(NOT_COORDINATOR_FOR_GROUP_CODE)
})

const offset = Math.random().toString()
const offsets = { topics: [{ topic: topic1, partitions: [{ partition: '0', offset }] }] }

await expect(offsetManager.commitOffsets(offsets)).rejects.toThrow()
expect(mockCluster.refreshMetadata).toHaveBeenCalled()
})
})
36 changes: 23 additions & 13 deletions src/consumer/offsetManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,30 @@ module.exports = class OffsetManager {
topics,
}

const coordinator = await this.getCoordinator()
await coordinator.offsetCommit(payload)
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload)

// Update local reference of committed offsets
topics.forEach(({ topic, partitions }) => {
const updatedOffsets = partitions.reduce(
(obj, { partition, offset }) => assign(obj, { [partition]: offset }),
{}
)
assign(this.committedOffsets()[topic], updatedOffsets)
})
try {
const coordinator = await this.getCoordinator()
await coordinator.offsetCommit(payload)
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload)

// Update local reference of committed offsets
topics.forEach(({ topic, partitions }) => {
const updatedOffsets = partitions.reduce(
(obj, { partition, offset }) => assign(obj, { [partition]: offset }),
{}
)
assign(this.committedOffsets()[topic], updatedOffsets)
})

this.lastCommit = Date.now()
this.lastCommit = Date.now()
} catch (e) {
// metadata is stale, the coordinator has changed due to a restart or
// broker reassignment
if (e.type === 'NOT_COORDINATOR_FOR_GROUP') {
await this.cluster.refreshMetadata()
}

throw e
}
}

async resolveOffsets() {
Expand Down
1 change: 1 addition & 0 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ module.exports = class Runner {
}

try {
await this.consumerGroup.connect()
await this.join()

this.running = true
Expand Down
7 changes: 2 additions & 5 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@ const socketFactory = defaultSocketFactory()

const {
createLogger,
LEVELS: { NOTHING, INFO, DEBUG },
LEVELS: { NOTHING },
} = require('../src/loggers')

const LoggerConsole = require('../src/loggers/console')
const { Kafka } = require('../index')

const isCI = process.env.TF_BUILD === 'True'
const ciLevel = process.env.VERBOSE ? DEBUG : INFO

const newLogger = (opts = {}) =>
createLogger(Object.assign({ level: isCI ? ciLevel : NOTHING, logCreator: LoggerConsole }, opts))
createLogger(Object.assign({ level: NOTHING, logCreator: LoggerConsole }, opts))

const getHost = () => process.env.HOST_IP || ip.address()
const secureRandom = (length = 10) =>
Expand Down

0 comments on commit 6f99fb4

Please sign in to comment.