Skip to content

Commit

Permalink
Merge pull request #313 from tulios/make-request-timeout-optional
Browse files Browse the repository at this point in the history
Make the requestTimeout optional
  • Loading branch information
tulios authored Mar 14, 2019
2 parents 71a6aa6 + 4894237 commit 47c5fab
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/admin/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ describe('Admin', () => {
const emitter = new InstrumentationEventEmitter()
const cluster = createCluster({
requestTimeout: 1,
enforceRequestTimeout: true,
instrumentationEmitter: emitter,
})

Expand Down
2 changes: 2 additions & 0 deletions src/cluster/connectionBuilder.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module.exports = ({
sasl,
clientId,
requestTimeout,
enforceRequestTimeout,
connectionTimeout,
maxInFlightRequests,
retry,
Expand Down Expand Up @@ -33,6 +34,7 @@ module.exports = ({
clientId,
connectionTimeout,
requestTimeout,
enforceRequestTimeout,
maxInFlightRequests,
instrumentationEmitter,
retry,
Expand Down
2 changes: 2 additions & 0 deletions src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ module.exports = class Cluster {
connectionTimeout,
authenticationTimeout,
requestTimeout,
enforceRequestTimeout,
metadataMaxAge,
retry,
allowExperimentalV011,
Expand All @@ -67,6 +68,7 @@ module.exports = class Cluster {
clientId,
connectionTimeout,
requestTimeout,
enforceRequestTimeout,
maxInFlightRequests,
retry,
})
Expand Down
6 changes: 5 additions & 1 deletion src/consumer/__tests__/instrumentationEvents.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ describe('Consumer > Instrumentation Events', () => {
})

it('emits request timeout events', async () => {
cluster = createCluster({ instrumentationEmitter: emitter, requestTimeout: 1 })
cluster = createCluster({
instrumentationEmitter: emitter,
requestTimeout: 1,
enforceRequestTimeout: true,
})
consumer = createConsumer({
cluster,
groupId,
Expand Down
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module.exports = class Client {
connectionTimeout,
authenticationTimeout,
requestTimeout,
enforceRequestTimeout = false,
retry,
logLevel = INFO,
logCreator = LoggerConsole,
Expand All @@ -52,6 +53,7 @@ module.exports = class Client {
connectionTimeout,
authenticationTimeout,
requestTimeout,
enforceRequestTimeout,
metadataMaxAge,
instrumentationEmitter,
allowAutoTopicCreation,
Expand Down
2 changes: 2 additions & 0 deletions src/network/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ module.exports = class Connection {
clientId = 'kafkajs',
connectionTimeout = 1000,
requestTimeout = 30000,
enforceRequestTimeout = false,
maxInFlightRequests = null,
instrumentationEmitter = null,
retry = {},
Expand All @@ -68,6 +69,7 @@ module.exports = class Connection {
instrumentationEmitter,
maxInFlightRequests,
requestTimeout,
enforceRequestTimeout,
clientId,
broker: this.broker,
logger: logger.namespace('RequestQueue'),
Expand Down
4 changes: 3 additions & 1 deletion src/network/connection.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ describe('Network > Connection', () => {

test('respect the requestTimeout', async () => {
const protocol = apiVersions()
connection = new Connection(connectionOpts({ requestTimeout: 50 }))
connection = new Connection(
connectionOpts({ requestTimeout: 50, enforceRequestTimeout: true })
)
const originalProcessData = connection.processData

connection.processData = async data => {
Expand Down
3 changes: 3 additions & 0 deletions src/network/requestQueue/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ module.exports = class RequestQueue {
instrumentationEmitter = null,
maxInFlightRequests,
requestTimeout,
enforceRequestTimeout,
clientId,
broker,
logger,
}) {
this.instrumentationEmitter = instrumentationEmitter
this.maxInFlightRequests = maxInFlightRequests
this.requestTimeout = requestTimeout
this.enforceRequestTimeout = enforceRequestTimeout
this.clientId = clientId
this.broker = broker
this.logger = logger
Expand Down Expand Up @@ -65,6 +67,7 @@ module.exports = class RequestQueue {
broker: this.broker,
clientId: this.clientId,
instrumentationEmitter: this.instrumentationEmitter,
enforceRequestTimeout: this.enforceRequestTimeout,
requestTimeout,
send: () => {
this.inflight.set(correlationId, socketRequest)
Expand Down
6 changes: 5 additions & 1 deletion src/network/requestQueue/socketRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ module.exports = class SocketRequest {
*/
constructor({
requestTimeout,
enforceRequestTimeout,
broker,
clientId,
entry,
Expand All @@ -59,6 +60,7 @@ module.exports = class SocketRequest {
}) {
this.createdAt = Date.now()
this.requestTimeout = requestTimeout
this.enforceRequestTimeout = enforceRequestTimeout
this.broker = broker
this.clientId = clientId
this.entry = entry
Expand Down Expand Up @@ -110,7 +112,9 @@ module.exports = class SocketRequest {
})
}

this.timeoutId = setTimeout(timeoutCallback, this.requestTimeout)
if (this.enforceRequestTimeout) {
this.timeoutId = setTimeout(timeoutCallback, this.requestTimeout)
}
}

completed({ size, payload }) {
Expand Down
3 changes: 3 additions & 0 deletions src/network/requestQueue/socketRequest.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ describe('Network > SocketRequest', () => {
expect(request.pendingDuration).toEqual(null)
expect(request.timeoutId).toEqual(null)

request.enforceRequestTimeout = true
request.send()

expect(sendRequest).toHaveBeenCalled()
Expand All @@ -63,6 +64,7 @@ describe('Network > SocketRequest', () => {

it('executes the timeoutHandler when it times out', async () => {
jest.spyOn(request, 'rejected')
request.enforceRequestTimeout = true
request.send()

await sleep(requestTimeout + 1)
Expand Down Expand Up @@ -164,6 +166,7 @@ describe('Network > SocketRequest', () => {
it('emits NETWORK_REQUEST_TIMEOUT', async () => {
jest.spyOn(request, 'rejected')
emitter.addListener(events.NETWORK_REQUEST_TIMEOUT, eventCalled)
request.enforceRequestTimeout = true
request.send()

await sleep(requestTimeout + 1)
Expand Down
1 change: 1 addition & 0 deletions src/producer/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ describe('Producer', () => {
const emitter = new InstrumentationEventEmitter()
const cluster = createCluster({
requestTimeout: 1,
enforceRequestTimeout: true,
instrumentationEmitter: emitter,
})

Expand Down

0 comments on commit 47c5fab

Please sign in to comment.