From 67f12b6a15a825122680e2d2433f62e3d262501f Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 26 Jun 2019 09:46:13 +0200 Subject: [PATCH 1/6] Update logger typescript types --- types/index.d.ts | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/types/index.d.ts b/types/index.d.ts index f9b922fd7..87840bece 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -341,13 +341,22 @@ export enum logLevel { DEBUG = 5, } -export type LogEntry = { namespace: string; level: logLevel; label: string; log: string } +export interface LogEntry { + namespace: string + level: logLevel + label: string + log: LoggerEntryContent +} + +export interface LoggerEntryContent { + readonly timestamp: Date + readonly message: string + [key: string]: any +} export type Logger = (entry: LogEntry) => void -export type logCreator = ( - logLevel: string -) => (namespace: string, level: string, label: string, log: string) => void +export type logCreator = (logLevel: string) => (entry: LogEntry) => void export type Broker = { isConnected(): boolean From 30b74dc6ce21082fc58bf8baa94e4af6168535c6 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 26 Jun 2019 09:46:28 +0200 Subject: [PATCH 2/6] Fix pause/resume typescript types --- types/index.d.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/types/index.d.ts b/types/index.d.ts index 87840bece..03247a6d3 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -600,8 +600,8 @@ export type Consumer = { }): Promise seek(topicPartition: { topic: string; partition: number; offset: string }): void describeGroup(): Promise - pause(topicPartitions: TopicPartitions[]): void - resume(topicPartitions: TopicPartitions[]): void + pause(topics: Array<{ topic: string }>): void + resume(topics: Array<{ topic: string }>): void on(eventName: ValueOf, listener: (...args: any[]) => void): void logger(): Logger events: ConsumerEvents From fb0727134fc7da64645cb70d4679940ecef90d1d Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 26 Jun 2019 09:46:57 +0200 Subject: [PATCH 3/6] Extract EachMessage and EachBatch typescript interfaces --- types/index.d.ts | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/types/index.d.ts b/types/index.d.ts index 03247a6d3..a1060eb5a 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -572,6 +572,22 @@ export interface OffsetsByTopicPartition { topics: TopicOffsets[] } +export interface EachMessagePayload { + topic: string + partition: number + message: KafkaMessage +} + +export interface EachBatchPayload { + batch: Batch + resolveOffset(offset: string): void + heartbeat(): Promise + commitOffsetsIfNecessary(offsets?: Offsets): Promise + uncommittedOffsets(): Promise + isRunning(): boolean + isStale(): boolean +} + export type Consumer = { connect(): Promise disconnect(): Promise @@ -583,20 +599,8 @@ export type Consumer = { autoCommitThreshold?: number | null eachBatchAutoResolve?: boolean partitionsConsumedConcurrently?: number - eachBatch?: ( - batch: { - batch: Batch - resolveOffset(offset: string): void - heartbeat(): Promise - commitOffsetsIfNecessary(offsets?: Offsets): Promise - uncommittedOffsets(): Promise - isRunning(): boolean - isStale(): boolean - } - ) => Promise - eachMessage?: ( - message: { topic: string; partition: number; message: KafkaMessage } - ) => Promise + eachBatch?: (payload: EachBatchPayload) => Promise + eachMessage?: (payload: EachMessagePayload) => Promise }): Promise seek(topicPartition: { topic: string; partition: number; offset: string }): void describeGroup(): Promise From 65b8ecccafa8517fed366ed59abbfd758183594c Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 26 Jun 2019 09:47:15 +0200 Subject: [PATCH 4/6] Fix general typescript types --- types/index.d.ts | 53 ++++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/types/index.d.ts b/types/index.d.ts index a1060eb5a..928150dac 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -29,7 +29,7 @@ export interface KafkaConfig { export type ISocketFactory = ( host: string, port: number, - ssl: tls.SecureContextOptions, + ssl: tls.ConnectionOptions, onConnect: () => void ) => net.Socket @@ -50,16 +50,27 @@ export interface ProducerConfig { maxInFlightRequests?: number } -export type ICustomPartitioner = () => ( - message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message } -) => number +export interface PartitionerArgs { + topic: string + partitionMetadata: PartitionMetadata[] + message: Message +} + +export type ICustomPartitioner = () => (args: PartitionerArgs) => number +export type DefaultPartitioner = (args: PartitionerArgs) => number +export type JavaCompatiblePartitioner = (args: PartitionerArgs) => number + +export const Partitioners: { + DefaultPartitioner: DefaultPartitioner + JavaCompatiblePartitioner: JavaCompatiblePartitioner +} export interface Message { - key?: string | Buffer - value: string | Buffer | null - partition?: string | number + key?: Buffer | null + value: Buffer | null + partition?: number headers?: IHeaders - timestamp?: number | string + timestamp?: string } export type PartitionMetadata = { @@ -95,6 +106,15 @@ export interface PartitionAssigner { new (config: { cluster: Cluster }): Assigner } +export interface CoordinatorMetadata { + errorCode: number + coordinator: { + nodeId: number + host: string + port: number + } +} + export type Cluster = { isConnected(): boolean connect(): Promise @@ -107,9 +127,7 @@ export type Cluster = { findTopicPartitionMetadata(topic: string): PartitionMetadata[] findLeaderForPartitions(topic: string, partitions: number[]): { [leader: string]: number[] } findGroupCoordinator(group: { groupId: string }): Promise - findGroupCoordinatorMetadata(group: { - groupId: string - }): Promise<{ errorCode: number; coordinator: { nodeId: number; host: string; port: number } }> + findGroupCoordinatorMetadata(group: { groupId: string }): Promise defaultOffset(config: { fromBeginning: boolean }): number fetchTopicsOffset( topics: Array<{ @@ -320,19 +338,6 @@ export const AssignerProtocol: { MemberAssignment: ISerializer } -export type DefaultPartitioner = ( - message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message } -) => number - -export type JavaCompatiblePartitioner = ( - message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message } -) => number - -export const Partitioners: { - DefaultPartitioner: DefaultPartitioner - JavaCompatiblePartitioner: JavaCompatiblePartitioner -} - export enum logLevel { NOTHING = 0, ERROR = 1, From 3ebebb975144c286306aface7a9026daaba91c81 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 26 Jun 2019 09:50:56 +0200 Subject: [PATCH 5/6] Bump version (v1.9.2) and update changelog --- CHANGELOG.md | 4 ++++ package.json | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37ae3f3c8..d668f2d71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [1.9.2] - 2019-06-26 +### Fixed + - Fix typescript types for Logger, consumer pause and resume, eachMessage and EachBatch interfaces #409 + ## [1.9.1] - 2019-06-25 ### Fixed - Fix typescript types for SSL, SASL and batch #407 diff --git a/package.json b/package.json index 40fd73d75..99f87b22a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "kafkajs", - "version": "1.9.1", + "version": "1.9.2", "description": "A modern Apache Kafka client for node.js", "author": "Tulio Ornelas ", "main": "index.js", From 56d3aa2c206c66a38463f4872516215792803d76 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 26 Jun 2019 10:03:47 +0200 Subject: [PATCH 6/6] Fix typescript test --- types/tests.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/types/tests.ts b/types/tests.ts index 8469bffc2..d0a61d04d 100644 --- a/types/tests.ts +++ b/types/tests.ts @@ -63,15 +63,15 @@ const runConsumer = async () => { await consumer.disconnect() } -runConsumer().catch(e => console.error(`[example/consumer] ${e.message}`, e)) +runConsumer().catch(console.error) // PRODUCER const producer = kafka.producer({ allowAutoTopicCreation: true }) const getRandomNumber = () => Math.round(Math.random() * 1000) const createMessage = (num: number) => ({ - key: `key-${num}`, - value: `value-${num}-${new Date().toISOString()}`, + key: Buffer.from(`key-${num}`), + value: Buffer.from(`value-${num}-${new Date().toISOString()}`), }) const sendMessage = () => { @@ -84,7 +84,7 @@ const sendMessage = () => { .map(_ => createMessage(getRandomNumber())), }) .then(console.log) - .catch(e => console.error(`[example/producer] ${e.message}`, e)) + .catch(console.error) } const runProducer = async () => { @@ -93,7 +93,7 @@ const runProducer = async () => { await producer.disconnect() } -runProducer().catch(e => console.error(`[example/producer] ${e.message}`, e)) +runProducer().catch(console.error) // ADMIN const admin = kafka.admin({ retry: { retries: 10 } }) @@ -109,17 +109,19 @@ const runAdmin = async () => { await admin.disconnect() } -runAdmin().catch(e => console.error(`[example/admin] ${e.message}`, e)) +runAdmin().catch(console.error) // OTHERS -;async () => { +const produceWithGZIP = async () => { await producer.send({ topic: 'topic-name', compression: CompressionTypes.GZIP, - messages: [{ key: 'key1', value: 'hello world!' }], + messages: [{ key: Buffer.from('key1'), value: Buffer.from('hello world!') }], }) } +produceWithGZIP().catch(console.error) + const SnappyCodec: any = undefined CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec