diff --git a/docs/admin.md b/docs/admin.md index aa6e325a..13a2fd5b 100644 --- a/docs/admin.md +++ b/docs/admin.md @@ -128,6 +128,52 @@ Options: | -------- | ------------------------------- | -------------------------------------------------------------------------------- | | topics | `DescribeLogDirsRequestTopic[]` | Array of topics specifying the topics and partitions for which to describe logs. | +### `listOffsets(options[, callback])` + +Lists offsets for specified topic partitions. + +The return value is an array of topics, each containing partition offset information including the offset, timestamp, partition index, and leader epoch. + +Options: + +| Property | Type | Description | +| -------------- | -------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| topics | `TopicOffsetRequest[]` | Array of topics with their partitions to query offsets for. Each topic has `name` and `partitions` (array of `{ partitionIndex, timestamp }`). | +| isolationLevel | `IsolationLevel` \| `null` | Isolation level for reading offsets. Valid values are `IsolationLevels.READ_UNCOMMITTED` (0) or `IsolationLevels.READ_COMMITTED` (1). Defaults to `READ_UNCOMMITTED`. | + +Example: + +```typescript +const offsets = await admin.listOffsets({ + topics: [ + { + name: 'my-topic', + partitions: [ + { partitionIndex: 0, timestamp: -1n }, // Latest offset + { partitionIndex: 1, timestamp: -2n } // Earliest offset + ] + } + ], + isolationLevel: IsolationLevels.READ_COMMITTED +}) + +// offsets is an array of AdminListedOffsetsTopic: +// [ +// { +// name: 'my-topic', +// partitions: [ +// { partitionIndex: 0, offset: 12345n, timestamp: 1234567890n, leaderEpoch: 1 }, +// { partitionIndex: 1, offset: 100n, timestamp: 1234567800n, leaderEpoch: 1 } +// ] +// } +// ] +``` + +Special timestamp values: +- `-1n` - Get the latest offset (end of the log) +- `-2n` - Get the earliest offset (beginning of the log) +- Any other positive value - Get the offset at or after the specified timestamp + ### `close([callback])` Closes the admin and all its connections. diff --git a/docs/diagnostic.md b/docs/diagnostic.md index 3f167826..fce55593 100644 --- a/docs/diagnostic.md +++ b/docs/diagnostic.md @@ -73,6 +73,7 @@ Each tracing channel publishes events with the following common properties: | `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. | | `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. | | `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. | +| `plt:kafka:admin:offsets` | `Admin` | Traces a `Admin.listOffsets` request. | | `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. | | `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. | | `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. | diff --git a/playground/apis/consumer/fetch.ts b/playground/apis/consumer/fetch.ts index ccf27a90..75879037 100644 --- a/playground/apis/consumer/fetch.ts +++ b/playground/apis/consumer/fetch.ts @@ -2,7 +2,7 @@ import { api as fetchV17 } from '../../../src/apis/consumer/fetch-v17.ts' import { api as offsetCommitV9 } from '../../../src/apis/consumer/offset-commit-v9.ts' import { api as offsetFetchV9 } from '../../../src/apis/consumer/offset-fetch-v9.ts' import { api as syncGroupV5 } from '../../../src/apis/consumer/sync-group-v5.ts' -import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../../src/apis/enumerations.ts' +import { IsolationLevels, FindCoordinatorKeyTypes } from '../../../src/apis/enumerations.ts' import { api as findCoordinatorV6 } from '../../../src/apis/metadata/find-coordinator-v6.ts' import { api as metadataV12 } from '../../../src/apis/metadata/metadata-v12.ts' import type { KafkaRecord } from '../../../src/index.ts' @@ -72,7 +72,7 @@ for (let i = 0; i < 3; i++) { 0, 0, 1024 ** 3, - FetchIsolationLevels.READ_UNCOMMITTED, + IsolationLevels.READ_UNCOMMITTED, -1, -1, [ diff --git a/src/apis/callbacks.ts b/src/apis/callbacks.ts index 513930e1..b36e4c0c 100644 --- a/src/apis/callbacks.ts +++ b/src/apis/callbacks.ts @@ -29,10 +29,19 @@ export function createPromisifiedCallback (): CallbackWithPromise ( +export function runConcurrentCallbacks | Map> ( errorMessage: string, - collection: unknown[] | Set | Map, - operation: (item: any, cb: Callback) => void, + collection: Collection, + operation: ( + item: Collection extends Map + ? [K, V] + : Collection extends Set + ? U + : Collection extends (infer U)[] + ? U + : never, + cb: Callback + ) => void, callback: Callback ): void { let remaining = Array.isArray(collection) ? collection.length : collection.size diff --git a/src/apis/enumerations.ts b/src/apis/enumerations.ts index fb31983f..8fde6a03 100644 --- a/src/apis/enumerations.ts +++ b/src/apis/enumerations.ts @@ -32,9 +32,8 @@ export const allowedGroupProtocols = Object.values(GroupProtocols) export type GroupProtocol = keyof typeof GroupProtocols // ./consumer/fetch.ts -export const FetchIsolationLevels = { READ_UNCOMMITTED: 0, READ_COMMITTED: 1 } -export const allowedFetchIsolationLevels = Object.values(FetchIsolationLevels) as number[] -export type FetchIsolationLevel = keyof typeof FetchIsolationLevels +export const IsolationLevels = { READ_UNCOMMITTED: 0, READ_COMMITTED: 1 } as const +export type IsolationLevel = (typeof IsolationLevels)[keyof typeof IsolationLevels] export const ListOffsetTimestamps = { LATEST: -1n, EARLIEST: -2n } export type ListOffsetTimestamp = keyof typeof ListOffsetTimestamps diff --git a/src/clients/admin/admin.ts b/src/clients/admin/admin.ts index 1bddabdf..22bbbd17 100644 --- a/src/clients/admin/admin.ts +++ b/src/clients/admin/admin.ts @@ -9,6 +9,11 @@ import { type CreateTopicsRequestTopicAssignment, type CreateTopicsResponse } from '../../apis/admin/create-topics-v7.ts' +import { + type ListOffsetsRequest, + type ListOffsetsRequestTopic, + type ListOffsetsResponse +} from '../../apis/consumer/list-offsets-v9.ts' import { type DeleteGroupsRequest, type DeleteGroupsResponse } from '../../apis/admin/delete-groups-v2.ts' import { type DeleteTopicsRequest, @@ -34,13 +39,14 @@ import { type CallbackWithPromise } from '../../apis/callbacks.ts' import { type Callback } from '../../apis/definitions.ts' -import { FindCoordinatorKeyTypes, type ConsumerGroupState } from '../../apis/enumerations.ts' +import { IsolationLevels, FindCoordinatorKeyTypes, type ConsumerGroupState } from '../../apis/enumerations.ts' import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts' import { type MetadataRequest, type MetadataResponse } from '../../apis/metadata/metadata-v12.ts' import { adminClientQuotasChannel, adminGroupsChannel, adminLogDirsChannel, + adminOffsetsChannel, adminTopicsChannel, createDiagnosticContext } from '../../diagnostic.ts' @@ -70,9 +76,12 @@ import { describeGroupsOptionsValidator, describeLogDirsOptionsValidator, listGroupsOptionsValidator, + adminListOffsetsOptionsValidator, listTopicsOptionsValidator } from './options.ts' import { + type AdminListedOffsetsTopic, + type AdminListOffsetsOptions, type AdminOptions, type AlterClientQuotasOptions, type BrokerLogDirDescription, @@ -89,6 +98,7 @@ import { type ListGroupsOptions, type ListTopicsOptions } from './types.ts' +import { type Broker } from '../../index.ts' export class Admin extends Base { constructor (options: AdminOptions) { @@ -392,6 +402,38 @@ export class Admin extends Base { return callback[kCallbackPromise] } + listOffsets (options: AdminListOffsetsOptions, callback: CallbackWithPromise): void + listOffsets (options: AdminListOffsetsOptions): Promise + listOffsets ( + options: AdminListOffsetsOptions, + callback?: CallbackWithPromise + ): void | Promise { + if (!callback) { + callback = createPromisifiedCallback() + } + + if (this[kCheckNotClosed](callback)) { + return callback[kCallbackPromise] + } + + const validationError = this[kValidateOptions](options, adminListOffsetsOptionsValidator, '/options', false) + if (validationError) { + callback(validationError, undefined as unknown as AdminListedOffsetsTopic[]) + return callback[kCallbackPromise] + } + + adminOffsetsChannel.traceCallback( + this.#listOffsets, + 1, + createDiagnosticContext({ client: this, operation: 'listOffsets', options }), + this, + options, + callback + ) + + return callback![kCallbackPromise] + } + #listTopics (options: ListTopicsOptions, callback: CallbackWithPromise): void { const includeInternals = options.includeInternals ?? false @@ -574,7 +616,7 @@ export class Admin extends Base { return } - runConcurrentCallbacks( + runConcurrentCallbacks>( 'Listing groups failed.', metadata.brokers, ([, broker], concurrentCallback) => { @@ -658,7 +700,7 @@ export class Admin extends Base { coordinator.push(group) } - runConcurrentCallbacks( + runConcurrentCallbacks>( 'Describing groups failed.', coordinators, ([node, groups], concurrentCallback) => { @@ -940,7 +982,7 @@ export class Admin extends Base { return } - runConcurrentCallbacks( + runConcurrentCallbacks>( 'Describing log dirs failed.', metadata.brokers, ([id, broker], concurrentCallback) => { @@ -987,4 +1029,100 @@ export class Admin extends Base { ) }) } + + #listOffsets (options: AdminListOffsetsOptions, callback: CallbackWithPromise): void { + this[kMetadata]({ topics: options.topics.map(topic => topic.name) }, (error, metadata) => { + if (error) { + callback(error, undefined as unknown as AdminListedOffsetsTopic[]) + return + } + + const requests = new Map() + + for (const topic of options.topics) { + for (const partition of topic.partitions) { + const { leader, leaderEpoch } = metadata.topics.get(topic.name)!.partitions[partition.partitionIndex] + let leaderRequests = requests.get(leader) + if (!leaderRequests) { + leaderRequests = [] + requests.set(leader, leaderRequests) + } + + let topicRequest = leaderRequests.find(t => t.name === topic.name) + if (!topicRequest) { + topicRequest = { name: topic.name, partitions: [] } + leaderRequests.push(topicRequest) + } + + topicRequest.partitions.push({ + partitionIndex: partition.partitionIndex, + currentLeaderEpoch: leaderEpoch, + timestamp: partition.timestamp ?? -1n + }) + } + } + + runConcurrentCallbacks>( + 'Listing offsets failed.', + requests, + ([leader, requests], concurrentCallback) => { + this[kGetConnection](metadata.brokers.get(leader)!, (error, connection) => { + if (error) { + concurrentCallback(error, undefined as unknown as ListOffsetsResponse) + return + } + this[kPerformWithRetry]( + 'listOffsets', + retryCallback => { + this[kGetApi]('ListOffsets', (error, api) => { + if (error) { + retryCallback(error, undefined as unknown as ListOffsetsResponse) + return + } + + api( + connection, + -1, + options.isolationLevel ?? IsolationLevels.READ_UNCOMMITTED, + Array.from(requests.values()), + retryCallback + ) + }) + }, + concurrentCallback, + 0 + ) + }) + }, + (error, responses) => { + if (error) { + callback(error, undefined as unknown as AdminListedOffsetsTopic[]) + return + } + + const ret: AdminListedOffsetsTopic[] = [] + + for (const response of responses) { + for (const topic of response.topics) { + let topicOffsets = ret.find(t => t.name === topic.name) + if (!topicOffsets) { + topicOffsets = { name: topic.name, partitions: [] } + ret.push(topicOffsets) + } + for (const partition of topic.partitions) { + topicOffsets.partitions.push({ + offset: partition.offset, + timestamp: partition.timestamp, + partitionIndex: partition.partitionIndex, + leaderEpoch: partition.leaderEpoch + }) + } + } + } + + callback(null, ret) + } + ) + }) + } } diff --git a/src/clients/admin/options.ts b/src/clients/admin/options.ts index 38758a81..c6f93017 100644 --- a/src/clients/admin/options.ts +++ b/src/clients/admin/options.ts @@ -1,4 +1,4 @@ -import { ClientQuotaMatchTypes, ConsumerGroupStates } from '../../apis/enumerations.ts' +import { ClientQuotaMatchTypes, ConsumerGroupStates, IsolationLevels } from '../../apis/enumerations.ts' import { ajv, listErrorMessage } from '../../utils.ts' import { idProperty } from '../base/options.ts' @@ -199,6 +199,40 @@ export const describeLogDirsOptionsSchema = { additionalProperties: false } +export const adminListOffsetsOptionsSchema = { + type: 'object', + properties: { + topics: { + type: 'array', + items: { + type: 'object', + properties: { + name: { type: 'string', minLength: 1 }, + partitions: { + type: 'array', + items: { + type: 'object', + properties: { + partitionIndex: { type: 'number', minimum: 0 }, + timestamp: { bigint: true } + }, + required: ['partitionIndex', 'timestamp'], + additionalProperties: false + }, + minItems: 1 + } + }, + required: ['name', 'partitions'], + additionalProperties: false + }, + minItems: 1 + }, + isolationLevel: { type: ['number', 'null'], enum: [null, ...Object.values(IsolationLevels)] } + }, + required: ['topics'], + additionalProperties: false +} + export const createTopicsOptionsValidator = ajv.compile(createTopicOptionsSchema) export const listTopicsOptionsValidator = ajv.compile(listTopicOptionsSchema) export const deleteTopicsOptionsValidator = ajv.compile(deleteTopicOptionsSchema) @@ -208,3 +242,4 @@ export const deleteGroupsOptionsValidator = ajv.compile(deleteGroupsOptionsSchem export const describeClientQuotasOptionsValidator = ajv.compile(describeClientQuotasOptionsSchema) export const alterClientQuotasOptionsValidator = ajv.compile(alterClientQuotasOptionsSchema) export const describeLogDirsOptionsValidator = ajv.compile(describeLogDirsOptionsSchema) +export const adminListOffsetsOptionsValidator = ajv.compile(adminListOffsetsOptionsSchema) diff --git a/src/clients/admin/types.ts b/src/clients/admin/types.ts index ac86e197..a6391ece 100644 --- a/src/clients/admin/types.ts +++ b/src/clients/admin/types.ts @@ -6,8 +6,8 @@ import { type DescribeLogDirsResponse, type DescribeLogDirsResponseResult } from '../../apis/admin/describe-log-dirs-v4.ts' -import { type ConsumerGroupState } from '../../apis/enumerations.ts' -import { type NullableString } from '../../protocol/definitions.ts' +import { type IsolationLevel, type ConsumerGroupState } from '../../apis/enumerations.ts' +import { type Nullable, type NullableString } from '../../protocol/definitions.ts' import { type BaseOptions } from '../base/types.ts' import { type ExtendedGroupProtocolSubscription, type GroupAssignment } from '../consumer/types.ts' @@ -98,3 +98,30 @@ export interface BrokerLogDirDescription { throttleTimeMs: DescribeLogDirsResponse['throttleTimeMs'] results: Omit[] } + +export interface PartitionTimestamp { + partitionIndex: number + timestamp: bigint +} + +export interface TopicOffsetRequest { + name: string + partitions: PartitionTimestamp[] +} + +export interface AdminListOffsetsOptions { + topics: TopicOffsetRequest[] + isolationLevel?: Nullable +} + +export interface AdminListedOffsetsPartition { + partitionIndex: number + timestamp: bigint + offset: bigint + leaderEpoch: number +} + +export interface AdminListedOffsetsTopic { + name: string + partitions: AdminListedOffsetsPartition[] +} diff --git a/src/clients/base/base.ts b/src/clients/base/base.ts index ba2e4acb..4a41a996 100644 --- a/src/clients/base/base.ts +++ b/src/clients/base/base.ts @@ -223,7 +223,7 @@ export class Base extends EventEm nodes = Array.from(metadata.brokers.keys()) } - runConcurrentCallbacks<[number, Connection]>( + runConcurrentCallbacks<[number, Connection], number[]>( 'Connecting to brokers failed.', nodes, (nodeId: number, concurrentCallback) => { diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index af8b90db..58516709 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -38,7 +38,7 @@ import { type SyncGroupRequestAssignment, type SyncGroupResponse } from '../../apis/consumer/sync-group-v5.ts' -import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../apis/enumerations.ts' +import { FindCoordinatorKeyTypes } from '../../apis/enumerations.ts' import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts' import { consumerCommitsChannel, @@ -89,7 +89,7 @@ import { groupIdAndOptionsValidator, groupOptionsValidator, listCommitsOptionsValidator, - listOffsetsOptionsValidator + consumerListOffsetsOptionsValidator } from './options.ts' import { roundRobinAssigner } from './partitions-assigners.ts' import { TopicsMap } from './topics-map.ts' @@ -106,7 +106,7 @@ import { type GroupPartitionsAssigner, type GroupProtocolSubscription, type ListCommitsOptions, - type ListOffsetsOptions, + type ConsumerListOffsetsOptions, type Offsets, type OffsetsWithTimestamps } from './types.ts' @@ -400,9 +400,9 @@ export class Consumer): void - listOffsets (options: ListOffsetsOptions): Promise - listOffsets (options: ListOffsetsOptions, callback?: CallbackWithPromise): void | Promise { + listOffsets (options: ConsumerListOffsetsOptions, callback: CallbackWithPromise): void + listOffsets (options: ConsumerListOffsetsOptions): Promise + listOffsets (options: ConsumerListOffsetsOptions, callback?: CallbackWithPromise): void | Promise { if (!callback) { callback = createPromisifiedCallback() } @@ -411,7 +411,7 @@ export class Consumer): void - listOffsetsWithTimestamps (options: ListOffsetsOptions): Promise listOffsetsWithTimestamps ( - options: ListOffsetsOptions, + options: ConsumerListOffsetsOptions, + callback: CallbackWithPromise + ): void + listOffsetsWithTimestamps (options: ConsumerListOffsetsOptions): Promise + listOffsetsWithTimestamps ( + options: ConsumerListOffsetsOptions, callback?: CallbackWithPromise ): void | Promise { if (!callback) { @@ -444,7 +447,7 @@ export class Consumer ): void { let topics = options.topics @@ -849,7 +852,7 @@ export class Consumer( + runConcurrentCallbacks>>( 'Listing offsets failed.', requests, ([leader, requests], concurrentCallback) => { @@ -871,7 +874,7 @@ export class Consumer( + runConcurrentCallbacks>>( 'Closing streams failed.', this.#streams, (stream, concurrentCallback) => { diff --git a/src/clients/consumer/options.ts b/src/clients/consumer/options.ts index d59fe0ce..183b8ad1 100644 --- a/src/clients/consumer/options.ts +++ b/src/clients/consumer/options.ts @@ -1,4 +1,4 @@ -import { allowedFetchIsolationLevels, allowedGroupProtocols } from '../../apis/enumerations.ts' +import { allowedGroupProtocols, IsolationLevels } from '../../apis/enumerations.ts' import { ajv } from '../../utils.ts' import { idProperty, topicWithPartitionAndOffsetProperties } from '../base/options.ts' import { serdeProperties } from '../serde.ts' @@ -61,7 +61,7 @@ export const consumeOptionsProperties = { minBytes: { type: 'number', minimum: 0 }, maxBytes: { type: 'number', minimum: 0 }, maxWaitTime: { type: 'number', minimum: 0 }, - isolationLevel: { type: 'string', enum: allowedFetchIsolationLevels }, + isolationLevel: { type: 'string', enum: Object.values(IsolationLevels) }, deserializers: serdeProperties, highWaterMark: { type: 'number', minimum: 1 } } @@ -188,7 +188,7 @@ export const listCommitsOptionsSchema = { additionalProperties: false } -export const listOffsetsOptionsSchema = { +export const consumerListOffsetsOptionsSchema = { type: 'object', properties: { topics: { type: 'array', items: idProperty }, @@ -199,7 +199,7 @@ export const listOffsetsOptionsSchema = { items: { type: 'number', minimum: 0 } } }, - isolationLevel: { type: 'string', enum: allowedFetchIsolationLevels }, + isolationLevel: { type: 'string', enum: Object.values(IsolationLevels) }, timestamp: { bigint: true } }, required: ['topics'], @@ -243,7 +243,7 @@ export const consumerOptionsValidator = ajv.compile(consumerOptionsSchema) export const fetchOptionsValidator = ajv.compile(fetchOptionsSchema) export const commitOptionsValidator = ajv.compile(commitOptionsSchema) export const listCommitsOptionsValidator = ajv.compile(listCommitsOptionsSchema) -export const listOffsetsOptionsValidator = ajv.compile(listOffsetsOptionsSchema) +export const consumerListOffsetsOptionsValidator = ajv.compile(consumerListOffsetsOptionsSchema) export const getLagOptionsValidator = ajv.compile(getLagOptionsSchema) export const defaultConsumerOptions = { @@ -255,6 +255,6 @@ export const defaultConsumerOptions = { minBytes: 1, maxBytes: 1_048_576 * 10, // 10 MB maxWaitTime: 5_000, - isolationLevel: 'READ_COMMITTED', + isolationLevel: IsolationLevels.READ_COMMITTED, highWaterMark: 1024 } satisfies Partial> diff --git a/src/clients/consumer/types.ts b/src/clients/consumer/types.ts index b4211fd7..46cdaf23 100644 --- a/src/clients/consumer/types.ts +++ b/src/clients/consumer/types.ts @@ -1,5 +1,5 @@ import { type FetchRequestTopic } from '../../apis/consumer/fetch-v17.ts' -import { type FetchIsolationLevel, type GroupProtocols } from '../../apis/enumerations.ts' +import { type IsolationLevel, type GroupProtocols } from '../../apis/enumerations.ts' import { type KafkaRecord, type Message } from '../../protocol/records.ts' import { type BaseOptions, type ClusterMetadata, type TopicWithPartitionAndOffset } from '../base/types.ts' import { type Deserializers } from '../serde.ts' @@ -89,7 +89,7 @@ export interface ConsumeBaseOptions { minBytes?: number maxBytes?: number maxWaitTime?: number - isolationLevel?: FetchIsolationLevel + isolationLevel?: IsolationLevel deserializers?: Partial> highWaterMark?: number } @@ -132,11 +132,11 @@ export interface ListCommitsOptions { topics: GroupAssignment[] } -export interface ListOffsetsOptions { +export interface ConsumerListOffsetsOptions { topics: string[] partitions?: Record timestamp?: bigint - isolationLevel?: FetchIsolationLevel + isolationLevel?: IsolationLevel } -export type GetLagOptions = Omit +export type GetLagOptions = Omit diff --git a/src/clients/producer/producer.ts b/src/clients/producer/producer.ts index af34d0ca..e0ea930c 100644 --- a/src/clients/producer/producer.ts +++ b/src/clients/producer/producer.ts @@ -405,7 +405,7 @@ export class Producer( + runConcurrentCallbacks>( 'Producing messages failed.', messagesByDestination, ([destination, destinationMessages], concurrentCallback) => { diff --git a/src/diagnostic.ts b/src/diagnostic.ts index 11f2b0dc..a140f024 100644 --- a/src/diagnostic.ts +++ b/src/diagnostic.ts @@ -77,6 +77,7 @@ export const adminTopicsChannel = createTracingChannel('a export const adminGroupsChannel = createTracingChannel('admin:groups') export const adminClientQuotasChannel = createTracingChannel('admin:clientQuotas') export const adminLogDirsChannel = createTracingChannel('admin:logDirs') +export const adminOffsetsChannel = createTracingChannel('admin:offsets') // Producer channels export const producerInitIdempotentChannel = createTracingChannel('producer:initIdempotent') diff --git a/src/network/connection-pool.ts b/src/network/connection-pool.ts index 6c3e8003..9a96f857 100644 --- a/src/network/connection-pool.ts +++ b/src/network/connection-pool.ts @@ -92,7 +92,7 @@ export class ConnectionPool extends EventEmitter { this.#closed = true - runConcurrentCallbacks( + runConcurrentCallbacks>( 'Closing connections failed.', this.#connections, ([key, connection]: [string, Connection], cb: Callback) => { diff --git a/src/protocol/definitions.ts b/src/protocol/definitions.ts index 13d5995c..37dc33a1 100644 --- a/src/protocol/definitions.ts +++ b/src/protocol/definitions.ts @@ -17,4 +17,5 @@ export const EMPTY_TAGGED_FIELDS_BUFFER = Buffer.from([0]) export type Collection = string | Buffer | DynamicBuffer | Array | Map | Set -export type NullableString = string | undefined | null +export type Nullable = T | null | undefined +export type NullableString = Nullable diff --git a/test/clients/admin/admin.test.ts b/test/clients/admin/admin.test.ts index 65287791..e80f9d95 100644 --- a/test/clients/admin/admin.test.ts +++ b/test/clients/admin/admin.test.ts @@ -2,13 +2,14 @@ import { deepStrictEqual, ok, strictEqual } from 'node:assert' import { randomUUID } from 'node:crypto' import { test } from 'node:test' import { scheduler } from 'node:timers/promises' -import { ClientQuotaEntityTypes, ClientQuotaKeys } from '../../../src/apis/enumerations.ts' +import { ClientQuotaEntityTypes, ClientQuotaKeys, ListOffsetTimestamps } from '../../../src/apis/enumerations.ts' import { kConnections } from '../../../src/clients/base/base.ts' import { Admin, adminClientQuotasChannel, adminGroupsChannel, adminLogDirsChannel, + adminOffsetsChannel, adminTopicsChannel, alterClientQuotasV1, type BrokerLogDirDescription, @@ -25,13 +26,16 @@ import { type GroupBase, instancesChannel, listGroupsV5, + listOffsetsV9, MultipleErrors, sleep, + stringSerializers, UnsupportedApiError } from '../../../src/index.ts' import { createAdmin, createCreationChannelVerifier, + createProducer, createTopic, createTracingChannelVerifier, kafkaBootstrapServers, @@ -44,6 +48,7 @@ import { mockUnavailableAPI, retry } from '../../helpers.ts' +import { type ListOffsetsResponse } from '../../../src/apis/consumer/list-offsets-v9.ts' test('constructor should initialize properly', t => { const created = createCreationChannelVerifier(instancesChannel) @@ -175,6 +180,16 @@ test('all operations should fail when admin is closed', async t => { } catch (error) { strictEqual(error.message, 'Client is closed.') } + + // Attempt to call listOffsets on closed admin + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message, 'Client is closed.') + } }) test('listTopics should list topics and support diagnostic channels', async t => { @@ -2409,3 +2424,475 @@ test('describeLogDirs should handle unavailable API errors', async t => { strictEqual(error.errors[0].message.includes('Unsupported API DescribeLogDirs.'), true) } }) + +test('listOffsets should list offsets for topics and partitions', async t => { + const admin = createAdmin(t) + + const topicName = `test-topic-${randomUUID()}` + await admin.createTopics({ + topics: [topicName], + partitions: 2, + replicas: 1 + }) + + const producer = createProducer(t, { acks: 1, serializers: stringSerializers }) + + const messages = Array.from({ length: 10 }, (_, i) => ({ + key: `key-${i}`, + value: `value-${i}`, + topic: topicName + })) + + for (const message of messages) { + await producer.send({ messages: [message] }) + + await scheduler.wait(100) + } + + await scheduler.wait(500) + + const listOffsetsResult = await admin.listOffsets({ + topics: [ + { + name: topicName, + partitions: [ + { partitionIndex: 0, timestamp: BigInt(0) }, + { partitionIndex: 1, timestamp: BigInt(0) } + ] + } + ] + }) + + strictEqual(Array.isArray(listOffsetsResult), true) + strictEqual(listOffsetsResult.length, 1) + strictEqual(listOffsetsResult[0].name, topicName) + strictEqual(listOffsetsResult[0].partitions.length, 2) + strictEqual(listOffsetsResult[0].partitions[0].partitionIndex, 0) + strictEqual(listOffsetsResult[0].partitions[1].partitionIndex, 1) + strictEqual(listOffsetsResult[0].partitions[0].offset, 0n) + strictEqual(listOffsetsResult[0].partitions[1].offset, 0n) + + const firstTimestamps = listOffsetsResult[0].partitions.map(p => p.timestamp) + + const listOffsetsResult2 = await admin.listOffsets({ + topics: [ + { + name: topicName, + partitions: [ + { partitionIndex: 0, timestamp: firstTimestamps[0] + 1n }, + { partitionIndex: 1, timestamp: firstTimestamps[1] + 1n } + ] + } + ] + }) + + strictEqual(listOffsetsResult2[0].partitions[0].offset, 1n) + strictEqual(listOffsetsResult2[0].partitions[1].offset, 1n) + + const listOffsetsResultEarliest = await admin.listOffsets({ + topics: [ + { + name: topicName, + partitions: [ + { partitionIndex: 0, timestamp: ListOffsetTimestamps.EARLIEST }, + { partitionIndex: 1, timestamp: ListOffsetTimestamps.EARLIEST } + ] + } + ] + }) + + strictEqual(listOffsetsResultEarliest[0].partitions[0].offset, 0n) + strictEqual(listOffsetsResultEarliest[0].partitions[1].offset, 0n) + + const listOffsetsResultLatest = await admin.listOffsets({ + topics: [ + { + name: topicName, + partitions: [ + { partitionIndex: 0, timestamp: ListOffsetTimestamps.LATEST }, + { partitionIndex: 1, timestamp: ListOffsetTimestamps.LATEST } + ] + } + ] + }) + + strictEqual(listOffsetsResultLatest[0].partitions[0].offset + listOffsetsResultLatest[0].partitions[1].offset, 10n) + + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('listOffsets should support diagnostic channels', async t => { + const admin = createAdmin(t) + + const topicName = `test-topic-${randomUUID()}` + await admin.createTopics({ + topics: [topicName], + partitions: 1, + replicas: 1 + }) + + const options = { + topics: [ + { + name: topicName, + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ] + } + + const verifyTracingChannel = createTracingChannelVerifier( + adminOffsetsChannel, + 'client', + { + start (context: ClientDiagnosticEvent) { + deepStrictEqual(context, { + client: admin, + operation: 'listOffsets', + options, + operationId: mockedOperationId + }) + }, + asyncStart (context: ClientDiagnosticEvent) { + const result = context.result as ListOffsetsResponse[] + ok(result) + strictEqual(Array.isArray(result), true) + }, + error (context: ClientDiagnosticEvent) { + ok(typeof context === 'undefined') + } + }, + (_label: string, data: ClientDiagnosticEvent) => data.operation === 'listOffsets' + ) + + await admin.listOffsets(options) + + verifyTracingChannel() +}) + +test('listOffsets should validate options in strict mode', async t => { + const admin = createAdmin(t, { strict: true }) + + // Test with missing required field (topics) + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.listOffsets({}) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('topics'), true) + } + + // Test with invalid additional property in options + try { + await admin.listOffsets({ + topics: [ + { + name: 'test-topic', + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ], + invalidProperty: true + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with invalid type for topics + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.listOffsets({ topics: 'not-an-array' }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('topics'), true) + } + + // Test with invalid type for isolationLevel + try { + await admin.listOffsets({ + topics: [ + { + name: 'test-topic', + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ], + isolationLevel: 'not-a-number' + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('isolationLevel'), true) + } + + // Test with invalid value for isolationLevel + try { + await admin.listOffsets({ + topics: [ + { + name: 'test-topic', + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ], + // @ts-expect-error - Intentionally passing invalid options + isolationLevel: 5 + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('isolationLevel'), true) + } + + // Test with empty topics array + try { + await admin.listOffsets({ topics: [] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('topics'), true) + } + + // Test with invalid topic object (missing name) + try { + await admin.listOffsets({ + topics: [{ partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with invalid topic object (missing partitions) + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic' }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('partitions'), true) + } + + // Test with invalid name type + try { + await admin.listOffsets({ + topics: [{ name: 123, partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with empty name + try { + await admin.listOffsets({ + topics: [{ name: '', partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with invalid partitions type + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: 'not-an-array' }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('partitions'), true) + } + + // Test with invalid additional property in topic + try { + await admin.listOffsets({ + topics: [ + { name: 'test-topic', partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }], invalidProperty: true } as any + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with empty partitions array + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('partitions'), true) + } + + // Test with invalid partition object (missing partitionIndex) + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: [{ timestamp: BigInt(0) }] as any }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('partitionIndex'), true) + } + + // Test with invalid partition object (missing timestamp) + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: [{ partitionIndex: 0 }] as any }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('timestamp'), true) + } + + // Test with invalid partitionIndex type + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: [{ partitionIndex: 'not-a-number', timestamp: BigInt(0) }] as any }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('partitionIndex'), true) + } + + // Test with invalid partitionIndex value + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: [{ partitionIndex: -1, timestamp: BigInt(0) }] as any }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('partitionIndex'), true) + } + + // Test with invalid timestamp type + try { + await admin.listOffsets({ + topics: [{ name: 'test-topic', partitions: [{ partitionIndex: 0, timestamp: 'not-a-bigint' }] as any }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('timestamp'), true) + } + + // Test with invalid additional property in partition + try { + await admin.listOffsets({ + topics: [ + { name: 'test-topic', partitions: [{ partitionIndex: 0, timestamp: BigInt(0), invalidProperty: true }] as any } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } +}) + +test('listOffsets should handle errors from Base.metadata', async t => { + const admin = createAdmin(t) + + mockMetadata(admin, 1) + + try { + // Attempt to delete groups - should fail with connection error + await admin.listOffsets({ + topics: [ + { + name: 'test-topic', + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + // Error should contain our mock error message + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes(mockedErrorMessage), true) + } +}) + +test('listOffsets should handle errors from Connection.get', async t => { + const admin = createAdmin(t) + + const topicName = `test-topic-${randomUUID()}` + await admin.createTopics({ + topics: [topicName], + partitions: 1, + replicas: 1 + }) + + await scheduler.wait(1000) + + mockConnectionPoolGet(admin[kConnections], 2) + + try { + // Attempt to delete groups - should fail with connection error + await admin.listOffsets({ + topics: [ + { + name: topicName, + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + // Error should contain our mock error message + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Listing offsets failed.'), true) + } +}) + +test('listOffsets should handle errors from the API', async t => { + const admin = createAdmin(t) + + const topicName = `test-topic-${randomUUID()}` + await admin.createTopics({ + topics: [topicName], + partitions: 1, + replicas: 1 + }) + + await scheduler.wait(1000) + + mockAPI(admin[kConnections], listOffsetsV9.api.key) + + try { + await admin.listOffsets({ + topics: [ + { + name: topicName, + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Listing offsets failed.'), true) + } +}) + +test('listOffsets should handle unavailable API errors', async t => { + const admin = createAdmin(t) + + const topicName = `test-topic-${randomUUID()}` + await admin.createTopics({ + topics: [topicName], + partitions: 1, + replicas: 1 + }) + + await scheduler.wait(1000) + + mockUnavailableAPI(admin, 'ListOffsets') + + try { + await admin.listOffsets({ + topics: [ + { + name: topicName, + partitions: [{ partitionIndex: 0, timestamp: BigInt(0) }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.errors[0].message.includes('Unsupported API ListOffsets.'), true) + } +}) diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index d71bca37..0992a21d 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -27,6 +27,7 @@ import { type GroupPartitionsAssignments, heartbeatV4, instancesChannel, + IsolationLevels, joinGroupV9, leaveGroupV5, MessagesStream, @@ -164,7 +165,7 @@ test('constructor should initialize with custom options', t => { minBytes: 100, maxBytes: 5242880, // 5MB maxWaitTime: 3000, - isolationLevel: 'READ_UNCOMMITTED', + isolationLevel: IsolationLevels.READ_UNCOMMITTED, highWaterMark: 512 }) @@ -1777,7 +1778,7 @@ test('listOffsets should use custom isolation level when provided', async t => { const topic = await createTopic(t, true) // Use a specific isolation level - const isolationLevel = 'READ_COMMITTED' + const isolationLevel = IsolationLevels.READ_COMMITTED const offsets = await consumer.listOffsets({ topics: [topic], isolationLevel }) // Verification is implicit - if the call doesn't throw, it succeeded