Skip to content

Commit

Permalink
Merge pull request #256 from tulios/crash-consumer-on-codec-not-imple…
Browse files Browse the repository at this point in the history
…mented-error

Crash consumer on codec not implemented error
  • Loading branch information
tulios committed Feb 18, 2019
1 parent d641a16 commit 39743fa
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/consumer/__tests__/runner.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const Runner = require('../runner')
const Batch = require('../batch')
const { KafkaJSProtocolError } = require('../../errors')
const { KafkaJSProtocolError, KafkaJSNotImplemented } = require('../../errors')
const { createErrorFromCode } = require('../../protocol/error')
const InstrumentationEventEmitter = require('../../instrumentation/emitter')
const { newLogger } = require('testHelpers')
Expand Down Expand Up @@ -90,4 +90,14 @@ describe('Consumer > Runner', () => {
expect(runner.scheduleFetch).not.toHaveBeenCalled()
expect(onCrash).toHaveBeenCalledWith(unknowError)
})

it('crashes on KafkaJSNotImplemented errors', async () => {
const notImplementedError = new KafkaJSNotImplemented('not implemented')
consumerGroup.fetch.mockImplementationOnce(() => {
throw notImplementedError
})

await runner.start()
expect(onCrash).toHaveBeenCalledWith(notImplementedError)
})
})
4 changes: 4 additions & 0 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ module.exports = class Runner {
return
}

if (e.name === 'KafkaJSNotImplemented') {
return bail(e)
}

this.logger.debug('Error while fetching data, trying again...', {
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,10,116,111,112,105,99,45,116,101,115,116,0,0,0,2,0,0,0,3,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,110,0,0,0,0,0,0,0,0,0,0,0,98,0,0,0,0,2,151,239,110,170,0,2,0,0,0,0,0,0,1,104,185,18,144,177,0,0,1,104,185,18,144,177,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,1,48,68,94,0,0,0,14,107,101,121,45,49,52,57,68,118,97,108,117,101,1,10,100,45,50,48,49,57,45,48,50,45,48,52,84,49,53,58,49,51,58,52,56,46,49,56,55,90,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}
2 changes: 2 additions & 0 deletions src/protocol/requests/fetch/v4/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const decodeMessages = async decoder => {
if (e.name === 'KafkaJSPartialMessageError') {
break
}

throw e
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/protocol/requests/fetch/v4/response.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { decode, parse } = require('./response')
const { KafkaJSNotImplemented } = require('../../../../errors')

describe('Protocol > Requests > Fetch > v4', () => {
test('response', async () => {
Expand Down Expand Up @@ -262,4 +263,12 @@ describe('Protocol > Requests > Fetch > v4', () => {
expect(new Set(messagesMagicBytes)).toEqual(new Set([1]))
})
})

describe('response with an unconfigured compression codec (snappy)', () => {
test('throws KafkaJSNotImplemented error', async () => {
await expect(
decode(Buffer.from(require('../fixtures/v4_response_snappy.json')))
).rejects.toThrow(KafkaJSNotImplemented)
})
})
})

0 comments on commit 39743fa

Please sign in to comment.