diff --git a/src/admin/index.js b/src/admin/index.js index 8e98691b1..d9dc63eba 100644 --- a/src/admin/index.js +++ b/src/admin/index.js @@ -986,6 +986,46 @@ module.exports = ({ }) } + /** + * + * @param {Object} request + * @param {string} request.groupId + * @param {import("../../types").TopicPartitions[]} request.topics + * @returns {Promise} + */ + const offsetDelete = ({ groupId, topics }) => { + if (!groupId || typeof groupId !== 'string') { + throw new KafkaJSNonRetriableError(`Invalid groupId value ${groupId}`) + } + + if (!topics || !Array.isArray(topics)) { + throw new KafkaJSNonRetriableError(`Invalid partitions value ${topics}`) + } + + if (topics.length === 0) { + return + } + + const retrier = createRetry(retry) + + return retrier(async (bail, retryCount, retryTime) => { + try { + await cluster.refreshMetadataIfNecessary() + + const coordinator = await cluster.findGroupCoordinator({ groupId }) + + await coordinator.offsetDelete({ groupId, topics }) + } catch (e) { + if (e.type === 'NOT_CONTROLLER' || e.type === 'COORDINATOR_NOT_AVAILABLE') { + logger.warn('Could not delete groups', { error: e.message, retryCount, retryTime }) + throw e + } + + bail(e) + } + }) + } + /** * Delete topic records up to the selected partition offsets * @@ -1602,5 +1642,6 @@ module.exports = ({ deleteTopicRecords, alterPartitionReassignments, listPartitionReassignments, + offsetDelete, } } diff --git a/src/broker/index.js b/src/broker/index.js index bb86cc5d5..5dd2f2f54 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -896,6 +896,16 @@ module.exports = class Broker { return await this[PRIVATE.SEND_REQUEST](listPartitionReassignments({ topics, timeout })) } + /** + * @param {Object} request + * @param {string} request.groupId + * @param {import("../../types").TopicPartitions[]} request.topics + */ + async offsetDelete({ groupId, topics }) { + const offsetDelete = this.lookupRequest(apiKeys.OffsetDelete, requests.OffsetDelete) + return await this[PRIVATE.SEND_REQUEST](offsetDelete({ groupId, topics })) + } + /** * @private */ diff --git a/src/errors.js b/src/errors.js index 266192e19..ff16e23dd 100644 --- a/src/errors.js +++ b/src/errors.js @@ -248,6 +248,15 @@ class KafkaJSAlterPartitionReassignmentsError extends KafkaJSProtocolError { } } +class KafkaJSOffsetDeleteError extends KafkaJSProtocolError { + constructor(e, topicName, partition) { + super(e) + this.topic = topicName + this.partition = partition + this.name = 'KafkaJSOffsetDeleteError' + } +} + class KafkaJSAggregateError extends Error { constructor(message, errors) { super(message) @@ -304,6 +313,7 @@ module.exports = { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError, KafkaJSAlterPartitionReassignmentsError, + KafkaJSOffsetDeleteError, isRebalancing, isKafkaJSError, } diff --git a/src/protocol/requests/apiKeys.js b/src/protocol/requests/apiKeys.js index 442739efd..03a1d2bf5 100644 --- a/src/protocol/requests/apiKeys.js +++ b/src/protocol/requests/apiKeys.js @@ -46,4 +46,5 @@ module.exports = { IncrementalAlterConfigs: 44, AlterPartitionReassignments: 45, ListPartitionReassignments: 46, + OffsetDelete: 47, } diff --git a/src/protocol/requests/index.js b/src/protocol/requests/index.js index 96137bc28..ddbc71f87 100644 --- a/src/protocol/requests/index.js +++ b/src/protocol/requests/index.js @@ -74,6 +74,7 @@ const requests = { IncrementalAlterConfigs: noImplementedRequestDefinitions, AlterPartitionReassignments: require('./alterPartitionReassignments'), ListPartitionReassignments: require('./listPartitionReassignments'), + OffsetDelete: require('./offsetDelete'), } const names = Object.keys(apiKeys) diff --git a/src/protocol/requests/offsetDelete/fixtures/v0_request.json b/src/protocol/requests/offsetDelete/fixtures/v0_request.json new file mode 100644 index 000000000..43e5a341e --- /dev/null +++ b/src/protocol/requests/offsetDelete/fixtures/v0_request.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,3,102,111,111,0,0,0,1,0,3,98,97,114,0,0,0,4,0,0,0,0,0,0,0,1,0,0,0,2,0,0,0,3]} diff --git a/src/protocol/requests/offsetDelete/fixtures/v0_response.json b/src/protocol/requests/offsetDelete/fixtures/v0_response.json new file mode 100644 index 000000000..440194078 --- /dev/null +++ b/src/protocol/requests/offsetDelete/fixtures/v0_response.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,0,0,0,0,0,0,0,0,1,0,3,98,97,114,0,0,0,4,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,2,0,0,0,0,0,3,0,0]} \ No newline at end of file diff --git a/src/protocol/requests/offsetDelete/index.js b/src/protocol/requests/offsetDelete/index.js new file mode 100644 index 000000000..6c4749784 --- /dev/null +++ b/src/protocol/requests/offsetDelete/index.js @@ -0,0 +1,12 @@ +const versions = { + 0: ({ groupId, topics }) => { + const request = require('./v0/request') + const response = require('./v0/response') + return { request: request({ groupId, topics }), response } + }, +} + +module.exports = { + versions: Object.keys(versions), + protocol: ({ version }) => versions[version], +} diff --git a/src/protocol/requests/offsetDelete/v0/request.js b/src/protocol/requests/offsetDelete/v0/request.js new file mode 100644 index 000000000..8e8f8a7e1 --- /dev/null +++ b/src/protocol/requests/offsetDelete/v0/request.js @@ -0,0 +1,35 @@ +const Encoder = require('../../../encoder') +const { OffsetDelete: apiKey } = require('../../apiKeys') + +/** + * OffsetDelete Request (Version: 0) => group_id [topics] + * group_id => STRING + * topics => name [partitions] + * name => STRING + * partitions => partition_index + * partition_index => INT32 + */ + +/** + * @param {Object} request + * @param {string} request.groupId + * @param {import("../../../../../types").TopicPartitions[]} request.topics + */ +module.exports = ({ groupId, topics }) => ({ + apiKey, + apiVersion: 0, + apiName: 'OffsetDelete', + encode: async () => { + return new Encoder() + .writeString(groupId) + .writeArray(topics === null ? topics : topics.map(encodeTopics)) + }, +}) + +const encodeTopics = ({ topic, partitions }) => { + return new Encoder().writeString(topic).writeArray(partitions.map(encodePartitions)) +} + +const encodePartitions = partition => { + return new Encoder().writeInt32(partition) +} diff --git a/src/protocol/requests/offsetDelete/v0/request.spec.js b/src/protocol/requests/offsetDelete/v0/request.spec.js new file mode 100644 index 000000000..8f8402a4c --- /dev/null +++ b/src/protocol/requests/offsetDelete/v0/request.spec.js @@ -0,0 +1,21 @@ +const offsetDeleteRequest = require('./request') + +describe('Protocol > Requests > OffsetDelete > v0', () => { + test('request', async () => { + const { apiKey, apiVersion, apiName, encode } = offsetDeleteRequest({ + groupId: 'foo', + topics: [ + { + topic: 'bar', + partitions: [0, 1, 2, 3], + }, + ], + }) + const { buffer } = await encode() + + expect(apiKey).toEqual(47) + expect(apiVersion).toEqual(0) + expect(apiName).toEqual('OffsetDelete') + expect(buffer).toEqual(Buffer.from(require('../fixtures/v0_request.json'))) + }) +}) diff --git a/src/protocol/requests/offsetDelete/v0/response.js b/src/protocol/requests/offsetDelete/v0/response.js new file mode 100644 index 000000000..b3f39e6f0 --- /dev/null +++ b/src/protocol/requests/offsetDelete/v0/response.js @@ -0,0 +1,82 @@ +const { KafkaJSAggregateError, KafkaJSOffsetDeleteError } = require('../../../../errors') +const Decoder = require('../../../decoder') +const { failure, createErrorFromCode } = require('../../../error') + +/** + * OffsetDelete Response (Version: 0) => error_code throttle_time_ms [topics] + * error_code => INT16 + * throttle_time_ms => INT32 + * topics => name [partitions] + * name => STRING + * partitions => partition_index error_code + * partition_index => INT32 + * error_code => INT16 + */ + +/** + * @param {Decoder} decoder + */ +const decodePartition = decoder => { + const partition = { + partition: decoder.readInt32(), + errorCode: decoder.readInt16(), + } + + return partition +} + +/** + * @param {Decoder} decoder + */ +const decodeTopics = decoder => { + const topic = { + topic: decoder.readString(), + partitions: decoder.readArray(decodePartition), + } + + return topic +} + +const decode = async rawData => { + const decoder = new Decoder(rawData) + + const errorCode = decoder.readInt16() + const throttleTime = decoder.readInt32() + return { + errorCode, + throttleTime, + topics: decoder.readArray(decodeTopics), + } +} + +const parse = async data => { + if (failure(data.errorCode)) { + throw createErrorFromCode(data.errorCode) + } + + const topicPartitionsWithError = data.topics.flatMap(({ topic, partitions }) => + partitions + .filter(partition => failure(partition.errorCode)) + .map(partition => ({ + ...partition, + topic, + })) + ) + + if (topicPartitionsWithError.length > 0) { + throw new KafkaJSAggregateError( + 'Errors deleting offsets', + topicPartitionsWithError.map( + ({ topic, partition, errorCode }) => + new KafkaJSOffsetDeleteError(createErrorFromCode(errorCode), topic, partition) + ) + ) + } + + return data +} + +module.exports = { + decode, + parse, +} diff --git a/src/protocol/requests/offsetDelete/v0/response.spec.js b/src/protocol/requests/offsetDelete/v0/response.spec.js new file mode 100644 index 000000000..5f5ab011d --- /dev/null +++ b/src/protocol/requests/offsetDelete/v0/response.spec.js @@ -0,0 +1,36 @@ +const { decode, parse } = require('./response') + +describe('Protocol > Requests > OffsetDelete > v0', () => { + test('response', async () => { + const data = await decode(Buffer.from(require('../fixtures/v0_response.json'))) + expect(data).toEqual({ + errorCode: 0, + throttleTime: 0, + topics: [ + { + topic: 'bar', + partitions: [ + { + errorCode: 0, + partition: 0, + }, + { + errorCode: 0, + partition: 1, + }, + { + errorCode: 0, + partition: 2, + }, + { + errorCode: 0, + partition: 3, + }, + ], + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 26bcbfea3..d2f60d966 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -549,6 +549,10 @@ export type Admin = { topics?: TopicPartitions[] timeout?: number }): Promise + offsetDelete(request: { + groupId: string, + topics: TopicPartitions[], + }): Promise logger(): Logger on( eventName: AdminEvents['CONNECT'], @@ -702,6 +706,10 @@ export type Broker = { topics?: TopicPartitions[] timeout?: number }): Promise + offsetDelete(request: { + groupId: string, + topics: TopicPartitions[], + }): Promise } interface MessageSetEntry {