Skip to content

Commit

Permalink
feat: add OffsetDelete admin API
Browse files Browse the repository at this point in the history
resolves: tulios#1549
  • Loading branch information
ehm-93 committed Jun 11, 2023
1 parent ff3b111 commit a591393
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 0 deletions.
41 changes: 41 additions & 0 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,46 @@ module.exports = ({
})
}

/**
*
* @param {Object} request
* @param {string} request.groupId
* @param {import("../../types").TopicPartitions[]} request.topics
* @returns {Promise<void>}
*/
const offsetDelete = ({ groupId, topics }) => {
if (!groupId || typeof groupId !== 'string') {
throw new KafkaJSNonRetriableError(`Invalid groupId value ${groupId}`)
}

if (!topics || !Array.isArray(topics)) {
throw new KafkaJSNonRetriableError(`Invalid partitions value ${topics}`)
}

if (topics.length === 0) {
return
}

const retrier = createRetry(retry)

return retrier(async (bail, retryCount, retryTime) => {
try {
await cluster.refreshMetadataIfNecessary()

const coordinator = await cluster.findGroupCoordinator({ groupId })

await coordinator.offsetDelete({ groupId, topics })
} catch (e) {
if (e.type === 'NOT_CONTROLLER' || e.type === 'COORDINATOR_NOT_AVAILABLE') {
logger.warn('Could not delete groups', { error: e.message, retryCount, retryTime })
throw e
}

bail(e)
}
})
}

/**
* Delete topic records up to the selected partition offsets
*
Expand Down Expand Up @@ -1602,5 +1642,6 @@ module.exports = ({
deleteTopicRecords,
alterPartitionReassignments,
listPartitionReassignments,
offsetDelete,
}
}
10 changes: 10 additions & 0 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,16 @@ module.exports = class Broker {
return await this[PRIVATE.SEND_REQUEST](listPartitionReassignments({ topics, timeout }))
}

/**
* @param {Object} request
* @param {string} request.groupId
* @param {import("../../types").TopicPartitions[]} request.topics
*/
async offsetDelete({ groupId, topics }) {
const offsetDelete = this.lookupRequest(apiKeys.OffsetDelete, requests.OffsetDelete)
return await this[PRIVATE.SEND_REQUEST](offsetDelete({ groupId, topics }))
}

/**
* @private
*/
Expand Down
10 changes: 10 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ class KafkaJSAlterPartitionReassignmentsError extends KafkaJSProtocolError {
}
}

class KafkaJSOffsetDeleteError extends KafkaJSProtocolError {
constructor(e, topicName, partition) {
super(e)
this.topic = topicName
this.partition = partition
this.name = 'KafkaJSOffsetDeleteError'
}
}

