diff --git a/src/consumer/__tests__/runner.spec.js b/src/consumer/__tests__/runner.spec.js index c7dd60e91..4ad464538 100644 --- a/src/consumer/__tests__/runner.spec.js +++ b/src/consumer/__tests__/runner.spec.js @@ -1,6 +1,6 @@ const Runner = require('../runner') const Batch = require('../batch') -const { KafkaJSProtocolError } = require('../../errors') +const { KafkaJSProtocolError, KafkaJSNotImplemented } = require('../../errors') const { createErrorFromCode } = require('../../protocol/error') const InstrumentationEventEmitter = require('../../instrumentation/emitter') const { newLogger } = require('testHelpers') @@ -90,4 +90,14 @@ describe('Consumer > Runner', () => { expect(runner.scheduleFetch).not.toHaveBeenCalled() expect(onCrash).toHaveBeenCalledWith(unknowError) }) + + it('crashes on KafkaJSNotImplemented errors', async () => { + const notImplementedError = new KafkaJSNotImplemented('not implemented') + consumerGroup.fetch.mockImplementationOnce(() => { + throw notImplementedError + }) + + await runner.start() + expect(onCrash).toHaveBeenCalledWith(notImplementedError) + }) }) diff --git a/src/consumer/runner.js b/src/consumer/runner.js index 631c22f2a..caa8ed635 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -307,6 +307,10 @@ module.exports = class Runner { return } + if (e.name === 'KafkaJSNotImplemented') { + return bail(e) + } + this.logger.debug('Error while fetching data, trying again...', { groupId: this.consumerGroup.groupId, memberId: this.consumerGroup.memberId, diff --git a/src/protocol/requests/fetch/fixtures/v4_response_snappy.json b/src/protocol/requests/fetch/fixtures/v4_response_snappy.json new file mode 100644 index 000000000..aea1b6814 --- /dev/null +++ b/src/protocol/requests/fetch/fixtures/v4_response_snappy.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,10,116,111,112,105,99,45,116,101,115,116,0,0,0,2,0,0,0,3,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,110,0,0,0,0,0,0,0,0,0,0,0,98,0,0,0,0,2,151,239,110,170,0,2,0,0,0,0,0,0,1,104,185,18,144,177,0,0,1,104,185,18,144,177,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,1,48,68,94,0,0,0,14,107,101,121,45,49,52,57,68,118,97,108,117,101,1,10,100,45,50,48,49,57,45,48,50,45,48,52,84,49,53,58,49,51,58,52,56,46,49,56,55,90,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]} diff --git a/src/protocol/requests/fetch/v4/response.js b/src/protocol/requests/fetch/v4/response.js index 912101caa..65925c5b7 100644 --- a/src/protocol/requests/fetch/v4/response.js +++ b/src/protocol/requests/fetch/v4/response.js @@ -50,6 +50,8 @@ const decodeMessages = async decoder => { if (e.name === 'KafkaJSPartialMessageError') { break } + + throw e } } diff --git a/src/protocol/requests/fetch/v4/response.spec.js b/src/protocol/requests/fetch/v4/response.spec.js index aec217608..5d0a8f58d 100644 --- a/src/protocol/requests/fetch/v4/response.spec.js +++ b/src/protocol/requests/fetch/v4/response.spec.js @@ -1,4 +1,5 @@ const { decode, parse } = require('./response') +const { KafkaJSNotImplemented } = require('../../../../errors') describe('Protocol > Requests > Fetch > v4', () => { test('response', async () => { @@ -262,4 +263,12 @@ describe('Protocol > Requests > Fetch > v4', () => { expect(new Set(messagesMagicBytes)).toEqual(new Set([1])) }) }) + + describe('response with an unconfigured compression codec (snappy)', () => { + test('throws KafkaJSNotImplemented error', async () => { + await expect( + decode(Buffer.from(require('../fixtures/v4_response_snappy.json'))) + ).rejects.toThrow(KafkaJSNotImplemented) + }) + }) })