diff --git a/src/admin/__tests__/fetchTopicMetadata.spec.js b/src/admin/__tests__/fetchTopicMetadata.spec.js index 484561c9a..18978fceb 100644 --- a/src/admin/__tests__/fetchTopicMetadata.spec.js +++ b/src/admin/__tests__/fetchTopicMetadata.spec.js @@ -101,7 +101,10 @@ describe('Admin', () => { admin.fetchTopicMetadata({ topics: [existingTopicName, newTopicName], }) - ).rejects.toHaveProperty('message', 'This server does not host this topic-partition') + ).rejects.toHaveProperty( + 'message', + `This server does not host this topic-partition [${newTopicName}]` + ) }) }) }) diff --git a/src/errors.js b/src/errors.js index 266192e19..0e1cd421c 100644 --- a/src/errors.js +++ b/src/errors.js @@ -38,6 +38,29 @@ class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError { } } +/** + * Wraps 'UNKNOWN_TOPIC_OR_PARTITION' (code=3) but is only + * used in cases where it is clearly the topic that is unknown + * (e.g. a `metadata` request has no partition parameter) + */ +class KafkaJSUnknownTopic extends KafkaJSProtocolError { + constructor(e, { topic }) { + super(e, { retriable: false }) + this.topic = topic + this.name = 'KafkaJSUnknownTopic' + this.message = `${this.message} [${this.topic}]` + } +} + +class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError { + constructor(e, { topic }) { + super(e, { retriable: false }) + this.topic = topic + this.name = 'KafkaJSTopicAuthorizationFailed' + this.message = `${this.message} [${this.topic}]` + } +} + class KafkaJSMemberIdRequired extends KafkaJSProtocolError { constructor(e, { memberId }) { super(e) @@ -304,6 +327,8 @@ module.exports = { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError, KafkaJSAlterPartitionReassignmentsError, + KafkaJSUnknownTopic, + KafkaJSTopicAuthorizationFailed, isRebalancing, isKafkaJSError, } diff --git a/src/network/connection.js b/src/network/connection.js index dded584d8..ca1e8092c 100644 --- a/src/network/connection.js +++ b/src/network/connection.js @@ -445,6 +445,7 @@ module.exports = class Connection { error: e.message, correlationId, size, + topic: e.topic, }) } diff --git a/src/protocol/error.js b/src/protocol/error.js index c070cb932..fa1a42e1f 100644 --- a/src/protocol/error.js +++ b/src/protocol/error.js @@ -592,10 +592,16 @@ const staleMetadata = e => e.type ) +const errorCodesByType = errorCodes.reduce((codesByType, error) => { + codesByType[error.type] = error.code + return codesByType +}, {}) + module.exports = { failure, errorCodes, createErrorFromCode, failIfVersionNotSupported, staleMetadata, + errorCodesByType, } diff --git a/src/protocol/requests/metadata/v0/response.js b/src/protocol/requests/metadata/v0/response.js index 3b2fb7650..c4915810e 100644 --- a/src/protocol/requests/metadata/v0/response.js +++ b/src/protocol/requests/metadata/v0/response.js @@ -1,5 +1,6 @@ const Decoder = require('../../../decoder') -const { failure, createErrorFromCode } = require('../../../error') +const { failure, createErrorFromCode, errorCodesByType } = require('../../../error') +const { KafkaJSUnknownTopic, KafkaJSTopicAuthorizationFailed } = require('../../../../errors') /** * Metadata Response (Version: 0) => [brokers] [topic_metadata] @@ -51,7 +52,17 @@ const decode = async rawData => { const parse = async data => { const topicsWithErrors = data.topicMetadata.filter(topic => failure(topic.topicErrorCode)) if (topicsWithErrors.length > 0) { - const { topicErrorCode } = topicsWithErrors[0] + const { topic, topicErrorCode } = topicsWithErrors[0] + if (topicErrorCode === errorCodesByType['UNKNOWN_TOPIC_OR_PARTITION']) { + throw new KafkaJSUnknownTopic(createErrorFromCode(topicErrorCode), { + topic, + }) + } + if (topicErrorCode === errorCodesByType['TOPIC_AUTHORIZATION_FAILED']) { + throw new KafkaJSTopicAuthorizationFailed(createErrorFromCode(topicErrorCode), { + topic, + }) + } throw createErrorFromCode(topicErrorCode) } diff --git a/src/protocol/requests/metadata/v0/response.spec.js b/src/protocol/requests/metadata/v0/response.spec.js index ea6806e99..5321b07b0 100644 --- a/src/protocol/requests/metadata/v0/response.spec.js +++ b/src/protocol/requests/metadata/v0/response.spec.js @@ -67,5 +67,29 @@ describe('Protocol > Requests > Metadata > v0', () => { createErrorFromCode(5).message ) }) + + test('when topicErrorCode is UNKNOWN_TOPIC_OR_PARTITION', async () => { + decoded.topicMetadata[0].topicErrorCode = 3 + await expect(response.parse(decoded)).rejects.toMatchObject({ + message: createErrorFromCode(3).message + ' [test-topic-1]', + retriable: false, + type: 'UNKNOWN_TOPIC_OR_PARTITION', + code: 3, + name: 'KafkaJSUnknownTopic', + topic: 'test-topic-1', + }) + }) + + test('when topicErrorCode is TOPIC_AUTHORIZATION_FAILED', async () => { + decoded.topicMetadata[0].topicErrorCode = 29 + await expect(response.parse(decoded)).rejects.toMatchObject({ + message: createErrorFromCode(29).message + ' [test-topic-1]', + retriable: false, + type: 'TOPIC_AUTHORIZATION_FAILED', + code: 29, + name: 'KafkaJSTopicAuthorizationFailed', + topic: 'test-topic-1', + }) + }) }) }) diff --git a/types/index.d.ts b/types/index.d.ts index 26bcbfea3..cbbcae53f 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -1159,6 +1159,16 @@ export class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError { constructor(e: Error | string, metadata?: KafkaJSOffsetOutOfRangeMetadata) } +export class KafkaJSUnknownTopic extends KafkaJSProtocolError { + constructor(e: Error | string, metadata?: KafkaJSUnknownTopicMetadata) + readonly topic: string +} + +export class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError { + constructor(e: Error | string, metadata?: KafkaJSTopicAuthorizationFailedMetadata) + readonly topic: string +} + export class KafkaJSAlterPartitionReassignmentsError extends KafkaJSProtocolError { readonly topic?: string readonly partition?: number @@ -1277,6 +1287,14 @@ export interface KafkaJSOffsetOutOfRangeMetadata { partition: number } +export interface KafkaJSUnknownTopicMetadata { + topic: string +} + +export interface KafkaJSTopicAuthorizationFailedMetadata { + topic: string +} + export interface KafkaJSNumberOfRetriesExceededMetadata { retryCount: number retryTime: number diff --git a/types/tests.ts b/types/tests.ts index f21796528..b26b72fd8 100644 --- a/types/tests.ts +++ b/types/tests.ts @@ -21,6 +21,8 @@ import { KafkaJSStaleTopicMetadataAssignment, PartitionMetadata, KafkaJSServerDoesNotSupportApiKey, + KafkaJSUnknownTopic, + KafkaJSTopicAuthorizationFailed } from './index' const { roundRobin } = PartitionAssigners @@ -338,3 +340,6 @@ new KafkaJSServerDoesNotSupportApiKey( apiName: 'Produce', } ) + +new KafkaJSUnknownTopic('This server does not host this topic-partition', { topic: 'my_topic' }) +new KafkaJSTopicAuthorizationFailed('Not authorized to access topics: [Topic authorization failed]', { topic: 'my_topic' })