Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,52 @@ Options:
| -------- | ------------------------------- | -------------------------------------------------------------------------------- |
| topics | `DescribeLogDirsRequestTopic[]` | Array of topics specifying the topics and partitions for which to describe logs. |

### `listOffsets(options[, callback])`

Lists offsets for specified topic partitions.

The return value is an array of topics, each containing partition offset information including the offset, timestamp, partition index, and leader epoch.

Options:

| Property | Type | Description |
| -------------- | -------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| topics | `TopicOffsetRequest[]` | Array of topics with their partitions to query offsets for. Each topic has `name` and `partitions` (array of `{ partitionIndex, timestamp }`). |
| isolationLevel | `IsolationLevel` \| `null` | Isolation level for reading offsets. Valid values are `IsolationLevels.READ_UNCOMMITTED` (0) or `IsolationLevels.READ_COMMITTED` (1). Defaults to `READ_UNCOMMITTED`. |

Example:

```typescript
const offsets = await admin.listOffsets({
topics: [
{
name: 'my-topic',
partitions: [
{ partitionIndex: 0, timestamp: -1n }, // Latest offset
{ partitionIndex: 1, timestamp: -2n } // Earliest offset
]
}
],
isolationLevel: IsolationLevels.READ_COMMITTED
})

// offsets is an array of AdminListedOffsetsTopic:
// [
// {
// name: 'my-topic',
// partitions: [
// { partitionIndex: 0, offset: 12345n, timestamp: 1234567890n, leaderEpoch: 1 },
// { partitionIndex: 1, offset: 100n, timestamp: 1234567800n, leaderEpoch: 1 }
// ]
// }
// ]
```

Special timestamp values:
- `-1n` - Get the latest offset (end of the log)
- `-2n` - Get the earliest offset (beginning of the log)
- Any other positive value - Get the offset at or after the specified timestamp

### `close([callback])`

