Skip to content

Commit

Permalink
Merge pull request #73 from tulios/add-create-topic
Browse files Browse the repository at this point in the history
Add create topic
  • Loading branch information
tulios authored Jun 12, 2018
2 parents f225ee8 + 573be89 commit 73ddfbc
Show file tree
Hide file tree
Showing 30 changed files with 2,149 additions and 856 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ KafkaJS is battle-tested and ready for production.
- [Custom partition assigner](#consuming-messages-custom-partition-assigner)
- [Describe group](#consuming-messages-describe-group)
- [Compression](#consuming-messages-compression)
- [Admin](#admin)
- [Create Topics](#admin-create-topics)
- [Instrumentation](#instrumentation)
- [Custom logging](#custom-logging)
- [Retry (detailed)](#configuration-default-retry-detailed)
Expand Down Expand Up @@ -635,6 +637,55 @@ const data = await consumer.describeGroup()

KafkaJS only support GZIP natively, but [other codecs can be supported](#producing-messages-compression-other).

## <a name="admin"></a> Admin

The admin client will host all the cluster operations, such as: `createTopics`, `createPartitions`, etc. Currently, only `createTopics` is available.

```javascript
const kafka = new Kafka(...)
const admin = kafka.admin() // kafka.admin({ retry: { retries: 2 } })

// remember to connect/disconnect the client
await admin.connect()
await admin.disconnect()
```

The option `retry` can be used to customize the configuration for the admin.

Take a look at [Retry](#configuration-default-retry) for more information.

### <a name="admin-create-topics"></a> Create topics

`createTopics` will resolve to `true` if the topic was created successfully or `false` if it already exists. The method will throw exceptions in case of errors.

```javascript
await admin.createTopics({
validateOnly: <boolean>,
waitForLeaders: <boolean>
timeout: <Number>,
topics: <Topic[]>,
})
```

`Topic` structure:

```javascript
{
topic: <String>,
numPartitions: <Number>, // default: 1
replicationFactor: <Number>, // default: 1
replicaAssignment: <Array>, // Example: [{ partition: 0, replicas: [0,1,2] }] - default: []
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
```

| property | description | default |
| -------------- | ----------- | ------- |
| topics | Topic definition | |
| validateOnly | If this is `true`, the request will be validated, but the topic won't be created. | false |
| timeout | The time in ms to wait for a topic to be completely created on the controller node | 5000 |
| waitForLeaders | If this is `true` it will wait until metadata for the new topics doesn't throw `LEADER_NOT_AVAILABLE` | true |

## <a name="instrumentation"></a> Instrumentation

> Experimental - This feature may be removed or changed in new versions of KafkaJS
Expand Down
97 changes: 97 additions & 0 deletions src/admin/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
const createRetry = require('../retry')
const waitFor = require('../utils/waitFor')
const { KafkaJSNonRetriableError } = require('../errors')

const retryOnLeaderNotAvailable = (fn, opts = {}) => {
const callback = async () => {
try {
return await fn()
} catch (e) {
console.error(e)
if (e.type !== 'LEADER_NOT_AVAILABLE') {
throw e
}
return false
}
}

return waitFor(callback, opts)
}

module.exports = ({ retry = { retries: 5 }, logger: rootLogger, cluster }) => {
const logger = rootLogger.namespace('Admin')

/**
* @returns {Promise}
*/
const connect = async () => await cluster.connect()

/**
* @return {Promise}
*/
const disconnect = async () => await cluster.disconnect()

/**
* @param {array} topics
* @param {boolean} [validateOnly=false]
* @param {number} [timeout=5000]
* @param {boolean} [waitForLeaders=true]
* @return {Promise}
*/
const createTopics = async ({ topics, validateOnly, timeout, waitForLeaders = true }) => {
if (!topics || !Array.isArray(topics)) {
throw new KafkaJSNonRetriableError(`Invalid topics array ${topics}`)
}

if (topics.filter(({ topic }) => typeof topic !== 'string').length > 0) {
throw new KafkaJSNonRetriableError(
'Invalid topics array, the topic names have to be a valid string'
)
}

const topicNames = new Set(topics.map(({ topic }) => topic))
if (topicNames.size < topics.length) {
throw new KafkaJSNonRetriableError(
'Invalid topics array, it cannot have multiple entries for the same topic'
)
}

const retrier = createRetry(retry)

return retrier(async (bail, retryCount, retryTime) => {
try {
await cluster.refreshMetadata()
const broker = await cluster.findControllerBroker()
await broker.createTopics({ topics, validateOnly, timeout })

if (waitForLeaders) {
const topicNamesArray = Array.from(topicNames.values())
await retryOnLeaderNotAvailable(async () => await broker.metadata(topicNamesArray), {
delay: 100,
timeoutMessage: 'Timed out while waiting for topic leaders',
})
}

return true
} catch (e) {
if (e.type === 'NOT_CONTROLLER') {
logger.warn('Could not create topics', { error: e.message, retryCount, retryTime })
throw e
}

if (e.type === 'TOPIC_ALREADY_EXISTS') {
logger.warn(e.message)
return false
}

bail(e)
}
})
}

return {
connect,
disconnect,
createTopics,
}
}
171 changes: 171 additions & 0 deletions src/admin/index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
const createAdmin = require('./index')
const { KafkaJSProtocolError } = require('../errors')
const { createErrorFromCode } = require('../protocol/error')

const {
secureRandom,
sslConnectionOpts,
saslConnectionOpts,
saslSCRAM256ConnectionOpts,
saslSCRAM512ConnectionOpts,
createCluster,
sslBrokers,
saslBrokers,
newLogger,
} = require('testHelpers')

const NOT_CONTROLLER = 41
const TOPIC_ALREADY_EXISTS = 36

describe('Admin', () => {
let topicName, admin

beforeEach(() => {
topicName = `test-topic-${secureRandom()}`
})

afterEach(async () => {
await admin.disconnect()
})

test('support SSL connections', async () => {
const cluster = createCluster(sslConnectionOpts(), sslBrokers())
admin = createAdmin({ cluster, logger: newLogger() })

await admin.connect()
})

test('support SASL PLAIN connections', async () => {
const cluster = createCluster(saslConnectionOpts(), saslBrokers())
admin = createAdmin({ cluster, logger: newLogger() })
await admin.connect()
})

test('support SASL SCRAM 256 connections', async () => {
const cluster = createCluster(saslSCRAM256ConnectionOpts(), saslBrokers())
admin = createAdmin({ cluster, logger: newLogger() })
await admin.connect()
})

test('support SASL SCRAM 512 connections', async () => {
const cluster = createCluster(saslSCRAM512ConnectionOpts(), saslBrokers())
admin = createAdmin({ cluster, logger: newLogger() })
await admin.connect()
})

describe('createTopics', () => {
test('throws an error if the topics array is invalid', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(admin.createTopics({ topics: null })).rejects.toHaveProperty(
'message',
'Invalid topics array null'
)

await expect(admin.createTopics({ topics: 'this-is-not-an-array' })).rejects.toHaveProperty(
'message',
'Invalid topics array this-is-not-an-array'
)
})

test('throws an error if the topic name is not a valid string', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(admin.createTopics({ topics: [{ topic: 123 }] })).rejects.toHaveProperty(
'message',
'Invalid topics array, the topic names have to be a valid string'
)
})

test('throws an error if there are multiple entries for the same topic', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
const topics = [{ topic: 'topic-123' }, { topic: 'topic-123' }]
await expect(admin.createTopics({ topics })).rejects.toHaveProperty(
'message',
'Invalid topics array, it cannot have multiple entries for the same topic'
)
})

test('create the new topics and return true', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

await admin.connect()
await expect(
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: topicName }],
})
).resolves.toEqual(true)
})

test('retries if the controller has moved', async () => {
const cluster = createCluster()
const broker = { createTopics: jest.fn(() => true) }

cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest
.fn()
.mockImplementationOnce(() => {
throw new KafkaJSProtocolError(createErrorFromCode(NOT_CONTROLLER))
})
.mockImplementationOnce(() => broker)

admin = createAdmin({ cluster, logger: newLogger() })
await expect(
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: topicName }],
})
).resolves.toEqual(true)

expect(cluster.refreshMetadata).toHaveBeenCalledTimes(2)
expect(cluster.findControllerBroker).toHaveBeenCalledTimes(2)
expect(broker.createTopics).toHaveBeenCalledTimes(1)
})

test('ignore already created topics and return false', async () => {
const cluster = createCluster()
const broker = { createTopics: jest.fn() }

cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest.fn(() => broker)
broker.createTopics.mockImplementationOnce(() => {
throw new KafkaJSProtocolError(createErrorFromCode(TOPIC_ALREADY_EXISTS))
})

admin = createAdmin({ cluster, logger: newLogger() })
await expect(
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: topicName }],
})
).resolves.toEqual(false)

expect(cluster.refreshMetadata).toHaveBeenCalledTimes(1)
expect(cluster.findControllerBroker).toHaveBeenCalledTimes(1)
expect(broker.createTopics).toHaveBeenCalledTimes(1)
})

test('query metadata if waitForLeaders is true', async () => {
const topic2 = `test-topic-${secureRandom()}`
const topic3 = `test-topic-${secureRandom()}`

const cluster = createCluster()
const broker = { createTopics: jest.fn(), metadata: jest.fn(() => true) }

cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest.fn(() => broker)

broker.createTopics.mockImplementationOnce(() => true)
admin = createAdmin({ cluster, logger: newLogger() })

await expect(
admin.createTopics({
waitForLeaders: true,
topics: [{ topic: topicName }, { topic: topic2 }, { topic: topic3 }],
})
).resolves.toEqual(true)

expect(broker.metadata).toHaveBeenCalledTimes(1)
expect(broker.metadata).toHaveBeenCalledWith([topicName, topic2, topic3])
})
})
})
Loading

0 comments on commit 73ddfbc

Please sign in to comment.