From 39b992519b364d622bdde332866aec8e83c83232 Mon Sep 17 00:00:00 2001 From: Peter Lehnhardt Date: Wed, 5 Nov 2025 16:45:24 +0100 Subject: [PATCH] Make error messages from the server available Many APIs return error codes and error messages in case of errors. Previously, only error codes have been handled which were used to construct generic protocol errors with predefined error messages. Error messages from the Kafka cluster were skipped altogether. This PR makes the error messages from the Kafka cluster available inside the protocol errors if those have been provided. This way, more detailed information about the issue can be accessed. on-behalf-of: @SAP ospo@sap.com --- src/apis/admin/alter-client-quotas-v1.ts | 2 +- src/apis/admin/alter-configs-v2.ts | 5 ++-- .../admin/alter-partition-reassignments-v0.ts | 7 ++--- src/apis/admin/alter-partition-v3.ts | 4 +-- src/apis/admin/alter-replica-log-dirs-v2.ts | 2 +- .../admin/alter-user-scram-credentials-v0.ts | 2 +- src/apis/admin/consumer-group-describe-v0.ts | 5 ++-- src/apis/admin/create-acls-v3.ts | 2 +- src/apis/admin/create-delegation-token-v3.ts | 2 +- src/apis/admin/create-partitions-v1.ts | 2 +- src/apis/admin/create-partitions-v2.ts | 2 +- src/apis/admin/create-partitions-v3.ts | 2 +- src/apis/admin/create-topics-v7.ts | 2 +- src/apis/admin/delete-acls-v3.ts | 10 ++++--- src/apis/admin/delete-groups-v2.ts | 2 +- src/apis/admin/delete-records-v2.ts | 2 +- src/apis/admin/delete-topics-v6.ts | 2 +- src/apis/admin/describe-acls-v3.ts | 2 +- src/apis/admin/describe-client-quotas-v0.ts | 2 +- src/apis/admin/describe-cluster-v1.ts | 2 +- src/apis/admin/describe-configs-v2.ts | 5 ++-- src/apis/admin/describe-configs-v3.ts | 5 ++-- src/apis/admin/describe-configs-v4.ts | 5 ++-- .../admin/describe-delegation-token-v3.ts | 2 +- src/apis/admin/describe-groups-v5.ts | 2 +- src/apis/admin/describe-log-dirs-v4.ts | 4 +-- src/apis/admin/describe-producers-v0.ts | 2 +- src/apis/admin/describe-quorum-v2.ts | 7 ++--- .../admin/describe-topic-partitions-v0.ts | 4 +-- src/apis/admin/describe-transactions-v0.ts | 2 +- .../describe-user-scram-credentials-v0.ts | 10 ++++--- src/apis/admin/envelope-v0.ts | 2 +- src/apis/admin/expire-delegation-token-v2.ts | 2 +- .../admin/incremental-alter-configs-v1.ts | 5 ++-- src/apis/admin/list-groups-v4.ts | 2 +- src/apis/admin/list-groups-v5.ts | 2 +- .../admin/list-partition-reassignments-v0.ts | 2 +- src/apis/admin/list-transactions-v1.ts | 2 +- src/apis/admin/offset-delete-v0.ts | 4 +-- src/apis/admin/renew-delegation-token-v2.ts | 2 +- src/apis/admin/unregister-broker-v0.ts | 2 +- src/apis/admin/update-features-v1.ts | 7 ++--- .../consumer/consumer-group-heartbeat-v0.ts | 2 +- src/apis/consumer/fetch-v12.ts | 4 +-- src/apis/consumer/fetch-v13.ts | 4 +-- src/apis/consumer/fetch-v14.ts | 4 +-- src/apis/consumer/fetch-v15.ts | 4 +-- src/apis/consumer/fetch-v16.ts | 4 +-- src/apis/consumer/fetch-v17.ts | 4 +-- src/apis/consumer/heartbeat-v4.ts | 2 +- src/apis/consumer/join-group-v9.ts | 2 +- src/apis/consumer/leave-group-v5.ts | 4 +-- src/apis/consumer/list-offsets-v8.ts | 2 +- src/apis/consumer/list-offsets-v9.ts | 2 +- src/apis/consumer/offset-commit-v8.ts | 2 +- src/apis/consumer/offset-commit-v9.ts | 2 +- src/apis/consumer/offset-fetch-v8.ts | 4 +-- src/apis/consumer/offset-fetch-v9.ts | 4 +-- .../consumer/offset-for-leader-epoch-v4.ts | 2 +- src/apis/consumer/sync-group-v5.ts | 2 +- src/apis/definitions.ts | 3 ++- src/apis/metadata/api-versions-v3.ts | 2 +- src/apis/metadata/api-versions-v4.ts | 2 +- src/apis/metadata/find-coordinator-v4.ts | 2 +- src/apis/metadata/find-coordinator-v5.ts | 2 +- src/apis/metadata/find-coordinator-v6.ts | 2 +- src/apis/metadata/metadata-v10.ts | 4 +-- src/apis/metadata/metadata-v11.ts | 4 +-- src/apis/metadata/metadata-v12.ts | 4 +-- src/apis/metadata/metadata-v9.ts | 4 +-- src/apis/producer/add-offsets-to-txn-v4.ts | 2 +- src/apis/producer/add-partitions-to-txn-v5.ts | 4 +-- src/apis/producer/end-txn-v4.ts | 2 +- src/apis/producer/init-producer-id-v4.ts | 2 +- src/apis/producer/init-producer-id-v5.ts | 2 +- src/apis/producer/produce-v10.ts | 27 +++++++++++-------- src/apis/producer/produce-v11.ts | 27 +++++++++++-------- src/apis/producer/produce-v7.ts | 2 +- src/apis/producer/produce-v8.ts | 27 +++++++++++-------- src/apis/producer/produce-v9.ts | 27 +++++++++++-------- src/apis/producer/txn-offset-commit-v4.ts | 2 +- src/apis/security/sasl-authenticate-v2.ts | 2 +- src/apis/security/sasl-handshake-v1.ts | 6 ++--- .../get-telemetry-subscriptions-v0.ts | 2 +- .../list-client-metrics-resources-v0.ts | 2 +- src/apis/telemetry/push-telemetry-v0.ts | 2 +- src/errors.ts | 16 ++++++++--- .../consumer-consumer-group-protocol.test.ts | 2 +- test/clients/consumer/consumer.test.ts | 4 +-- test/clients/producer/producer.test.ts | 2 +- test/errors.test.ts | 12 ++++----- 91 files changed, 221 insertions(+), 177 deletions(-) diff --git a/src/apis/admin/alter-client-quotas-v1.ts b/src/apis/admin/alter-client-quotas-v1.ts index 11a9ef26..8e0b2ad6 100644 --- a/src/apis/admin/alter-client-quotas-v1.ts +++ b/src/apis/admin/alter-client-quotas-v1.ts @@ -106,7 +106,7 @@ export function parseResponse ( } if (entry.errorCode !== 0) { - errors.push([`/entries/${i}`, entry.errorCode]) + errors.push([`/entries/${i}`, [entry.errorCode, entry.errorMessage]]) } return entry diff --git a/src/apis/admin/alter-configs-v2.ts b/src/apis/admin/alter-configs-v2.ts index 66a77ed6..aaf86c3f 100644 --- a/src/apis/admin/alter-configs-v2.ts +++ b/src/apis/admin/alter-configs-v2.ts @@ -73,14 +73,15 @@ export function parseResponse ( throttleTimeMs: reader.readInt32(), responses: reader.readArray((r, i) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString() if (errorCode !== 0) { - errors.push([`/responses/${i}`, errorCode]) + errors.push([`/responses/${i}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(), + errorMessage, resourceType: r.readInt8(), resourceName: r.readString() } diff --git a/src/apis/admin/alter-partition-reassignments-v0.ts b/src/apis/admin/alter-partition-reassignments-v0.ts index 0b9116d0..67f742b6 100644 --- a/src/apis/admin/alter-partition-reassignments-v0.ts +++ b/src/apis/admin/alter-partition-reassignments-v0.ts @@ -76,15 +76,16 @@ export function parseResponse ( const throttleTimeMs = reader.readInt32() const errorCode = reader.readInt16() + const errorMessage = reader.readNullableString() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, errorMessage ?? '']]) } const response: AlterPartitionReassignmentsResponse = { throttleTimeMs, errorCode, - errorMessage: reader.readNullableString(), + errorMessage, responses: reader.readArray((r, i) => { return { name: r.readString(), @@ -96,7 +97,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`responses/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`responses/${i}/partitions/${j}`, [partition.errorCode, partition.errorMessage]]) } return partition diff --git a/src/apis/admin/alter-partition-v3.ts b/src/apis/admin/alter-partition-v3.ts index 30ab0784..01d87def 100644 --- a/src/apis/admin/alter-partition-v3.ts +++ b/src/apis/admin/alter-partition-v3.ts @@ -104,7 +104,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['/', errorCode]) + errors.push(['/', [errorCode, null]]) } const response: AlterPartitionResponse = { @@ -125,7 +125,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/admin/alter-replica-log-dirs-v2.ts b/src/apis/admin/alter-replica-log-dirs-v2.ts index 2ee072f7..ced2cff0 100644 --- a/src/apis/admin/alter-replica-log-dirs-v2.ts +++ b/src/apis/admin/alter-replica-log-dirs-v2.ts @@ -76,7 +76,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/results/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/results/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/admin/alter-user-scram-credentials-v0.ts b/src/apis/admin/alter-user-scram-credentials-v0.ts index b4e33323..9e373c8f 100644 --- a/src/apis/admin/alter-user-scram-credentials-v0.ts +++ b/src/apis/admin/alter-user-scram-credentials-v0.ts @@ -87,7 +87,7 @@ export function parseResponse ( } if (result.errorCode !== 0) { - errors.push([`/results/${i}`, result.errorCode]) + errors.push([`/results/${i}`, [result.errorCode, result.errorMessage]]) } return result diff --git a/src/apis/admin/consumer-group-describe-v0.ts b/src/apis/admin/consumer-group-describe-v0.ts index 25b26d34..6793f47b 100644 --- a/src/apis/admin/consumer-group-describe-v0.ts +++ b/src/apis/admin/consumer-group-describe-v0.ts @@ -101,14 +101,15 @@ export function parseResponse ( throttleTimeMs: reader.readInt32(), groups: reader.readArray((r, i) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString() if (errorCode !== 0) { - errors.push([`/groups/${i}`, errorCode]) + errors.push([`/groups/${i}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(), + errorMessage, groupId: r.readString(), groupState: r.readString(), groupEpoch: r.readInt32(), diff --git a/src/apis/admin/create-acls-v3.ts b/src/apis/admin/create-acls-v3.ts index de0a3c41..35b682e3 100644 --- a/src/apis/admin/create-acls-v3.ts +++ b/src/apis/admin/create-acls-v3.ts @@ -75,7 +75,7 @@ export function parseResponse ( } if (result.errorCode !== 0) { - errors.push([`/results/${i}`, result.errorCode]) + errors.push([`/results/${i}`, [result.errorCode, result.errorMessage]]) } return result diff --git a/src/apis/admin/create-delegation-token-v3.ts b/src/apis/admin/create-delegation-token-v3.ts index cd418708..a892a6c4 100644 --- a/src/apis/admin/create-delegation-token-v3.ts +++ b/src/apis/admin/create-delegation-token-v3.ts @@ -83,7 +83,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/create-partitions-v1.ts b/src/apis/admin/create-partitions-v1.ts index ed2df595..6937d5c0 100644 --- a/src/apis/admin/create-partitions-v1.ts +++ b/src/apis/admin/create-partitions-v1.ts @@ -89,7 +89,7 @@ export function parseResponse ( } if (result.errorCode !== 0) { - errors.push([`/results/${i}`, result.errorCode]) + errors.push([`/results/${i}`, [result.errorCode, result.errorMessage]]) } return result diff --git a/src/apis/admin/create-partitions-v2.ts b/src/apis/admin/create-partitions-v2.ts index 3c5e6c4c..70da5e43 100644 --- a/src/apis/admin/create-partitions-v2.ts +++ b/src/apis/admin/create-partitions-v2.ts @@ -79,7 +79,7 @@ export function parseResponse ( } if (result.errorCode !== 0) { - errors.push([`/results/${i}`, result.errorCode]) + errors.push([`/results/${i}`, [result.errorCode, result.errorMessage]]) } return result diff --git a/src/apis/admin/create-partitions-v3.ts b/src/apis/admin/create-partitions-v3.ts index 7a2f7d03..72e4efe5 100644 --- a/src/apis/admin/create-partitions-v3.ts +++ b/src/apis/admin/create-partitions-v3.ts @@ -79,7 +79,7 @@ export function parseResponse ( } if (result.errorCode !== 0) { - errors.push([`/results/${i}`, result.errorCode]) + errors.push([`/results/${i}`, [result.errorCode, result.errorMessage]]) } return result diff --git a/src/apis/admin/create-topics-v7.ts b/src/apis/admin/create-topics-v7.ts index 0469047d..4b9a3a9a 100644 --- a/src/apis/admin/create-topics-v7.ts +++ b/src/apis/admin/create-topics-v7.ts @@ -132,7 +132,7 @@ export function parseResponse ( } if (topic.errorCode !== 0) { - errors.push([`/topics/${i}`, topic.errorCode]) + errors.push([`/topics/${i}`, [topic.errorCode, topic.errorMessage]]) } return topic diff --git a/src/apis/admin/delete-acls-v3.ts b/src/apis/admin/delete-acls-v3.ts index 99f20099..a2d88175 100644 --- a/src/apis/admin/delete-acls-v3.ts +++ b/src/apis/admin/delete-acls-v3.ts @@ -91,24 +91,26 @@ export function parseResponse ( throttleTimeMs: reader.readInt32(), filterResults: reader.readArray((r, i) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString() if (errorCode !== 0) { - errors.push([`/filter_results/${i}`, errorCode]) + errors.push([`/filter_results/${i}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(), + errorMessage, matchingAcls: r.readArray((r, j) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString() if (errorCode !== 0) { - errors.push([`/filter_results/${i}/matching_acls/${j}`, errorCode]) + errors.push([`/filter_results/${i}/matching_acls/${j}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(), + errorMessage, resourceType: r.readInt8(), resourceName: r.readString(), patternType: r.readInt8(), diff --git a/src/apis/admin/delete-groups-v2.ts b/src/apis/admin/delete-groups-v2.ts index f3786a14..1192bb54 100644 --- a/src/apis/admin/delete-groups-v2.ts +++ b/src/apis/admin/delete-groups-v2.ts @@ -49,7 +49,7 @@ export function parseResponse ( } if (group.errorCode !== 0) { - errors.push([`/results/${i}`, group.errorCode]) + errors.push([`/results/${i}`, [group.errorCode, null]]) } return group diff --git a/src/apis/admin/delete-records-v2.ts b/src/apis/admin/delete-records-v2.ts index 416cdceb..6e11943d 100644 --- a/src/apis/admin/delete-records-v2.ts +++ b/src/apis/admin/delete-records-v2.ts @@ -82,7 +82,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`topics[${i}].partitions[${j}]`, partition.errorCode]) + errors.push([`topics[${i}].partitions[${j}]`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/admin/delete-topics-v6.ts b/src/apis/admin/delete-topics-v6.ts index fb18bb19..0a19fb8e 100644 --- a/src/apis/admin/delete-topics-v6.ts +++ b/src/apis/admin/delete-topics-v6.ts @@ -67,7 +67,7 @@ export function parseResponse ( } if (topicResponse.errorCode !== 0) { - errors.push([`/responses/${i}`, topicResponse.errorCode]) + errors.push([`/responses/${i}`, [topicResponse.errorCode, topicResponse.errorMessage]]) } return topicResponse diff --git a/src/apis/admin/describe-acls-v3.ts b/src/apis/admin/describe-acls-v3.ts index a3d99a22..011b0bfb 100644 --- a/src/apis/admin/describe-acls-v3.ts +++ b/src/apis/admin/describe-acls-v3.ts @@ -99,7 +99,7 @@ export function parseResponse ( } if (response.errorCode) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response) } return response diff --git a/src/apis/admin/describe-client-quotas-v0.ts b/src/apis/admin/describe-client-quotas-v0.ts index b3f4fee5..553b8409 100644 --- a/src/apis/admin/describe-client-quotas-v0.ts +++ b/src/apis/admin/describe-client-quotas-v0.ts @@ -101,7 +101,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response) } return response diff --git a/src/apis/admin/describe-cluster-v1.ts b/src/apis/admin/describe-cluster-v1.ts index 8d9c4924..92f347f5 100644 --- a/src/apis/admin/describe-cluster-v1.ts +++ b/src/apis/admin/describe-cluster-v1.ts @@ -73,7 +73,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response) } return response diff --git a/src/apis/admin/describe-configs-v2.ts b/src/apis/admin/describe-configs-v2.ts index 5d34b73e..c396c0db 100644 --- a/src/apis/admin/describe-configs-v2.ts +++ b/src/apis/admin/describe-configs-v2.ts @@ -105,14 +105,15 @@ export function parseResponse ( results: reader.readArray( (r, i) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString(false) if (errorCode !== 0) { - errors.push([`/results/${i}`, errorCode]) + errors.push([`/results/${i}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(false), + errorMessage, resourceType: r.readInt8(), resourceName: r.readString(false), configs: r.readArray( diff --git a/src/apis/admin/describe-configs-v3.ts b/src/apis/admin/describe-configs-v3.ts index a262b07b..0aade4a3 100644 --- a/src/apis/admin/describe-configs-v3.ts +++ b/src/apis/admin/describe-configs-v3.ts @@ -105,14 +105,15 @@ export function parseResponse ( results: reader.readArray( (r, i) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString(false) if (errorCode !== 0) { - errors.push([`/results/${i}`, errorCode]) + errors.push([`/results/${i}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(false), + errorMessage, resourceType: r.readInt8(), resourceName: r.readString(false), configs: r.readArray( diff --git a/src/apis/admin/describe-configs-v4.ts b/src/apis/admin/describe-configs-v4.ts index 9c56d76c..99d973d8 100644 --- a/src/apis/admin/describe-configs-v4.ts +++ b/src/apis/admin/describe-configs-v4.ts @@ -100,14 +100,15 @@ export function parseResponse ( throttleTimeMs: reader.readInt32(), results: reader.readArray((r, i) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString() if (errorCode !== 0) { - errors.push([`/results/${i}`, errorCode]) + errors.push([`/results/${i}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(), + errorMessage, resourceType: r.readInt8(), resourceName: r.readString(), configs: r.readArray(r => { diff --git a/src/apis/admin/describe-delegation-token-v3.ts b/src/apis/admin/describe-delegation-token-v3.ts index 6963bef3..c0fe2f79 100644 --- a/src/apis/admin/describe-delegation-token-v3.ts +++ b/src/apis/admin/describe-delegation-token-v3.ts @@ -95,7 +95,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/describe-groups-v5.ts b/src/apis/admin/describe-groups-v5.ts index 15a43705..085e8d87 100644 --- a/src/apis/admin/describe-groups-v5.ts +++ b/src/apis/admin/describe-groups-v5.ts @@ -91,7 +91,7 @@ export function parseResponse ( } if (group.errorCode !== 0) { - errors.push([`/groups/${i}`, group.errorCode]) + errors.push([`/groups/${i}`, [group.errorCode, null]]) } return group diff --git a/src/apis/admin/describe-log-dirs-v4.ts b/src/apis/admin/describe-log-dirs-v4.ts index fd2589f1..82e4756d 100644 --- a/src/apis/admin/describe-log-dirs-v4.ts +++ b/src/apis/admin/describe-log-dirs-v4.ts @@ -79,7 +79,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['/', [errorCode, null]]) } const response: DescribeLogDirsResponse = { @@ -89,7 +89,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/results/${i}`, errorCode]) + errors.push([`/results/${i}`, [errorCode, null]]) } return { diff --git a/src/apis/admin/describe-producers-v0.ts b/src/apis/admin/describe-producers-v0.ts index f5a71726..5db820ee 100644 --- a/src/apis/admin/describe-producers-v0.ts +++ b/src/apis/admin/describe-producers-v0.ts @@ -99,7 +99,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/partitions/${i}`, partition.errorCode]) + errors.push([`/partitions/${i}`, [partition.errorCode, partition.errorMessage]]) } return partition diff --git a/src/apis/admin/describe-quorum-v2.ts b/src/apis/admin/describe-quorum-v2.ts index 6c61916a..5381dbe2 100644 --- a/src/apis/admin/describe-quorum-v2.ts +++ b/src/apis/admin/describe-quorum-v2.ts @@ -115,13 +115,14 @@ export function parseResponse ( const errors: ResponseErrorWithLocation[] = [] const errorCode = reader.readInt16() + const errorMessage = reader.readNullableString() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, errorMessage]]) } const response: DescribeQuorumResponse = { errorCode, - errorMessage: reader.readNullableString(), + errorMessage, topics: reader.readArray((r, i) => { return { topicName: r.readString(), @@ -154,7 +155,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, partition.errorMessage]]) } return partition diff --git a/src/apis/admin/describe-topic-partitions-v0.ts b/src/apis/admin/describe-topic-partitions-v0.ts index d5fd5ef6..33fb4734 100644 --- a/src/apis/admin/describe-topic-partitions-v0.ts +++ b/src/apis/admin/describe-topic-partitions-v0.ts @@ -111,7 +111,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}`, errorCode]) + errors.push([`/topics/${i}`, [errorCode, null]]) } return { @@ -123,7 +123,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [errorCode, null]]) } return { diff --git a/src/apis/admin/describe-transactions-v0.ts b/src/apis/admin/describe-transactions-v0.ts index 7f3a27f3..11ac41f2 100644 --- a/src/apis/admin/describe-transactions-v0.ts +++ b/src/apis/admin/describe-transactions-v0.ts @@ -79,7 +79,7 @@ export function parseResponse ( } if (state.errorCode !== 0) { - errors.push([`/transaction_states/${i}`, state.errorCode]) + errors.push([`/transaction_states/${i}`, [state.errorCode, null]]) } return state diff --git a/src/apis/admin/describe-user-scram-credentials-v0.ts b/src/apis/admin/describe-user-scram-credentials-v0.ts index 32a4bced..92b40282 100644 --- a/src/apis/admin/describe-user-scram-credentials-v0.ts +++ b/src/apis/admin/describe-user-scram-credentials-v0.ts @@ -63,27 +63,29 @@ export function parseResponse ( const throttleTimeMs = reader.readInt32() const errorCode = reader.readInt16() + const errorMessage = reader.readNullableString() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, errorMessage]]) } const response: DescribeUserScramCredentialsResponse = { throttleTimeMs, errorCode, - errorMessage: reader.readNullableString(), + errorMessage, results: reader.readArray((r, i) => { const user = r.readString() const errorCode = r.readInt16() + const errorMessage = r.readNullableString() if (errorCode !== 0) { - errors.push([`/results/${i}`, errorCode]) + errors.push([`/results/${i}`, [errorCode, errorMessage]]) } return { user, errorCode, - errorMessage: r.readNullableString(), + errorMessage, credentialInfos: r.readArray(r => { return { mechanism: r.readInt8(), diff --git a/src/apis/admin/envelope-v0.ts b/src/apis/admin/envelope-v0.ts index 0ff93f61..2f20cd14 100644 --- a/src/apis/admin/envelope-v0.ts +++ b/src/apis/admin/envelope-v0.ts @@ -45,7 +45,7 @@ export function parseResponse ( } if (response.errorCode) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/expire-delegation-token-v2.ts b/src/apis/admin/expire-delegation-token-v2.ts index 7bd1b9c9..478b0061 100644 --- a/src/apis/admin/expire-delegation-token-v2.ts +++ b/src/apis/admin/expire-delegation-token-v2.ts @@ -39,7 +39,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/incremental-alter-configs-v1.ts b/src/apis/admin/incremental-alter-configs-v1.ts index cd34044d..59aa3a75 100644 --- a/src/apis/admin/incremental-alter-configs-v1.ts +++ b/src/apis/admin/incremental-alter-configs-v1.ts @@ -75,14 +75,15 @@ export function parseResponse ( throttleTimeMs: reader.readInt32(), responses: reader.readArray((r, i) => { const errorCode = r.readInt16() + const errorMessage = r.readNullableString() if (errorCode !== 0) { - errors.push([`/responses/${i}`, errorCode]) + errors.push([`/responses/${i}`, [errorCode, errorMessage]]) } return { errorCode, - errorMessage: r.readNullableString(), + errorMessage, resourceType: r.readInt8(), resourceName: r.readString() } diff --git a/src/apis/admin/list-groups-v4.ts b/src/apis/admin/list-groups-v4.ts index 8d5c0368..2008019a 100644 --- a/src/apis/admin/list-groups-v4.ts +++ b/src/apis/admin/list-groups-v4.ts @@ -57,7 +57,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/list-groups-v5.ts b/src/apis/admin/list-groups-v5.ts index 8b832b87..5ad0e98e 100644 --- a/src/apis/admin/list-groups-v5.ts +++ b/src/apis/admin/list-groups-v5.ts @@ -61,7 +61,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/list-partition-reassignments-v0.ts b/src/apis/admin/list-partition-reassignments-v0.ts index a9da363f..2e022367 100644 --- a/src/apis/admin/list-partition-reassignments-v0.ts +++ b/src/apis/admin/list-partition-reassignments-v0.ts @@ -85,7 +85,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response) } return response diff --git a/src/apis/admin/list-transactions-v1.ts b/src/apis/admin/list-transactions-v1.ts index 7c66aa8a..3beb39fb 100644 --- a/src/apis/admin/list-transactions-v1.ts +++ b/src/apis/admin/list-transactions-v1.ts @@ -67,7 +67,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/offset-delete-v0.ts b/src/apis/admin/offset-delete-v0.ts index 257d4564..4c282962 100644 --- a/src/apis/admin/offset-delete-v0.ts +++ b/src/apis/admin/offset-delete-v0.ts @@ -72,7 +72,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, null]]) } const response: OffsetDeleteResponse = { @@ -88,7 +88,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/admin/renew-delegation-token-v2.ts b/src/apis/admin/renew-delegation-token-v2.ts index 0201e30f..a9fc96ea 100644 --- a/src/apis/admin/renew-delegation-token-v2.ts +++ b/src/apis/admin/renew-delegation-token-v2.ts @@ -39,7 +39,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/admin/unregister-broker-v0.ts b/src/apis/admin/unregister-broker-v0.ts index 38817d32..d50df12e 100644 --- a/src/apis/admin/unregister-broker-v0.ts +++ b/src/apis/admin/unregister-broker-v0.ts @@ -39,7 +39,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response) } return response diff --git a/src/apis/admin/update-features-v1.ts b/src/apis/admin/update-features-v1.ts index a1832ca7..8bebe008 100644 --- a/src/apis/admin/update-features-v1.ts +++ b/src/apis/admin/update-features-v1.ts @@ -68,15 +68,16 @@ export function parseResponse ( const throttleTimeMs = reader.readInt32() const errorCode = reader.readInt16() + const errorMessage = reader.readNullableString() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, errorMessage]]) } const response: UpdateFeaturesResponse = { throttleTimeMs, errorCode, - errorMessage: reader.readNullableString(), + errorMessage, results: reader.readArray((r, i) => { const result = { feature: r.readString(), @@ -85,7 +86,7 @@ export function parseResponse ( } if (result.errorCode !== 0) { - errors.push([`/results/${i}`, result.errorCode]) + errors.push([`/results/${i}`, [result.errorCode, result.errorMessage]]) } return result diff --git a/src/apis/consumer/consumer-group-heartbeat-v0.ts b/src/apis/consumer/consumer-group-heartbeat-v0.ts index 7199b6c8..13c21579 100644 --- a/src/apis/consumer/consumer-group-heartbeat-v0.ts +++ b/src/apis/consumer/consumer-group-heartbeat-v0.ts @@ -114,7 +114,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response) } return response diff --git a/src/apis/consumer/fetch-v12.ts b/src/apis/consumer/fetch-v12.ts index ba90e119..2812fdf1 100644 --- a/src/apis/consumer/fetch-v12.ts +++ b/src/apis/consumer/fetch-v12.ts @@ -150,7 +150,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['/', [errorCode, null]]) } const response: FetchResponse = { @@ -177,7 +177,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/responses/${i}/partitions/${j}`, [partition.errorCode, null]]) } // We need to reduce the size by one to follow the COMPACT_RECORDS specification. diff --git a/src/apis/consumer/fetch-v13.ts b/src/apis/consumer/fetch-v13.ts index d7c67842..a2ef49d9 100644 --- a/src/apis/consumer/fetch-v13.ts +++ b/src/apis/consumer/fetch-v13.ts @@ -150,7 +150,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['/', [errorCode, null]]) } const response: FetchResponse = { @@ -177,7 +177,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/responses/${i}/partitions/${j}`, [partition.errorCode, null]]) } // We need to reduce the size by one to follow the COMPACT_RECORDS specification. diff --git a/src/apis/consumer/fetch-v14.ts b/src/apis/consumer/fetch-v14.ts index a1fd1c54..1ea0e1f1 100644 --- a/src/apis/consumer/fetch-v14.ts +++ b/src/apis/consumer/fetch-v14.ts @@ -150,7 +150,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['/', [errorCode, null]]) } const response: FetchResponse = { @@ -177,7 +177,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/responses/${i}/partitions/${j}`, [partition.errorCode, null]]) } // We need to reduce the size by one to follow the COMPACT_RECORDS specification. diff --git a/src/apis/consumer/fetch-v15.ts b/src/apis/consumer/fetch-v15.ts index 9bbd0890..45cf22a8 100644 --- a/src/apis/consumer/fetch-v15.ts +++ b/src/apis/consumer/fetch-v15.ts @@ -148,7 +148,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, null]]) } const response: FetchResponse = { @@ -175,7 +175,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/responses/${i}/partitions/${j}`, [partition.errorCode, null]]) } // We need to reduce the size by one to follow the COMPACT_RECORDS specification. diff --git a/src/apis/consumer/fetch-v16.ts b/src/apis/consumer/fetch-v16.ts index 629af3be..20ce6155 100644 --- a/src/apis/consumer/fetch-v16.ts +++ b/src/apis/consumer/fetch-v16.ts @@ -148,7 +148,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, null]]) } const response: FetchResponse = { @@ -175,7 +175,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/responses/${i}/partitions/${j}`, [partition.errorCode, null]]) } // We need to reduce the size by one to follow the COMPACT_RECORDS specification. diff --git a/src/apis/consumer/fetch-v17.ts b/src/apis/consumer/fetch-v17.ts index 3f21209c..e9cad220 100644 --- a/src/apis/consumer/fetch-v17.ts +++ b/src/apis/consumer/fetch-v17.ts @@ -148,7 +148,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, null]]) } const response: FetchResponse = { @@ -175,7 +175,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/responses/${i}/partitions/${j}`, [partition.errorCode, null]]) } // We need to reduce the size by one to follow the COMPACT_RECORDS specification. diff --git a/src/apis/consumer/heartbeat-v4.ts b/src/apis/consumer/heartbeat-v4.ts index 891f4509..0a5e0f1d 100644 --- a/src/apis/consumer/heartbeat-v4.ts +++ b/src/apis/consumer/heartbeat-v4.ts @@ -49,7 +49,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/consumer/join-group-v9.ts b/src/apis/consumer/join-group-v9.ts index 1b312b77..a77f948d 100644 --- a/src/apis/consumer/join-group-v9.ts +++ b/src/apis/consumer/join-group-v9.ts @@ -107,7 +107,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/consumer/leave-group-v5.ts b/src/apis/consumer/leave-group-v5.ts index 97254d52..bfcc47ba 100644 --- a/src/apis/consumer/leave-group-v5.ts +++ b/src/apis/consumer/leave-group-v5.ts @@ -63,7 +63,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, null]]) } const response: LeaveGroupResponse = { @@ -77,7 +77,7 @@ export function parseResponse ( } if (member.errorCode !== 0) { - errors.push([`/members/${i}`, member.errorCode]) + errors.push([`/members/${i}`, [member.errorCode, null]]) } return member diff --git a/src/apis/consumer/list-offsets-v8.ts b/src/apis/consumer/list-offsets-v8.ts index 000c55d5..9623a996 100644 --- a/src/apis/consumer/list-offsets-v8.ts +++ b/src/apis/consumer/list-offsets-v8.ts @@ -94,7 +94,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/consumer/list-offsets-v9.ts b/src/apis/consumer/list-offsets-v9.ts index 31dff541..ed4e07e4 100644 --- a/src/apis/consumer/list-offsets-v9.ts +++ b/src/apis/consumer/list-offsets-v9.ts @@ -94,7 +94,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/consumer/offset-commit-v8.ts b/src/apis/consumer/offset-commit-v8.ts index 60bebf33..2971a5cc 100644 --- a/src/apis/consumer/offset-commit-v8.ts +++ b/src/apis/consumer/offset-commit-v8.ts @@ -99,7 +99,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/consumer/offset-commit-v9.ts b/src/apis/consumer/offset-commit-v9.ts index 1f2cc216..7d02925c 100644 --- a/src/apis/consumer/offset-commit-v9.ts +++ b/src/apis/consumer/offset-commit-v9.ts @@ -99,7 +99,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/consumer/offset-fetch-v8.ts b/src/apis/consumer/offset-fetch-v8.ts index 1a8abeda..e0217339 100644 --- a/src/apis/consumer/offset-fetch-v8.ts +++ b/src/apis/consumer/offset-fetch-v8.ts @@ -105,7 +105,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/groups/${i}/topics/${j}/partitions/${k}`, partition.errorCode]) + errors.push([`/groups/${i}/topics/${j}/partitions/${k}`, [partition.errorCode, null]]) } return partition @@ -116,7 +116,7 @@ export function parseResponse ( } if (group.errorCode !== 0) { - errors.push([`/groups/${i}`, group.errorCode]) + errors.push([`/groups/${i}`, [group.errorCode, null]]) } return group diff --git a/src/apis/consumer/offset-fetch-v9.ts b/src/apis/consumer/offset-fetch-v9.ts index 3c38c169..03c81064 100644 --- a/src/apis/consumer/offset-fetch-v9.ts +++ b/src/apis/consumer/offset-fetch-v9.ts @@ -108,7 +108,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/groups/${i}/topics/${j}/partitions/${k}`, partition.errorCode]) + errors.push([`/groups/${i}/topics/${j}/partitions/${k}`, [partition.errorCode, null]]) } return partition @@ -119,7 +119,7 @@ export function parseResponse ( } if (group.errorCode !== 0) { - errors.push([`/groups/${i}`, group.errorCode]) + errors.push([`/groups/${i}`, [group.errorCode, null]]) } return group diff --git a/src/apis/consumer/offset-for-leader-epoch-v4.ts b/src/apis/consumer/offset-for-leader-epoch-v4.ts index 3c918799..7bffeef2 100644 --- a/src/apis/consumer/offset-for-leader-epoch-v4.ts +++ b/src/apis/consumer/offset-for-leader-epoch-v4.ts @@ -85,7 +85,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [errorCode, null]]) } return { diff --git a/src/apis/consumer/sync-group-v5.ts b/src/apis/consumer/sync-group-v5.ts index 7123da0a..77a362c9 100644 --- a/src/apis/consumer/sync-group-v5.ts +++ b/src/apis/consumer/sync-group-v5.ts @@ -75,7 +75,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/definitions.ts b/src/apis/definitions.ts index 6efc3c81..fc31ed74 100644 --- a/src/apis/definitions.ts +++ b/src/apis/definitions.ts @@ -2,6 +2,7 @@ import { promisify } from 'node:util' import { type Connection } from '../network/connection.ts' import { type Reader } from '../protocol/reader.ts' import { type Writer } from '../protocol/writer.ts' +import { type NullableString } from '../protocol/definitions.ts' export type Callback = (error: Error | null, payload: ReturnType) => void @@ -16,7 +17,7 @@ export type ResponseParser = ( reader: Reader ) => ReturnType | Promise -export type ResponseErrorWithLocation = [string, number] +export type ResponseErrorWithLocation = [string, [number, NullableString]] export type APIWithCallback, ResponseType> = ( connection: Connection, diff --git a/src/apis/metadata/api-versions-v3.ts b/src/apis/metadata/api-versions-v3.ts index d50cb353..55b79b06 100644 --- a/src/apis/metadata/api-versions-v3.ts +++ b/src/apis/metadata/api-versions-v3.ts @@ -59,7 +59,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/metadata/api-versions-v4.ts b/src/apis/metadata/api-versions-v4.ts index c85369d6..8c3006b0 100644 --- a/src/apis/metadata/api-versions-v4.ts +++ b/src/apis/metadata/api-versions-v4.ts @@ -59,7 +59,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/metadata/find-coordinator-v4.ts b/src/apis/metadata/find-coordinator-v4.ts index fe51cce1..ab42639d 100644 --- a/src/apis/metadata/find-coordinator-v4.ts +++ b/src/apis/metadata/find-coordinator-v4.ts @@ -64,7 +64,7 @@ export function parseResponse ( } if (coordinator.errorCode !== 0) { - errors.push([`/coordinators/${i}`, coordinator.errorCode]) + errors.push([`/coordinators/${i}`, [coordinator.errorCode, coordinator.errorMessage]]) } return coordinator diff --git a/src/apis/metadata/find-coordinator-v5.ts b/src/apis/metadata/find-coordinator-v5.ts index 0f5366d3..bfc16099 100644 --- a/src/apis/metadata/find-coordinator-v5.ts +++ b/src/apis/metadata/find-coordinator-v5.ts @@ -64,7 +64,7 @@ export function parseResponse ( } if (coordinator.errorCode !== 0) { - errors.push([`/coordinators/${i}`, coordinator.errorCode]) + errors.push([`/coordinators/${i}`, [coordinator.errorCode, coordinator.errorMessage]]) } return coordinator diff --git a/src/apis/metadata/find-coordinator-v6.ts b/src/apis/metadata/find-coordinator-v6.ts index e400dac5..3b219f54 100644 --- a/src/apis/metadata/find-coordinator-v6.ts +++ b/src/apis/metadata/find-coordinator-v6.ts @@ -64,7 +64,7 @@ export function parseResponse ( } if (coordinator.errorCode !== 0) { - errors.push([`/coordinators/${i}`, coordinator.errorCode]) + errors.push([`/coordinators/${i}`, [coordinator.errorCode, coordinator.errorMessage]]) } return coordinator diff --git a/src/apis/metadata/metadata-v10.ts b/src/apis/metadata/metadata-v10.ts index 8a7525c6..915058a4 100644 --- a/src/apis/metadata/metadata-v10.ts +++ b/src/apis/metadata/metadata-v10.ts @@ -112,7 +112,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}`, errorCode]) + errors.push([`/topics/${i}`, [errorCode, null]]) } return { @@ -124,7 +124,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [errorCode, null]]) } return { diff --git a/src/apis/metadata/metadata-v11.ts b/src/apis/metadata/metadata-v11.ts index a1ec02ef..3d7fc76c 100644 --- a/src/apis/metadata/metadata-v11.ts +++ b/src/apis/metadata/metadata-v11.ts @@ -112,7 +112,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}`, errorCode]) + errors.push([`/topics/${i}`, [errorCode, null]]) } return { @@ -124,7 +124,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [errorCode, null]]) } return { diff --git a/src/apis/metadata/metadata-v12.ts b/src/apis/metadata/metadata-v12.ts index db6347c0..c1f20928 100644 --- a/src/apis/metadata/metadata-v12.ts +++ b/src/apis/metadata/metadata-v12.ts @@ -109,7 +109,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}`, errorCode]) + errors.push([`/topics/${i}`, [errorCode, null]]) } return { @@ -121,7 +121,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [errorCode, null]]) } return { diff --git a/src/apis/metadata/metadata-v9.ts b/src/apis/metadata/metadata-v9.ts index 243cef7f..0161eb35 100644 --- a/src/apis/metadata/metadata-v9.ts +++ b/src/apis/metadata/metadata-v9.ts @@ -111,7 +111,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}`, errorCode]) + errors.push([`/topics/${i}`, [errorCode, null]]) } const name = r.readNullableString() @@ -125,7 +125,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [errorCode, null]]) } return { diff --git a/src/apis/producer/add-offsets-to-txn-v4.ts b/src/apis/producer/add-offsets-to-txn-v4.ts index 7ff903c1..c7c297a4 100644 --- a/src/apis/producer/add-offsets-to-txn-v4.ts +++ b/src/apis/producer/add-offsets-to-txn-v4.ts @@ -48,7 +48,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/producer/add-partitions-to-txn-v5.ts b/src/apis/producer/add-partitions-to-txn-v5.ts index 34159bbd..ad8e27a8 100644 --- a/src/apis/producer/add-partitions-to-txn-v5.ts +++ b/src/apis/producer/add-partitions-to-txn-v5.ts @@ -93,7 +93,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, null]]) } const response: AddPartitionsToTxnResponse = { @@ -114,7 +114,7 @@ export function parseResponse ( if (partition.partitionErrorCode !== 0) { errors.push([ `/results_by_transaction/${i}/topic_results/${j}/results_by_partitions/${k}`, - partition.partitionErrorCode + [partition.partitionErrorCode, null] ]) } diff --git a/src/apis/producer/end-txn-v4.ts b/src/apis/producer/end-txn-v4.ts index a44cdd31..9cb45ff0 100644 --- a/src/apis/producer/end-txn-v4.ts +++ b/src/apis/producer/end-txn-v4.ts @@ -48,7 +48,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/producer/init-producer-id-v4.ts b/src/apis/producer/init-producer-id-v4.ts index 7339df6e..bfa4987c 100644 --- a/src/apis/producer/init-producer-id-v4.ts +++ b/src/apis/producer/init-producer-id-v4.ts @@ -64,7 +64,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/producer/init-producer-id-v5.ts b/src/apis/producer/init-producer-id-v5.ts index ff503216..8e3efe60 100644 --- a/src/apis/producer/init-producer-id-v5.ts +++ b/src/apis/producer/init-producer-id-v5.ts @@ -64,7 +64,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/producer/produce-v10.ts b/src/apis/producer/produce-v10.ts index 9a6ca5aa..6d45ab57 100644 --- a/src/apis/producer/produce-v10.ts +++ b/src/apis/producer/produce-v10.ts @@ -117,16 +117,9 @@ export function parseResponse ( const topicResponse = { name: r.readString(), partitionResponses: r.readArray((r, j) => { - const index = r.readInt32() - const errorCode = r.readInt16() - - if (errorCode !== 0) { - errors.push([`/responses/${i}/partition_responses/${j}`, errorCode]) - } - - return { - index, - errorCode, + const partitionResponse = { + index: r.readInt32(), + errorCode: r.readInt16(), baseOffset: r.readInt64(), logAppendTimeMs: r.readInt64(), logStartOffset: r.readInt64(), @@ -137,13 +130,25 @@ export function parseResponse ( } if (recordError.batchIndexErrorMessage) { - errors.push([`/responses/${i}/partition_responses/${j}/record_errors/${k}`, -1]) + errors.push([ + `/responses/${i}/partition_responses/${j}/record_errors/${k}`, + [-1, recordError.batchIndexErrorMessage] + ]) } return recordError }), errorMessage: r.readNullableString() } + + if (partitionResponse.errorCode !== 0) { + errors.push([ + `/responses/${i}/partition_responses/${j}`, + [partitionResponse.errorCode, partitionResponse.errorMessage] + ]) + } + + return partitionResponse }) } diff --git a/src/apis/producer/produce-v11.ts b/src/apis/producer/produce-v11.ts index b5330924..d8946e79 100644 --- a/src/apis/producer/produce-v11.ts +++ b/src/apis/producer/produce-v11.ts @@ -117,16 +117,9 @@ export function parseResponse ( const topicResponse = { name: r.readString(), partitionResponses: r.readArray((r, j) => { - const index = r.readInt32() - const errorCode = r.readInt16() - - if (errorCode !== 0) { - errors.push([`/responses/${i}/partition_responses/${j}`, errorCode]) - } - - return { - index, - errorCode, + const partitionResponse = { + index: r.readInt32(), + errorCode: r.readInt16(), baseOffset: r.readInt64(), logAppendTimeMs: r.readInt64(), logStartOffset: r.readInt64(), @@ -137,13 +130,25 @@ export function parseResponse ( } if (recordError.batchIndexErrorMessage) { - errors.push([`/responses/${i}/partition_responses/${j}/record_errors/${k}`, -1]) + errors.push([ + `/responses/${i}/partition_responses/${j}/record_errors/${k}`, + [-1, recordError.batchIndexErrorMessage] + ]) } return recordError }), errorMessage: r.readNullableString() } + + if (partitionResponse.errorCode !== 0) { + errors.push([ + `/responses/${i}/partition_responses/${j}`, + [partitionResponse.errorCode, partitionResponse.errorMessage] + ]) + } + + return partitionResponse }) } diff --git a/src/apis/producer/produce-v7.ts b/src/apis/producer/produce-v7.ts index c4810221..f3bcb34c 100644 --- a/src/apis/producer/produce-v7.ts +++ b/src/apis/producer/produce-v7.ts @@ -123,7 +123,7 @@ export function parseResponse ( const errorCode = r.readInt16() if (errorCode !== 0) { - errors.push([`/responses/${i}/partition_responses/${j}`, errorCode]) + errors.push([`/responses/${i}/partition_responses/${j}`, [errorCode, null]]) } return { diff --git a/src/apis/producer/produce-v8.ts b/src/apis/producer/produce-v8.ts index 38b6fd70..9e724a75 100644 --- a/src/apis/producer/produce-v8.ts +++ b/src/apis/producer/produce-v8.ts @@ -123,16 +123,9 @@ export function parseResponse ( name: r.readString(false), partitionResponses: r.readArray( (r, j) => { - const index = r.readInt32() - const errorCode = r.readInt16() - - if (errorCode !== 0) { - errors.push([`/responses/${i}/partition_responses/${j}`, errorCode]) - } - - return { - index, - errorCode, + const partitionResponse: ProduceResponsePartition = { + index: r.readInt32(), + errorCode: r.readInt16(), baseOffset: r.readInt64(), logAppendTimeMs: r.readInt64(), logStartOffset: r.readInt64(), @@ -144,7 +137,10 @@ export function parseResponse ( } if (recordError.batchIndexErrorMessage) { - errors.push([`/responses/${i}/partition_responses/${j}/record_errors/${k}`, -1]) + errors.push([ + `/responses/${i}/partition_responses/${j}/record_errors/${k}`, + [-1, recordError.batchIndexErrorMessage] + ]) } return recordError @@ -154,6 +150,15 @@ export function parseResponse ( ), errorMessage: r.readNullableString(false) } + + if (partitionResponse.errorCode !== 0) { + errors.push([ + `/responses/${i}/partition_responses/${j}`, + [partitionResponse.errorCode, partitionResponse.errorMessage] + ]) + } + + return partitionResponse }, false, false diff --git a/src/apis/producer/produce-v9.ts b/src/apis/producer/produce-v9.ts index b85f09c7..16a3ff1c 100644 --- a/src/apis/producer/produce-v9.ts +++ b/src/apis/producer/produce-v9.ts @@ -117,16 +117,9 @@ export function parseResponse ( const topicResponse = { name: r.readString(), partitionResponses: r.readArray((r, j) => { - const index = r.readInt32() - const errorCode = r.readInt16() - - if (errorCode !== 0) { - errors.push([`/responses/${i}/partition_responses/${j}`, errorCode]) - } - - return { - index, - errorCode, + const partitionResponse = { + index: r.readInt32(), + errorCode: r.readInt16(), baseOffset: r.readInt64(), logAppendTimeMs: r.readInt64(), logStartOffset: r.readInt64(), @@ -137,13 +130,25 @@ export function parseResponse ( } if (recordError.batchIndexErrorMessage) { - errors.push([`/responses/${i}/partition_responses/${j}/record_errors/${k}`, -1]) + errors.push([ + `/responses/${i}/partition_responses/${j}/record_errors/${k}`, + [-1, recordError.batchIndexErrorMessage] + ]) } return recordError }), errorMessage: r.readNullableString() } + + if (partitionResponse.errorCode !== 0) { + errors.push([ + `/responses/${i}/partition_responses/${j}`, + [partitionResponse.errorCode, partitionResponse.errorMessage] + ]) + } + + return partitionResponse }) } diff --git a/src/apis/producer/txn-offset-commit-v4.ts b/src/apis/producer/txn-offset-commit-v4.ts index 29d3b6fa..31df6493 100644 --- a/src/apis/producer/txn-offset-commit-v4.ts +++ b/src/apis/producer/txn-offset-commit-v4.ts @@ -108,7 +108,7 @@ export function parseResponse ( } if (partition.errorCode !== 0) { - errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode]) + errors.push([`/topics/${i}/partitions/${j}`, [partition.errorCode, null]]) } return partition diff --git a/src/apis/security/sasl-authenticate-v2.ts b/src/apis/security/sasl-authenticate-v2.ts index 50b4d71d..c4eff984 100644 --- a/src/apis/security/sasl-authenticate-v2.ts +++ b/src/apis/security/sasl-authenticate-v2.ts @@ -44,7 +44,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response) } return response diff --git a/src/apis/security/sasl-handshake-v1.ts b/src/apis/security/sasl-handshake-v1.ts index e597e731..0fdc61e2 100644 --- a/src/apis/security/sasl-handshake-v1.ts +++ b/src/apis/security/sasl-handshake-v1.ts @@ -6,8 +6,8 @@ import { createAPI } from '../definitions.ts' export type SaslHandshakeRequest = Parameters export interface SaslHandshakeResponse { - errorCode?: number - mechanisms?: string[] + errorCode: number + mechanisms: string[] } /* @@ -41,7 +41,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode! }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/telemetry/get-telemetry-subscriptions-v0.ts b/src/apis/telemetry/get-telemetry-subscriptions-v0.ts index 2fcdc8ce..5fefa292 100644 --- a/src/apis/telemetry/get-telemetry-subscriptions-v0.ts +++ b/src/apis/telemetry/get-telemetry-subscriptions-v0.ts @@ -50,7 +50,7 @@ export function parseResponse ( const errorCode = reader.readInt16() if (errorCode !== 0) { - errors.push(['', errorCode]) + errors.push(['', [errorCode, null]]) } const response: GetTelemetrySubscriptionsResponse = { diff --git a/src/apis/telemetry/list-client-metrics-resources-v0.ts b/src/apis/telemetry/list-client-metrics-resources-v0.ts index 063423e8..408889db 100644 --- a/src/apis/telemetry/list-client-metrics-resources-v0.ts +++ b/src/apis/telemetry/list-client-metrics-resources-v0.ts @@ -46,7 +46,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/apis/telemetry/push-telemetry-v0.ts b/src/apis/telemetry/push-telemetry-v0.ts index 4ca30cb6..966d44ea 100644 --- a/src/apis/telemetry/push-telemetry-v0.ts +++ b/src/apis/telemetry/push-telemetry-v0.ts @@ -51,7 +51,7 @@ export function parseResponse ( } if (response.errorCode !== 0) { - throw new ResponseError(apiKey, apiVersion, { '': response.errorCode }, response) + throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, null] }, response) } return response diff --git a/src/errors.ts b/src/errors.ts index 7b732c30..b378cd8e 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,5 +1,6 @@ import { type JoinGroupResponse } from './apis/consumer/join-group-v9.ts' import { protocolAPIsById } from './protocol/apis.ts' +import { type NullableString } from './protocol/definitions.ts' import { protocolErrors, protocolErrorsCodesById } from './protocol/errors.ts' const kGenericError = Symbol('plt.kafka.genericError') @@ -146,13 +147,19 @@ export class NetworkError extends GenericError { export class ProtocolError extends GenericError { static code: ErrorCode = 'PLT_KFK_PROTOCOL' - constructor (codeOrId: string | number, properties: ErrorProperties = {}, response: unknown = undefined) { + constructor ( + codeOrId: string | number, + serverErrorMessage: NullableString = null, + properties: ErrorProperties = {}, + response: unknown = undefined + ) { const { id, code, message, canRetry } = protocolErrors[typeof codeOrId === 'number' ? protocolErrorsCodesById[codeOrId] : codeOrId] super(ProtocolError.code, message, { apiId: id, apiCode: code, + serverErrorMessage, canRetry, hasStaleMetadata: ['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_OR_FOLLOWER'].includes(id), needsRejoin: ['MEMBER_ID_REQUIRED', 'UNKNOWN_MEMBER_ID', 'REBALANCE_IN_PROGRESS'].includes(id), @@ -178,13 +185,16 @@ export class ResponseError extends MultipleErrors { constructor ( apiName: number, apiVersion: number, - errors: Record, + errors: Record, response: unknown, properties: ErrorProperties = {} ) { super( `Received response with error while executing API ${protocolAPIsById[apiName]}(v${apiVersion})`, - Object.entries(errors).map(([path, errorCode]) => new ProtocolError(errorCode as number, { path }, response)), + Object.entries(errors).map( + ([path, [errorCode, detailedErrorMessage]]) => + new ProtocolError(errorCode as number, detailedErrorMessage, { path }, response) + ), { ...properties, response diff --git a/test/clients/consumer/consumer-consumer-group-protocol.test.ts b/test/clients/consumer/consumer-consumer-group-protocol.test.ts index 770c250c..236db41f 100644 --- a/test/clients/consumer/consumer-consumer-group-protocol.test.ts +++ b/test/clients/consumer/consumer-consumer-group-protocol.test.ts @@ -121,7 +121,7 @@ test('consumer should handle fenced member epoch error', skipConsumerGroupProtoc new ResponseError( consumerGroupHeartbeatV0.api.key, 0, - { '': 110 }, + { '/': [110, null] }, { throttleTimeMs: 0, errorCode: 110, diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index d71bca37..94f0cdb6 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -3068,7 +3068,7 @@ test('joinGroup should cancel when membership has been cancelled during rejoin ( mockAPI( consumer[kConnections], syncGroupV5.api.key, - new ProtocolError('REBALANCE_IN_PROGRESS', { cancelMembership: true }) + new ProtocolError('REBALANCE_IN_PROGRESS', null, { cancelMembership: true }) ) deepStrictEqual(await consumer.joinGroup({}), undefined) @@ -3080,7 +3080,7 @@ test('joinGroup should cancel when membership has been cancelled during rejoin ( mockAPI( consumer[kConnections], syncGroupV5.api.key, - new ProtocolError('UNKNOWN_MEMBER_ID', { cancelMembership: true }) + new ProtocolError('UNKNOWN_MEMBER_ID', null, { cancelMembership: true }) ) deepStrictEqual(await consumer.joinGroup({}), undefined) diff --git a/test/clients/producer/producer.test.ts b/test/clients/producer/producer.test.ts index e11af3df..531587a3 100644 --- a/test/clients/producer/producer.test.ts +++ b/test/clients/producer/producer.test.ts @@ -997,7 +997,7 @@ test('send should repeat the operation in case of stale metadata', async t => { mockAPI( producer[kConnections], produceV11.api.key, - new ProtocolError('UNKNOWN_TOPIC_OR_PARTITION', { topic: testTopic }) + new ProtocolError('UNKNOWN_TOPIC_OR_PARTITION', null, { topic: testTopic }) ) await producer.send({ diff --git a/test/errors.test.ts b/test/errors.test.ts index e09c6f0c..2517c6a0 100644 --- a/test/errors.test.ts +++ b/test/errors.test.ts @@ -152,7 +152,7 @@ test('NetworkError', () => { }) test('ProtocolError with string code', () => { - const error = new ProtocolError('UNKNOWN_TOPIC_OR_PARTITION', { topic: 'test-topic' }) + const error = new ProtocolError('UNKNOWN_TOPIC_OR_PARTITION', null, { topic: 'test-topic' }) deepStrictEqual(error.code, 'PLT_KFK_PROTOCOL') deepStrictEqual(error.message, 'This server does not host this topic-partition.') deepStrictEqual(error.apiId, 'UNKNOWN_TOPIC_OR_PARTITION') @@ -163,7 +163,7 @@ test('ProtocolError with string code', () => { }) test('ProtocolError with numeric code', () => { - const error = new ProtocolError(3, { partition: 1 }) + const error = new ProtocolError(3, null, { partition: 1 }) deepStrictEqual(error.code, 'PLT_KFK_PROTOCOL') deepStrictEqual(error.message, 'This server does not host this topic-partition.') deepStrictEqual(error.apiId, 'UNKNOWN_TOPIC_OR_PARTITION') @@ -175,7 +175,7 @@ test('ProtocolError with numeric code', () => { test('ProtocolError with response containing memberId', () => { const response = { memberId: 'test-member-id' } - const error = new ProtocolError('REBALANCE_IN_PROGRESS', {}, response) + const error = new ProtocolError('REBALANCE_IN_PROGRESS', null, {}, response) deepStrictEqual(error.code, 'PLT_KFK_PROTOCOL') deepStrictEqual(error.apiId, 'REBALANCE_IN_PROGRESS') ok(error.rebalanceInProgress) @@ -186,9 +186,9 @@ test('ProtocolError with response containing memberId', () => { test('ResponseError', () => { const apiName = 3 // Metadata const apiVersion = 1 - const errors = { - 'topics[0]': 3, // UNKNOWN_TOPIC_OR_PARTITION - 'topics[1]': 5 // LEADER_NOT_AVAILABLE + const errors: Record = { + 'topics[0]': [3, null], // UNKNOWN_TOPIC_OR_PARTITION + 'topics[1]': [5, null] // LEADER_NOT_AVAILABLE } const response = { topics: ['topic1', 'topic2'] }