Skip to content

Commit

Permalink
Fix propagation of custom retry configs
Browse files Browse the repository at this point in the history
  • Loading branch information
tulios committed Feb 19, 2019
1 parent 78ded51 commit a987e0d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const ISOLATION_LEVEL = require('./protocol/isolationLevel')

const PRIVATE = {
CREATE_CLUSTER: Symbol('private:Kafka:createCluster'),
CLUSTER_RETRY: Symbol('private:Kafka:clusterRetry'),
LOGGER: Symbol('private:Kafka:logger'),
OFFSETS: Symbol('private:Kafka:offsets'),
}
Expand All @@ -33,6 +34,7 @@ module.exports = class Client {
}) {
this[PRIVATE.OFFSETS] = new Map()
this[PRIVATE.LOGGER] = createLogger({ level: logLevel, logCreator })
this[PRIVATE.CLUSTER_RETRY] = retry
this[PRIVATE.CREATE_CLUSTER] = ({
metadataMaxAge = 300000,
allowAutoTopicCreation = true,
Expand All @@ -42,6 +44,7 @@ module.exports = class Client {
}) =>
new Cluster({
logger: this[PRIVATE.LOGGER],
retry: this[PRIVATE.CLUSTER_RETRY],
brokers,
ssl,
sasl,
Expand All @@ -51,7 +54,6 @@ module.exports = class Client {
requestTimeout,
metadataMaxAge,
instrumentationEmitter,
retry,
allowAutoTopicCreation,
allowExperimentalV011,
maxInFlightRequests,
Expand Down Expand Up @@ -82,7 +84,7 @@ module.exports = class Client {
})

return createProducer({
retry: { ...cluster.retry, ...retry },
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
logger: this[PRIVATE.LOGGER],
cluster,
createPartitioner,
Expand Down Expand Up @@ -125,7 +127,7 @@ module.exports = class Client {
})

return createConsumer({
retry: { ...cluster.retry, retry },
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
logger: this[PRIVATE.LOGGER],
cluster,
groupId,
Expand All @@ -152,7 +154,7 @@ module.exports = class Client {
})

return createAdmin({
retry: { ...cluster.retry, retry },
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
logger: this[PRIVATE.LOGGER],
instrumentationEmitter,
cluster,
Expand Down
37 changes: 37 additions & 0 deletions src/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
jest.mock('./producer')
jest.mock('./consumer')
jest.mock('./admin')
jest.mock('./cluster')

const Client = require('./index')
const createProducer = require('./producer')
const createConsumer = require('./consumer')
const createAdmin = require('./admin')
const Cluster = require('./cluster')
const ISOLATION_LEVEL = require('./protocol/isolationLevel')

Expand Down Expand Up @@ -85,4 +87,39 @@ describe('Client', () => {
)
})
})

describe('retry configurations', () => {
it('merges local producer options with the client options', () => {
const client = new Client({ retry: { initialRetryTime: 100 } })
client.producer({ retry: { multiplier: 3 } })

expect(createProducer).toHaveBeenCalledWith(
expect.objectContaining({
retry: { initialRetryTime: 100, multiplier: 3 },
})
)
})

it('merges local consumer options with the client options', () => {
const client = new Client({ retry: { initialRetryTime: 100 } })
client.consumer({ retry: { multiplier: 3 } })

expect(createConsumer).toHaveBeenCalledWith(
expect.objectContaining({
retry: { initialRetryTime: 100, multiplier: 3 },
})
)
})

it('merges local admin options with the client options', () => {
const client = new Client({ retry: { initialRetryTime: 100 } })
client.admin({ retry: { multiplier: 3 } })

expect(createAdmin).toHaveBeenCalledWith(
expect.objectContaining({
retry: { initialRetryTime: 100, multiplier: 3 },
})
)
})
})
})

0 comments on commit a987e0d

Please sign in to comment.