From f5036fed79640a182fe2b039a0d8dd9e15e1c3b1 Mon Sep 17 00:00:00 2001 From: Greg Foltz Date: Sat, 6 Jan 2024 13:20:42 -0800 Subject: [PATCH 1/5] Log topic name for UnknownTopic and TopicAuthorizationFailed errors --- src/errors.js | 17 +++++++++++++++++ src/network/connection.js | 1 + src/protocol/error.js | 6 ++++++ src/protocol/requests/metadata/v0/response.js | 15 +++++++++++++-- 4 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/errors.js b/src/errors.js index 266192e19..874e8e7f1 100644 --- a/src/errors.js +++ b/src/errors.js @@ -38,6 +38,21 @@ class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError { } } +class KafkaJSUnknownTopic extends KafkaJSProtocolError { + constructor(e, { topic }) { + super(e, { retriable: false }) + this.topic = topic + this.name = 'KafkaJSUnknownTopic' + } +} + +class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError { + constructor(e, { topic }) { + super(e, { retriable: false }) + this.topic = topic + this.name = 'KafkaJSTopicAuthorizationFailed' + } +} class KafkaJSMemberIdRequired extends KafkaJSProtocolError { constructor(e, { memberId }) { super(e) @@ -304,6 +319,8 @@ module.exports = { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError, KafkaJSAlterPartitionReassignmentsError, + KafkaJSUnknownTopic, + KafkaJSTopicAuthorizationFailed, isRebalancing, isKafkaJSError, } diff --git a/src/network/connection.js b/src/network/connection.js index dded584d8..ca1e8092c 100644 --- a/src/network/connection.js +++ b/src/network/connection.js @@ -445,6 +445,7 @@ module.exports = class Connection { error: e.message, correlationId, size, + topic: e.topic, }) } diff --git a/src/protocol/error.js b/src/protocol/error.js index c070cb932..fa1a42e1f 100644 --- a/src/protocol/error.js +++ b/src/protocol/error.js @@ -592,10 +592,16 @@ const staleMetadata = e => e.type ) +const errorCodesByType = errorCodes.reduce((codesByType, error) => { + codesByType[error.type] = error.code + return codesByType +}, {}) + module.exports = { failure, errorCodes, createErrorFromCode, failIfVersionNotSupported, staleMetadata, + errorCodesByType, } diff --git a/src/protocol/requests/metadata/v0/response.js b/src/protocol/requests/metadata/v0/response.js index 3b2fb7650..c4915810e 100644 --- a/src/protocol/requests/metadata/v0/response.js +++ b/src/protocol/requests/metadata/v0/response.js @@ -1,5 +1,6 @@ const Decoder = require('../../../decoder') -const { failure, createErrorFromCode } = require('../../../error') +const { failure, createErrorFromCode, errorCodesByType } = require('../../../error') +const { KafkaJSUnknownTopic, KafkaJSTopicAuthorizationFailed } = require('../../../../errors') /** * Metadata Response (Version: 0) => [brokers] [topic_metadata] @@ -51,7 +52,17 @@ const decode = async rawData => { const parse = async data => { const topicsWithErrors = data.topicMetadata.filter(topic => failure(topic.topicErrorCode)) if (topicsWithErrors.length > 0) { - const { topicErrorCode } = topicsWithErrors[0] + const { topic, topicErrorCode } = topicsWithErrors[0] + if (topicErrorCode === errorCodesByType['UNKNOWN_TOPIC_OR_PARTITION']) { + throw new KafkaJSUnknownTopic(createErrorFromCode(topicErrorCode), { + topic, + }) + } + if (topicErrorCode === errorCodesByType['TOPIC_AUTHORIZATION_FAILED']) { + throw new KafkaJSTopicAuthorizationFailed(createErrorFromCode(topicErrorCode), { + topic, + }) + } throw createErrorFromCode(topicErrorCode) } From 83e23d10a9d1f89bb5ea73f8813ff1ce9555893f Mon Sep 17 00:00:00 2001 From: Greg Foltz Date: Sat, 6 Jan 2024 13:47:03 -0800 Subject: [PATCH 2/5] metadata/v0/response unit tests --- .../requests/metadata/v0/response.spec.js | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/protocol/requests/metadata/v0/response.spec.js b/src/protocol/requests/metadata/v0/response.spec.js index ea6806e99..0d7d9d42e 100644 --- a/src/protocol/requests/metadata/v0/response.spec.js +++ b/src/protocol/requests/metadata/v0/response.spec.js @@ -67,5 +67,29 @@ describe('Protocol > Requests > Metadata > v0', () => { createErrorFromCode(5).message ) }) + + test('when topicErrorCode is UNKNOWN_TOPIC_OR_PARTITION', async () => { + decoded.topicMetadata[0].topicErrorCode = 3 + await expect(response.parse(decoded)).rejects.toMatchObject({ + message: createErrorFromCode(3).message, + retriable: false, + type: 'UNKNOWN_TOPIC_OR_PARTITION', + code: 3, + name: 'KafkaJSUnknownTopic', + topic: 'test-topic-1', + }) + }) + + test('when topicErrorCode is TOPIC_AUTHORIZATION_FAILED', async () => { + decoded.topicMetadata[0].topicErrorCode = 29 + await expect(response.parse(decoded)).rejects.toMatchObject({ + message: createErrorFromCode(29).message, + retriable: false, + type: 'TOPIC_AUTHORIZATION_FAILED', + code: 29, + name: 'KafkaJSTopicAuthorizationFailed', + topic: 'test-topic-1', + }) + }) }) }) From fad3e16ab6155617927515b5a10c7d5e1d6d3885 Mon Sep 17 00:00:00 2001 From: Greg Foltz Date: Sat, 6 Jan 2024 14:00:29 -0800 Subject: [PATCH 3/5] Add Errors to types --- types/index.d.ts | 17 +++++++++++++++++ types/tests.ts | 5 +++++ 2 files changed, 22 insertions(+) diff --git a/types/index.d.ts b/types/index.d.ts index 26bcbfea3..05531eec9 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -1159,6 +1159,16 @@ export class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError { constructor(e: Error | string, metadata?: KafkaJSOffsetOutOfRangeMetadata) } +export class KafkaJSUnknownTopic extends KafkaJSProtocolError { + constructor(e: Error | string, metadata?: KafkaJSUnknownTopicMetadata) + readonly topic: string +} + +export class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError { + constructor(e: Error | string, metadata?: KafkaJSTopicAuthorizationFailedMetadata) + readonly topic: string +} + export class KafkaJSAlterPartitionReassignmentsError extends KafkaJSProtocolError { readonly topic?: string readonly partition?: number @@ -1277,6 +1287,13 @@ export interface KafkaJSOffsetOutOfRangeMetadata { partition: number } +export interface KafkaJSUnknownTopicMetadata { + topic: string +} +export interface KafkaJSTopicAuthorizationFailedMetadata { + topic: string +} + export interface KafkaJSNumberOfRetriesExceededMetadata { retryCount: number retryTime: number diff --git a/types/tests.ts b/types/tests.ts index f21796528..22b6e329f 100644 --- a/types/tests.ts +++ b/types/tests.ts @@ -21,6 +21,8 @@ import { KafkaJSStaleTopicMetadataAssignment, PartitionMetadata, KafkaJSServerDoesNotSupportApiKey, + KafkaJSUnknownTopic, + KafkaJSTopicAuthorizationFailed } from './index' const { roundRobin } = PartitionAssigners @@ -338,3 +340,6 @@ new KafkaJSServerDoesNotSupportApiKey( apiName: 'Produce', } ) + +new KafkaJSUnknownTopic('This server does not host this topic-partition', { topic: 'my_topic' }) +new KafkaJSTopicAuthorizationFailed('Not authorized to access topics: [Topic authorization failed]', { topic: 'my_topic' }) \ No newline at end of file From 1ce4ce6014dd75c7df269a8c430d8c6b1d354ca3 Mon Sep 17 00:00:00 2001 From: Greg Foltz Date: Sat, 6 Jan 2024 17:58:46 -0800 Subject: [PATCH 4/5] Include topic in error message --- src/errors.js | 4 +++- src/protocol/error.js | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/errors.js b/src/errors.js index 874e8e7f1..15ec77c83 100644 --- a/src/errors.js +++ b/src/errors.js @@ -40,9 +40,10 @@ class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError { class KafkaJSUnknownTopic extends KafkaJSProtocolError { constructor(e, { topic }) { - super(e, { retriable: false }) + super(e) this.topic = topic this.name = 'KafkaJSUnknownTopic' + this.message = `${this.message} [${this.topic}]` } } @@ -51,6 +52,7 @@ class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError { super(e, { retriable: false }) this.topic = topic this.name = 'KafkaJSTopicAuthorizationFailed' + this.message = `${this.message} [${this.topic})]` } } class KafkaJSMemberIdRequired extends KafkaJSProtocolError { diff --git a/src/protocol/error.js b/src/protocol/error.js index fa1a42e1f..b212abaa8 100644 --- a/src/protocol/error.js +++ b/src/protocol/error.js @@ -24,7 +24,7 @@ const errorCodes = [ { type: 'UNKNOWN_TOPIC_OR_PARTITION', code: 3, - retriable: true, + retriable: false, message: 'This server does not host this topic-partition', }, { From 07084ef80885f33dc5069619de6f541a26c0be5c Mon Sep 17 00:00:00 2001 From: Greg Foltz Date: Sat, 6 Jan 2024 19:30:33 -0800 Subject: [PATCH 5/5] Fix tests --- src/admin/__tests__/fetchTopicMetadata.spec.js | 5 ++++- src/errors.js | 10 ++++++++-- src/protocol/error.js | 2 +- src/protocol/requests/metadata/v0/response.spec.js | 4 ++-- types/index.d.ts | 1 + types/tests.ts | 2 +- 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/admin/__tests__/fetchTopicMetadata.spec.js b/src/admin/__tests__/fetchTopicMetadata.spec.js index 484561c9a..18978fceb 100644 --- a/src/admin/__tests__/fetchTopicMetadata.spec.js +++ b/src/admin/__tests__/fetchTopicMetadata.spec.js @@ -101,7 +101,10 @@ describe('Admin', () => { admin.fetchTopicMetadata({ topics: [existingTopicName, newTopicName], }) - ).rejects.toHaveProperty('message', 'This server does not host this topic-partition') + ).rejects.toHaveProperty( + 'message', + `This server does not host this topic-partition [${newTopicName}]` + ) }) }) }) diff --git a/src/errors.js b/src/errors.js index 15ec77c83..0e1cd421c 100644 --- a/src/errors.js +++ b/src/errors.js @@ -38,9 +38,14 @@ class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError { } } +/** + * Wraps 'UNKNOWN_TOPIC_OR_PARTITION' (code=3) but is only + * used in cases where it is clearly the topic that is unknown + * (e.g. a `metadata` request has no partition parameter) + */ class KafkaJSUnknownTopic extends KafkaJSProtocolError { constructor(e, { topic }) { - super(e) + super(e, { retriable: false }) this.topic = topic this.name = 'KafkaJSUnknownTopic' this.message = `${this.message} [${this.topic}]` @@ -52,9 +57,10 @@ class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError { super(e, { retriable: false }) this.topic = topic this.name = 'KafkaJSTopicAuthorizationFailed' - this.message = `${this.message} [${this.topic})]` + this.message = `${this.message} [${this.topic}]` } } + class KafkaJSMemberIdRequired extends KafkaJSProtocolError { constructor(e, { memberId }) { super(e) diff --git a/src/protocol/error.js b/src/protocol/error.js index b212abaa8..fa1a42e1f 100644 --- a/src/protocol/error.js +++ b/src/protocol/error.js @@ -24,7 +24,7 @@ const errorCodes = [ { type: 'UNKNOWN_TOPIC_OR_PARTITION', code: 3, - retriable: false, + retriable: true, message: 'This server does not host this topic-partition', }, { diff --git a/src/protocol/requests/metadata/v0/response.spec.js b/src/protocol/requests/metadata/v0/response.spec.js index 0d7d9d42e..5321b07b0 100644 --- a/src/protocol/requests/metadata/v0/response.spec.js +++ b/src/protocol/requests/metadata/v0/response.spec.js @@ -71,7 +71,7 @@ describe('Protocol > Requests > Metadata > v0', () => { test('when topicErrorCode is UNKNOWN_TOPIC_OR_PARTITION', async () => { decoded.topicMetadata[0].topicErrorCode = 3 await expect(response.parse(decoded)).rejects.toMatchObject({ - message: createErrorFromCode(3).message, + message: createErrorFromCode(3).message + ' [test-topic-1]', retriable: false, type: 'UNKNOWN_TOPIC_OR_PARTITION', code: 3, @@ -83,7 +83,7 @@ describe('Protocol > Requests > Metadata > v0', () => { test('when topicErrorCode is TOPIC_AUTHORIZATION_FAILED', async () => { decoded.topicMetadata[0].topicErrorCode = 29 await expect(response.parse(decoded)).rejects.toMatchObject({ - message: createErrorFromCode(29).message, + message: createErrorFromCode(29).message + ' [test-topic-1]', retriable: false, type: 'TOPIC_AUTHORIZATION_FAILED', code: 29, diff --git a/types/index.d.ts b/types/index.d.ts index 05531eec9..cbbcae53f 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -1290,6 +1290,7 @@ export interface KafkaJSOffsetOutOfRangeMetadata { export interface KafkaJSUnknownTopicMetadata { topic: string } + export interface KafkaJSTopicAuthorizationFailedMetadata { topic: string } diff --git a/types/tests.ts b/types/tests.ts index 22b6e329f..b26b72fd8 100644 --- a/types/tests.ts +++ b/types/tests.ts @@ -342,4 +342,4 @@ new KafkaJSServerDoesNotSupportApiKey( ) new KafkaJSUnknownTopic('This server does not host this topic-partition', { topic: 'my_topic' }) -new KafkaJSTopicAuthorizationFailed('Not authorized to access topics: [Topic authorization failed]', { topic: 'my_topic' }) \ No newline at end of file +new KafkaJSTopicAuthorizationFailed('Not authorized to access topics: [Topic authorization failed]', { topic: 'my_topic' })