diff --git a/website/versioned_docs/version-2.2.4/Consuming.md b/website/versioned_docs/version-2.2.4/Consuming.md new file mode 100644 index 000000000..dd02f75bf --- /dev/null +++ b/website/versioned_docs/version-2.2.4/Consuming.md @@ -0,0 +1,476 @@ +--- +id: version-2.2.4-consuming +title: Consuming Messages +original_id: consuming +--- + +Consumer groups allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers. When a consumer fails the load is automatically distributed to other members of the group. Consumer groups __must have__ unique group ids within the cluster, from a kafka broker perspective. + +Creating the consumer: + +```javascript +const consumer = kafka.consumer({ groupId: 'my-group' }) +``` + +Subscribing to some topics: + +```javascript +await consumer.connect() + +await consumer.subscribe({ topics: ['topic-A'] }) + +// You can subscribe to multiple topics at once +await consumer.subscribe({ topics: ['topic-B', 'topic-C'] }) + +// It's possible to start from the beginning of the topic +await consumer.subscribe({ topics: ['topic-D'], fromBeginning: true }) +``` + +Alternatively, you can subscribe to any topic that matches a regular expression: + +```javascript +await consumer.connect() +await consumer.subscribe({ topics: [/topic-(eu|us)-.*/i] }) +``` + +When suppling a regular expression, the consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`. + +KafkaJS offers you two ways to process your data: `eachMessage` and `eachBatch` + +## eachMessage + +The `eachMessage` handler provides a convenient and easy to use API, feeding your function one message at a time. It is implemented on top of `eachBatch`, and it will automatically commit your offsets and heartbeat at the configured interval for you. If you are just looking to get started with Kafka consumers this a good place to start. + +```javascript +await consumer.run({ + eachMessage: async ({ topic, partition, message, heartbeat, pause }) => { + console.log({ + key: message.key.toString(), + value: message.value.toString(), + headers: message.headers, + }) + }, +}) +``` + +Be aware that the `eachMessage` handler should not block for longer than the configured [session timeout](#options) or else the consumer will be removed from the group. If your workload involves very slow processing times for individual messages then you should either increase the session timeout or make periodic use of the `heartbeat` function exposed in the handler payload. +The `pause` function is a convenience for `consumer.pause({ topic, partitions: [partition] })`. It will pause the current topic-partition and returns a function that allows you to resume consuming later. + +## eachBatch + +Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, `isStale`, and `pause`. All resolved offsets will be automatically committed after the function is executed. + +> Note: Be aware that using `eachBatch` directly is considered a more advanced use case as compared to using `eachMessage`, since you will have to understand how session timeouts and heartbeats are connected. + +```javascript +await consumer.run({ + eachBatchAutoResolve: true, + eachBatch: async ({ + batch, + resolveOffset, + heartbeat, + commitOffsetsIfNecessary, + uncommittedOffsets, + isRunning, + isStale, + pause, + }) => { + for (let message of batch.messages) { + console.log({ + topic: batch.topic, + partition: batch.partition, + highWatermark: batch.highWatermark, + message: { + offset: message.offset, + key: message.key.toString(), + value: message.value.toString(), + headers: message.headers, + } + }) + + resolveOffset(message.offset) + await heartbeat() + } + }, +}) +``` + +* `eachBatchAutoResolve` configures auto-resolve of batch processing. If set to true, KafkaJS will automatically commit the last offset of the batch if `eachBatch` doesn't throw an error. Default: true. +* `batch.highWatermark` is the last committed offset within the topic partition. It can be useful for calculating lag. +* `resolveOffset()` is used to mark a message in the batch as processed. In case of errors, the consumer will automatically commit the resolved offsets. +* `heartbeat(): Promise` can be used to send heartbeat to the broker according to the set `heartbeatInterval` value in consumer [configuration](#options), which means if you invoke `heartbeat()` sooner than `heartbeatInterval` it will be ignored. +* `commitOffsetsIfNecessary(offsets?): Promise` is used to commit offsets based on the autoCommit configurations (`autoCommitInterval` and `autoCommitThreshold`). Note that auto commit won't happen in `eachBatch` if `commitOffsetsIfNecessary` is not invoked. Take a look at [autoCommit](#auto-commit) for more information. +* `uncommittedOffsets()` returns all offsets by topic-partition which have not yet been committed. +* `isRunning()` returns true if consumer is in running state, else it returns false. +* `isStale()` returns whether the messages in the batch have been rendered stale through some other operation and should be discarded. For example, when calling [`consumer.seek`](#seek) the messages in the batch should be discarded, as they are not at the offset we seeked to. +* `pause()` can be used to pause the consumer for the current topic-partition. All offsets resolved up to that point will be committed (subject to `eachBatchAutoResolve` and [autoCommit](#auto-commit)). Throw an error to pause in the middle of the batch without resolving the current offset. Alternatively, disable `eachBatchAutoResolve`. The returned function can be used to resume processing of the topic-partition. See [Pause & Resume](#pause-resume) for more information about this feature. + +### Example + +```javascript +consumer.run({ + eachBatchAutoResolve: false, + eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => { + for (let message of batch.messages) { + if (!isRunning() || isStale()) break + await processMessage(message) + resolveOffset(message.offset) + await heartbeat() + } + } +}) +``` + +In the example above, if the consumer is shutting down in the middle of the batch, the remaining messages won't be resolved and therefore not committed. This way, you can quickly shut down the consumer without losing/skipping any messages. If the batch goes stale for some other reason (like calling `consumer.seek`) none of the remaining messages are processed either. + +## Partition-aware concurrency + +By default, [`eachMessage`](Consuming.md#each-message) is invoked sequentially for each message in each partition. In order to concurrently process several messages per once, you can increase the `partitionsConsumedConcurrently` option: + +```javascript +consumer.run({ + partitionsConsumedConcurrently: 3, // Default: 1 + eachMessage: async ({ topic, partition, message }) => { + // This will be called up to 3 times concurrently + }, +}) +``` + +Messages in the same partition are still guaranteed to be processed in order, but messages from multiple partitions can be processed at the same time. If `eachMessage` consists of asynchronous work, such as network requests or other I/O, this can improve performance. If `eachMessage` is entirely synchronous, this will make no difference. + +The same thing applies if you are using [`eachBatch`](Consuming.md#each-batch). Given `partitionsConsumedConcurrently > 1`, you will be able to process multiple batches concurrently. + +A guideline for setting `partitionsConsumedConcurrently` would be that it should not be larger than the number of partitions consumed. Depending on whether or not your workload is CPU bound, it may also not benefit you to set it to a higher number than the number of logical CPU cores. A recommendation is to start with a low number and measure if increasing leads to higher throughput. + +## autoCommit + +The messages are always fetched in batches from Kafka, even when using the `eachMessage` handler. All resolved offsets will be committed to Kafka after processing the whole batch. + +Committing offsets periodically during a batch allows the consumer to recover from group rebalancing, stale metadata and other issues before it has completed the entire batch. However, committing more often increases network traffic and slows down processing. Auto-commit offers more flexibility when committing offsets; there are two flavors available: + +`autoCommitInterval`: The consumer will commit offsets after a given period, for example, five seconds. Value in milliseconds. Default: `null` + +```javascript +consumer.run({ + autoCommitInterval: 5000, + // ... +}) +``` + +`autoCommitThreshold`: The consumer will commit offsets after resolving a given number of messages, for example, a hundred messages. Default: `null` + +```javascript +consumer.run({ + autoCommitThreshold: 100, + // ... +}) +``` + +Having both flavors at the same time is also possible, the consumer will commit the offsets if any of the use cases (interval or number of messages) happens. + +`autoCommit`: Advanced option to disable auto committing altogether. Instead, you can [manually commit offsets](#manual-commits). Default: `true` + +## Manual committing + +When disabling [`autoCommit`](#auto-commit) you can still manually commit message offsets, in a couple of different ways: + +- By using the `commitOffsetsIfNecessary` method available in the `eachBatch` callback. The `commitOffsetsIfNecessary` method will still respect the other autoCommit options if set. +- By [sending message offsets in a transaction](Transactions.md#offsets). +- By using the `commitOffsets` method of the consumer (see below). + +The `consumer.commitOffsets` is the lowest-level option and will ignore all other auto commit settings, but in doing so allows the committed offset to be set to any offset and committing various offsets at once. This can be useful, for example, for building a processing reset tool. It can only be called after `consumer.run`. Committing offsets does not change what message we'll consume next once we've started consuming, but instead is only used to determine **from which place to start**. To immediately change from what offset you're consuming messages, you'll want to [seek](#seek), instead. + +```javascript +consumer.run({ + autoCommit: false, + eachMessage: async ({ topic, partition, message }) => { + // Process the message somehow + }, +}) + +consumer.commitOffsets([ + { topic: 'topic-A', partition: 0, offset: '1' }, + { topic: 'topic-A', partition: 1, offset: '3' }, + { topic: 'topic-B', partition: 0, offset: '2' } +]) +``` + +Note that you don't *have* to store consumed offsets in Kafka, but instead store it in a storage mechanism of your own choosing. That's an especially useful approach when the results of consuming a message are written to a datastore that allows atomically writing the consumed offset with it, like for example a SQL database. When possible it can make the consumption fully atomic and give "exactly once" semantics that are stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality. + +The usual usage pattern for offsets stored outside of Kafka is as follows: + +- Run the consumer with `autoCommit` disabled. +- Store a message's `offset + 1` in the store together with the results of processing. `1` is added to prevent that same message from being consumed again. +- Use the externally stored offset on restart to [seek](#seek) the consumer to it. + +## fromBeginning + +The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, `fromBeginning` defines the behavior of the consumer group. This can be configured when subscribing to a topic: + +```javascript +await consumer.subscribe({ topics: ['test-topic'], fromBeginning: true }) +await consumer.subscribe({ topics: ['other-topic'], fromBeginning: false }) +``` + +When `fromBeginning` is `true`, the group will use the earliest offset. If set to `false`, it will use the latest offset. The default is `false`. + +## Options + +```javascript +kafka.consumer({ + groupId: , + partitionAssigners: , + sessionTimeout: , + rebalanceTimeout: , + heartbeatInterval: , + metadataMaxAge: , + allowAutoTopicCreation: , + maxBytesPerPartition: , + minBytes: , + maxBytes: , + maxWaitTimeInMs: , + retry: , + maxInFlightRequests: , + rackId: +}) +``` + +| option | description | default | +| ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------- | +| partitionAssigners | List of partition assigners | `[PartitionAssigners.roundRobin]` | +| sessionTimeout | Timeout in milliseconds used to detect failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance | `30000` | +| rebalanceTimeout | The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group | `60000` | +| heartbeatInterval | The expected time in milliseconds between heartbeats to the consumer coordinator. Heartbeats are used to ensure that the consumer's session stays active. The value must be set lower than session timeout | `3000` | +| metadataMaxAge | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions | `300000` (5 minutes) | +| allowAutoTopicCreation | Allow topic creation when querying metadata for non-existent topics | `true` | +| maxBytesPerPartition | The maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition | `1048576` (1MB) | +| minBytes | Minimum amount of data the server should return for a fetch request, otherwise wait up to `maxWaitTimeInMs` for more data to accumulate. | `1` | +| maxBytes | Maximum amount of bytes to accumulate in the response. Supported by Kafka >= `0.10.1.0` | `10485760` (10MB) | +| maxWaitTimeInMs | The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by `minBytes` | `5000` | +| retry | See [retry](Configuration.md#retry) for more information | `{ retries: 5 }` | +| readUncommitted | Configures the consumer isolation level. If `false` (default), the consumer will not return any transactional messages which were not committed. | `false` | +| maxInFlightRequests | Max number of requests that may be in progress at any time. If falsey then no limit. | `null` _(no limit)_ | +| rackId | Configure the "rack" in which the consumer resides to enable [follower fetching](#follower-fetching) | `null` _(fetch from the leader always)_ | + +## Pause & Resume + +In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. It also provides the `paused` method to get the list of all paused topics. Note that pausing a topic means that it won't be fetched in the next cycle and subsequent messages within the current batch won't be passed to an `eachMessage` handler. + +Calling `pause` with a topic that the consumer is not subscribed to is a no-op, calling `resume` with a topic that is not paused is also a no-op. + +> Note: Calling `resume` or `pause` while the consumer is not running will throw an error. + +Example: A situation where this could be useful is when an external dependency used by the consumer is under too much load. Here we want to `pause` consumption from a topic when this happens, and after a predefined interval we `resume` again: + +```javascript +await consumer.connect() +await consumer.subscribe({ topics: ['jobs'] }) + +await consumer.run({ eachMessage: async ({ topic, message }) => { + try { + await sendToDependency(message) + } catch (e) { + if (e instanceof TooManyRequestsError) { + consumer.pause([{ topic }]) + setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000) + } + + throw e + } +}}) +``` + +For finer-grained control, specific partitions of topics can also be paused, rather than the whole topic. The ability to pause and resume on a per-partition basis, means it can be used to isolate the consuming (and processing) of messages. + +Example: in combination with [consuming messages per partition concurrently](#concurrent-processing), it can prevent having to stop processing all partitions because of a slow process in one of the other partitions. + +```javascript +consumer.run({ + partitionsConsumedConcurrently: 3, // Default: 1 + eachMessage: async ({ topic, partition, message }) => { + // This will be called up to 3 times concurrently + try { + await sendToDependency(message) + } catch (e) { + if (e instanceof TooManyRequestsError) { + consumer.pause([{ topic, partitions: [partition] }]) + // Other partitions will keep fetching and processing, until if / when + // they also get throttled + setTimeout(() => { + consumer.resume([{ topic, partitions: [partition] }]) + // Other partitions that are paused will continue to be paused + }, e.retryAfter * 1000) + } + + throw e + } + }, +}) +``` + +As a convenience, the `eachMessage` callback provides a `pause` function to pause the specific topic-partition of the message currently being processed. + +```javascript +await consumer.connect() +await consumer.subscribe({ topics: ['jobs'] }) + +await consumer.run({ eachMessage: async ({ topic, message, pause }) => { + try { + await sendToDependency(message) + } catch (e) { + if (e instanceof TooManyRequestsError) { + const resumeThisPartition = pause() + // Other partitions that are paused will continue to be paused + setTimeout(resumeThisPartition, e.retryAfter * 1000) + } + + throw e + } +}}) +``` + +It's possible to access the list of paused topic partitions using the `paused` method. + +```javascript +const pausedTopicPartitions = consumer.paused() + +for (const topicPartitions of pausedTopicPartitions) { + const { topic, partitions } = topicPartitions + console.log({ topic, partitions }) +} +``` + +## Seek + +To move the offset position in a topic/partition the `Consumer` provides the method `seek`. This method has to be called after the consumer is initialized and is running (after consumer#run). + +```javascript +await consumer.connect() +await consumer.subscribe({ topics: ['example'] }) + +// you don't need to await consumer#run +consumer.run({ eachMessage: async ({ topic, message }) => true }) +consumer.seek({ topic: 'example', partition: 0, offset: 12384 }) +``` + +Upon seeking to an offset, any messages in active batches are marked as stale and discarded, making sure the next message read for the partition is from the offset sought to. Make sure to check `isStale()` before processing a message using [the `eachBatch` interface](#each-batch) of `consumer.run`. + +By default, the consumer will commit the offset seeked. To disable this, set the [`autoCommit`](#auto-commit) option to `false` on the consumer. + +```javascript +consumer.run({ + autoCommit: false, + eachMessage: async ({ topic, message }) => true +}) +// This will now only resolve the previous offset, not commit it +consumer.seek({ topic: 'example', partition: 0, offset: "12384" }) +``` + +## Custom partition assigner + +It's possible to configure the strategy the consumer will use to distribute partitions amongst the consumer group. KafkaJS has a round robin assigner configured by default. + +A partition assigner is a function which returns an object with the following interface: + +```javascript +const MyPartitionAssigner = ({ cluster }) => ({ + name: 'MyPartitionAssigner', + version: 1, + async assign({ members, topics }) {}, + protocol({ topics }) {} +}) +``` + +The method `assign` has to return an assignment plan with partitions per topic. A partition plan consists of a list of `memberId` and `memberAssignment`. The member assignment has to be encoded, use the `MemberAssignment` utility for that. Example: + +```javascript +const { AssignerProtocol: { MemberAssignment } } = require('kafkajs') + +const MyPartitionAssigner = ({ cluster }) => ({ + version: 1, + async assign({ members, topics }) { + // perform assignment + return myCustomAssignmentArray.map(memberId => ({ + memberId, + memberAssignment: MemberAssignment.encode({ + version: this.version, + assignment: assignment[memberId], + }) + })) + } +}) +``` + +The method `protocol` has to return `name` and `metadata`. Metadata has to be encoded, use the `MemberMetadata` utility for that. Example: + +```javascript +const { AssignerProtocol: { MemberMetadata } } = require('kafkajs') + +const MyPartitionAssigner = ({ cluster }) => ({ + name: 'MyPartitionAssigner', + version: 1, + protocol({ topics }) { + return { + name: this.name, + metadata: MemberMetadata.encode({ + version: this.version, + topics, + }), + } + } +}) +``` + +Your `protocol` method will probably look like the example, but it's not implemented by default because extra data can be included as `userData`. Take a look at the `MemberMetadata#encode` for more information. + +Once your assigner is done, add it to the list of assigners. It's important to keep the default assigner there to allow the old consumers to have a common ground with the new consumers when deploying. + +```javascript +const { PartitionAssigners: { roundRobin } } = require('kafkajs') + +kafka.consumer({ + groupId: 'my-group', + partitionAssigners: [ + MyPartitionAssigner, + roundRobin + ] +}) +``` + +## Describe group + +> **Experimental** - This feature may be removed or changed in new versions of KafkaJS + +Returns metadata for the configured consumer group, example: + +```javascript +const data = await consumer.describeGroup() +// { +// errorCode: 0, +// groupId: 'consumer-group-id-f104efb0e1044702e5f6', +// members: [ +// { +// clientHost: '/172.19.0.1', +// clientId: 'test-3e93246fe1f4efa7380a', +// memberAssignment: Buffer, +// memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb', +// memberMetadata: Buffer, +// }, +// ], +// protocol: 'RoundRobinAssigner', +// protocolType: 'consumer', +// state: 'Stable', +// }, +``` + +## Compression + +KafkaJS only support GZIP natively, but [other codecs can be supported](Producing.md#compression-other). + +## Follower Fetching + +KafkaJS supports "follower fetching", where the consumer tries to fetch data preferentially from a broker in the same "rack", rather than always going to the leader. This can considerably reduce operational costs if data transfer across "racks" is metered. There may also be performance benefits if the network speed between these "racks" is limited. + +The meaning of "rack" is very flexible, and can be used to model setups such as data centers, regions/availability zones, or other topologies. + +See also [this blog post](https://www.confluent.io/blog/multi-region-data-replication/) for the bigger context. diff --git a/website/versioned_docs/version-2.2.4/CustomAuthenticationMechanism.md b/website/versioned_docs/version-2.2.4/CustomAuthenticationMechanism.md new file mode 100644 index 000000000..5cb8e7190 --- /dev/null +++ b/website/versioned_docs/version-2.2.4/CustomAuthenticationMechanism.md @@ -0,0 +1,171 @@ +--- +id: version-2.2.4-custom-authentication-mechanism +title: Custom Authentication Mechanisms +original_id: custom-authentication-mechanism +--- + +To use an authentication mechanism that is not supported out of the box by KafkaJS, +custom authentication mechanisms can be introduced: + +```js +{ + sasl: { + mechanism: , + authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise } + } +} +``` + +`` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms` +property in `server.properties`. See the Kafka documentation for information on how to +configure your brokers. + +## Writing a custom authentication mechanism + +A custom authentication mechanism needs to fulfill the following interface: + +```ts +type SaslAuthenticateArgs = { + request: SaslAuthenticationRequest + response?: SaslAuthenticationResponse +} + +type AuthenticationProviderArgs = { + host: string + port: number + logger: Logger + saslAuthenticate: ( + args: SaslAuthenticateArgs + ) => Promise +} + +type Mechanism = { + mechanism: string + authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator +} + +type Authenticator = { + authenticate(): Promise +} + +type SaslAuthenticationRequest = { + encode: () => Buffer | Promise +} + +type SaslAuthenticationResponse = { + decode: (rawResponse: Buffer) => Buffer | Promise + parse: (data: Buffer) => ParseResult +} +``` +* `host` - Hostname of the specific broker to connect to +* `port` - Port of the specific broker to connect to +* `logger` - A logger instance namespaced to the authentication mechanism +* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally +decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected. +### Example +In this example we will create a custom authentication mechanism called `simon`. The general +flow will be: +1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +request with the value of `says` as `auth_bytes`. +2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes` +should equal `says`, if it does not start with "Simon says", it should be an empty string. + +**This is a made up example!** + +It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism. + +```js +const simonAuthenticator = says = ({ host, port, logger, saslAuthenticate }) => { + const INT32_SIZE = 4 + + const request = { + /** + * Encodes the value for `auth_bytes` in SaslAuthenticate request + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * In this example, we are just sending `says` as a string, + * with the length of the string in bytes prepended as an int32 + **/ + encode: () => { + const byteLength = Buffer.byteLength(says, 'utf8') + const buf = Buffer.alloc(INT32_SIZE + byteLength) + buf.writeUInt32BE(byteLength, 0) + buf.write(says, INT32_SIZE, byteLength, 'utf8') + return buf + }, + } + const response = { + /** + * Decodes the `auth_bytes` in SaslAuthenticate response + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * This is essentially the reverse of `request.encode`, where + * we read the length of the string as an int32 and then read + * that many bytes + */ + decode: rawData => { + const byteLength = rawData.readInt32BE(0) + return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength) + }, + /** + * The return value from `response.decode` is passed into + * this function, which is responsible for interpreting + * the data. In this case, we just turn the buffer back + * into a string + */ + parse: data => { + return data.toString() + }, + } + return { + /** + * This function is responsible for orchestrating the authentication flow. + * Essentially we will send a SaslAuthenticate request with the + * value of `sasl.says` to the broker, and expect to + * get the same value back. + * + * Other authentication methods may do any other operations they + * like, but communication with the brokers goes through + * the SaslAuthenticate request. + */ + authenticate: async () => { + if (says == null) { + throw new Error('SASL Simon: Invalid "says"') + } + const broker = `${host}:${port}` + try { + logger.info('Authenticate with SASL Simon', { broker }) + const authenticateResponse = await saslAuthenticate({ request, response }) + + const saidSimon = says.startsWith("Simon says ") + const expectedResponse = saidSimon ? says : "" + if (authenticateResponse !== expectedResponse) { + throw new Error("Mismatching response from broker") + } + logger.info('SASL Simon authentication successful', { broker }) + } catch (e) { + const error = new Error( + `SASL Simon authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, + } +} +``` + +The `response` argument to `saslAuthenticate` is optional, in case the authentication +method does not require the `auth_bytes` in the response. + +In the example above, we expect the client to be configured as such: + +```js +const config = { + sasl: { + mechanism: 'simon' + authenticationProvider: simonAuthenticator('Simon says authenticate me') + } +} +``` \ No newline at end of file diff --git a/website/versioned_docs/version-2.2.4/CustomLogger.md b/website/versioned_docs/version-2.2.4/CustomLogger.md new file mode 100644 index 000000000..82cba7c94 --- /dev/null +++ b/website/versioned_docs/version-2.2.4/CustomLogger.md @@ -0,0 +1,99 @@ +--- +id: version-2.2.4-custom-logger +title: Custom Logger +original_id: custom-logger +--- + +The logger is customized using log creators. A log creator is a function which receives a log level and returns a log function. The log function receives namespace, level, label, and log. + +- `namespace` identifies the component which is performing the log, for example, connection or consumer. +- `level` is the log level of the log entry. +- `label` is a text representation of the log level, example: 'INFO'. +- `log` is an object with the following keys: `timestamp`, `logger`, `message`, and the extra keys given by the user. (`logger.info('test', { extra_data: true })`) + +```javascript +{ + level: 4, + label: 'INFO', // NOTHING, ERROR, WARN, INFO, or DEBUG + timestamp: '2017-12-29T13:39:54.575Z', + logger: 'kafkajs', + message: 'Started', + // ... any other extra key provided to the log function +} +``` + +The general structure looks like this: + +```javascript +const MyLogCreator = logLevel => ({ namespace, level, label, log }) => { + // Example: + // const { timestamp, logger, message, ...others } = log + // console.log(`${label} [${namespace}] ${message} ${JSON.stringify(others)}`) +} +``` + +Example using [Winston](https://github.com/winstonjs/winston): + +```javascript +const { logLevel } = require('kafkajs') +const winston = require('winston') +const toWinstonLogLevel = level => { + switch(level) { + case logLevel.ERROR: + case logLevel.NOTHING: + return 'error' + case logLevel.WARN: + return 'warn' + case logLevel.INFO: + return 'info' + case logLevel.DEBUG: + return 'debug' + } +} + +const WinstonLogCreator = logLevel => { + const logger = winston.createLogger({ + level: toWinstonLogLevel(logLevel), + transports: [ + new winston.transports.Console(), + new winston.transports.File({ filename: 'myapp.log' }) + ] + }) + + return ({ namespace, level, label, log }) => { + const { message, ...extra } = log + logger.log({ + level: toWinstonLogLevel(level), + message, + extra, + }) + } +} +``` + +Once you have your log creator you can use the `logCreator` option to configure the client: + +```javascript +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + logLevel: logLevel.ERROR, + logCreator: WinstonLogCreator +}) +``` + +To get access to the namespaced logger of a consumer, producer, admin or root Kafka client after instantiation, you can use the `logger` method: + +```javascript +const client = new Kafka( ... ) +client.logger().info( ... ) + +const consumer = kafka.consumer( ... ) +consumer.logger().info( ... ) + +const producer = kafka.producer( ... ) +producer.logger().info( ... ) + +const admin = kafka.admin( ... ) +admin.logger().info( ... ) +``` diff --git a/website/versioned_docs/version-2.2.4/ProducerExample.md b/website/versioned_docs/version-2.2.4/ProducerExample.md new file mode 100644 index 000000000..c0db4d8dc --- /dev/null +++ b/website/versioned_docs/version-2.2.4/ProducerExample.md @@ -0,0 +1,135 @@ +--- +id: version-2.2.4-producer-example +title: Producer +original_id: producer-example +--- + +The following example assumes that you are using the local Kafka configuration described in [Running Kafka in Development](DockerLocal.md). + +```javascript +const ip = require('ip') + +const { Kafka, CompressionTypes, logLevel } = require('kafkajs') + +const host = process.env.HOST_IP || ip.address() + +const kafka = new Kafka({ + logLevel: logLevel.DEBUG, + brokers: [`${host}:9092`], + clientId: 'example-producer', +}) + +const topic = 'topic-test' +const producer = kafka.producer() + +const getRandomNumber = () => Math.round(Math.random(10) * 1000) +const createMessage = num => ({ + key: `key-${num}`, + value: `value-${num}-${new Date().toISOString()}`, +}) + +const sendMessage = () => { + return producer + .send({ + topic, + compression: CompressionTypes.GZIP, + messages: Array(getRandomNumber()) + .fill() + .map(_ => createMessage(getRandomNumber())), + }) + .then(console.log) + .catch(e => console.error(`[example/producer] ${e.message}`, e)) +} + +const run = async () => { + await producer.connect() + setInterval(sendMessage, 3000) +} + +run().catch(e => console.error(`[example/producer] ${e.message}`, e)) + +const errorTypes = ['unhandledRejection', 'uncaughtException'] +const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] + +errorTypes.forEach(type => { + process.on(type, async () => { + try { + console.log(`process.on ${type}`) + await producer.disconnect() + process.exit(0) + } catch (_) { + process.exit(1) + } + }) +}) + +signalTraps.forEach(type => { + process.once(type, async () => { + try { + await producer.disconnect() + } finally { + process.kill(process.pid, type) + } + }) +}) +``` + +## Typescript Example + +```typescript +import { Kafka, Message, Producer, ProducerBatch, TopicMessages } from 'kafkajs' + +interface CustomMessageFormat { a: string } + +export default class ProducerFactory { + private producer: Producer + + constructor() { + this.producer = this.createProducer() + } + + public async start(): Promise { + try { + await this.producer.connect() + } catch (error) { + console.log('Error connecting the producer: ', error) + } + } + + public async shutdown(): Promise { + await this.producer.disconnect() + } + + public async sendBatch(messages: Array): Promise { + const kafkaMessages: Array = messages.map((message) => { + return { + value: JSON.stringify(message) + } + }) + + const topicMessages: TopicMessages = { + topic: 'producer-topic', + messages: kafkaMessages + } + + const batch: ProducerBatch = { + topicMessages: [topicMessages] + } + + await this.producer.sendBatch(batch) + } + + private createProducer() : Producer { + const kafka = new Kafka({ + clientId: 'producer-client', + brokers: ['localhost:9092'], + }) + + return kafka.producer() + } +} +``` + +## SSL & SASL Authentication + +See the [Consumer Example](ConsumerExample.md#ssl-and-sasl-authentication). diff --git a/website/versioned_docs/version-2.2.4/Producing.md b/website/versioned_docs/version-2.2.4/Producing.md new file mode 100644 index 000000000..4b53b17e6 --- /dev/null +++ b/website/versioned_docs/version-2.2.4/Producing.md @@ -0,0 +1,351 @@ +--- +id: version-2.2.4-producing +title: Producing Messages +original_id: producing +--- + +To publish messages to Kafka you have to create a producer. Simply call the `producer` function of the client to create it: + +```javascript +const producer = kafka.producer() +``` + +or with options + +```javascript +const producer = kafka.producer({ + allowAutoTopicCreation: false, + transactionTimeout: 30000 +}) +``` + +## Options + +| option | description | default | +| ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------- | +| createPartitioner | Take a look at [Custom Partitioner](#custom-partitioner) for more information | `null` | +| retry | Take a look at [Producer Retry](#retry) for more information | `null` | +| metadataMaxAge | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions | `300000` - 5 minutes | +| allowAutoTopicCreation | Allow topic creation when querying metadata for non-existent topics | `true` | +| transactionTimeout | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the __broker__, the request will fail with a `InvalidTransactionTimeout` error | `60000` | +| idempotent | _Experimental._ If enabled producer will ensure each message is written exactly once. Acks _must_ be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | `false` | +| maxInFlightRequests | Max number of requests that may be in progress at any time. If falsey then no limit. | `null` _(no limit)_ | + +## Producing messages + +The method `send` is used to publish messages to the Kafka cluster. + +```javascript +const producer = kafka.producer() + +await producer.connect() +await producer.send({ + topic: 'topic-name', + messages: [ + { key: 'key1', value: 'hello world' }, + { key: 'key2', value: 'hey hey!' } + ], +}) +``` + +Example with a defined partition: + +```javascript +const producer = kafka.producer() + +await producer.connect() +await producer.send({ + topic: 'topic-name', + messages: [ + { key: 'key1', value: 'hello world', partition: 0 }, + { key: 'key2', value: 'hey hey!', partition: 1 } + ], +}) +``` + +The method `send` has the following signature: + +```javascript +await producer.send({ + topic: , + messages: , + acks: , + timeout: , + compression: , +}) +``` + +| property | description | default | +| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------- | +| topic | topic name | | +| messages | An array of objects. See [Message structure](#message-structure) for more details. Example:
`[{ key: 'my-key', value: 'my-value'}]` | | +| acks | Control the number of required acks.
__-1__ = all insync replicas must acknowledge _(default)_
__0__ = no acknowledgments
__1__ = only waits for the leader to acknowledge | `-1` all insync replicas must acknowledge | +| timeout | The time to await a response in ms | `30000` | +| compression | Compression codec | `CompressionTypes.None` | + + +### Message structure + +Messages have the following properties: + +| Property | Description | Default | +| ----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------| +| key | Used for partitioning. See [Key](#message-key) | | +| value | Your message content. The value can be a Buffer, a string or null. The value will always be encoded as bytes when sent to Kafka. When consumed, the consumer will need to interpret the value according to your schema. | | +| partition | Which partition to send the message to. See [Key](#message-key) for details on how the partition is decided if this property is omitted. | | +| timestamp | The timestamp of when the message was created. See [Timestamp](#message-timestamp) for details. | `Date.now()` | +| headers | Metadata to associate with your message. See [Headers](#message-headers). | | + +#### Key + +The message `key` is used to decide which partition the message will be sent to. This is important to ensure that messages relating to the same aggregate are processed in order. For example, if you use an `orderId` as the key, you can ensure that all messages regarding that order will be processed in order. + +By default, the producer is configured to distribute the messages with the following logic: + +- If a partition is specified in the message, use it +- If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key +- If no partition or key is present choose a partition in a round-robin fashion + +#### Timestamp + +Each message has a timestamp in the form of a UTC timestamp with millisecond precision as a string. If no timestamp was provided, the producer will use the current time as the timestamp. When the message is consumed, the broker may override this timestamp depending on the topic configuration: + +* If the topic is configured to use CreateTime, the timestamp from the producer's message will be used. +* If the topic is configured to use LogAppendTime, the timestamp will be overwritten by the broker with the broker local time when it appends the message to its log. + +#### Headers + +Kafka v0.11 introduces record headers, which allows your messages to carry extra metadata. To send headers with your message, include the key `headers` with the values. Example: + +```javascript +await producer.send({ + topic: 'topic-name', + messages: [{ + key: 'key1', + value: 'hello world', + headers: { + 'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67', + 'system-id': 'my-system', + } + }] +}) +``` + +A header value can be either a string or an array of strings. + +## Producing to multiple topics + +To produce to multiple topics at the same time, use `sendBatch`. This can be useful, for example, when migrating between two topics. + +```javascript +const topicMessages = [ + { + topic: 'topic-a', + messages: [{ key: 'key', value: 'hello topic-a' }], + }, + { + topic: 'topic-b', + messages: [{ key: 'key', value: 'hello topic-b' }], + }, + { + topic: 'topic-c', + messages: [ + { + key: 'key', + value: 'hello topic-c', + headers: { + 'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67', + }, + } + ], + } +] +await producer.sendBatch({ topicMessages }) +``` + +`sendBatch` has the same signature as `send`, except `topic` and `messages` are replaced with `topicMessages`: + +```javascript +await producer.sendBatch({ + topicMessages: , + acks: , + timeout: , + compression: , +}) +``` + +| property | description | +| ------------- | ---------------------------------------------------------------------------------------------------------- | +| topicMessages | An array of objects with `topic` and `messages`.
`messages` is an array of the same type as for `send`. | + +## Custom partitioner + +It's possible to assign a custom partitioner to the producer. A partitioner is a function which returns another function responsible for the partition selection, something like this: + +```javascript +const MyPartitioner = () => { + return ({ topic, partitionMetadata, message }) => { + // select a partition based on some logic + // return the partition number + return 0 + } +} +``` + +`partitionMetadata` is an array of partitions with the following structure: + +`{ partitionId: , leader: }` + +Example: + +```javascript +[ + { partitionId: 1, leader: 1 }, + { partitionId: 2, leader: 2 }, + { partitionId: 0, leader: 0 } +] +``` + +To use your custom partitioner, use the option `createPartitioner` when creating the producer. + +```javascript +kafka.producer({ createPartitioner: MyPartitioner }) +``` + +### Default Partitioners + +KafkaJS ships with 2 partitioners: `DefaultPartitioner` and `LegacyPartitioner`. + +The `DefaultPartitioner` should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the [co-partitioning requirement](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements) when joining multiple topics. + +> 🚨 **Important** 🚨 +> +> **The `LegacyPartitioner` was the default until v2.0.0. If you are upgrading from a version +older and want to retain the previous partitioning behavior, use the `LegacyPartitioner` +by importing it and providing it to the Producer constructor:** +> +> ```javascript +> const { Partitioners } = require('kafkajs') +> kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }) +> ``` + +## Retry + +The option `retry` can be used to customize the configuration for the producer. + +Take a look at [Retry](Configuration.md#retry) for more information. + +## Compression + +Since KafkaJS aims to have as small footprint and as few dependencies as possible, only the GZIP codec is part of the core functionality, other codecs are available as packages. + +### GZIP + +```javascript +const { CompressionTypes } = require('kafkajs') + +async () => { + await producer.send({ + topic: 'topic-name', + compression: CompressionTypes.GZIP, + messages: [ + { key: 'key1', value: 'hello world' }, + { key: 'key2', value: 'hey hey!' } + ], + }) +} +``` + +The consumers know how to decompress GZIP, so no further work is necessary. + +### Snappy + +Snappy support is provided by the package `kafkajs-snappy` + +```sh +npm install --save kafkajs-snappy +# yarn add kafkajs-snappy +``` + +```javascript +const { CompressionTypes, CompressionCodecs } = require('kafkajs') +const SnappyCodec = require('kafkajs-snappy') + +CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec +``` + +Take a look at the official [readme](https://github.com/tulios/kafkajs-snappy) for more information + +### LZ4 + +LZ4 support is provided by the package `kafkajs-lz4` + +```sh +npm install --save kafkajs-lz4 +# yarn add kafkajs-lz4 +``` + +```javascript +const { CompressionTypes, CompressionCodecs } = require('kafkajs') +const LZ4 = require('kafkajs-lz4') + +CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec +``` + +The package also accepts options to granularly control LZ4 compression & decompression. Take a look at the official [readme](https://github.com/indix/kafkajs-lz4) for more information. + +### ZSTD + +Zstandard support is provided by [`@kafkajs/zstd`](https://github.com/kafkajs/zstd) + +```sh +npm install --save @kafkajs/zstd +# yarn add @kafkajs/zstd +``` + +```javascript +const { CompressionTypes, CompressionCodecs } = require('kafkajs') +const ZstdCodec = require('@kafkajs/zstd') + +CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec() +``` + +Configuration options can be passed to the factory function to control compression & decompression levels and other features. See [the official readme](https://github.com/kafkajs/zstd) for more information. + +### Other + +Any other codec can be easily implemented using existing libraries. + +A codec is an object with two `async` functions: `compress` and `decompress`. Import the libraries and define the codec object: + +```javascript +const MyCustomSnappyCodec = { + async compress(encoder) { + return someCompressFunction(encoder.buffer) + }, + + async decompress(buffer) { + return someDecompressFunction(buffer) + } +} +``` + +Now that we have the codec object, we can wrap it in a function and add it to the implementation: + +```javascript +const { CompressionTypes, CompressionCodecs } = require('kafkajs') +CompressionCodecs[CompressionTypes.Snappy] = () => MyCustomSnappyCodec +``` + +The new codec can now be used with the `send` method, example: + +```javascript +await producer.send({ + topic: 'topic-name', + compression: CompressionTypes.Snappy, + messages: [ + { key: 'key1', value: 'hello world' }, + { key: 'key2', value: 'hey hey!' } + ], +}) +``` diff --git a/website/versions.json b/website/versions.json index 35c431656..cf8cfd207 100644 --- a/website/versions.json +++ b/website/versions.json @@ -1,4 +1,5 @@ [ + "2.2.4", "2.2.0", "2.1.0", "2.0.1",