Skip to content

Commit

Permalink
Merge pull request #409 from tulios/fix-typings-logger
Browse files Browse the repository at this point in the history
Fix Typescript types (logger, pause/resume, eachBatch/Message interfaces)
  • Loading branch information
tulios authored Jun 26, 2019
2 parents 4f7c63c + 56d3aa2 commit fd13841
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 53 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.9.2] - 2019-06-26
### Fixed
- Fix typescript types for Logger, consumer pause and resume, eachMessage and EachBatch interfaces #409

## [1.9.1] - 2019-06-25
### Fixed
- Fix typescript types for SSL, SASL and batch #407
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafkajs",
"version": "1.9.1",
"version": "1.9.2",
"description": "A modern Apache Kafka client for node.js",
"author": "Tulio Ornelas <[email protected]>",
"main": "index.js",
Expand Down
106 changes: 62 additions & 44 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export interface KafkaConfig {
export type ISocketFactory = (
host: string,
port: number,
ssl: tls.SecureContextOptions,
ssl: tls.ConnectionOptions,
onConnect: () => void
) => net.Socket

Expand All @@ -50,16 +50,27 @@ export interface ProducerConfig {
maxInFlightRequests?: number
}

export type ICustomPartitioner = () => (
message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message }
) => number
export interface PartitionerArgs {
topic: string
partitionMetadata: PartitionMetadata[]
message: Message
}

export type ICustomPartitioner = () => (args: PartitionerArgs) => number
export type DefaultPartitioner = (args: PartitionerArgs) => number
export type JavaCompatiblePartitioner = (args: PartitionerArgs) => number

export const Partitioners: {
DefaultPartitioner: DefaultPartitioner
JavaCompatiblePartitioner: JavaCompatiblePartitioner
}

export interface Message {
key?: string | Buffer
value: string | Buffer | null
partition?: string | number
key?: Buffer | null
value: Buffer | null
partition?: number
headers?: IHeaders
timestamp?: number | string
timestamp?: string
}

