Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include the offending topic when metadata request returns unknown or unauthorized #1657

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/admin/__tests__/fetchTopicMetadata.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ describe('Admin', () => {
admin.fetchTopicMetadata({
topics: [existingTopicName, newTopicName],
})
).rejects.toHaveProperty('message', 'This server does not host this topic-partition')
).rejects.toHaveProperty(
'message',
`This server does not host this topic-partition [${newTopicName}]`
)
})
})
})
25 changes: 25 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,29 @@ class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError {
}
}

/**
* Wraps 'UNKNOWN_TOPIC_OR_PARTITION' (code=3) but is only
* used in cases where it is clearly the topic that is unknown
* (e.g. a `metadata` request has no partition parameter)
*/
class KafkaJSUnknownTopic extends KafkaJSProtocolError {
constructor(e, { topic }) {
super(e, { retriable: false })
this.topic = topic
this.name = 'KafkaJSUnknownTopic'
this.message = `${this.message} [${this.topic}]`
}
}

class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError {
constructor(e, { topic }) {
super(e, { retriable: false })
this.topic = topic
this.name = 'KafkaJSTopicAuthorizationFailed'
this.message = `${this.message} [${this.topic}]`
}
}

class KafkaJSMemberIdRequired extends KafkaJSProtocolError {
constructor(e, { memberId }) {
super(e)
Expand Down Expand Up @@ -304,6 +327,8 @@ module.exports = {
KafkaJSFetcherRebalanceError,
KafkaJSNoBrokerAvailableError,
KafkaJSAlterPartitionReassignmentsError,
KafkaJSUnknownTopic,
KafkaJSTopicAuthorizationFailed,
isRebalancing,
isKafkaJSError,
}
1 change: 1 addition & 0 deletions src/network/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ module.exports = class Connection {
error: e.message,
correlationId,
size,
topic: e.topic,
})
}

Expand Down
6 changes: 6 additions & 0 deletions src/protocol/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -592,10 +592,16 @@ const staleMetadata = e =>
e.type
)

const errorCodesByType = errorCodes.reduce((codesByType, error) => {
codesByType[error.type] = error.code
return codesByType
}, {})

module.exports = {
failure,
errorCodes,
createErrorFromCode,
failIfVersionNotSupported,
staleMetadata,
errorCodesByType,
}
15 changes: 13 additions & 2 deletions src/protocol/requests/metadata/v0/response.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const { failure, createErrorFromCode, errorCodesByType } = require('../../../error')
const { KafkaJSUnknownTopic, KafkaJSTopicAuthorizationFailed } = require('../../../../errors')

/**
* Metadata Response (Version: 0) => [brokers] [topic_metadata]
Expand Down Expand Up @@ -51,7 +52,17 @@ const decode = async rawData => {
const parse = async data => {
const topicsWithErrors = data.topicMetadata.filter(topic => failure(topic.topicErrorCode))
if (topicsWithErrors.length > 0) {
const { topicErrorCode } = topicsWithErrors[0]
const { topic, topicErrorCode } = topicsWithErrors[0]
if (topicErrorCode === errorCodesByType['UNKNOWN_TOPIC_OR_PARTITION']) {
throw new KafkaJSUnknownTopic(createErrorFromCode(topicErrorCode), {
topic,
})
}
if (topicErrorCode === errorCodesByType['TOPIC_AUTHORIZATION_FAILED']) {
throw new KafkaJSTopicAuthorizationFailed(createErrorFromCode(topicErrorCode), {
topic,
})
}
throw createErrorFromCode(topicErrorCode)
}

Expand Down
24 changes: 24 additions & 0 deletions src/protocol/requests/metadata/v0/response.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,29 @@ describe('Protocol > Requests > Metadata > v0', () => {
createErrorFromCode(5).message
)
})

test('when topicErrorCode is UNKNOWN_TOPIC_OR_PARTITION', async () => {
decoded.topicMetadata[0].topicErrorCode = 3
await expect(response.parse(decoded)).rejects.toMatchObject({
message: createErrorFromCode(3).message + ' [test-topic-1]',
retriable: false,
type: 'UNKNOWN_TOPIC_OR_PARTITION',
code: 3,
name: 'KafkaJSUnknownTopic',
topic: 'test-topic-1',
})
})

test('when topicErrorCode is TOPIC_AUTHORIZATION_FAILED', async () => {
decoded.topicMetadata[0].topicErrorCode = 29
await expect(response.parse(decoded)).rejects.toMatchObject({
message: createErrorFromCode(29).message + ' [test-topic-1]',
retriable: false,
type: 'TOPIC_AUTHORIZATION_FAILED',
code: 29,
name: 'KafkaJSTopicAuthorizationFailed',
topic: 'test-topic-1',
})
})
})
})
18 changes: 18 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,16 @@ export class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError {
constructor(e: Error | string, metadata?: KafkaJSOffsetOutOfRangeMetadata)
}

export class KafkaJSUnknownTopic extends KafkaJSProtocolError {
constructor(e: Error | string, metadata?: KafkaJSUnknownTopicMetadata)
readonly topic: string
}

export class KafkaJSTopicAuthorizationFailed extends KafkaJSProtocolError {
constructor(e: Error | string, metadata?: KafkaJSTopicAuthorizationFailedMetadata)
readonly topic: string
}

export class KafkaJSAlterPartitionReassignmentsError extends KafkaJSProtocolError {
readonly topic?: string
readonly partition?: number
Expand Down Expand Up @@ -1277,6 +1287,14 @@ export interface KafkaJSOffsetOutOfRangeMetadata {
partition: number
}

export interface KafkaJSUnknownTopicMetadata {
topic: string
}

export interface KafkaJSTopicAuthorizationFailedMetadata {
topic: string
}

export interface KafkaJSNumberOfRetriesExceededMetadata {
retryCount: number
retryTime: number
Expand Down
5 changes: 5 additions & 0 deletions types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import {
KafkaJSStaleTopicMetadataAssignment,
PartitionMetadata,
KafkaJSServerDoesNotSupportApiKey,
KafkaJSUnknownTopic,
KafkaJSTopicAuthorizationFailed
} from './index'

const { roundRobin } = PartitionAssigners
Expand Down Expand Up @@ -338,3 +340,6 @@ new KafkaJSServerDoesNotSupportApiKey(
apiName: 'Produce',
}
)

new KafkaJSUnknownTopic('This server does not host this topic-partition', { topic: 'my_topic' })
new KafkaJSTopicAuthorizationFailed('Not authorized to access topics: [Topic authorization failed]', { topic: 'my_topic' })