Skip to content

Commit

Permalink
Merge pull request #227 from tulios/fix-subscribing-to-new-topic-226
Browse files Browse the repository at this point in the history
Always assign partitions based on subscribed topics
  • Loading branch information
Nevon authored and tulios committed Dec 3, 2018
1 parent a2abda2 commit 3e129af
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
61 changes: 61 additions & 0 deletions src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,65 @@ describe('Consumer', () => {
consumer2.run({ eachMessage: () => {} })
await waitForConsumerToJoinGroup(consumer2)
})

it('Starts consuming from new topics after already having assignments', async () => {
consumer2 = createConsumer({
cluster: createCluster({ metadataMaxAge: 50 }),
groupId,
heartbeatInterval: 100,
maxWaitTimeInMs: 100,
logger: newLogger(),
})

// Both consumers receive assignments for one topic
let assignments = await Promise.all(
[consumer1, consumer2].map(async consumer => {
await consumer.connect()
await consumer.subscribe({ topic: topicNames[0] })
consumer.run({ eachMessage: () => {} })
return waitForConsumerToJoinGroup(consumer)
})
)

assignments.forEach(assignment =>
expect(Object.keys(assignment.payload.memberAssignment)).toEqual([topicNames[0]])
)

// One consumer is replaced with a new one, subscribing to the old topic as well as a new one
await consumer1.disconnect()
consumer1 = createConsumer({
cluster: createCluster({ metadataMaxAge: 50 }),
groupId,
heartbeatInterval: 100,
maxWaitTimeInMs: 100,
logger: newLogger(),
})
await consumer1.connect()
await Promise.all(topicNames.map(topic => consumer1.subscribe({ topic })))
consumer1.run({ eachMessage: () => {} })
await waitForConsumerToJoinGroup(consumer1)

// Second consumer is also replaced, subscribing to both topics
await consumer2.disconnect()
consumer2 = createConsumer({
cluster: createCluster({ metadataMaxAge: 50 }),
groupId,
heartbeatInterval: 100,
maxWaitTimeInMs: 100,
logger: newLogger(),
})

await consumer2.connect()
await Promise.all(topicNames.map(topic => consumer2.subscribe({ topic })))
consumer2.run({ eachMessage: () => {} })

// Both consumers are assigned to both topics
assignments = await Promise.all(
[consumer1, consumer2].map(consumer => waitForConsumerToJoinGroup(consumer))
)

assignments.forEach(assignment =>
expect(Object.keys(assignment.payload.memberAssignment)).toEqual(topicNames)
)
})
})
22 changes: 16 additions & 6 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ module.exports = class ConsumerGroup {
this.cluster = cluster
this.groupId = groupId
this.topics = topics
this.topicsSubscribed = topics
this.topicConfigurations = topicConfigurations
this.logger = logger.namespace('ConsumerGroup')
this.instrumentationEmitter = instrumentationEmitter
Expand Down Expand Up @@ -102,7 +103,16 @@ module.exports = class ConsumerGroup {

async sync() {
let assignment = []
const { groupId, generationId, memberId, members, groupProtocol, topics, coordinator } = this
const {
groupId,
generationId,
memberId,
members,
groupProtocol,
topics,
topicsSubscribed,
coordinator,
} = this

if (this.isLeader()) {
this.logger.debug('Chosen as group leader', { groupId, generationId, memberId, topics })
Expand All @@ -116,13 +126,13 @@ module.exports = class ConsumerGroup {

await this.cluster.refreshMetadata()

assignment = await assigner.assign({ members, topics })
assignment = await assigner.assign({ members, topics: topicsSubscribed })
this.logger.debug('Group assignment', {
groupId,
generationId,
topics,
groupProtocol,
assignment,
topics: topicsSubscribed,
})
}

Expand All @@ -145,15 +155,15 @@ module.exports = class ConsumerGroup {

let currentMemberAssignment = decodedAssignment
const assignedTopics = keys(currentMemberAssignment)
const topicsNotSubscribed = arrayDiff(assignedTopics, this.topics)
const topicsNotSubscribed = arrayDiff(assignedTopics, topicsSubscribed)

if (topicsNotSubscribed.length > 0) {
this.logger.warn('Consumer group received unsubscribed topics', {
groupId,
generationId,
memberId,
assignedTopics,
topicsSubscribed: this.topics,
topicsSubscribed,
topicsNotSubscribed,
})

Expand Down Expand Up @@ -426,7 +436,7 @@ module.exports = class ConsumerGroup {
generatePartitionsPerSubscribedTopic() {
const map = new Map()

for (let topic of this.topics) {
for (let topic of this.topicsSubscribed) {
const partitions = this.cluster
.findTopicPartitionMetadata(topic)
.map(m => m.partitionId)
Expand Down
2 changes: 2 additions & 0 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ module.exports = class Runner {
await this.consumerGroup.join()
await this.consumerGroup.sync()

this.running = true

const payload = {
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
Expand Down
4 changes: 2 additions & 2 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ const waitForMessages = (buffer, { number = 1, delay = 50 } = {}) =>
const waitForConsumerToJoinGroup = (consumer, { maxWait = 10000 } = {}) =>
new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => reject(new Error('Timeout')), maxWait)
consumer.on(consumer.events.GROUP_JOIN, () => {
consumer.on(consumer.events.GROUP_JOIN, event => {
clearTimeout(timeoutId)
resolve()
resolve(event)
})
consumer.on(consumer.events.CRASH, event => {
clearTimeout(timeoutId)
Expand Down

0 comments on commit 3e129af

Please sign in to comment.