export type PartitionMetadata = {
Expand Down Expand Up @@ -95,6 +106,15 @@ export interface PartitionAssigner {
new (config: { cluster: Cluster }): Assigner
}

export interface CoordinatorMetadata {
errorCode: number
coordinator: {
nodeId: number
host: string
port: number
}
}

export type Cluster = {
isConnected(): boolean
connect(): Promise<void>
Expand All @@ -107,9 +127,7 @@ export type Cluster = {
findTopicPartitionMetadata(topic: string): PartitionMetadata[]
findLeaderForPartitions(topic: string, partitions: number[]): { [leader: string]: number[] }
findGroupCoordinator(group: { groupId: string }): Promise<Broker>
findGroupCoordinatorMetadata(group: {
groupId: string
}): Promise<{ errorCode: number; coordinator: { nodeId: number; host: string; port: number } }>
findGroupCoordinatorMetadata(group: { groupId: string }): Promise<CoordinatorMetadata>
defaultOffset(config: { fromBeginning: boolean }): number
fetchTopicsOffset(
topics: Array<{
Expand Down Expand Up @@ -320,19 +338,6 @@ export const AssignerProtocol: {
MemberAssignment: ISerializer<MemberAssignment>
}

export type DefaultPartitioner = (
message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message }
) => number

export type JavaCompatiblePartitioner = (
message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message }
) => number

export const Partitioners: {
DefaultPartitioner: DefaultPartitioner
JavaCompatiblePartitioner: JavaCompatiblePartitioner
}

export enum logLevel {
NOTHING = 0,
ERROR = 1,
Expand All @@ -341,13 +346,22 @@ export enum logLevel {
DEBUG = 5,
}

export type LogEntry = { namespace: string; level: logLevel; label: string; log: string }
export interface LogEntry {
namespace: string
level: logLevel
label: string
log: LoggerEntryContent
}

export interface LoggerEntryContent {
readonly timestamp: Date
readonly message: string
[key: string]: any
}

export type Logger = (entry: LogEntry) => void

export type logCreator = (
logLevel: string
) => (namespace: string, level: string, label: string, log: string) => void
export type logCreator = (logLevel: string) => (entry: LogEntry) => void

export type Broker = {
isConnected(): boolean
Expand Down Expand Up @@ -563,6 +577,22 @@ export interface OffsetsByTopicPartition {
topics: TopicOffsets[]
}

export interface EachMessagePayload {
topic: string
partition: number
message: KafkaMessage
}

export interface EachBatchPayload {
batch: Batch
resolveOffset(offset: string): void
heartbeat(): Promise<void>
commitOffsetsIfNecessary(offsets?: Offsets): Promise<void>
uncommittedOffsets(): Promise<OffsetsByTopicPartition>
isRunning(): boolean
isStale(): boolean
}

export type Consumer = {
connect(): Promise<void>
disconnect(): Promise<void>
Expand All @@ -574,25 +604,13 @@ export type Consumer = {
autoCommitThreshold?: number | null
eachBatchAutoResolve?: boolean
partitionsConsumedConcurrently?: number
eachBatch?: (
batch: {
batch: Batch
resolveOffset(offset: string): void
heartbeat(): Promise<void>
commitOffsetsIfNecessary(offsets?: Offsets): Promise<void>
uncommittedOffsets(): Promise<OffsetsByTopicPartition>
isRunning(): boolean
isStale(): boolean
}
) => Promise<void>
eachMessage?: (
message: { topic: string; partition: number; message: KafkaMessage }
) => Promise<void>
eachBatch?: (payload: EachBatchPayload) => Promise<void>
eachMessage?: (payload: EachMessagePayload) => Promise<void>
}): Promise<void>
seek(topicPartition: { topic: string; partition: number; offset: string }): void
describeGroup(): Promise<GroupDescription>
pause(topicPartitions: TopicPartitions[]): void
resume(topicPartitions: TopicPartitions[]): void
pause(topics: Array<{ topic: string }>): void
resume(topics: Array<{ topic: string }>): void
on(eventName: ValueOf<ConsumerEvents>, listener: (...args: any[]) => void): void
logger(): Logger
events: ConsumerEvents
Expand Down
18 changes: 10 additions & 8 deletions types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ const runConsumer = async () => {
await consumer.disconnect()
}

runConsumer().catch(e => console.error(`[example/consumer] ${e.message}`, e))
runConsumer().catch(console.error)

// PRODUCER
const producer = kafka.producer({ allowAutoTopicCreation: true })

const getRandomNumber = () => Math.round(Math.random() * 1000)
const createMessage = (num: number) => ({
key: `key-${num}`,
value: `value-${num}-${new Date().toISOString()}`,
key: Buffer.from(`key-${num}`),
value: Buffer.from(`value-${num}-${new Date().toISOString()}`),
})

const sendMessage = () => {
Expand All @@ -84,7 +84,7 @@ const sendMessage = () => {
.map(_ => createMessage(getRandomNumber())),
})
.then(console.log)
.catch(e => console.error(`[example/producer] ${e.message}`, e))
.catch(console.error)
}

const runProducer = async () => {
Expand All @@ -93,7 +93,7 @@ const runProducer = async () => {
await producer.disconnect()
}

runProducer().catch(e => console.error(`[example/producer] ${e.message}`, e))
runProducer().catch(console.error)

// ADMIN
const admin = kafka.admin({ retry: { retries: 10 } })
Expand All @@ -109,17 +109,19 @@ const runAdmin = async () => {
await admin.disconnect()
}

runAdmin().catch(e => console.error(`[example/admin] ${e.message}`, e))
runAdmin().catch(console.error)

// OTHERS
;async () => {
const produceWithGZIP = async () => {
await producer.send({
topic: 'topic-name',
compression: CompressionTypes.GZIP,
messages: [{ key: 'key1', value: 'hello world!' }],
messages: [{ key: Buffer.from('key1'), value: Buffer.from('hello world!') }],
})
}

produceWithGZIP().catch(console.error)

const SnappyCodec: any = undefined
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

Expand Down

0 comments on commit fd13841

Please sign in to comment.