Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement static membership feature on client side (KIP-345) #1594

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const ResourcePatternTypes = require('./src/protocol/resourcePatternTypes')
const { isRebalancing, isKafkaJSError, ...errors } = require('./src/errors')
const { LEVELS } = require('./src/loggers')

// local two

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errant comment maybe?


module.exports = {
Kafka,
PartitionAssigners,
Expand Down
3 changes: 3 additions & 0 deletions src/admin/__tests__/describeGroups.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,15 @@ describe('Admin', () => {
clientHost: expect.any(String),
clientId: expect.any(String),
memberId: expect.any(String),
groupInstanceId: expect.any(Object),
memberAssignment: expect.anything(),
memberMetadata: expect.anything(),
},
],
protocol: 'RoundRobinAssigner',
protocolType: 'consumer',
state: 'Stable',
authorizedOperations: expect.any(Number),
}))
)
})
Expand All @@ -117,6 +119,7 @@ describe('Admin', () => {
protocol: '',
protocolType: '',
state: 'Dead',
authorizedOperations: expect.any(Number) || null,
},
],
})
Expand Down
7 changes: 5 additions & 2 deletions src/broker/__tests__/describeGroups.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ const {
const createConsumer = require('../../consumer')

describe('Broker > DescribeGroups', () => {
let groupId, topicName, seedBroker, broker, cluster, consumer
let groupId, topicName, seedBroker, broker, cluster, consumer, groupInstanceId

beforeEach(async () => {
groupId = `consumer-group-id-${secureRandom()}`
groupInstanceId = `group-instance-id-${secureRandom()}`
topicName = `test-topic-${secureRandom()}`
seedBroker = new Broker({
connectionPool: createConnectionPool(),
Expand All @@ -37,6 +38,7 @@ describe('Broker > DescribeGroups', () => {
consumer = createConsumer({
cluster,
groupId,
groupInstanceId,
maxWaitTimeInMs: 1,
logger: newLogger(),
})
Expand All @@ -55,7 +57,6 @@ describe('Broker > DescribeGroups', () => {
const response = await broker.describeGroups({ groupIds: [groupId] })

expect(response).toEqual({
clientSideThrottleTime: expect.optional(0),
throttleTime: 0,
groups: [
{
Expand All @@ -67,12 +68,14 @@ describe('Broker > DescribeGroups', () => {
clientId: expect.any(String),
memberAssignment: expect.anything(),
memberId: expect.any(String),
groupInstanceId: expect.anything(),
memberMetadata: expect.anything(),
},
],
protocol: 'RoundRobinAssigner',
protocolType: 'consumer',
state: 'Stable',
authorizedOperations: expect.any(Number),
},
],
})
Expand Down
35 changes: 24 additions & 11 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ module.exports = class Broker {
[]
)

return await this[PRIVATE.SEND_REQUEST](
const res = await this[PRIVATE.SEND_REQUEST](
fetch({
replicaId,
isolationLevel,
Expand All @@ -328,6 +328,7 @@ module.exports = class Broker {
rackId,
})
)
return res
}

/**
Expand All @@ -336,11 +337,14 @@ module.exports = class Broker {
* @param {string} request.groupId The group id
* @param {number} request.groupGenerationId The generation of the group
* @param {string} request.memberId The member id assigned by the group coordinator
* @param {string} request.groupInstanceId
* @returns {Promise}
*/
async heartbeat({ groupId, groupGenerationId, memberId }) {
async heartbeat({ groupId, groupGenerationId, memberId, groupInstanceId }) {
const heartbeat = this.lookupRequest(apiKeys.Heartbeat, requests.Heartbeat)
return await this[PRIVATE.SEND_REQUEST](heartbeat({ groupId, groupGenerationId, memberId }))
return await this[PRIVATE.SEND_REQUEST](
heartbeat({ groupId, groupGenerationId, memberId, groupInstanceId })
)
}

/**
Expand All @@ -365,6 +369,7 @@ module.exports = class Broker {
* @param {number} request.rebalanceTimeout The maximum time that the coordinator will wait for each member
* to rejoin when rebalancing the group
* @param {string} [request.memberId=""] The assigned consumer id or an empty string for a new consumer
* @param {string} [request.groupInstanceId=""] The assigned group instance id or an empty string for a new consumer
* @param {string} [request.protocolType="consumer"] Unique name for class of protocols implemented by group
* @param {Array} request.groupProtocols List of protocols that the member supports (assignment strategy)
* [{ name: 'AssignerName', metadata: '{"version": 1, "topics": []}' }]
Expand All @@ -375,6 +380,7 @@ module.exports = class Broker {
sessionTimeout,
rebalanceTimeout,
memberId = '',
groupInstanceId, // TODO: make this empty by default
protocolType = 'consumer',
groupProtocols,
}) {
Expand All @@ -386,13 +392,18 @@ module.exports = class Broker {
sessionTimeout,
rebalanceTimeout,
memberId: assignedMemberId,
groupInstanceId,
protocolType,
groupProtocols,
})
)

try {
return await makeRequest()
const initialJoinData = await makeRequest()
if (initialJoinData.errorCode !== 79) {
return initialJoinData
}
return await makeRequest(initialJoinData.memberId)
} catch (error) {
if (error.name === 'KafkaJSMemberIdRequired') {
return makeRequest(error.memberId)
Expand All @@ -407,11 +418,12 @@ module.exports = class Broker {
* @param {object} request
* @param {string} request.groupId
* @param {string} request.memberId
* @param {string} request.groupInstanceId
* @returns {Promise}
*/
async leaveGroup({ groupId, memberId }) {
async leaveGroup({ groupId, memberId, groupInstanceId }) {
const leaveGroup = this.lookupRequest(apiKeys.LeaveGroup, requests.LeaveGroup)
return await this[PRIVATE.SEND_REQUEST](leaveGroup({ groupId, memberId }))
return await this[PRIVATE.SEND_REQUEST](leaveGroup({ groupId, memberId, groupInstanceId }))
}

/**
Expand All @@ -420,16 +432,18 @@ module.exports = class Broker {
* @param {string} request.groupId
* @param {number} request.generationId
* @param {string} request.memberId
* @param {string} request.groupInstanceId
* @param {object} request.groupAssignment
* @returns {Promise}
*/
async syncGroup({ groupId, generationId, memberId, groupAssignment }) {
async syncGroup({ groupId, generationId, memberId, groupAssignment, groupInstanceId }) {
const syncGroup = this.lookupRequest(apiKeys.SyncGroup, requests.SyncGroup)
return await this[PRIVATE.SEND_REQUEST](
syncGroup({
groupId,
generationId,
memberId,
groupInstanceId,
groupAssignment,
})
)
Expand Down Expand Up @@ -476,8 +490,7 @@ module.exports = class Broker {
* @param {string} request.groupId
* @param {number} request.groupGenerationId
* @param {string} request.memberId
* @param {number} [request.retentionTime=-1] -1 signals to the broker that its default configuration
* should be used.
* @param {string} request.groupInstanceId
* @param {object} request.topics Topics to commit offsets, e.g:
* [
* {
Expand All @@ -489,14 +502,14 @@ module.exports = class Broker {
* ]
* @returns {Promise}
*/
async offsetCommit({ groupId, groupGenerationId, memberId, retentionTime, topics }) {
async offsetCommit({ groupId, groupGenerationId, memberId, groupInstanceId, topics }) {
const offsetCommit = this.lookupRequest(apiKeys.OffsetCommit, requests.OffsetCommit)
return await this[PRIVATE.SEND_REQUEST](
offsetCommit({
groupId,
groupGenerationId,
memberId,
retentionTime,
groupInstanceId,
topics,
})
)
Expand Down
4 changes: 3 additions & 1 deletion src/consumer/__tests__/commitOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ const {
} = require('testHelpers')

describe('Consumer', () => {
let topicName, groupId, cluster, producer, consumer
let topicName, groupId, cluster, producer, consumer, groupInstanceId

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`
groupInstanceId = `group-instance-id-${secureRandom()}`

await createTopic({ topic: topicName })

Expand All @@ -31,6 +32,7 @@ describe('Consumer', () => {
consumer = createConsumer({
cluster,
groupId,
groupInstanceId,
logger: newLogger(),
})
})
Expand Down
4 changes: 3 additions & 1 deletion src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ const {
} = require('testHelpers')

describe('Consumer', () => {
let topicName, groupId, cluster, producer, consumer, admin
let topicName, groupId, cluster, producer, consumer, admin, groupInstanceId

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`
groupInstanceId = `group-instance-id-${secureRandom()}`

await createTopic({ topic: topicName })

Expand All @@ -45,6 +46,7 @@ describe('Consumer', () => {
consumer = createConsumer({
cluster,
groupId,
groupInstanceId,
maxWaitTimeInMs: 100,
logger: newLogger(),
})
Expand Down
4 changes: 3 additions & 1 deletion src/consumer/__tests__/controlBatches.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ const {
} = require('testHelpers')

describe('Consumer', () => {
let topicName, groupId, transactionalId, cluster, producer, consumer
let topicName, groupId, transactionalId, cluster, producer, consumer, groupInstanceId
const maxBytes = 170

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`
groupInstanceId = `group-instance-id-${secureRandom()}`
transactionalId = `transaction-id-${secureRandom()}`

await createTopic({ topic: topicName })
Expand All @@ -42,6 +43,7 @@ describe('Consumer', () => {
consumer = createConsumer({
cluster,
groupId,
groupInstanceId,
maxWaitTimeInMs: 100,
maxBytes,
maxBytesPerPartition: maxBytes,
Expand Down
2 changes: 2 additions & 0 deletions src/consumer/__tests__/describeGroup.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ describe('Consumer', () => {
clientHost: expect.any(String),
clientId: expect.any(String),
memberId: expect.any(String),
groupInstanceId: expect.any(Object),
memberAssignment: expect.anything(),
memberMetadata: expect.anything(),
},
],
protocol: 'RoundRobinAssigner',
protocolType: 'consumer',
state: 'Stable',
authorizedOperations: expect.anything(),
})
})
})
Expand Down
6 changes: 5 additions & 1 deletion src/consumer/__tests__/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,28 @@ const waitFor = require('../../utils/waitFor')
const uniq = require('../../utils/uniq')

describe('Consumer', () => {
let topicName, groupId, consumer, consumer2, producer
let topicName, groupId, consumer, consumer2, producer, groupInstanceId, groupInstanceId2

describe('#run', () => {
beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`
groupInstanceId = `group-instance-id-${secureRandom()}`
groupInstanceId2 = `group-instance-id-${secureRandom()}`

await createTopic({ topic: topicName })
consumer = createConsumer({
cluster: createCluster({ metadataMaxAge: 50 }),
groupId,
groupInstanceId,
heartbeatInterval: 100,
maxWaitTimeInMs: 100,
logger: newLogger(),
})
consumer2 = createConsumer({
cluster: createCluster({ metadataMaxAge: 50 }),
groupId,
groupInstanceId: groupInstanceId2,
heartbeatInterval: 100,
maxWaitTimeInMs: 100,
logger: newLogger(),
Expand Down
Loading