From 3e129af92e6337880facf6005bc7689150fbbb15 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Mon, 3 Dec 2018 14:38:45 +0100 Subject: [PATCH] Merge pull request #227 from tulios/fix-subscribing-to-new-topic-226 Always assign partitions based on subscribed topics --- .../assignmentForUnsubscribedTopic.spec.js | 61 +++++++++++++++++++ src/consumer/consumerGroup.js | 22 +++++-- src/consumer/runner.js | 2 + testHelpers/index.js | 4 +- 4 files changed, 81 insertions(+), 8 deletions(-) diff --git a/src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js b/src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js index 065c817ad..493a3e483 100644 --- a/src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js +++ b/src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js @@ -53,4 +53,65 @@ describe('Consumer', () => { consumer2.run({ eachMessage: () => {} }) await waitForConsumerToJoinGroup(consumer2) }) + + it('Starts consuming from new topics after already having assignments', async () => { + consumer2 = createConsumer({ + cluster: createCluster({ metadataMaxAge: 50 }), + groupId, + heartbeatInterval: 100, + maxWaitTimeInMs: 100, + logger: newLogger(), + }) + + // Both consumers receive assignments for one topic + let assignments = await Promise.all( + [consumer1, consumer2].map(async consumer => { + await consumer.connect() + await consumer.subscribe({ topic: topicNames[0] }) + consumer.run({ eachMessage: () => {} }) + return waitForConsumerToJoinGroup(consumer) + }) + ) + + assignments.forEach(assignment => + expect(Object.keys(assignment.payload.memberAssignment)).toEqual([topicNames[0]]) + ) + + // One consumer is replaced with a new one, subscribing to the old topic as well as a new one + await consumer1.disconnect() + consumer1 = createConsumer({ + cluster: createCluster({ metadataMaxAge: 50 }), + groupId, + heartbeatInterval: 100, + maxWaitTimeInMs: 100, + logger: newLogger(), + }) + await consumer1.connect() + await Promise.all(topicNames.map(topic => consumer1.subscribe({ topic }))) + consumer1.run({ eachMessage: () => {} }) + await waitForConsumerToJoinGroup(consumer1) + + // Second consumer is also replaced, subscribing to both topics + await consumer2.disconnect() + consumer2 = createConsumer({ + cluster: createCluster({ metadataMaxAge: 50 }), + groupId, + heartbeatInterval: 100, + maxWaitTimeInMs: 100, + logger: newLogger(), + }) + + await consumer2.connect() + await Promise.all(topicNames.map(topic => consumer2.subscribe({ topic }))) + consumer2.run({ eachMessage: () => {} }) + + // Both consumers are assigned to both topics + assignments = await Promise.all( + [consumer1, consumer2].map(consumer => waitForConsumerToJoinGroup(consumer)) + ) + + assignments.forEach(assignment => + expect(Object.keys(assignment.payload.memberAssignment)).toEqual(topicNames) + ) + }) }) diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index 78f0d8140..0d6b99027 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -41,6 +41,7 @@ module.exports = class ConsumerGroup { this.cluster = cluster this.groupId = groupId this.topics = topics + this.topicsSubscribed = topics this.topicConfigurations = topicConfigurations this.logger = logger.namespace('ConsumerGroup') this.instrumentationEmitter = instrumentationEmitter @@ -102,7 +103,16 @@ module.exports = class ConsumerGroup { async sync() { let assignment = [] - const { groupId, generationId, memberId, members, groupProtocol, topics, coordinator } = this + const { + groupId, + generationId, + memberId, + members, + groupProtocol, + topics, + topicsSubscribed, + coordinator, + } = this if (this.isLeader()) { this.logger.debug('Chosen as group leader', { groupId, generationId, memberId, topics }) @@ -116,13 +126,13 @@ module.exports = class ConsumerGroup { await this.cluster.refreshMetadata() - assignment = await assigner.assign({ members, topics }) + assignment = await assigner.assign({ members, topics: topicsSubscribed }) this.logger.debug('Group assignment', { groupId, generationId, - topics, groupProtocol, assignment, + topics: topicsSubscribed, }) } @@ -145,7 +155,7 @@ module.exports = class ConsumerGroup { let currentMemberAssignment = decodedAssignment const assignedTopics = keys(currentMemberAssignment) - const topicsNotSubscribed = arrayDiff(assignedTopics, this.topics) + const topicsNotSubscribed = arrayDiff(assignedTopics, topicsSubscribed) if (topicsNotSubscribed.length > 0) { this.logger.warn('Consumer group received unsubscribed topics', { @@ -153,7 +163,7 @@ module.exports = class ConsumerGroup { generationId, memberId, assignedTopics, - topicsSubscribed: this.topics, + topicsSubscribed, topicsNotSubscribed, }) @@ -426,7 +436,7 @@ module.exports = class ConsumerGroup { generatePartitionsPerSubscribedTopic() { const map = new Map() - for (let topic of this.topics) { + for (let topic of this.topicsSubscribed) { const partitions = this.cluster .findTopicPartitionMetadata(topic) .map(m => m.partitionId) diff --git a/src/consumer/runner.js b/src/consumer/runner.js index 80647c1e3..631c22f2a 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -48,6 +48,8 @@ module.exports = class Runner { await this.consumerGroup.join() await this.consumerGroup.sync() + this.running = true + const payload = { groupId: this.consumerGroup.groupId, memberId: this.consumerGroup.memberId, diff --git a/testHelpers/index.js b/testHelpers/index.js index 3e60e5cf1..835502fc3 100644 --- a/testHelpers/index.js +++ b/testHelpers/index.js @@ -128,9 +128,9 @@ const waitForMessages = (buffer, { number = 1, delay = 50 } = {}) => const waitForConsumerToJoinGroup = (consumer, { maxWait = 10000 } = {}) => new Promise((resolve, reject) => { const timeoutId = setTimeout(() => reject(new Error('Timeout')), maxWait) - consumer.on(consumer.events.GROUP_JOIN, () => { + consumer.on(consumer.events.GROUP_JOIN, event => { clearTimeout(timeoutId) - resolve() + resolve(event) }) consumer.on(consumer.events.CRASH, event => { clearTimeout(timeoutId)