class KafkaJSAggregateError extends Error {
constructor(message, errors) {
super(message)
Expand Down Expand Up @@ -304,6 +313,7 @@ module.exports = {
KafkaJSFetcherRebalanceError,
KafkaJSNoBrokerAvailableError,
KafkaJSAlterPartitionReassignmentsError,
KafkaJSOffsetDeleteError,
isRebalancing,
isKafkaJSError,
}
1 change: 1 addition & 0 deletions src/protocol/requests/apiKeys.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ module.exports = {
IncrementalAlterConfigs: 44,
AlterPartitionReassignments: 45,
ListPartitionReassignments: 46,
OffsetDelete: 47,
}
1 change: 1 addition & 0 deletions src/protocol/requests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const requests = {
IncrementalAlterConfigs: noImplementedRequestDefinitions,
AlterPartitionReassignments: require('./alterPartitionReassignments'),
ListPartitionReassignments: require('./listPartitionReassignments'),
OffsetDelete: require('./offsetDelete'),
}

const names = Object.keys(apiKeys)
Expand Down
12 changes: 12 additions & 0 deletions src/protocol/requests/offsetDelete/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const versions = {
0: ({ groupId, topics }) => {
const request = require('./v0/request')
const response = require('./v0/response')
return { request: request({ groupId, topics }), response }
},
}

module.exports = {
versions: Object.keys(versions),
protocol: ({ version }) => versions[version],
}
29 changes: 29 additions & 0 deletions src/protocol/requests/offsetDelete/v0/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
const Encoder = require('../../../encoder')
const { OffsetDelete: apiKey } = require('../../apiKeys')

/**
* OffsetDelete Request (Version: 0) => group_id [topics]
* group_id => STRING
* topics => name [partitions]
* name => STRING
* partitions => partition_index
* partition_index => INT32
*/
module.exports = ({ groupId, topics }) => ({
apiKey,
apiVersion: 0,
apiName: 'OffsetDelete',
encode: async () => {
return new Encoder()
.writeString(groupId)
.writeArray(topics === null ? topics : topics.map(encodeTopics))
},
})

const encodeTopics = ({ topic, partitions }) => {
return new Encoder().writeString(topic).writeArray(partitions.map(encodePartitions))
}

const encodePartitions = partition => {
return new Encoder().writeInt32(partition)
}
5 changes: 5 additions & 0 deletions src/protocol/requests/offsetDelete/v0/request.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
describe('Protocol > Requests > OffsetDelete > v0', () => {
test('request', () => {
throw new Error('TODO')
})
})
82 changes: 82 additions & 0 deletions src/protocol/requests/offsetDelete/v0/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
const { KafkaJSAggregateError, KafkaJSOffsetDeleteError } = require('../../../../errors')
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')

/**
* OffsetDelete Response (Version: 0) => error_code throttle_time_ms [topics]
* error_code => INT16
* throttle_time_ms => INT32
* topics => name [partitions]
* name => STRING
* partitions => partition_index error_code
* partition_index => INT32
* error_code => INT16
*/

/**
* @param {Decoder} decoder
*/
const decodePartition = decoder => {
const partition = {
partition: decoder.readInt32(),
errorCode: decoder.readInt16(),
}

return partition
}

/**
* @param {Decoder} decoder
*/
const decodeTopics = decoder => {
const topic = {
topic: decoder.readString(),
partitions: decoder.readArray(decodePartition),
}

return topic
}

const decode = async rawData => {
const decoder = new Decoder(rawData)

const errorCode = decoder.readInt16()
const throttleTime = decoder.readInt32()
return {
errorCode,
throttleTime,
topics: decoder.readArray(decodeTopics),
}
}

const parse = async data => {
if (failure(data.errorCode)) {
throw createErrorFromCode(data.errorCode)
}

const topicPartitionsWithError = data.topics.flatMap(({ topic, partitions }) =>
partitions
.filter(partition => failure(partition.errorCode))
.map(partition => ({
...partition,
topic,
}))
)

if (topicPartitionsWithError.length > 0) {
throw new KafkaJSAggregateError(
'Errors deleting offsets',
topicPartitionsWithError.map(
({ topic, partition, errorCode }) =>
new KafkaJSOffsetDeleteError(createErrorFromCode(errorCode), topic, partition)
)
)
}

return data
}

module.exports = {
decode,
parse,
}
5 changes: 5 additions & 0 deletions src/protocol/requests/offsetDelete/v0/response.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
describe('Protocol > Requests > OffsetDelete > v0', () => {
test('response', () => {
throw new Error('TODO')
})
})
8 changes: 8 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ export type Admin = {
topics?: TopicPartitions[]
timeout?: number
}): Promise<ListPartitionReassignmentsResponse>
offsetDelete(request: {
groupId: string,
topics: TopicPartitions[],
}): Promise<void>
logger(): Logger
on(
eventName: AdminEvents['CONNECT'],
Expand Down Expand Up @@ -702,6 +706,10 @@ export type Broker = {
topics?: TopicPartitions[]
timeout?: number
}): Promise<ListPartitionReassignmentsResponse>
offsetDelete(request: {
groupId: string,
topics: TopicPartitions[],
}): Promise<void>
}

interface MessageSetEntry {
Expand Down

0 comments on commit a591393

Please sign in to comment.