From cf623d81524e1e21a235037c410d97d7c5d1e4a4 Mon Sep 17 00:00:00 2001 From: Peter Lehnhardt Date: Fri, 31 Oct 2025 10:07:08 +0100 Subject: [PATCH] Do not deduplicate topic creations and deletions As of now creating and deleting multiple topics in a row will ignore most of the topics except the first one. This is due to deduplication that is done based on operation alone without considering the target topics. This change incorporates topic data into the deduplication to avoid the aforementioned issue. on-behalf-of: @SAP ospo@sap.com --- src/clients/admin/admin.ts | 4 +-- test/clients/admin/admin.test.ts | 56 ++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/clients/admin/admin.ts b/src/clients/admin/admin.ts index 1bddabdf..53849540 100644 --- a/src/clients/admin/admin.ts +++ b/src/clients/admin/admin.ts @@ -465,7 +465,7 @@ export class Admin extends Base { } this[kPerformDeduplicated]( - 'createTopics', + `createTopics-${options.topics.join(',')}`, deduplicateCallback => { this[kPerformWithRetry]( 'createTopics', @@ -527,7 +527,7 @@ export class Admin extends Base { #deleteTopics (options: DeleteTopicsOptions, callback: CallbackWithPromise): void { this[kPerformDeduplicated]( - 'deleteTopics', + `deleteTopics-${options.topics.join(',')}`, deduplicateCallback => { this[kPerformWithRetry]( 'deleteTopics', diff --git a/test/clients/admin/admin.test.ts b/test/clients/admin/admin.test.ts index 65287791..f8975d3c 100644 --- a/test/clients/admin/admin.test.ts +++ b/test/clients/admin/admin.test.ts @@ -458,6 +458,25 @@ test('createTopics using assignments', async t => { await admin.deleteTopics({ topics: [topicName] }) }) +test('createTopics should not deduplicate creation of different topics', async t => { + const admin = createAdmin(t) + + const topicNames = [`test-topic-${randomUUID()}`, `test-topic-${randomUUID()}`] + + await Promise.all( + topicNames.map(topicName => + admin.createTopics({ + topics: [topicName] + })) + ) + + const topicMetadata = await admin.metadata({ topics: topicNames }) + strictEqual(topicMetadata.topics.has(topicNames[0]), true) + strictEqual(topicMetadata.topics.has(topicNames[1]), true) + + await admin.deleteTopics({ topics: topicNames }) +}) + test('createTopics should validate options in strict mode', async t => { const admin = createAdmin(t, { strict: true }) @@ -617,6 +636,43 @@ test('deleteTopics should delete a topic and support diagnostic channels', async await admin.deleteTopics({ topics: [topicName] }) }) +test('deleteTopics should not deduplicate deletion of different topics', async t => { + const admin = createAdmin(t) + + const topicNames = [`test-topic-${randomUUID()}`, `test-topic-${randomUUID()}`] + + admin.createTopics({ topics: topicNames }) + + const topicMetadata = await admin.metadata({ topics: topicNames }) + strictEqual(topicMetadata.topics.has(topicNames[0]), true) + strictEqual(topicMetadata.topics.has(topicNames[1]), true) + + await Promise.all( + topicNames.map(topicName => + admin.deleteTopics({ + topics: [topicName] + })) + ) + + // Deletion needs some time to propagate, retry a few times + await retry(15, 500, async () => { + try { + await admin.metadata({ topics: [topicNames[0]] }) + throw Error('Topic still exists: ' + topicNames[0]) + } catch (error) { + // ApiCode 3 = UnknownTopicOrPartition + ok(error.findBy?.('apiCode', 3)) + } + try { + await admin.metadata({ topics: [topicNames[1]] }) + throw Error('Topic still exists: ' + topicNames[1]) + } catch (error) { + // ApiCode 3 = UnknownTopicOrPartition + ok(error.findBy?.('apiCode', 3)) + } + }) +}) + test('deleteTopics should validate options in strict mode', async t => { const admin = createAdmin(t, { strict: true })