diff --git a/src/consumer/__tests__/emptyAssignment.spec.js b/src/consumer/__tests__/emptyAssignment.spec.js index ea20f3cb8..77b3c6724 100644 --- a/src/consumer/__tests__/emptyAssignment.spec.js +++ b/src/consumer/__tests__/emptyAssignment.spec.js @@ -8,6 +8,7 @@ const { createTopic, newLogger, waitForConsumerToJoinGroup, + waitForNextEvent, } = require('testHelpers') describe('Consumer', () => { @@ -25,7 +26,7 @@ describe('Consumer', () => { consumer2 && (await consumer2.disconnect()) }) - test('can join the group without receiving any assignment', async () => { + test('remains in the group without receiving any assignment', async () => { // Assigns all topic-partitions to the first member. const UnbalancedAssigner = ({ cluster }) => ({ name: 'UnbalancedAssigner', @@ -89,9 +90,23 @@ describe('Consumer', () => { consumer2.run({ eachMessage: () => {} }) // Ensure that both consumers manage to join - await Promise.all([ + const groupJoinEvents = await Promise.all([ waitForConsumerToJoinGroup(consumer1), waitForConsumerToJoinGroup(consumer2), ]) + + const emptyAssignments = groupJoinEvents.filter( + ({ payload }) => Object.entries(payload.memberAssignment).length === 0 + ) + expect(emptyAssignments).toHaveLength(1) + + await Promise.all( + [consumer1, consumer2].map(consumer => waitForNextEvent(consumer, consumer.events.FETCH)) + ) + + // Both consumers should continue to heartbeat even without receiving any assignments + await Promise.all( + [consumer1, consumer2].map(consumer => waitForNextEvent(consumer, consumer.events.HEARTBEAT)) + ) }) }) diff --git a/src/consumer/runner.js b/src/consumer/runner.js index 64582811b..686549cc5 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -314,6 +314,10 @@ module.exports = class Runner extends EventEmitter { nodeId, }) + if (batches.length === 0) { + await this.heartbeat() + } + return batches }