diff --git a/index.js b/index.js index 3fd3cb1c8..78eed9328 100644 --- a/index.js +++ b/index.js @@ -12,6 +12,8 @@ const ResourcePatternTypes = require('./src/protocol/resourcePatternTypes') const { isRebalancing, isKafkaJSError, ...errors } = require('./src/errors') const { LEVELS } = require('./src/loggers') +// local two + module.exports = { Kafka, PartitionAssigners, diff --git a/src/admin/__tests__/describeGroups.spec.js b/src/admin/__tests__/describeGroups.spec.js index cb48b8965..436b4bc93 100644 --- a/src/admin/__tests__/describeGroups.spec.js +++ b/src/admin/__tests__/describeGroups.spec.js @@ -93,6 +93,7 @@ describe('Admin', () => { clientHost: expect.any(String), clientId: expect.any(String), memberId: expect.any(String), + groupInstanceId: expect.any(Object), memberAssignment: expect.anything(), memberMetadata: expect.anything(), }, @@ -100,6 +101,7 @@ describe('Admin', () => { protocol: 'RoundRobinAssigner', protocolType: 'consumer', state: 'Stable', + authorizedOperations: expect.any(Number), })) ) }) @@ -117,6 +119,7 @@ describe('Admin', () => { protocol: '', protocolType: '', state: 'Dead', + authorizedOperations: expect.any(Number) || null, }, ], }) diff --git a/src/broker/__tests__/describeGroups.spec.js b/src/broker/__tests__/describeGroups.spec.js index d16a04dcf..8ed84c76d 100644 --- a/src/broker/__tests__/describeGroups.spec.js +++ b/src/broker/__tests__/describeGroups.spec.js @@ -9,10 +9,11 @@ const { const createConsumer = require('../../consumer') describe('Broker > DescribeGroups', () => { - let groupId, topicName, seedBroker, broker, cluster, consumer + let groupId, topicName, seedBroker, broker, cluster, consumer, groupInstanceId beforeEach(async () => { groupId = `consumer-group-id-${secureRandom()}` + groupInstanceId = `group-instance-id-${secureRandom()}` topicName = `test-topic-${secureRandom()}` seedBroker = new Broker({ connectionPool: createConnectionPool(), @@ -37,6 +38,7 @@ describe('Broker > DescribeGroups', () => { consumer = createConsumer({ cluster, groupId, + groupInstanceId, maxWaitTimeInMs: 1, logger: newLogger(), }) @@ -55,7 +57,6 @@ describe('Broker > DescribeGroups', () => { const response = await broker.describeGroups({ groupIds: [groupId] }) expect(response).toEqual({ - clientSideThrottleTime: expect.optional(0), throttleTime: 0, groups: [ { @@ -67,12 +68,14 @@ describe('Broker > DescribeGroups', () => { clientId: expect.any(String), memberAssignment: expect.anything(), memberId: expect.any(String), + groupInstanceId: expect.anything(), memberMetadata: expect.anything(), }, ], protocol: 'RoundRobinAssigner', protocolType: 'consumer', state: 'Stable', + authorizedOperations: expect.any(Number), }, ], }) diff --git a/src/broker/index.js b/src/broker/index.js index bb86cc5d5..2137472ab 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -317,7 +317,7 @@ module.exports = class Broker { [] ) - return await this[PRIVATE.SEND_REQUEST]( + const res = await this[PRIVATE.SEND_REQUEST]( fetch({ replicaId, isolationLevel, @@ -328,6 +328,7 @@ module.exports = class Broker { rackId, }) ) + return res } /** @@ -336,11 +337,14 @@ module.exports = class Broker { * @param {string} request.groupId The group id * @param {number} request.groupGenerationId The generation of the group * @param {string} request.memberId The member id assigned by the group coordinator + * @param {string} request.groupInstanceId * @returns {Promise} */ - async heartbeat({ groupId, groupGenerationId, memberId }) { + async heartbeat({ groupId, groupGenerationId, memberId, groupInstanceId }) { const heartbeat = this.lookupRequest(apiKeys.Heartbeat, requests.Heartbeat) - return await this[PRIVATE.SEND_REQUEST](heartbeat({ groupId, groupGenerationId, memberId })) + return await this[PRIVATE.SEND_REQUEST]( + heartbeat({ groupId, groupGenerationId, memberId, groupInstanceId }) + ) } /** @@ -365,6 +369,7 @@ module.exports = class Broker { * @param {number} request.rebalanceTimeout The maximum time that the coordinator will wait for each member * to rejoin when rebalancing the group * @param {string} [request.memberId=""] The assigned consumer id or an empty string for a new consumer + * @param {string} [request.groupInstanceId=""] The assigned group instance id or an empty string for a new consumer * @param {string} [request.protocolType="consumer"] Unique name for class of protocols implemented by group * @param {Array} request.groupProtocols List of protocols that the member supports (assignment strategy) * [{ name: 'AssignerName', metadata: '{"version": 1, "topics": []}' }] @@ -375,6 +380,7 @@ module.exports = class Broker { sessionTimeout, rebalanceTimeout, memberId = '', + groupInstanceId, // TODO: make this empty by default protocolType = 'consumer', groupProtocols, }) { @@ -386,13 +392,18 @@ module.exports = class Broker { sessionTimeout, rebalanceTimeout, memberId: assignedMemberId, + groupInstanceId, protocolType, groupProtocols, }) ) try { - return await makeRequest() + const initialJoinData = await makeRequest() + if (initialJoinData.errorCode !== 79) { + return initialJoinData + } + return await makeRequest(initialJoinData.memberId) } catch (error) { if (error.name === 'KafkaJSMemberIdRequired') { return makeRequest(error.memberId) @@ -407,11 +418,12 @@ module.exports = class Broker { * @param {object} request * @param {string} request.groupId * @param {string} request.memberId + * @param {string} request.groupInstanceId * @returns {Promise} */ - async leaveGroup({ groupId, memberId }) { + async leaveGroup({ groupId, memberId, groupInstanceId }) { const leaveGroup = this.lookupRequest(apiKeys.LeaveGroup, requests.LeaveGroup) - return await this[PRIVATE.SEND_REQUEST](leaveGroup({ groupId, memberId })) + return await this[PRIVATE.SEND_REQUEST](leaveGroup({ groupId, memberId, groupInstanceId })) } /** @@ -420,16 +432,18 @@ module.exports = class Broker { * @param {string} request.groupId * @param {number} request.generationId * @param {string} request.memberId + * @param {string} request.groupInstanceId * @param {object} request.groupAssignment * @returns {Promise} */ - async syncGroup({ groupId, generationId, memberId, groupAssignment }) { + async syncGroup({ groupId, generationId, memberId, groupAssignment, groupInstanceId }) { const syncGroup = this.lookupRequest(apiKeys.SyncGroup, requests.SyncGroup) return await this[PRIVATE.SEND_REQUEST]( syncGroup({ groupId, generationId, memberId, + groupInstanceId, groupAssignment, }) ) @@ -476,8 +490,7 @@ module.exports = class Broker { * @param {string} request.groupId * @param {number} request.groupGenerationId * @param {string} request.memberId - * @param {number} [request.retentionTime=-1] -1 signals to the broker that its default configuration - * should be used. + * @param {string} request.groupInstanceId * @param {object} request.topics Topics to commit offsets, e.g: * [ * { @@ -489,14 +502,14 @@ module.exports = class Broker { * ] * @returns {Promise} */ - async offsetCommit({ groupId, groupGenerationId, memberId, retentionTime, topics }) { + async offsetCommit({ groupId, groupGenerationId, memberId, groupInstanceId, topics }) { const offsetCommit = this.lookupRequest(apiKeys.OffsetCommit, requests.OffsetCommit) return await this[PRIVATE.SEND_REQUEST]( offsetCommit({ groupId, groupGenerationId, memberId, - retentionTime, + groupInstanceId, topics, }) ) diff --git a/src/consumer/__tests__/commitOffsets.spec.js b/src/consumer/__tests__/commitOffsets.spec.js index f417f411b..f71a625c5 100644 --- a/src/consumer/__tests__/commitOffsets.spec.js +++ b/src/consumer/__tests__/commitOffsets.spec.js @@ -13,11 +13,12 @@ const { } = require('testHelpers') describe('Consumer', () => { - let topicName, groupId, cluster, producer, consumer + let topicName, groupId, cluster, producer, consumer, groupInstanceId beforeEach(async () => { topicName = `test-topic-${secureRandom()}` groupId = `consumer-group-id-${secureRandom()}` + groupInstanceId = `group-instance-id-${secureRandom()}` await createTopic({ topic: topicName }) @@ -31,6 +32,7 @@ describe('Consumer', () => { consumer = createConsumer({ cluster, groupId, + groupInstanceId, logger: newLogger(), }) }) diff --git a/src/consumer/__tests__/consumeMessages.spec.js b/src/consumer/__tests__/consumeMessages.spec.js index 568b908c9..ac7e708f4 100644 --- a/src/consumer/__tests__/consumeMessages.spec.js +++ b/src/consumer/__tests__/consumeMessages.spec.js @@ -22,11 +22,12 @@ const { } = require('testHelpers') describe('Consumer', () => { - let topicName, groupId, cluster, producer, consumer, admin + let topicName, groupId, cluster, producer, consumer, admin, groupInstanceId beforeEach(async () => { topicName = `test-topic-${secureRandom()}` groupId = `consumer-group-id-${secureRandom()}` + groupInstanceId = `group-instance-id-${secureRandom()}` await createTopic({ topic: topicName }) @@ -45,6 +46,7 @@ describe('Consumer', () => { consumer = createConsumer({ cluster, groupId, + groupInstanceId, maxWaitTimeInMs: 100, logger: newLogger(), }) diff --git a/src/consumer/__tests__/controlBatches.spec.js b/src/consumer/__tests__/controlBatches.spec.js index 03cdc14f5..b14b3ae97 100644 --- a/src/consumer/__tests__/controlBatches.spec.js +++ b/src/consumer/__tests__/controlBatches.spec.js @@ -17,12 +17,13 @@ const { } = require('testHelpers') describe('Consumer', () => { - let topicName, groupId, transactionalId, cluster, producer, consumer + let topicName, groupId, transactionalId, cluster, producer, consumer, groupInstanceId const maxBytes = 170 beforeEach(async () => { topicName = `test-topic-${secureRandom()}` groupId = `consumer-group-id-${secureRandom()}` + groupInstanceId = `group-instance-id-${secureRandom()}` transactionalId = `transaction-id-${secureRandom()}` await createTopic({ topic: topicName }) @@ -42,6 +43,7 @@ describe('Consumer', () => { consumer = createConsumer({ cluster, groupId, + groupInstanceId, maxWaitTimeInMs: 100, maxBytes, maxBytesPerPartition: maxBytes, diff --git a/src/consumer/__tests__/describeGroup.spec.js b/src/consumer/__tests__/describeGroup.spec.js index b37460208..eb6ae9f66 100644 --- a/src/consumer/__tests__/describeGroup.spec.js +++ b/src/consumer/__tests__/describeGroup.spec.js @@ -56,6 +56,7 @@ describe('Consumer', () => { clientHost: expect.any(String), clientId: expect.any(String), memberId: expect.any(String), + groupInstanceId: expect.any(Object), memberAssignment: expect.anything(), memberMetadata: expect.anything(), }, @@ -63,6 +64,7 @@ describe('Consumer', () => { protocol: 'RoundRobinAssigner', protocolType: 'consumer', state: 'Stable', + authorizedOperations: expect.anything(), }) }) }) diff --git a/src/consumer/__tests__/index.spec.js b/src/consumer/__tests__/index.spec.js index 48e23bdba..4d45bfb08 100644 --- a/src/consumer/__tests__/index.spec.js +++ b/src/consumer/__tests__/index.spec.js @@ -14,17 +14,20 @@ const waitFor = require('../../utils/waitFor') const uniq = require('../../utils/uniq') describe('Consumer', () => { - let topicName, groupId, consumer, consumer2, producer + let topicName, groupId, consumer, consumer2, producer, groupInstanceId, groupInstanceId2 describe('#run', () => { beforeEach(async () => { topicName = `test-topic-${secureRandom()}` groupId = `consumer-group-id-${secureRandom()}` + groupInstanceId = `group-instance-id-${secureRandom()}` + groupInstanceId2 = `group-instance-id-${secureRandom()}` await createTopic({ topic: topicName }) consumer = createConsumer({ cluster: createCluster({ metadataMaxAge: 50 }), groupId, + groupInstanceId, heartbeatInterval: 100, maxWaitTimeInMs: 100, logger: newLogger(), @@ -32,6 +35,7 @@ describe('Consumer', () => { consumer2 = createConsumer({ cluster: createCluster({ metadataMaxAge: 50 }), groupId, + groupInstanceId: groupInstanceId2, heartbeatInterval: 100, maxWaitTimeInMs: 100, logger: newLogger(), diff --git a/src/consumer/__tests__/instrumentationEvents.spec.js b/src/consumer/__tests__/instrumentationEvents.spec.js index b12305267..420f8815d 100644 --- a/src/consumer/__tests__/instrumentationEvents.spec.js +++ b/src/consumer/__tests__/instrumentationEvents.spec.js @@ -14,7 +14,7 @@ const { } = require('testHelpers') describe('Consumer > Instrumentation Events', () => { - let topicName, groupId, cluster, producer, consumer, consumer2, message, emitter + let topicName, groupId, cluster, producer, consumer, consumer2, consumer2Clone, message, emitter const createTestConsumer = (opts = {}) => createConsumer({ @@ -37,6 +37,7 @@ describe('Consumer > Instrumentation Events', () => { emitter = new InstrumentationEventEmitter() cluster = createCluster({ instrumentationEmitter: emitter, metadataMaxAge: 50 }) + producer = createProducer({ cluster, createPartitioner: createModPartitioner, @@ -49,11 +50,12 @@ describe('Consumer > Instrumentation Events', () => { afterEach(async () => { consumer && (await consumer.disconnect()) consumer2 && (await consumer2.disconnect()) + consumer2Clone && (await consumer2Clone.disconnect()) producer && (await producer.disconnect()) }) test('on throws an error when provided with an invalid event name', () => { - consumer = createTestConsumer() + consumer = createTestConsumer({ id: 1 }) expect(() => consumer.on('NON_EXISTENT_EVENT', () => {})).toThrow( /Event name should be one of consumer.events./ ) @@ -441,9 +443,12 @@ describe('Consumer > Instrumentation Events', () => { const onRebalancing = jest.fn() const groupId = `consumer-group-id-${secureRandom()}` + const groupInstanceId = `group-instance-id-${secureRandom()}` + const groupInstanceId2 = `group-instance-id-${secureRandom()}` consumer = createTestConsumer({ groupId, + groupInstanceId, cluster: createCluster({ instrumentationEmitter: new InstrumentationEventEmitter(), metadataMaxAge: 50, @@ -452,6 +457,7 @@ describe('Consumer > Instrumentation Events', () => { consumer2 = createTestConsumer({ groupId, + groupInstanceId: groupInstanceId2, cluster: createCluster({ instrumentationEmitter: new InstrumentationEventEmitter(), metadataMaxAge: 50, @@ -487,11 +493,74 @@ describe('Consumer > Instrumentation Events', () => { type: 'consumer.rebalancing', payload: { groupId: groupId, - memberId: memberId, + memberId: expect.stringContaining(groupInstanceId), }, }) }) + it('rebalance only once when a consumer rejoins with same static id', async () => { + const onRebalancing = jest.fn() + + const groupId = `consumer-group-id-${secureRandom()}` + const groupInstanceId = `group-instance-id-${secureRandom()}` + const groupInstanceId2 = `group-instance-id-${secureRandom()}` + + consumer = createTestConsumer({ + groupId, + groupInstanceId, + cluster: createCluster({ + instrumentationEmitter: new InstrumentationEventEmitter(), + metadataMaxAge: 50, + }), + }) + + consumer2 = createTestConsumer({ + groupId, + groupInstanceId: groupInstanceId2, + cluster: createCluster({ + instrumentationEmitter: new InstrumentationEventEmitter(), + metadataMaxAge: 50, + }), + }) + + consumer2Clone = createTestConsumer({ + groupId, + groupInstanceId: groupInstanceId2, + cluster: createCluster({ + instrumentationEmitter: new InstrumentationEventEmitter(), + metadataMaxAge: 50, + }), + }) + + let memberId + consumer.on(consumer.events.GROUP_JOIN, async event => { + memberId = memberId || event.payload.memberId + }) + + consumer.on(consumer.events.REBALANCING, async event => { + onRebalancing(event) + }) + + await consumer.connect() + await consumer.subscribe({ topic: topicName, fromBeginning: true }) + consumer.run({ eachMessage: () => true }) + await waitForConsumerToJoinGroup(consumer, { label: 'consumer1' }) + + await consumer2.connect() + await consumer2.subscribe({ topic: topicName, fromBeginning: true }) + consumer2.run({ eachMessage: () => true }) + await waitForConsumerToJoinGroup(consumer2, { label: 'consumer2' }) + + await consumer2.disconnect() + + await consumer2Clone.connect() + await consumer2Clone.subscribe({ topic: topicName }) + consumer2Clone.run({ eachMessage: () => true }) + await waitForConsumerToJoinGroup(consumer2Clone, { label: 'consumer2Clone' }) + + expect(onRebalancing).toBeCalledTimes(1) + }) + it('emits request events', async () => { const requestListener = jest.fn().mockName('request') diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index 442312c1a..ffdbf2faa 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -43,6 +43,7 @@ module.exports = class ConsumerGroup { * @param {import('../../types').RetryOptions} options.retry * @param {import('../../types').Cluster} options.cluster * @param {string} options.groupId + * @param {string} options.groupInstanceId * @param {string[]} options.topics * @param {Record} options.topicConfigurations * @param {import('../../types').Logger} options.logger @@ -65,6 +66,7 @@ module.exports = class ConsumerGroup { retry, cluster, groupId, + groupInstanceId, topics, topicConfigurations, logger, @@ -86,6 +88,7 @@ module.exports = class ConsumerGroup { /** @type {import("../../types").Cluster} */ this.cluster = cluster this.groupId = groupId + this.groupInstanceId = groupInstanceId this.topics = topics this.topicsSubscribed = topics this.topicConfigurations = topicConfigurations @@ -130,13 +133,14 @@ module.exports = class ConsumerGroup { this.lastRequest = Date.now() this[PRIVATE.SHARED_HEARTBEAT] = sharedPromiseTo(async ({ interval }) => { - const { groupId, generationId, memberId } = this + const { groupId, generationId, memberId, groupInstanceId } = this const now = Date.now() if (memberId && now >= this.lastRequest + interval) { const payload = { groupId, memberId, + groupInstanceId, groupGenerationId: generationId, } @@ -162,7 +166,7 @@ module.exports = class ConsumerGroup { } async [PRIVATE.JOIN]() { - const { groupId, sessionTimeout, rebalanceTimeout } = this + const { groupId, sessionTimeout, rebalanceTimeout, groupInstanceId } = this this.coordinator = await this.cluster.findGroupCoordinator({ groupId }) @@ -171,6 +175,7 @@ module.exports = class ConsumerGroup { sessionTimeout, rebalanceTimeout, memberId: this.memberId || '', + groupInstanceId, groupProtocols: this.assigners.map(assigner => assigner.protocol({ topics: this.topicsSubscribed, @@ -182,13 +187,14 @@ module.exports = class ConsumerGroup { this.leaderId = groupData.leaderId this.memberId = groupData.memberId this.members = groupData.members + this.groupInstanceId = groupData.groupInstanceId this.groupProtocol = groupData.groupProtocol } async leave() { - const { groupId, memberId } = this + const { groupId, memberId, groupInstanceId } = this if (memberId) { - await this.coordinator.leaveGroup({ groupId, memberId }) + await this.coordinator.leaveGroup({ groupId, memberId, groupInstanceId }) this.memberId = null } } @@ -199,6 +205,7 @@ module.exports = class ConsumerGroup { groupId, generationId, memberId, + groupInstanceId, members, groupProtocol, topics, @@ -234,6 +241,7 @@ module.exports = class ConsumerGroup { groupId, generationId, memberId, + groupInstanceId, groupAssignment: assignment, }) diff --git a/src/consumer/index.js b/src/consumer/index.js index 0489dcf1d..4ace77641 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -28,6 +28,7 @@ const specialOffsets = [ * @param {Object} params * @param {import("../../types").Cluster} params.cluster * @param {String} params.groupId + * @param {String} [params.groupInstanceId=null] * @param {import('../../types').RetryOptions} [params.retry] * @param {import('../../types').Logger} params.logger * @param {import('../../types').PartitionAssigner[]} [params.partitionAssigners] @@ -48,6 +49,7 @@ const specialOffsets = [ module.exports = ({ cluster, groupId, + groupInstanceId = null, retry, logger: rootLogger, partitionAssigners = [roundRobin], @@ -210,6 +212,7 @@ module.exports = ({ retry, cluster, groupId, + groupInstanceId, assigners, sessionTimeout, rebalanceTimeout, diff --git a/src/consumer/offsetManager/index.js b/src/consumer/offsetManager/index.js index 9077a9be9..d1364e754 100644 --- a/src/consumer/offsetManager/index.js +++ b/src/consumer/offsetManager/index.js @@ -25,6 +25,7 @@ module.exports = class OffsetManager { * @param {string} options.groupId * @param {number} options.generationId * @param {string} options.memberId + * @param {string} options.groupInstanceId */ constructor({ cluster, @@ -38,6 +39,7 @@ module.exports = class OffsetManager { groupId, generationId, memberId, + groupInstanceId, }) { this.cluster = cluster this.coordinator = coordinator @@ -54,6 +56,7 @@ module.exports = class OffsetManager { this.groupId = groupId this.generationId = generationId this.memberId = memberId + this.groupInstanceId = groupInstanceId this.autoCommit = autoCommit this.autoCommitInterval = autoCommitInterval @@ -244,7 +247,7 @@ module.exports = class OffsetManager { } async commitOffsets(offsets = {}) { - const { groupId, generationId, memberId } = this + const { groupId, generationId, memberId, groupInstanceId } = this const { topics = this.uncommittedOffsets().topics } = offsets if (topics.length === 0) { @@ -256,6 +259,7 @@ module.exports = class OffsetManager { groupId, memberId, groupGenerationId: generationId, + groupInstanceId, topics, } diff --git a/src/index.js b/src/index.js index d6369d7eb..6c9117a63 100644 --- a/src/index.js +++ b/src/index.js @@ -137,6 +137,7 @@ module.exports = class Client { */ consumer({ groupId, + groupInstanceId, partitionAssigners, metadataMaxAge = DEFAULT_METADATA_MAX_AGE, sessionTimeout, @@ -170,6 +171,7 @@ module.exports = class Client { logger: this[PRIVATE.LOGGER], cluster, groupId, + groupInstanceId, partitionAssigners, sessionTimeout, rebalanceTimeout, diff --git a/src/protocol/requests/describeGroups/fixtures/v3_request.json b/src/protocol/requests/describeGroups/fixtures/v3_request.json new file mode 100644 index 000000000..60e5345d6 --- /dev/null +++ b/src/protocol/requests/describeGroups/fixtures/v3_request.json @@ -0,0 +1,10 @@ +{"type":"Buffer","data":[ + 0, 0, 0, 1, 0, 81, 99, 111, 110, 115, 117, 109, + 101, 114, 45, 103, 114, 111, 117, 112, 45, 105, 100, 45, + 52, 100, 101, 48, 97, 97, 49, 48, 101, 102, 57, 52, + 52, 48, 51, 97, 51, 57, 55, 100, 45, 53, 51, 51, + 56, 52, 45, 100, 50, 102, 101, 101, 57, 54, 57, 45, + 49, 52, 52, 54, 45, 52, 49, 54, 54, 45, 98, 99, + 56, 101, 45, 99, 56, 56, 101, 56, 100, 97, 102, 102, + 100, 102, 101, 1 +]} diff --git a/src/protocol/requests/describeGroups/fixtures/v3_response.json b/src/protocol/requests/describeGroups/fixtures/v3_response.json new file mode 100644 index 000000000..34dea7993 --- /dev/null +++ b/src/protocol/requests/describeGroups/fixtures/v3_response.json @@ -0,0 +1 @@ +{"type":"Buffer","data": [0,0,0,12,0,0,0,1,0,0,0,81,99,111,110,115,117,109,101,114,45,103,114,111,117,112,45,105,100,45,52,100,101,48,97,97,49,48,101,102,57,52,52,48,51,97,51,57,55,100,45,53,51,51,56,52,45,100,50,102,101,101,57,54,57,45,49,52,52,54,45,52,49,54,54,45,98,99,56,101,45,99,56,56,101,56,100,97,102,102,100,102,101,0,6,83,116,97,98,108,101,0,13,112,114,111,116,111,99,111,108,32,116,121,112,101,0,18,82,111,117,110,100,82,111,98,105,110,65,115,115,105,103,110,101,114,0,0,0,1,0,8,109,101,109,98,101,114,45,49,0,8,99,108,105,101,110,116,45,49,0,13,99,108,105,101,110,116,45,104,111,115,116,45,49,255,255,255,255,255,255,255,255,0,0,0,92]} diff --git a/src/protocol/requests/describeGroups/fixtures/v4_response.json b/src/protocol/requests/describeGroups/fixtures/v4_response.json new file mode 100644 index 000000000..2334b32c3 --- /dev/null +++ b/src/protocol/requests/describeGroups/fixtures/v4_response.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,0,0,12,0,0,0,1,0,0,0,81,99,111,110,115,117,109,101,114,45,103,114,111,117,112,45,105,100,45,52,100,101,48,97,97,49,48,101,102,57,52,52,48,51,97,51,57,55,100,45,53,51,51,56,52,45,100,50,102,101,101,57,54,57,45,49,52,52,54,45,52,49,54,54,45,98,99,56,101,45,99,56,56,101,56,100,97,102,102,100,102,101,0,6,83,116,97,98,108,101,0,13,112,114,111,116,111,99,111,108,32,116,121,112,101,0,18,82,111,117,110,100,82,111,98,105,110,65,115,115,105,103,110,101,114,0,0,0,1,0,8,109,101,109,98,101,114,45,49,0,19,103,114,111,117,112,45,105,110,115,116,97,110,99,101,45,105,100,45,49,0,8,99,108,105,101,110,116,45,49,0,13,99,108,105,101,110,116,45,104,111,115,116,45,49,255,255,255,255,255,255,255,255,0,0,0,92,0,0,0,0] } diff --git a/src/protocol/requests/describeGroups/fixtures/v5_request.json b/src/protocol/requests/describeGroups/fixtures/v5_request.json new file mode 100644 index 000000000..72c3df477 --- /dev/null +++ b/src/protocol/requests/describeGroups/fixtures/v5_request.json @@ -0,0 +1,9 @@ +{"type":"Buffer","data": [ + 2, 82, 99, 111, 110, 115, 117, 109, 101, 114, 45, 103, + 114, 111, 117, 112, 45, 105, 100, 45, 52, 100, 101, 48, + 97, 97, 49, 48, 101, 102, 57, 52, 52, 48, 51, 97, + 51, 57, 55, 100, 45, 53, 51, 51, 56, 52, 45, 100, + 50, 102, 101, 101, 57, 54, 57, 45, 49, 52, 52, 54, + 45, 52, 49, 54, 54, 45, 98, 99, 56, 101, 45, 99, + 56, 56, 101, 56, 100, 97, 102, 102, 100, 102, 101, 1 +]} diff --git a/src/protocol/requests/describeGroups/index.js b/src/protocol/requests/describeGroups/index.js index a139d7131..c48256a8a 100644 --- a/src/protocol/requests/describeGroups/index.js +++ b/src/protocol/requests/describeGroups/index.js @@ -14,6 +14,16 @@ const versions = { const response = require('./v2/response') return { request: request({ groupIds }), response } }, + 3: ({ groupIds }) => { + const request = require('./v3/request') + const response = require('./v3/response') + return { request: request({ groupIds }), response } + }, + 4: ({ groupIds }) => { + const request = require('./v4/request') + const response = require('./v4/response') + return { request: request({ groupIds }), response } + }, } module.exports = { diff --git a/src/protocol/requests/describeGroups/v3/request.js b/src/protocol/requests/describeGroups/v3/request.js new file mode 100644 index 000000000..7cb654abe --- /dev/null +++ b/src/protocol/requests/describeGroups/v3/request.js @@ -0,0 +1,17 @@ +const Encoder = require('../../../encoder') +const { DescribeGroups: apiKey } = require('../../apiKeys') + +/** + * DescribeGroups Request (Version: 3) => [group_ids] include_authorized_operations + * group_ids => STRING + * include_authorized_operations => BOOLEAN + */ + +module.exports = ({ groupIds, includeAuthorizedOperations = true }) => ({ + apiKey, + apiVersion: 3, + apiName: 'DescribeGroups', + encode: async () => { + return new Encoder().writeArray(groupIds).writeBoolean(includeAuthorizedOperations) + }, +}) diff --git a/src/protocol/requests/describeGroups/v3/request.spec.js b/src/protocol/requests/describeGroups/v3/request.spec.js new file mode 100644 index 000000000..afeb3bf79 --- /dev/null +++ b/src/protocol/requests/describeGroups/v3/request.spec.js @@ -0,0 +1,14 @@ +const RequestV2Protocol = require('./request') + +describe('Protocol > Requests > DescribeGroups > v3', () => { + test('request', async () => { + const { buffer } = await RequestV2Protocol({ + groupIds: [ + 'consumer-group-id-4de0aa10ef94403a397d-53384-d2fee969-1446-4166-bc8e-c88e8daffdfe', + ], + includeAuthorizedOperations: true, + }).encode() + + expect(buffer).toEqual(Buffer.from(require('../fixtures/v3_request.json'))) + }) +}) diff --git a/src/protocol/requests/describeGroups/v3/response.js b/src/protocol/requests/describeGroups/v3/response.js new file mode 100644 index 000000000..ea0846b7e --- /dev/null +++ b/src/protocol/requests/describeGroups/v3/response.js @@ -0,0 +1,57 @@ +const Decoder = require('../../../decoder') +const { parse } = require('../v1/response') + +/** + * Starting in version 3 : new: authorized_operations for group + * @see https://kafka.apache.org/protocol.html#The_Messages_DescribeGroups + * + * DescribeGroups Response (Version: 3) => throttle_time_ms [groups] + * throttle_time_ms => INT32 + * groups => error_code group_id state protocol_type protocol [members] authorized_operations + * error_code => INT16 + * group_id => STRING + * state => STRING + * protocol_type => STRING + * protocol => STRING + * members => member_id client_id client_host member_metadata member_assignment + * member_id => STRING + * client_id => STRING + * client_host => STRING + * member_metadata => BYTES + * member_assignment => BYTES + * authorized_operations => INT32 // new + */ + +const decoderMember = decoder => ({ + memberId: decoder.readString(), + clientId: decoder.readString(), + clientHost: decoder.readString(), + memberMetadata: decoder.readBytes(), + memberAssignment: decoder.readBytes(), +}) + +const decodeGroup = decoder => ({ + errorCode: decoder.readInt16(), + groupId: decoder.readString(), + state: decoder.readString(), + protocolType: decoder.readString(), + protocol: decoder.readString(), + members: decoder.readArray(decoderMember), + authorizedOperations: decoder.readInt32(), +}) + +const decode = async rawData => { + const decoder = new Decoder(rawData) + const throttleTime = decoder.readInt32() + const groups = decoder.readArray(decodeGroup) + + return { + throttleTime, + groups, + } +} + +module.exports = { + decode, + parse, +} diff --git a/src/protocol/requests/describeGroups/v3/response.spec.js b/src/protocol/requests/describeGroups/v3/response.spec.js new file mode 100644 index 000000000..6a8d98d6a --- /dev/null +++ b/src/protocol/requests/describeGroups/v3/response.spec.js @@ -0,0 +1,32 @@ +const { decode, parse } = require('./response') + +describe('Protocol > Requests > DescribeGroups > v3', () => { + test('response', async () => { + const data = await decode(Buffer.from(require('../fixtures/v3_response.json'))) + expect(data).toEqual({ + throttleTime: 12, + groups: [ + { + errorCode: 0, + groupId: + 'consumer-group-id-4de0aa10ef94403a397d-53384-d2fee969-1446-4166-bc8e-c88e8daffdfe', + state: 'Stable', + protocolType: 'protocol type', + protocol: 'RoundRobinAssigner', + members: [ + { + memberId: 'member-1', + clientId: 'client-1', + clientHost: 'client-host-1', + memberMetadata: null, + memberAssignment: null, + }, + ], + authorizedOperations: 92, + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) +}) diff --git a/src/protocol/requests/describeGroups/v4/request.js b/src/protocol/requests/describeGroups/v4/request.js new file mode 100644 index 000000000..8c6315e7e --- /dev/null +++ b/src/protocol/requests/describeGroups/v4/request.js @@ -0,0 +1,10 @@ +const requestV3 = require('../v3/request') + +/** + * DescribeGroups Request (Version: 4) => [group_ids] include_authorized_operations + * group_ids => STRING + * include_authorized_operations => BOOLEAN + */ + +module.exports = ({ groupIds, includeAuthorizedOperations = true }) => + Object.assign(requestV3({ groupIds, includeAuthorizedOperations }), { apiVersion: 4 }) diff --git a/src/protocol/requests/describeGroups/v4/request.spec.js b/src/protocol/requests/describeGroups/v4/request.spec.js new file mode 100644 index 000000000..96bce0e4a --- /dev/null +++ b/src/protocol/requests/describeGroups/v4/request.spec.js @@ -0,0 +1,14 @@ +const RequestV4Protocol = require('./request') + +describe('Protocol > Requests > DescribeGroups > v4', () => { + test('request', async () => { + const { buffer } = await RequestV4Protocol({ + groupIds: [ + 'consumer-group-id-4de0aa10ef94403a397d-53384-d2fee969-1446-4166-bc8e-c88e8daffdfe', + ], + includeAuthorizedOperations: true, + }).encode() + + expect(buffer).toEqual(Buffer.from(require('../fixtures/v3_request.json'))) + }) +}) diff --git a/src/protocol/requests/describeGroups/v4/response.js b/src/protocol/requests/describeGroups/v4/response.js new file mode 100644 index 000000000..97fdc1cb9 --- /dev/null +++ b/src/protocol/requests/describeGroups/v4/response.js @@ -0,0 +1,59 @@ +const Decoder = require('../../../decoder') +const { parse } = require('../v1/response') + +/** + * Starting in version 4 : new: member group instance id + * @see https://kafka.apache.org/protocol.html#The_Messages_DescribeGroups + * + * DescribeGroups Response (Version: 4) => throttle_time_ms [groups] + * throttle_time_ms => INT32 + * groups => error_code group_id state protocol_type protocol [members] authorized_operations + * error_code => INT16 + * group_id => STRING + * state => STRING + * protocol_type => STRING + * protocol => STRING + * members => member_id group_instance_id client_id client_host member_metadata member_assignment + * member_id => STRING + * group_instance_id => NULLABLE_STRING // new + * client_id => STRING + * client_host => STRING + * member_metadata => BYTES + * member_assignment => BYTES + * authorized_operations => INT32 + */ + +const decoderMember = decoder => ({ + memberId: decoder.readString(), + groupInstanceId: decoder.readString(), + clientId: decoder.readString(), + clientHost: decoder.readString(), + memberMetadata: decoder.readBytes(), + memberAssignment: decoder.readBytes(), +}) + +const decodeGroup = decoder => ({ + errorCode: decoder.readInt16(), + groupId: decoder.readString(), + state: decoder.readString(), + protocolType: decoder.readString(), + protocol: decoder.readString(), + members: decoder.readArray(decoderMember), + authorizedOperations: decoder.readInt32(), +}) + +const decode = async rawData => { + const decoder = new Decoder(rawData) + const throttleTime = decoder.readInt32() + const groups = decoder.readArray(decodeGroup) + + return { + throttleTime, + groups, + } +} + +module.exports = { + decode, + parse, +} diff --git a/src/protocol/requests/describeGroups/v4/response.spec.js b/src/protocol/requests/describeGroups/v4/response.spec.js new file mode 100644 index 000000000..71d59de69 --- /dev/null +++ b/src/protocol/requests/describeGroups/v4/response.spec.js @@ -0,0 +1,35 @@ +/* eslint-disable prettier/prettier */ +const { decode, parse } = require('./response') + +describe('Protocol > Requests > DescribeGroups > v4', () => { + test('response', async () => { + const data = await decode(Buffer.from(require('../fixtures/v4_response.json'))) + + expect(data).toEqual({ + throttleTime: 12, + groups: [ + { + errorCode: 0, + groupId: + 'consumer-group-id-4de0aa10ef94403a397d-53384-d2fee969-1446-4166-bc8e-c88e8daffdfe', + state: 'Stable', + protocolType: 'protocol type', + protocol: 'RoundRobinAssigner', + members: [ + { + memberId: 'member-1', + groupInstanceId: 'group-instance-id-1', + clientId: 'client-1', + clientHost: 'client-host-1', + memberMetadata: null, + memberAssignment: null, + }, + ], + authorizedOperations: 92, + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) +}) diff --git a/src/protocol/requests/offsetCommit/fixtures/v6_request.json b/src/protocol/requests/offsetCommit/fixtures/v6_request.json new file mode 100644 index 000000000..c8871ee48 --- /dev/null +++ b/src/protocol/requests/offsetCommit/fixtures/v6_request.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,80,99,111,110,115,117,109,101,114,45,103,114,111,117,112,45,105,100,45,99,97,50,56,48,54,55,52,51,57,100,54,49,57,52,97,57,54,50,53,45,57,57,56,53,45,99,98,98,56,49,97,57,55,45,53,49,53,49,45,52,54,53,56,45,97,48,53,53,45,99,52,55,57,49,52,55,98,49,48,55,100,0,0,0,1,0,104,116,101,115,116,45,102,53,101,51,53,57,102,102,97,55,98,50,53,55,56,97,99,97,52,98,45,57,57,56,53,45,54,48,100,99,100,48,100,97,45,49,49,51,48,45,52,101,97,97,45,57,57,97,97,45,57,98,100,56,48,102,51,57,101,99,101,98,45,52,52,50,54,99,101,49,57,45,56,49,52,57,45,52,100,54,52,45,98,56,99,51,45,56,52,49,100,51,98,98,55,99,97,50,54,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,53,99,50,52,101,102,101,48,97,99,52,49,98,57,49,98,101,101,56,53,45,57,57,56,53,45,56,52,49,100,54,49,52,53,45,99,56,57,55,45,52,52,55,49,45,98,100,48,57,45,97,99,100,56,98,52,99,57,48,53,102,50,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,200,255,255]} diff --git a/src/protocol/requests/offsetCommit/fixtures/v6_request_metadata.json b/src/protocol/requests/offsetCommit/fixtures/v6_request_metadata.json new file mode 100644 index 000000000..2a346b1d7 --- /dev/null +++ b/src/protocol/requests/offsetCommit/fixtures/v6_request_metadata.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,80,99,111,110,115,117,109,101,114,45,103,114,111,117,112,45,105,100,45,99,97,50,56,48,54,55,52,51,57,100,54,49,57,52,97,57,54,50,53,45,57,57,56,53,45,99,98,98,56,49,97,57,55,45,53,49,53,49,45,52,54,53,56,45,97,48,53,53,45,99,52,55,57,49,52,55,98,49,48,55,100,0,0,0,1,0,104,116,101,115,116,45,102,53,101,51,53,57,102,102,97,55,98,50,53,55,56,97,99,97,52,98,45,57,57,56,53,45,54,48,100,99,100,48,100,97,45,49,49,51,48,45,52,101,97,97,45,57,57,97,97,45,57,98,100,56,48,102,51,57,101,99,101,98,45,52,52,50,54,99,101,49,57,45,56,49,52,57,45,52,100,54,52,45,98,56,99,51,45,56,52,49,100,51,98,98,55,99,97,50,54,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,53,99,50,52,101,102,101,48,97,99,52,49,98,57,49,98,101,101,56,53,45,57,57,56,53,45,56,52,49,100,54,49,52,53,45,99,56,57,55,45,52,52,55,49,45,98,100,48,57,45,97,99,100,56,98,52,99,57,48,53,102,50,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,200,0,4,116,101,115,116]} diff --git a/src/protocol/requests/offsetCommit/fixtures/v7_request.json b/src/protocol/requests/offsetCommit/fixtures/v7_request.json new file mode 100644 index 000000000..f30ea26cf --- /dev/null +++ b/src/protocol/requests/offsetCommit/fixtures/v7_request.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,80,99,111,110,115,117,109,101,114,45,103,114,111,117,112,45,105,100,45,99,97,50,56,48,54,55,52,51,57,100,54,49,57,52,97,57,54,50,53,45,57,57,56,53,45,99,98,98,56,49,97,57,55,45,53,49,53,49,45,52,54,53,56,45,97,48,53,53,45,99,52,55,57,49,52,55,98,49,48,55,100,0,0,0,1,0,104,116,101,115,116,45,102,53,101,51,53,57,102,102,97,55,98,50,53,55,56,97,99,97,52,98,45,57,57,56,53,45,54,48,100,99,100,48,100,97,45,49,49,51,48,45,52,101,97,97,45,57,57,97,97,45,57,98,100,56,48,102,51,57,101,99,101,98,45,52,52,50,54,99,101,49,57,45,56,49,52,57,45,52,100,54,52,45,98,56,99,51,45,56,52,49,100,51,98,98,55,99,97,50,54,0,118,103,114,112,117,112,45,105,110,115,116,97,110,99,101,45,105,100,45,45,102,53,101,51,53,57,102,102,97,55,98,50,53,55,56,97,99,97,52,98,45,57,57,56,53,45,54,48,100,99,100,48,100,97,45,49,49,51,48,45,52,101,97,97,45,57,57,97,97,45,57,98,100,56,48,102,51,57,101,99,101,98,45,52,52,50,54,99,101,49,57,45,56,49,52,57,45,52,100,54,52,45,98,56,99,51,45,56,52,49,100,51,98,98,55,99,97,50,54,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,53,99,50,52,101,102,101,48,97,99,52,49,98,57,49,98,101,101,56,53,45,57,57,56,53,45,56,52,49,100,54,49,52,53,45,99,56,57,55,45,52,52,55,49,45,98,100,48,57,45,97,99,100,56,98,52,99,57,48,53,102,50,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,200,255,255]} diff --git a/src/protocol/requests/offsetCommit/fixtures/v7_request_metadata.json b/src/protocol/requests/offsetCommit/fixtures/v7_request_metadata.json new file mode 100644 index 000000000..a73ac3213 --- /dev/null +++ b/src/protocol/requests/offsetCommit/fixtures/v7_request_metadata.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,80,99,111,110,115,117,109,101,114,45,103,114,111,117,112,45,105,100,45,99,97,50,56,48,54,55,52,51,57,100,54,49,57,52,97,57,54,50,53,45,57,57,56,53,45,99,98,98,56,49,97,57,55,45,53,49,53,49,45,52,54,53,56,45,97,48,53,53,45,99,52,55,57,49,52,55,98,49,48,55,100,0,0,0,1,0,104,116,101,115,116,45,102,53,101,51,53,57,102,102,97,55,98,50,53,55,56,97,99,97,52,98,45,57,57,56,53,45,54,48,100,99,100,48,100,97,45,49,49,51,48,45,52,101,97,97,45,57,57,97,97,45,57,98,100,56,48,102,51,57,101,99,101,98,45,52,52,50,54,99,101,49,57,45,56,49,52,57,45,52,100,54,52,45,98,56,99,51,45,56,52,49,100,51,98,98,55,99,97,50,54,0,118,103,114,112,117,112,45,105,110,115,116,97,110,99,101,45,105,100,45,45,102,53,101,51,53,57,102,102,97,55,98,50,53,55,56,97,99,97,52,98,45,57,57,56,53,45,54,48,100,99,100,48,100,97,45,49,49,51,48,45,52,101,97,97,45,57,57,97,97,45,57,98,100,56,48,102,51,57,101,99,101,98,45,52,52,50,54,99,101,49,57,45,56,49,52,57,45,52,100,54,52,45,98,56,99,51,45,56,52,49,100,51,98,98,55,99,97,50,54,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,53,99,50,52,101,102,101,48,97,99,52,49,98,57,49,98,101,101,56,53,45,57,57,56,53,45,56,52,49,100,54,49,52,53,45,99,56,57,55,45,52,52,55,49,45,98,100,48,57,45,97,99,100,56,98,52,99,57,48,53,102,50,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,200,0,4,116,101,115,116]} diff --git a/src/protocol/requests/offsetCommit/index.js b/src/protocol/requests/offsetCommit/index.js index ee9a22e91..7ef3f81cb 100644 --- a/src/protocol/requests/offsetCommit/index.js +++ b/src/protocol/requests/offsetCommit/index.js @@ -67,6 +67,33 @@ const versions = { response, } }, + 6: ({ groupId, groupGenerationId, memberId, topics }) => { + const request = require('./v6/request') + const response = require('./v6/response') + return { + request: request({ + groupId, + groupGenerationId, + memberId, + topics, + }), + response, + } + }, + 7: ({ groupId, groupGenerationId, memberId, groupInstanceId, topics }) => { + const request = require('./v7/request') + const response = require('./v7/response') + return { + request: request({ + groupId, + groupGenerationId, + memberId, + groupInstanceId, + topics, + }), + response, + } + }, } module.exports = { diff --git a/src/protocol/requests/offsetCommit/v6/request.js b/src/protocol/requests/offsetCommit/v6/request.js new file mode 100644 index 000000000..0fcbac1c0 --- /dev/null +++ b/src/protocol/requests/offsetCommit/v6/request.js @@ -0,0 +1,43 @@ +const Encoder = require('../../../encoder') +const { OffsetCommit: apiKey } = require('../../apiKeys') + +/** + * Version 6 group_instance_id added + * + * OffsetCommit Request (Version: 6) => group_id generation_id member_id [topics] + * group_id => STRING + * generation_id => INT32 + * member_id => STRING + * topics => topic [partitions] + * topic => STRING + * partitions => partition offset metadata + * partition => INT32 + * offset => INT64 + * committed_leader_epoch => INT32 // new + * metadata => NULLABLE_STRING + */ + +module.exports = ({ groupId, groupGenerationId, memberId, topics }) => ({ + apiKey, + apiVersion: 6, + apiName: 'OffsetCommit', + encode: async () => { + return new Encoder() + .writeString(groupId) + .writeInt32(groupGenerationId) + .writeString(memberId) + .writeArray(topics.map(encodeTopic)) + }, +}) + +const encodeTopic = ({ topic, partitions }) => { + return new Encoder().writeString(topic).writeArray(partitions.map(encodePartition)) +} + +const encodePartition = ({ partition, offset, committedLeaderEpoch, metadata = null }) => { + return new Encoder() + .writeInt32(partition) + .writeInt64(offset) + .writeInt32(committedLeaderEpoch) + .writeString(metadata) +} diff --git a/src/protocol/requests/offsetCommit/v6/request.spec.js b/src/protocol/requests/offsetCommit/v6/request.spec.js new file mode 100644 index 000000000..558fef5c9 --- /dev/null +++ b/src/protocol/requests/offsetCommit/v6/request.spec.js @@ -0,0 +1,37 @@ +const RequestV6Protocol = require('./request') + +describe('Protocol > Requests > OffsetCommit > v6', () => { + test('request', async () => { + const { buffer } = await RequestV6Protocol({ + groupId: 'consumer-group-id-ca28067439d6194a9625-9985-cbb81a97-5151-4658-a055-c479147b107d', + groupGenerationId: 1, + memberId: + 'test-f5e359ffa7b2578aca4b-9985-60dcd0da-1130-4eaa-99aa-9bd80f39eceb-4426ce19-8149-4d64-b8c3-841d3bb7ca26', + topics: [ + { + topic: 'test-topic-5c24efe0ac41b91bee85-9985-841d6145-c897-4471-bd09-acd8b4c905f2', + partitions: [{ partition: 0, offset: '0', committedLeaderEpoch: 456, metadata: null }], + }, + ], + }).encode() + + expect(buffer).toEqual(Buffer.from(require('../fixtures/v6_request.json'))) + }) + + test('request with metadata', async () => { + const { buffer } = await RequestV6Protocol({ + groupId: 'consumer-group-id-ca28067439d6194a9625-9985-cbb81a97-5151-4658-a055-c479147b107d', + groupGenerationId: 1, + memberId: + 'test-f5e359ffa7b2578aca4b-9985-60dcd0da-1130-4eaa-99aa-9bd80f39eceb-4426ce19-8149-4d64-b8c3-841d3bb7ca26', + topics: [ + { + topic: 'test-topic-5c24efe0ac41b91bee85-9985-841d6145-c897-4471-bd09-acd8b4c905f2', + partitions: [{ partition: 0, offset: '0', committedLeaderEpoch: 456, metadata: 'test' }], + }, + ], + }).encode() + + expect(buffer).toEqual(Buffer.from(require('../fixtures/v6_request_metadata.json'))) + }) +}) diff --git a/src/protocol/requests/offsetCommit/v6/response.js b/src/protocol/requests/offsetCommit/v6/response.js new file mode 100644 index 000000000..ac5e1421d --- /dev/null +++ b/src/protocol/requests/offsetCommit/v6/response.js @@ -0,0 +1,15 @@ +const { parse, decode } = require('../v5/response') + +/** + * OffsetCommit Response (Version: 6) => throttle_time_ms [responses] + * throttle_time_ms => INT32 + * responses => topic [partition_responses] + * topic => STRING + * partition_responses => partition error_code + * partition => INT32 + * error_code => INT16 + */ +module.exports = { + decode, + parse, +} diff --git a/src/protocol/requests/offsetCommit/v6/response.spec.js b/src/protocol/requests/offsetCommit/v6/response.spec.js new file mode 100644 index 000000000..5fecf463a --- /dev/null +++ b/src/protocol/requests/offsetCommit/v6/response.spec.js @@ -0,0 +1,19 @@ +const { decode, parse } = require('./response') + +describe('Protocol > Requests > OffsetCommit > v6', () => { + test('response', async () => { + const data = await decode(Buffer.from(require('../fixtures/v5_response.json'))) + expect(data).toEqual({ + throttleTime: 0, + clientSideThrottleTime: 0, + responses: [ + { + topic: 'test-topic-5c24efe0ac41b91bee85-9985-841d6145-c897-4471-bd09-acd8b4c905f2', + partitions: [{ partition: 0, errorCode: 0 }], + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) +}) diff --git a/src/protocol/requests/offsetCommit/v7/request.js b/src/protocol/requests/offsetCommit/v7/request.js new file mode 100644 index 000000000..605e6aa05 --- /dev/null +++ b/src/protocol/requests/offsetCommit/v7/request.js @@ -0,0 +1,45 @@ +const Encoder = require('../../../encoder') +const { OffsetCommit: apiKey } = require('../../apiKeys') + +/** + * Version 7 group_instance_id added + * + * OffsetCommit Request (Version: 7) => group_id generation_id member_id [topics] + * group_id => STRING + * generation_id => INT32 + * member_id => STRING + * group_instance_id => NULLABLE_STRING //new + * topics => topic [partitions] + * topic => STRING + * partitions => partition offset metadata + * partition => INT32 + * offset => INT64 + * committed_leader_epoch => INT32 + * metadata => NULLABLE_STRING + */ + +module.exports = ({ groupId, groupGenerationId, groupInstanceId = null, memberId, topics }) => ({ + apiKey, + apiVersion: 7, + apiName: 'OffsetCommit', + encode: async () => { + return new Encoder() + .writeString(groupId) + .writeInt32(groupGenerationId) + .writeString(memberId) + .writeString(groupInstanceId) + .writeArray(topics.map(encodeTopic)) + }, +}) + +const encodeTopic = ({ topic, partitions }) => { + return new Encoder().writeString(topic).writeArray(partitions.map(encodePartition)) +} + +const encodePartition = ({ partition, offset, committedLeaderEpoch, metadata = null }) => { + return new Encoder() + .writeInt32(partition) + .writeInt64(offset) + .writeInt32(committedLeaderEpoch) + .writeString(metadata) +} diff --git a/src/protocol/requests/offsetCommit/v7/request.spec.js b/src/protocol/requests/offsetCommit/v7/request.spec.js new file mode 100644 index 000000000..7a9f07185 --- /dev/null +++ b/src/protocol/requests/offsetCommit/v7/request.spec.js @@ -0,0 +1,41 @@ +const RequestV7Protocol = require('./request') + +describe('Protocol > Requests > OffsetCommit > v7', () => { + test('request', async () => { + const { buffer } = await RequestV7Protocol({ + groupId: 'consumer-group-id-ca28067439d6194a9625-9985-cbb81a97-5151-4658-a055-c479147b107d', + groupGenerationId: 1, + memberId: + 'test-f5e359ffa7b2578aca4b-9985-60dcd0da-1130-4eaa-99aa-9bd80f39eceb-4426ce19-8149-4d64-b8c3-841d3bb7ca26', + groupInstanceId: + 'grpup-instance-id--f5e359ffa7b2578aca4b-9985-60dcd0da-1130-4eaa-99aa-9bd80f39eceb-4426ce19-8149-4d64-b8c3-841d3bb7ca26', + topics: [ + { + topic: 'test-topic-5c24efe0ac41b91bee85-9985-841d6145-c897-4471-bd09-acd8b4c905f2', + partitions: [{ partition: 0, offset: '0', committedLeaderEpoch: 456, metadata: null }], + }, + ], + }).encode() + + expect(buffer).toEqual(Buffer.from(require('../fixtures/v7_request.json'))) + }) + + test('request with metadata', async () => { + const { buffer } = await RequestV7Protocol({ + groupId: 'consumer-group-id-ca28067439d6194a9625-9985-cbb81a97-5151-4658-a055-c479147b107d', + groupGenerationId: 1, + memberId: + 'test-f5e359ffa7b2578aca4b-9985-60dcd0da-1130-4eaa-99aa-9bd80f39eceb-4426ce19-8149-4d64-b8c3-841d3bb7ca26', + groupInstanceId: + 'grpup-instance-id--f5e359ffa7b2578aca4b-9985-60dcd0da-1130-4eaa-99aa-9bd80f39eceb-4426ce19-8149-4d64-b8c3-841d3bb7ca26', + topics: [ + { + topic: 'test-topic-5c24efe0ac41b91bee85-9985-841d6145-c897-4471-bd09-acd8b4c905f2', + partitions: [{ partition: 0, offset: '0', committedLeaderEpoch: 456, metadata: 'test' }], + }, + ], + }).encode() + + expect(buffer).toEqual(Buffer.from(require('../fixtures/v7_request_metadata.json'))) + }) +}) diff --git a/src/protocol/requests/offsetCommit/v7/response.js b/src/protocol/requests/offsetCommit/v7/response.js new file mode 100644 index 000000000..38842f4a6 --- /dev/null +++ b/src/protocol/requests/offsetCommit/v7/response.js @@ -0,0 +1,15 @@ +const { parse, decode } = require('../v6/response') + +/** + * OffsetCommit Response (Version: 7) => throttle_time_ms [responses] + * throttle_time_ms => INT32 + * responses => topic [partition_responses] + * topic => STRING + * partition_responses => partition error_code + * partition => INT32 + * error_code => INT16 + */ +module.exports = { + decode, + parse, +} diff --git a/src/protocol/requests/offsetCommit/v7/response.spec.js b/src/protocol/requests/offsetCommit/v7/response.spec.js new file mode 100644 index 000000000..341430fde --- /dev/null +++ b/src/protocol/requests/offsetCommit/v7/response.spec.js @@ -0,0 +1,19 @@ +const { decode, parse } = require('./response') + +describe('Protocol > Requests > OffsetCommit > v7', () => { + test('response', async () => { + const data = await decode(Buffer.from(require('../fixtures/v5_response.json'))) + expect(data).toEqual({ + throttleTime: 0, + clientSideThrottleTime: 0, + responses: [ + { + topic: 'test-topic-5c24efe0ac41b91bee85-9985-841d6145-c897-4471-bd09-acd8b4c905f2', + partitions: [{ partition: 0, errorCode: 0 }], + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 26bcbfea3..541b5e4ab 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -151,6 +151,7 @@ export interface IHeaders { export interface ConsumerConfig { groupId: string + groupInstanceId?: string partitionAssigners?: PartitionAssigner[] metadataMaxAge?: number sessionTimeout?: number @@ -661,6 +662,7 @@ export type Broker = { groupId: string groupGenerationId: number memberId: string + groupInstanceId?: string retentionTime?: number topics: TopicOffsets[] }): Promise @@ -970,6 +972,7 @@ export type ConsumerReceivedUnsubcribedTopicsEvent = InstrumentationEvent<{ groupId: string generationId: number memberId: string + groupInstanceId?: string assignedTopics: string[] topicsSubscribed: string[] topicsNotSubscribed: string[]