diff --git a/src/index.js b/src/index.js index 78e2b2265..613b50f98 100644 --- a/src/index.js +++ b/src/index.js @@ -13,6 +13,7 @@ const ISOLATION_LEVEL = require('./protocol/isolationLevel') const PRIVATE = { CREATE_CLUSTER: Symbol('private:Kafka:createCluster'), + CLUSTER_RETRY: Symbol('private:Kafka:clusterRetry'), LOGGER: Symbol('private:Kafka:logger'), OFFSETS: Symbol('private:Kafka:offsets'), } @@ -33,6 +34,7 @@ module.exports = class Client { }) { this[PRIVATE.OFFSETS] = new Map() this[PRIVATE.LOGGER] = createLogger({ level: logLevel, logCreator }) + this[PRIVATE.CLUSTER_RETRY] = retry this[PRIVATE.CREATE_CLUSTER] = ({ metadataMaxAge = 300000, allowAutoTopicCreation = true, @@ -42,6 +44,7 @@ module.exports = class Client { }) => new Cluster({ logger: this[PRIVATE.LOGGER], + retry: this[PRIVATE.CLUSTER_RETRY], brokers, ssl, sasl, @@ -51,7 +54,6 @@ module.exports = class Client { requestTimeout, metadataMaxAge, instrumentationEmitter, - retry, allowAutoTopicCreation, allowExperimentalV011, maxInFlightRequests, @@ -82,7 +84,7 @@ module.exports = class Client { }) return createProducer({ - retry: { ...cluster.retry, ...retry }, + retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry }, logger: this[PRIVATE.LOGGER], cluster, createPartitioner, @@ -125,7 +127,7 @@ module.exports = class Client { }) return createConsumer({ - retry: { ...cluster.retry, retry }, + retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry }, logger: this[PRIVATE.LOGGER], cluster, groupId, @@ -152,7 +154,7 @@ module.exports = class Client { }) return createAdmin({ - retry: { ...cluster.retry, retry }, + retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry }, logger: this[PRIVATE.LOGGER], instrumentationEmitter, cluster, diff --git a/src/index.spec.js b/src/index.spec.js index fcc58542c..e4d0c8dcc 100644 --- a/src/index.spec.js +++ b/src/index.spec.js @@ -1,10 +1,12 @@ jest.mock('./producer') jest.mock('./consumer') +jest.mock('./admin') jest.mock('./cluster') const Client = require('./index') const createProducer = require('./producer') const createConsumer = require('./consumer') +const createAdmin = require('./admin') const Cluster = require('./cluster') const ISOLATION_LEVEL = require('./protocol/isolationLevel') @@ -85,4 +87,39 @@ describe('Client', () => { ) }) }) + + describe('retry configurations', () => { + it('merges local producer options with the client options', () => { + const client = new Client({ retry: { initialRetryTime: 100 } }) + client.producer({ retry: { multiplier: 3 } }) + + expect(createProducer).toHaveBeenCalledWith( + expect.objectContaining({ + retry: { initialRetryTime: 100, multiplier: 3 }, + }) + ) + }) + + it('merges local consumer options with the client options', () => { + const client = new Client({ retry: { initialRetryTime: 100 } }) + client.consumer({ retry: { multiplier: 3 } }) + + expect(createConsumer).toHaveBeenCalledWith( + expect.objectContaining({ + retry: { initialRetryTime: 100, multiplier: 3 }, + }) + ) + }) + + it('merges local admin options with the client options', () => { + const client = new Client({ retry: { initialRetryTime: 100 } }) + client.admin({ retry: { multiplier: 3 } }) + + expect(createAdmin).toHaveBeenCalledWith( + expect.objectContaining({ + retry: { initialRetryTime: 100, multiplier: 3 }, + }) + ) + }) + }) })