Skip to content

Commit

Permalink
Prevent crash when re-producing after metadata refresh
Browse files Browse the repository at this point in the history
`sendMessages` would keep a list of brokers to re-produce to, but if a broker was no longer a leader of any partitions, it would crash. Fixes #62.

Co-authored-by tulios <[email protected]>
  • Loading branch information
Nevon committed May 18, 2018
1 parent 4d71d3d commit 0babdbf
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
9 changes: 7 additions & 2 deletions src/producer/sendMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ module.exports = ({ logger, cluster, partitioner }) => {
const partitions = partitionsPerLeader[broker.nodeId]
const topicData = createTopicData({ topic, partitions, messagesPerPartition })

const response = await broker.produce({ acks, timeout, compression, topicData })
responsePerBroker.set(broker, responseSerializer(response))
try {
const response = await broker.produce({ acks, timeout, compression, topicData })
responsePerBroker.set(broker, responseSerializer(response))
} catch (e) {
responsePerBroker.delete(broker)
throw e
}
})
}

Expand Down
30 changes: 29 additions & 1 deletion src/producer/sendMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ describe('Producer > sendMessages', () => {
1: { nodeId: 1, produce: jest.fn(() => createProducerResponse(topic, 0)) },
2: { nodeId: 2, produce: jest.fn(() => createProducerResponse(topic, 1)) },
3: { nodeId: 3, produce: jest.fn(() => createProducerResponse(topic, 2)) },
4: { nodeId: 4, produce: jest.fn(() => createProducerResponse(topic, 1)) },
}
cluster = {
addTargetTopic: jest.fn(),
Expand Down Expand Up @@ -75,8 +76,8 @@ describe('Producer > sendMessages', () => {
expect(brokers[2].produce).toHaveBeenCalledTimes(1)
expect(brokers[3].produce).toHaveBeenCalledTimes(3)
expect(response).toEqual([
{ errorCode: 0, offset: '0', partition: 0, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '1', partition: 1, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '0', partition: 0, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '2', partition: 2, timestamp: '-1', topicName: 'topic-name' },
])
})
Expand Down Expand Up @@ -108,4 +109,31 @@ describe('Producer > sendMessages', () => {
expect(cluster.refreshMetadata).toHaveBeenCalled()
})
}

test('does not re-produce messages to brokers that are no longer leaders after metadata refresh', async () => {
const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner })

brokers[2].produce
.mockImplementationOnce(() => {
const e = new Error('Some error broker 1')
e.type = 'NOT_LEADER_FOR_PARTITION'
throw e
})
.mockImplementationOnce(() => createProducerResponse(topic, 0))
cluster.findLeaderForPartitions
.mockImplementationOnce(() => partitionsPerLeader)
.mockImplementationOnce(() => ({
1: [0],
4: [1], // Broker 4 replaces broker 2 as leader for partition 1
3: [2],
}))

const response = await sendMessages({ topic, messages })

expect(response).toEqual([
{ errorCode: 0, offset: '0', partition: 0, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '2', partition: 2, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '1', partition: 1, timestamp: '-1', topicName: 'topic-name' },
])
})
})

0 comments on commit 0babdbf

Please sign in to comment.