Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/clients/admin/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ export class Admin extends Base<AdminOptions> {
}

this[kPerformDeduplicated](
'createTopics',
`createTopics-${options.topics.join(',')}`,
deduplicateCallback => {
this[kPerformWithRetry](
'createTopics',
Expand Down Expand Up @@ -527,7 +527,7 @@ export class Admin extends Base<AdminOptions> {

#deleteTopics (options: DeleteTopicsOptions, callback: CallbackWithPromise<void>): void {
this[kPerformDeduplicated](
'deleteTopics',
`deleteTopics-${options.topics.join(',')}`,
deduplicateCallback => {
this[kPerformWithRetry](
'deleteTopics',
Expand Down
56 changes: 56 additions & 0 deletions test/clients/admin/admin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,25 @@ test('createTopics using assignments', async t => {
await admin.deleteTopics({ topics: [topicName] })
})

test('createTopics should not deduplicate creation of different topics', async t => {
const admin = createAdmin(t)

const topicNames = [`test-topic-${randomUUID()}`, `test-topic-${randomUUID()}`]

await Promise.all(
topicNames.map(topicName =>
admin.createTopics({
topics: [topicName]
}))
)

const topicMetadata = await admin.metadata({ topics: topicNames })
strictEqual(topicMetadata.topics.has(topicNames[0]), true)
strictEqual(topicMetadata.topics.has(topicNames[1]), true)

await admin.deleteTopics({ topics: topicNames })
})

test('createTopics should validate options in strict mode', async t => {
const admin = createAdmin(t, { strict: true })

Expand Down Expand Up @@ -617,6 +636,43 @@ test('deleteTopics should delete a topic and support diagnostic channels', async
await admin.deleteTopics({ topics: [topicName] })
})

test('deleteTopics should not deduplicate deletion of different topics', async t => {
const admin = createAdmin(t)

const topicNames = [`test-topic-${randomUUID()}`, `test-topic-${randomUUID()}`]

admin.createTopics({ topics: topicNames })

const topicMetadata = await admin.metadata({ topics: topicNames })
strictEqual(topicMetadata.topics.has(topicNames[0]), true)
strictEqual(topicMetadata.topics.has(topicNames[1]), true)

await Promise.all(
topicNames.map(topicName =>
admin.deleteTopics({
topics: [topicName]
}))
)

// Deletion needs some time to propagate, retry a few times
await retry(15, 500, async () => {
try {
await admin.metadata({ topics: [topicNames[0]] })
throw Error('Topic still exists: ' + topicNames[0])
} catch (error) {
// ApiCode 3 = UnknownTopicOrPartition
ok(error.findBy?.('apiCode', 3))
}
try {
await admin.metadata({ topics: [topicNames[1]] })
throw Error('Topic still exists: ' + topicNames[1])
} catch (error) {
// ApiCode 3 = UnknownTopicOrPartition
ok(error.findBy?.('apiCode', 3))
}
})
})

test('deleteTopics should validate options in strict mode', async t => {
const admin = createAdmin(t, { strict: true })

Expand Down