Closes the admin and all its connections.
Expand Down
1 change: 1 addition & 0 deletions docs/diagnostic.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Each tracing channel publishes events with the following common properties:
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
| `plt:kafka:admin:offsets` | `Admin` | Traces a `Admin.listOffsets` request. |
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
Expand Down
4 changes: 2 additions & 2 deletions playground/apis/consumer/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { api as fetchV17 } from '../../../src/apis/consumer/fetch-v17.ts'
import { api as offsetCommitV9 } from '../../../src/apis/consumer/offset-commit-v9.ts'
import { api as offsetFetchV9 } from '../../../src/apis/consumer/offset-fetch-v9.ts'
import { api as syncGroupV5 } from '../../../src/apis/consumer/sync-group-v5.ts'
import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../../src/apis/enumerations.ts'
import { IsolationLevels, FindCoordinatorKeyTypes } from '../../../src/apis/enumerations.ts'
import { api as findCoordinatorV6 } from '../../../src/apis/metadata/find-coordinator-v6.ts'
import { api as metadataV12 } from '../../../src/apis/metadata/metadata-v12.ts'
import type { KafkaRecord } from '../../../src/index.ts'
Expand Down Expand Up @@ -72,7 +72,7 @@ for (let i = 0; i < 3; i++) {
0,
0,
1024 ** 3,
FetchIsolationLevels.READ_UNCOMMITTED,
IsolationLevels.READ_UNCOMMITTED,
-1,
-1,
[
Expand Down
15 changes: 12 additions & 3 deletions src/apis/callbacks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,19 @@ export function createPromisifiedCallback<ReturnType> (): CallbackWithPromise<Re
return callback
}

export function runConcurrentCallbacks<ReturnType> (
export function runConcurrentCallbacks<ReturnType, Collection extends any[] | Set<any> | Map<any, any>> (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this.

errorMessage: string,
collection: unknown[] | Set<unknown> | Map<unknown, unknown>,
operation: (item: any, cb: Callback<ReturnType>) => void,
collection: Collection,
operation: (
item: Collection extends Map<infer K, infer V>
? [K, V]
: Collection extends Set<infer U>
? U
: Collection extends (infer U)[]
? U
: never,
cb: Callback<ReturnType>
) => void,
callback: Callback<ReturnType[]>
): void {
let remaining = Array.isArray(collection) ? collection.length : collection.size
Expand Down
5 changes: 2 additions & 3 deletions src/apis/enumerations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ export const allowedGroupProtocols = Object.values(GroupProtocols)
export type GroupProtocol = keyof typeof GroupProtocols

// ./consumer/fetch.ts
export const FetchIsolationLevels = { READ_UNCOMMITTED: 0, READ_COMMITTED: 1 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert all this renamings.

export const allowedFetchIsolationLevels = Object.values(FetchIsolationLevels) as number[]
export type FetchIsolationLevel = keyof typeof FetchIsolationLevels
export const IsolationLevels = { READ_UNCOMMITTED: 0, READ_COMMITTED: 1 } as const
export type IsolationLevel = (typeof IsolationLevels)[keyof typeof IsolationLevels]

export const ListOffsetTimestamps = { LATEST: -1n, EARLIEST: -2n }
export type ListOffsetTimestamp = keyof typeof ListOffsetTimestamps
Expand Down
146 changes: 142 additions & 4 deletions src/clients/admin/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import {
type CreateTopicsRequestTopicAssignment,
type CreateTopicsResponse
} from '../../apis/admin/create-topics-v7.ts'
import {
type ListOffsetsRequest,
type ListOffsetsRequestTopic,
type ListOffsetsResponse
} from '../../apis/consumer/list-offsets-v9.ts'
import { type DeleteGroupsRequest, type DeleteGroupsResponse } from '../../apis/admin/delete-groups-v2.ts'
import {
type DeleteTopicsRequest,
Expand All @@ -34,13 +39,14 @@ import {
type CallbackWithPromise
} from '../../apis/callbacks.ts'
import { type Callback } from '../../apis/definitions.ts'
import { FindCoordinatorKeyTypes, type ConsumerGroupState } from '../../apis/enumerations.ts'
import { IsolationLevels, FindCoordinatorKeyTypes, type ConsumerGroupState } from '../../apis/enumerations.ts'
import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts'
import { type MetadataRequest, type MetadataResponse } from '../../apis/metadata/metadata-v12.ts'
import {
adminClientQuotasChannel,
adminGroupsChannel,
adminLogDirsChannel,
adminOffsetsChannel,
adminTopicsChannel,
createDiagnosticContext
} from '../../diagnostic.ts'
Expand Down Expand Up @@ -70,9 +76,12 @@ import {
describeGroupsOptionsValidator,
describeLogDirsOptionsValidator,
listGroupsOptionsValidator,
adminListOffsetsOptionsValidator,
listTopicsOptionsValidator
} from './options.ts'
import {
type AdminListedOffsetsTopic,
type AdminListOffsetsOptions,
type AdminOptions,
type AlterClientQuotasOptions,
type BrokerLogDirDescription,
Expand All @@ -89,6 +98,7 @@ import {
type ListGroupsOptions,
type ListTopicsOptions
} from './types.ts'
import { type Broker } from '../../index.ts'

export class Admin extends Base<AdminOptions> {
constructor (options: AdminOptions) {
Expand Down Expand Up @@ -392,6 +402,38 @@ export class Admin extends Base<AdminOptions> {
return callback[kCallbackPromise]
}

listOffsets (options: AdminListOffsetsOptions, callback: CallbackWithPromise<AdminListedOffsetsTopic[]>): void
listOffsets (options: AdminListOffsetsOptions): Promise<AdminListedOffsetsTopic[]>
listOffsets (
options: AdminListOffsetsOptions,
callback?: CallbackWithPromise<AdminListedOffsetsTopic[]>
): void | Promise<AdminListedOffsetsTopic[]> {
if (!callback) {
callback = createPromisifiedCallback()
}

if (this[kCheckNotClosed](callback)) {
return callback[kCallbackPromise]
}

const validationError = this[kValidateOptions](options, adminListOffsetsOptionsValidator, '/options', false)
if (validationError) {
callback(validationError, undefined as unknown as AdminListedOffsetsTopic[])
return callback[kCallbackPromise]
}

adminOffsetsChannel.traceCallback(
this.#listOffsets,
1,
createDiagnosticContext({ client: this, operation: 'listOffsets', options }),
this,
options,
callback
)

return callback![kCallbackPromise]
}

#listTopics (options: ListTopicsOptions, callback: CallbackWithPromise<string[]>): void {
const includeInternals = options.includeInternals ?? false

Expand Down Expand Up @@ -574,7 +616,7 @@ export class Admin extends Base<AdminOptions> {
return
}

runConcurrentCallbacks<ListGroupsResponse>(
runConcurrentCallbacks<ListGroupsResponse, Map<number, Broker>>(
'Listing groups failed.',
metadata.brokers,
([, broker], concurrentCallback) => {
Expand Down Expand Up @@ -658,7 +700,7 @@ export class Admin extends Base<AdminOptions> {
coordinator.push(group)
}

runConcurrentCallbacks<DescribeGroupsResponse>(
runConcurrentCallbacks<DescribeGroupsResponse, Map<number, string[]>>(
'Describing groups failed.',
coordinators,
([node, groups], concurrentCallback) => {
Expand Down Expand Up @@ -940,7 +982,7 @@ export class Admin extends Base<AdminOptions> {
return
}

runConcurrentCallbacks<BrokerLogDirDescription>(
runConcurrentCallbacks<BrokerLogDirDescription, Map<number, Broker>>(
'Describing log dirs failed.',
metadata.brokers,
([id, broker], concurrentCallback) => {
Expand Down Expand Up @@ -987,4 +1029,100 @@ export class Admin extends Base<AdminOptions> {
)
})
}

#listOffsets (options: AdminListOffsetsOptions, callback: CallbackWithPromise<AdminListedOffsetsTopic[]>): void {
this[kMetadata]({ topics: options.topics.map(topic => topic.name) }, (error, metadata) => {
if (error) {
callback(error, undefined as unknown as AdminListedOffsetsTopic[])
return
}

const requests = new Map<number, ListOffsetsRequestTopic[]>()

for (const topic of options.topics) {
for (const partition of topic.partitions) {
const { leader, leaderEpoch } = metadata.topics.get(topic.name)!.partitions[partition.partitionIndex]
let leaderRequests = requests.get(leader)
if (!leaderRequests) {
leaderRequests = []
requests.set(leader, leaderRequests)
}

let topicRequest = leaderRequests.find(t => t.name === topic.name)
if (!topicRequest) {
topicRequest = { name: topic.name, partitions: [] }
leaderRequests.push(topicRequest)
}

topicRequest.partitions.push({
partitionIndex: partition.partitionIndex,
currentLeaderEpoch: leaderEpoch,
timestamp: partition.timestamp ?? -1n
})
}
}

runConcurrentCallbacks<ListOffsetsResponse, Map<number, ListOffsetsRequestTopic[]>>(
'Listing offsets failed.',
requests,
([leader, requests], concurrentCallback) => {
this[kGetConnection](metadata.brokers.get(leader)!, (error, connection) => {
if (error) {
concurrentCallback(error, undefined as unknown as ListOffsetsResponse)
return
}
this[kPerformWithRetry](
'listOffsets',
retryCallback => {
this[kGetApi]<ListOffsetsRequest, ListOffsetsResponse>('ListOffsets', (error, api) => {
if (error) {
retryCallback(error, undefined as unknown as ListOffsetsResponse)
return
}

api(
connection,
-1,
options.isolationLevel ?? IsolationLevels.READ_UNCOMMITTED,
Array.from(requests.values()),
retryCallback
)
})
},
concurrentCallback,
0
)
})
},
(error, responses) => {
if (error) {
callback(error, undefined as unknown as AdminListedOffsetsTopic[])
return
}

const ret: AdminListedOffsetsTopic[] = []

for (const response of responses) {
for (const topic of response.topics) {
let topicOffsets = ret.find(t => t.name === topic.name)
if (!topicOffsets) {
topicOffsets = { name: topic.name, partitions: [] }
ret.push(topicOffsets)
}
for (const partition of topic.partitions) {
topicOffsets.partitions.push({
offset: partition.offset,
timestamp: partition.timestamp,
partitionIndex: partition.partitionIndex,
leaderEpoch: partition.leaderEpoch
})
}
}
}

callback(null, ret)
}
)
})
}
}
37 changes: 36 additions & 1 deletion src/clients/admin/options.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ClientQuotaMatchTypes, ConsumerGroupStates } from '../../apis/enumerations.ts'
import { ClientQuotaMatchTypes, ConsumerGroupStates, IsolationLevels } from '../../apis/enumerations.ts'
import { ajv, listErrorMessage } from '../../utils.ts'
import { idProperty } from '../base/options.ts'

Expand Down Expand Up @@ -199,6 +199,40 @@ export const describeLogDirsOptionsSchema = {
additionalProperties: false
}

export const adminListOffsetsOptionsSchema = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to listOffsetsOptionsSchema

type: 'object',
properties: {
topics: {
type: 'array',
items: {
type: 'object',
properties: {
name: { type: 'string', minLength: 1 },
partitions: {
type: 'array',
items: {
type: 'object',
properties: {
partitionIndex: { type: 'number', minimum: 0 },
timestamp: { bigint: true }
},
required: ['partitionIndex', 'timestamp'],
additionalProperties: false
},
minItems: 1
}
},
required: ['name', 'partitions'],
additionalProperties: false
},
minItems: 1
},
isolationLevel: { type: ['number', 'null'], enum: [null, ...Object.values(IsolationLevels)] }
},
required: ['topics'],
additionalProperties: false
}

export const createTopicsOptionsValidator = ajv.compile(createTopicOptionsSchema)
export const listTopicsOptionsValidator = ajv.compile(listTopicOptionsSchema)
export const deleteTopicsOptionsValidator = ajv.compile(deleteTopicOptionsSchema)
Expand All @@ -208,3 +242,4 @@ export const deleteGroupsOptionsValidator = ajv.compile(deleteGroupsOptionsSchem
export const describeClientQuotasOptionsValidator = ajv.compile(describeClientQuotasOptionsSchema)
export const alterClientQuotasOptionsValidator = ajv.compile(alterClientQuotasOptionsSchema)
export const describeLogDirsOptionsValidator = ajv.compile(describeLogDirsOptionsSchema)
export const adminListOffsetsOptionsValidator = ajv.compile(adminListOffsetsOptionsSchema)
Loading