From 06723173757973061ff3a2c40c89ec4fa1afbf50 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 16 Aug 2022 10:39:53 +0200 Subject: [PATCH] Bump version (2.2.0) and version docs --- CHANGELOG.md | 11 + package.json | 2 +- website/versioned_docs/version-2.2.0/Admin.md | 743 ++++++++++++++++++ .../version-2.2.0/Configuration.md | 481 ++++++++++++ .../version-2.2.0/ConsumerExample.md | 213 +++++ .../CustomAuthenticationMechanism.md | 167 ++++ website/versions.json | 1 + 7 files changed, 1617 insertions(+), 1 deletion(-) create mode 100644 website/versioned_docs/version-2.2.0/Admin.md create mode 100644 website/versioned_docs/version-2.2.0/Configuration.md create mode 100644 website/versioned_docs/version-2.2.0/ConsumerExample.md create mode 100644 website/versioned_docs/version-2.2.0/CustomAuthenticationMechanism.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 83b2264ad..bbc4f8dc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + +## [2.2.0] - 2022-08-16 + +### Added + - Add the ability to inject custom authentication mechanisms #1372 + - Add admin methods `alterPartitionReassignments` & `listPartitionReassignments` #1419 + +### Fixed + - Fix deprecation warning when connecting to a broker over TLS via IP address #1425 + - Improve consumer performance when subscribed to thousands of topics #1436 + ## [2.1.0] - 2022-06-28 ### Added diff --git a/package.json b/package.json index 1fe1a21ae..55f75ff79 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "kafkajs", - "version": "2.1.0", + "version": "2.2.0", "description": "A modern Apache Kafka client for node.js", "author": "Tulio Ornelas ", "main": "index.js", diff --git a/website/versioned_docs/version-2.2.0/Admin.md b/website/versioned_docs/version-2.2.0/Admin.md new file mode 100644 index 000000000..8bf8e4319 --- /dev/null +++ b/website/versioned_docs/version-2.2.0/Admin.md @@ -0,0 +1,743 @@ +--- +id: version-2.2.0-admin +title: Admin Client +original_id: admin +--- + +The admin client hosts all the cluster operations, such as: `createTopics`, `createPartitions`, etc. + +```javascript +const kafka = new Kafka(...) +const admin = kafka.admin() + +// remember to connect and disconnect when you are done +await admin.connect() +await admin.disconnect() +``` + +The option `retry` can be used to customize the configuration for the admin. + +Take a look at [Retry](Configuration.md#default-retry) for more information. + +## List topics + +`listTopics` lists the names of all existing topics, and returns an array of strings. +The method will throw exceptions in case of errors. + +```javascript +await admin.listTopics() +// [ 'topic-1', 'topic-2', 'topic-3', ... ] +``` + +## Create topics + +`createTopics` will resolve to `true` if the topic was created successfully or `false` if it already exists. The method will throw exceptions in case of errors. + +```javascript +await admin.createTopics({ + validateOnly: , + waitForLeaders: + timeout: , + topics: , +}) +``` + +`ITopicConfig` structure: + +```javascript +{ + topic: , + numPartitions: , // default: -1 (uses broker `num.partitions` configuration) + replicationFactor: , // default: -1 (uses broker `default.replication.factor` configuration) + replicaAssignment: , // Example: [{ partition: 0, replicas: [0,1,2] }] - default: [] + configEntries: // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: [] +} +``` + +| property | description | default | +| -------------- | ----------------------------------------------------------------------------------------------------- | ------- | +| topics | Topic definition | | +| validateOnly | If this is `true`, the request will be validated, but the topic won't be created. | false | +| timeout | The time in ms to wait for a topic to be completely created on the controller node | 5000 | +| waitForLeaders | If this is `true` it will wait until metadata for the new topics doesn't throw `LEADER_NOT_AVAILABLE` | true | + +## Delete topics + +```javascript +await admin.deleteTopics({ + topics: , + timeout: , // default: 5000 +}) +``` + +Topic deletion is disabled by default in Apache Kafka versions prior to `1.0.0`. To enable it set the server config. + +```yml +delete.topic.enable=true +``` + +## Create partitions + +`createPartitions` will resolve in case of success. The method will throw exceptions in case of errors. + +```javascript +await admin.createPartitions({ + validateOnly: , + timeout: , + topicPartitions: , +}) +``` + +`TopicPartition` structure: + +```javascript +{ + topic: , + count: , // partition count + assignments: >> // Example: [[0,1],[1,2],[2,0]] +} +``` + +| property | description | default | +| -------------- | ----------------------------------------------------------------------------------------------------- | ------- | +| topicPartitions| Topic partition definition | | +| validateOnly | If this is `true`, the request will be validated, but the topic won't be created. | false | +| timeout | The time in ms to wait for a topic to be completely created on the controller node | 5000 | +| count | New partition count, mandatory | | +| assignments | Assigned brokers for each new partition | null | + +## Fetch topic metadata + +```javascript +await admin.fetchTopicMetadata({ topics: > }) +``` + +`TopicsMetadata` structure: + +```javascript +{ + topics: >, +} +``` + +`TopicMetadata` structure: + +```javascript +{ + name: , + partitions: > // default: 1 +} +``` + +`PartitionMetadata` structure: + +```javascript +{ + partitionErrorCode: , // default: 0 + partitionId: , + leader: , + replicas: >, + isr: >, +} +``` + +The admin client will throw an exception if any of the provided topics do not already exist. + +If you omit the `topics` argument the admin client will fetch metadata for all topics: + +```javascript +await admin.fetchTopicMetadata() +``` + +## Fetch topic offsets + +`fetchTopicOffsets` returns most recent offset for a topic. + +```javascript +await admin.fetchTopicOffsets(topic) +// [ +// { partition: 0, offset: '31004', high: '31004', low: '421' }, +// { partition: 1, offset: '54312', high: '54312', low: '3102' }, +// { partition: 2, offset: '32103', high: '32103', low: '518' }, +// { partition: 3, offset: '28', high: '28', low: '0' }, +// ] +``` + +## Fetch topic offsets by timestamp + +Specify a `timestamp` to get the earliest offset on each partition where the message's timestamp is greater than or equal to the given timestamp. + +```javascript +await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) +// [ +// { partition: 0, offset: '3244' }, +// { partition: 1, offset: '3113' }, +// ] +``` + +## Fetch consumer group offsets + +`fetchOffsets` returns the consumer group offset for a list of topics. + +```javascript +await admin.fetchOffsets({ groupId, topics: ['topic1', 'topic2'] }) +// [ +// { +// topic: 'topic1', +// partitions: [ +// { partition: 0, offset: '31004' }, +// { partition: 1, offset: '54312' }, +// { partition: 2, offset: '32103' }, +// { partition: 3, offset: '28' }, +// ], +// }, +// { +// topic: 'topic2', +// partitions: [ +// { partition: 0, offset: '1234' }, +// { partition: 1, offset: '4567' }, +// ], +// }, +// ] +``` + +Omit `topics` altogether if you want to get the consumer group offsets for all topics with committed offsets. + +Include the optional `resolveOffsets` flag to resolve the offsets without having to start a consumer, useful when fetching directly after calling [resetOffsets](#a-name-reset-offsets-a-reset-consumer-group-offsets): + +```javascript +await admin.resetOffsets({ groupId, topic }) +await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: false }) +// [ +// { partition: 0, offset: '-1' }, +// { partition: 1, offset: '-1' }, +// { partition: 2, offset: '-1' }, +// { partition: 3, offset: '-1' }, +// ] + +await admin.resetOffsets({ groupId, topic }) +await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: true }) +// [ +// { partition: 0, offset: '31004' }, +// { partition: 1, offset: '54312' }, +// { partition: 2, offset: '32103' }, +// { partition: 3, offset: '28' }, +// ] +``` + +## Reset consumer group offsets + +`resetOffsets` resets the consumer group offset to the earliest or latest offset (latest by default). +The consumer group must have no running instances when performing the reset. Otherwise, the command will be rejected. + +```javascript +await admin.resetOffsets({ groupId, topic }) // latest by default +// await admin.resetOffsets({ groupId, topic, earliest: true }) +``` + +## Set consumer group offsets + +`setOffsets` allows you to set the consumer group offset to any value. + +```javascript +await admin.setOffsets({ + groupId: , + topic: , + partitions: , +}) +``` + +`SeekEntry` structure: + +```javascript +{ + partition: , + offset: , +} +``` + +Example: + +```javascript +await admin.setOffsets({ + groupId: 'my-consumer-group', + topic: 'custom-topic', + partitions: [ + { partition: 0, offset: '35' }, + { partition: 3, offset: '19' }, + ] +}) +``` + +## Reset consumer group offsets by timestamp + +Combining `fetchTopicOffsetsByTimestamp` and `setOffsets` can reset a consumer group's offsets on each partition to the earliest offset whose timestamp is greater than or equal to the given timestamp. +The consumer group must have no running instances when performing the reset. Otherwise, the command will be rejected. + +```javascript +await admin.setOffsets({ groupId, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) }) +``` + +## Describe cluster + +Allows you to get information about the broker cluster. This is mostly useful +for monitoring or operations, and is usually not relevant for typical event processing. + +```javascript +await admin.describeCluster() +// { +// brokers: [ +// { nodeId: 0, host: 'localhost', port: 9092 } +// ], +// controller: 0, +// clusterId: 'f8QmWTB8SQSLE6C99G4qzA' +// } +``` + +## Describe configs + +Get the configuration for the specified resources. + +```javascript +await admin.describeConfigs({ + includeSynonyms: , + resources: +}) +``` + +`ResourceConfigQuery` structure: + +```javascript +{ + type: , + name: , + configNames: +} +``` + +Returning all configs for a given resource: + +```javascript +const { ConfigResourceTypes } = require('kafkajs') + +await admin.describeConfigs({ + includeSynonyms: false, + resources: [ + { + type: ConfigResourceTypes.TOPIC, + name: 'topic-name' + } + ] +}) +``` + +Returning specific configs for a given resource: + +```javascript +const { ConfigResourceTypes } = require('kafkajs') + +await admin.describeConfigs({ + includeSynonyms: false, + resources: [ + { + type: ConfigResourceTypes.TOPIC, + name: 'topic-name', + configNames: ['cleanup.policy'] + } + ] +}) +``` + +Take a look at [configResourceTypes](https://github.com/tulios/kafkajs/blob/master/src/protocol/configResourceTypes.js) for a complete list of resources. + +Example response: + +```javascript +{ + resources: [ + { + configEntries: [{ + configName: 'cleanup.policy', + configValue: 'delete', + isDefault: true, + configSource: 5, + isSensitive: false, + readOnly: false + }], + errorCode: 0, + errorMessage: null, + resourceName: 'topic-name', + resourceType: 2 + } + ], + throttleTime: 0 +} +``` + +## Alter configs + +Update the configuration for the specified resources. + +```javascript +await admin.alterConfigs({ + validateOnly: false, + resources: +}) +``` + +`ResourceConfig` structure: + +```javascript +{ + type: , + name: , + configEntries: +} +``` + +`ResourceConfigEntry` structure: + +```javascript +{ + name: , + value: +} +``` + +Example: + +```javascript +const { ConfigResourceTypes } = require('kafkajs') + +await admin.alterConfigs({ + resources: [{ + type: ConfigResourceTypes.TOPIC, + name: 'topic-name', + configEntries: [{ name: 'cleanup.policy', value: 'compact' }] + }] +}) +``` + +Take a look at [configResourceTypes](https://github.com/tulios/kafkajs/blob/master/src/protocol/configResourceTypes.js) for a complete list of resources. + +Example response: + +```javascript +{ + resources: [{ + errorCode: 0, + errorMessage: null, + resourceName: 'topic-name', + resourceType: 2, + }], + throttleTime: 0, +} +``` + +## List groups + +List groups available on the broker. + +```javascript +await admin.listGroups() +``` + +Example response: + +```javascript +{ + groups: [ + {groupId: 'testgroup', protocolType: 'consumer'} + ] +} +``` + +## Describe groups + +Describe consumer groups by `groupId`s. This is similar to [consumer.describeGroup()](Consuming.md#describe-group), except +it allows you to describe multiple groups and does not require you to have a consumer be part of any of those groups. + +```js +await admin.describeGroups([ 'testgroup' ]) +// { +// groups: [{ +// errorCode: 0, +// groupId: 'testgroup', +// members: [ +// { +// clientHost: '/172.19.0.1', +// clientId: 'test-3e93246fe1f4efa7380a', +// memberAssignment: Buffer, +// memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb', +// memberMetadata: Buffer, +// }, +// ], +// protocol: 'RoundRobinAssigner', +// protocolType: 'consumer', +// state: 'Stable', +// }] +// } +``` +Helper function to decode `memeberMetadata` and `memberAssignment` is available in `AssignerProtocol` + +Example: + +`const memberMetadata = AssignerProtocol.MemberMetadata.decode(memberMetadata)` + +`const memberAssignment = AssignerProtocol.MemberAssignment.decode(memberAssignment)` + + +## Delete groups + +Delete groups by `groupId`. + +Note that you can only delete groups with no connected consumers. + +```javascript +await admin.deleteGroups([groupId]) +``` + +Example: + +```javascript +await admin.deleteGroups(['group-test']) +``` + +Example response: + +```javascript +[ + {groupId: 'testgroup', errorCode: 'consumer'} +] +``` + +Because this method accepts multiple `groupId`s, it can fail to delete one or more of the provided groups. In case of failure, it will throw an error containing the failed groups: + +```javascript +try { + await admin.deleteGroups(['a', 'b', 'c']) +} catch (error) { + // error.name 'KafkaJSDeleteGroupsError' + // error.groups = [{ + // groupId: a + // error: KafkaJSProtocolError + // }] +} +``` + +## Delete Topic Records + +Delete records for a selected topic. This will delete all records from the earliest offset up to - but not including - the provided target offset for the given partition(s). To delete all records in a partition, use a target offset of `-1`. + +Note that you cannot delete records in an arbitrary range (it will always be from the earliest available offset) + +```javascript +await admin.deleteTopicRecords({ + topic: , + partitions: , +}) +``` + +Example: + +```javascript +await admin.deleteTopicRecords({ + topic: 'custom-topic', + partitions: [ + { partition: 0, offset: '30' }, // delete up to and including offset 29 + { partition: 3, offset: '-1' }, // delete all available records on this partition + ] +}) +``` + +## Create ACL + +```javascript +const { + AclResourceTypes, + AclOperationTypes, + AclPermissionTypes, + ResourcePatternTypes, +} = require('kafkajs') + +const acl = [ + { + resourceType: AclResourceTypes.TOPIC, + resourceName: 'topic-name', + resourcePatternType: ResourcePatternTypes.LITERAL, + principal: 'User:bob', + host: '*', + operation: AclOperationTypes.ALL, + permissionType: AclPermissionTypes.DENY, + }, + { + resourceType: AclResourceTypes.TOPIC, + resourceName: 'topic-name', + resourcePatternType: ResourcePatternTypes.LITERAL, + principal: 'User:alice', + host: '*', + operation: AclOperationTypes.ALL, + permissionType: AclPermissionTypes.ALLOW, + }, +] + +await admin.createAcls({ acl }) +``` + +Be aware that the security features might be disabled in your cluster. In that case, the operation will throw an error: + +```sh +KafkaJSProtocolError: Security features are disabled +``` + +## Delete ACL + +```javascript +const { + AclResourceTypes, + AclOperationTypes, + AclPermissionTypes, + ResourcePatternTypes, +} = require('kafkajs') + +const acl = { + resourceName: 'topic-name, + resourceType: AclResourceTypes.TOPIC, + host: '*', + permissionType: AclPermissionTypes.ALLOW, + operation: AclOperationTypes.ANY, + resourcePatternType: ResourcePatternTypes.LITERAL, +} + +await admin.deleteAcls({ filters: [acl] }) +// { +// filterResponses: [ +// { +// errorCode: 0, +// errorMessage: null, +// matchingAcls: [ +// { +// errorCode: 0, +// errorMessage: null, +// resourceType: AclResourceTypes.TOPIC, +// resourceName: 'topic-name', +// resourcePatternType: ResourcePatternTypes.LITERAL, +// principal: 'User:alice', +// host: '*', +// operation: AclOperationTypes.ALL, +// permissionType: AclPermissionTypes.ALLOW, +// }, +// ], +// }, +// ], +// } +``` + +Be aware that the security features might be disabled in your cluster. In that case, the operation will throw an error: + +```sh +KafkaJSProtocolError: Security features are disabled +``` + +## Describe ACL + +```javascript +const { + AclResourceTypes, + AclOperationTypes, + AclPermissionTypes, + ResourcePatternTypes, +} = require('kafkajs') + +await admin.describeAcls({ + resourceName: 'topic-name, + resourceType: AclResourceTypes.TOPIC, + host: '*', + permissionType: AclPermissionTypes.ALLOW, + operation: AclOperationTypes.ANY, + resourcePatternTypeFilter: ResourcePatternTypes.LITERAL, +}) +// { +// resources: [ +// { +// resourceType: AclResourceTypes.TOPIC, +// resourceName: 'topic-name, +// resourcePatternType: ResourcePatternTypes.LITERAL, +// acls: [ +// { +// principal: 'User:alice', +// host: '*', +// operation: AclOperationTypes.ALL, +// permissionType: AclPermissionTypes.ALLOW, +// }, +// ], +// }, +// ], +// } +``` + +Be aware that the security features might be disabled in your cluster. In that case, the operation will throw an error: + +```sh +KafkaJSProtocolError: Security features are disabled +``` + +## Alter Partition Reassignments +This is used to reassign the replicas that partitions are on. This method will throw exceptions in the case of errors. + +```typescript +await admin.alterPartitionReassignments({ + topics: , + timeout: // optional - 5000 default +}) +``` + +PartitionReassignment Structure: +```typescript +{ + topic: , + partitionAssignment: // Example: [{ partition: 0, replicas: [0,1,2] }] +} +``` + +## List Partition Reassignments +This is used to list current partition reassignments in progress. This method will throw exceptions in the case of errors and resolve to ListPartitionReassignmentsResponse on success. If a requested partition does not exist it will not be included in the response. + +```javascript +await admin.listPartitionReassignments({ + topics: , // optional, if null then all topics will be returned. + timeout: // optional - 5000 default +}) +``` + +TopicPartitions Structure: +```typescript +{ + topic: , + partitions: +} +``` + +Resulting ListPartitionReassignmentsResponse Structure: +```typescript +{ + topics: +} +``` +OngoingTopicReassignment Structure: +```typescript +{ + topic: , + partitions: +} +``` +OngoingPartitionReassignment Structure: +```typescript +{ + partitionIndex: , + replicas: , // The current replica set + addingReplicas: // The set of replicas being added + removingReplicas: // The set of replicas being removed +} +``` +**Note:** If a partition is not going through a reassignment, its AddingReplicas and RemovingReplicas fields will simply be empty. \ No newline at end of file diff --git a/website/versioned_docs/version-2.2.0/Configuration.md b/website/versioned_docs/version-2.2.0/Configuration.md new file mode 100644 index 000000000..62073b7ee --- /dev/null +++ b/website/versioned_docs/version-2.2.0/Configuration.md @@ -0,0 +1,481 @@ +--- +id: version-2.2.0-configuration +title: Client Configuration +original_id: configuration +--- + +The client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata. + +```javascript +const { Kafka } = require('kafkajs') + +// Create the client with the broker list +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'] +}) +``` + +## Client Id +A logical identifier of an application. Can be used by brokers to apply quotas or trace requests to a specific application. Example: `booking-events-processor`. + +The [kafka documentation](https://kafka.apache.org/documentation/#design_quotasgroups) describes the `clientId` as: + +> Client-id is a logical grouping of clients with a meaningful name chosen by the client application. The tuple (user, client-id) defines a secure logical group of clients that share both user principal and client-id. Quotas can be applied to (user, client-id), user or client-id groups. + +It [also says](https://kafka.apache.org/documentation/#producerconfigs_client.id): + +> `client.id` +> +> An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. + +Therefore the `clientId` should be shared across multiple instances in a cluster or horizontally scaled application, but distinct for each application. + +## Broker discovery + +Normally KafkaJS will notice and react to broker cluster topology changes automatically, but in some circumstances you may want to be able to dynamically +fetch the seed brokers instead of using a statically configured list. In that case, `brokers` can be set to an async function that resolves to +a broker array: + +```javascript +const kafka = new Kafka({ + clientId: 'my-app', + brokers: async () => { + // Example getting brokers from Confluent REST Proxy + const clusterResponse = await fetch('https://kafka-rest:8082/v3/clusters', { + headers: 'application/vnd.api+json', + }).then(response => response.json()) + const clusterUrl = clusterResponse.data[0].links.self + + const brokersResponse = await fetch(`${clusterUrl}/brokers`, { + headers: 'application/vnd.api+json', + }).then(response => response.json()) + + const brokers = brokersResponse.data.map(broker => { + const { host, port } = broker.attributes + return `${host}:${port}` + }) + + return brokers + } +}) +``` + +Note that this discovery mechanism is only used to get the initial set of brokers (i.e. the seed brokers). After successfully connecting to +a broker in this list, Kafka has its own mechanism for discovering the rest of the cluster. + +## SSL + +The `ssl` option can be used to configure the TLS sockets. The options are passed directly to [`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) and used to create the TLS Secure Context, all options are accepted. + +```javascript +const fs = require('fs') + +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + ssl: { + rejectUnauthorized: false, + ca: [fs.readFileSync('/my/custom/ca.crt', 'utf-8')], + key: fs.readFileSync('/my/custom/client-key.pem', 'utf-8'), + cert: fs.readFileSync('/my/custom/client-cert.pem', 'utf-8') + }, +}) +``` + +Refer to [TLS create secure context](https://nodejs.org/dist/latest-v8.x/docs/api/tls.html#tls_tls_createsecurecontext_options) for more information. `NODE_EXTRA_CA_CERTS` can be used to add custom CAs. Use `ssl: true` if you don't have any extra configurations and want to enable SSL. + +## SASL + +Kafka has support for using SASL to authenticate clients. The `sasl` option can be used to configure the authentication mechanism. Currently, KafkaJS supports `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, and `AWS` mechanisms. + +Note that the broker may be configured to reject your authentication attempt if you are not using TLS, even if the credentials themselves are valid. In particular, never authenticate without TLS when using `PLAIN` as your authentication mechanism, as that will transmit your credentials unencrypted in plain text. See [SSL](#ssl) for more information on how to enable TLS. + +### Options + +| option | description | default | +| ------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | +| authenticationTimeout | Timeout in ms for authentication requests | `10000` | +| reauthenticationThreshold | When periodic reauthentication (`connections.max.reauth.ms`) is configured on the broker side, reauthenticate when `reauthenticationThreshold` milliseconds remain of session lifetime. | `10000` | + +### PLAIN/SCRAM Example + +```javascript +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + // authenticationTimeout: 10000, + // reauthenticationThreshold: 10000, + ssl: true, + sasl: { + mechanism: 'plain', // scram-sha-256 or scram-sha-512 + username: 'my-username', + password: 'my-password' + }, +}) +``` + +### OAUTHBEARER Example + +```javascript +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + // authenticationTimeout: 10000, + // reauthenticationThreshold: 10000, + ssl: true, + sasl: { + mechanism: 'oauthbearer', + oauthBearerProvider: async () => { + // Use an unsecured token... + const token = jwt.sign({ sub: 'test' }, 'abc', { algorithm: 'none' }) + + // ...or, more realistically, grab the token from some OAuth endpoint + + return { + value: token + } + } + }, +}) +``` + +The `sasl` object must include a property named `oauthBearerProvider`, an +async function that is used to return the OAuth bearer token. + +The OAuth bearer token must be an object with properties value and +(optionally) extensions, that will be sent during the SASL/OAUTHBEARER +request. + +The implementation of the oauthBearerProvider must take care that tokens are +reused and refreshed when appropriate. An example implementation using +[`simple-oauth2`](https://github.com/lelylan/simple-oauth2) would look something +like the following: + +```ts +import { AccessToken, ClientCredentials } from 'simple-oauth2' +interface OauthBearerProviderOptions { + clientId: string; + clientSecret: string; + host: string; + path: string; + refreshThresholdMs: number; +} + +const oauthBearerProvider = (options: OauthBearerProviderOptions) => { + const client = new ClientCredentials({ + client: { + id: options.clientId, + secret: options.clientSecret + }, + auth: { + tokenHost: options.host, + tokenPath: options.path + } + }); + + let tokenPromise: Promise; + let accessToken: AccessToken; + + async function refreshToken() { + try { + if (accessToken == null) { + accessToken = await client.getToken({}) + } + + if (accessToken.expired(options.refreshThresholdMs / 1000)) { + accessToken = await accessToken.refresh() + } + + const nextRefresh = accessToken.token.expires_in * 1000 - options.refreshThresholdMs; + setTimeout(() => { + tokenPromise = refreshToken() + }, nextRefresh); + + return accessToken.token.access_token; + } catch (error) { + accessToken = null; + throw error; + } + } + + tokenPromise = refreshToken(); + + return async function () { + return { + value: await tokenPromise + } + } +}; + +const kafka = new Kafka({ + // ... other required options + sasl: { + mechanism: 'oauthbearer', + oauthBearerProvider: oauthBearerProvider({ + clientId: 'oauth-client-id', + clientSecret: 'oauth-client-secret', + host: 'https://my-oauth-server.com', + path: '/oauth/token', + // Refresh the token 15 seconds before it expires + refreshThreshold: 15000, + }), + }, +}) +``` + +### AWS IAM Example + +```javascript +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + // authenticationTimeout: 10000, + // reauthenticationThreshold: 10000, + ssl: true, + sasl: { + mechanism: 'aws', + authorizationIdentity: 'AIDAIOSFODNN7EXAMPLE', // UserId or RoleId + accessKeyId: 'AKIAIOSFODNN7EXAMPLE', + secretAccessKey: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', + sessionToken: 'WHArYt8i5vfQUrIU5ZbMLCbjcAiv/Eww6eL9tgQMJp6QFNEXAMPLETOKEN' // Optional + }, +}) +``` + +For more information on the basics of IAM credentials and authentication, see the +[AWS Security Credentials - Access Keys](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) page. + +Use of this functionality requires +[STACK's Kafka AWS IAM LoginModule](https://github.com/STACK-Fintech/kafka-auth-aws-iam), or a +compatible alternative to be installed on all of the target brokers. + +In the above example, the `authorizationIdentity` must be the `aws:userid` of the AWS IAM +identity. Typically, you can retrieve this value using the `aws iam get-user` or `aws iam get-role` +commands of the [AWS CLI toolkit](https://aws.amazon.com/cli/). The `aws:userid` is usually listed +as the `UserId` or `RoleId` property of the response. + +You can also programmatically retrieve the `aws:userid` for currently available credentials with the +[AWS SDK's Security Token Service](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/STS.html). + +A complete breakdown can be found in the IAM User Guide's +[Reference on Policy Variables](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_variables.html#policy-vars-infotouse). + +### Use Encrypted Protocols + +It is **highly recommended** that you use SSL for encryption when using `PLAIN` or `AWS`, +otherwise credentials will be transmitted in cleartext! + +### Custom Authentication Mechanisms + +If an authentication mechanism is not supported out of the box in KafkaJS, a custom authentication +mechanism can be introduced as a plugin: + +```js +{ + sasl: { + mechanism: , + authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise } + } +} +``` + +See [Custom Authentication Mechanisms](CustomAuthenticationMechanism.md) for more information on how to implement your own +authentication mechanism. + +## Connection Timeout + +Time in milliseconds to wait for a successful connection. The default value is: `1000`. + +```javascript +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + connectionTimeout: 3000 +}) +``` + +## Request Timeout + +Time in milliseconds to wait for a successful request. The default value is: `30000`. + +```javascript +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + requestTimeout: 25000 +}) +``` + +The request timeout can be disabled by setting `enforceRequestTimeout` to `false`. + +```javascript +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + enforceRequestTimeout: false +}) +``` + +## Default Retry + +The `retry` option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers). + +The retry mechanism uses a randomization function that grows exponentially. +[Detailed example](RetryDetailed.md) + +If the max number of retries is exceeded the retrier will throw `KafkaJSNumberOfRetriesExceeded` and interrupt. Producers will bubble up the error to the user code; Consumers will wait the retry time attached to the exception (it will be based on the number of attempts) and perform a full restart. + +__Available options:__ + +| option | description | default | +| ------------------- | ----------------------------------------------------------------------------------------------------------------------- | ------------------- | +| maxRetryTime | Maximum wait time for a retry in milliseconds | `30000` | +| initialRetryTime | Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor) | `300` | +| factor | Randomization factor | `0.2` | +| multiplier | Exponential factor | `2` | +| retries | Max number of retries per call | `5` | +| restartOnFailure | Only used in consumer. See [`restartOnFailure`](#restartonfailure) | `async () => true` | + +Example: + +```javascript +new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + retry: { + initialRetryTime: 100, + retries: 8 + } +}) +``` + +### `restartOnFailure` + +An async function that will be invoked after the consumer exhausts all retries, to decide whether or not to restart the consumer (essentially resetting `consumer.run`). This can be used to, for example, cleanly shut down resources before crashing, if that is preferred. The function will be passed the error, which allows it to decide based on the type of error whether or not to exit the application or allow it to restart. + +The function has the following signature: `(error: Error) => Promise` + +* If the promise resolves to `true`: the consumer will restart +* If the promise resolves to `false`: the consumer will **not** restart +* If the promise rejects: the consumer will restart +* If there is no `restartOnFailure` provided: the consumer will restart + +Note that the function will only ever be invoked for what KafkaJS considers retriable errors. On non-retriable errors, the consumer will not be restarted and the `restartOnFailure` function will not be invoked. [See this list](https://kafka.apache.org/protocol#protocol_error_codes) for retriable errors in the Kafka protocol, but note that some additional errors will still be considered retriable in KafkaJS, such as for example network connection errors. + +## Logging + +KafkaJS has a built-in `STDOUT` logger which outputs JSON. It also accepts a custom log creator which allows you to integrate your favorite logger library. There are 5 log levels available: `NOTHING`, `ERROR`, `WARN`, `INFO`, and `DEBUG`. `INFO` is configured by default. + +##### Log level + +```javascript +const { Kafka, logLevel } = require('kafkajs') + +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + logLevel: logLevel.ERROR +}) +``` + +To override the log level after instantiation, call `setLogLevel` on the individual logger. + +```javascript +const { Kafka, logLevel } = require('kafkajs') + +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + logLevel: logLevel.ERROR +}) +kafka.logger().setLogLevel(logLevel.WARN) + +const producer = kafka.producer(...) +producer.logger().setLogLevel(logLevel.INFO) + +const consumer = kafka.consumer(...) +consumer.logger().setLogLevel(logLevel.DEBUG) + +const admin = kafka.admin(...) +admin.logger().setLogLevel(logLevel.NOTHING) +``` + +The environment variable `KAFKAJS_LOG_LEVEL` can also be used and it has precedence over the configuration in code, example: + +```sh +KAFKAJS_LOG_LEVEL=info node code.js +``` + +> Note: for more information on how to customize your logs, take a look at [Custom logging](CustomLogger.md) + +## Custom socket factory + +To allow for custom socket configurations, the client accepts an optional `socketFactory` property that will be used to construct +any socket. + +`socketFactory` should be a function that returns an object compatible with [`net.Socket`](https://nodejs.org/api/net.html#net_class_net_socket) (see the [default implementation](https://github.com/tulios/kafkajs/tree/master/src/network/socketFactory.js)). + +```javascript +const { Kafka } = require('kafkajs') + +// Example socket factory setting a custom TTL +const net = require('net') +const tls = require('tls') + +const myCustomSocketFactory = ({ host, port, ssl, onConnect }) => { + const socket = ssl + ? tls.connect( + Object.assign({ host, port }, ssl), + onConnect + ) + : net.connect( + { host, port }, + onConnect + ) + + socket.setKeepAlive(true, 30000) + + return socket +} + +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['kafka1:9092', 'kafka2:9092'], + socketFactory: myCustomSocketFactory, +}) +``` + +### Proxy support + +Support for proxying traffic can be implemented with a custom socket factory. In the example +below we are using [proxy-chain](https://github.com/apify/proxy-chain) to integrate with the +proxy server, but any other proxy library can be used as long as the socket factory interface +is implemented. + +```javascript +const tls = require('tls') +const net = require('net') +const { createTunnel, closeTunnel } = require('proxy-chain') + +const socketFactory = ({ host, port, ssl, onConnect }) => { + const socket = ssl ? new tls.TLSSocket() : new net.Socket() + + createTunnel(process.env.HTTP_PROXY, `${host}:${port}`) + .then((tunnelAddress) => { + const [tunnelHost, tunnelPort] = tunnelAddress.split(':') + socket.setKeepAlive(true, 60000) + socket.connect( + Object.assign({ host: tunnelHost, port: tunnelPort, servername: host }, ssl), + onConnect + ) + + socket.on('close', () => { + closeTunnel(tunnelServer, true) + }) + }) + .catch(error => socket.emit('error', error)) + + return socket +} +``` \ No newline at end of file diff --git a/website/versioned_docs/version-2.2.0/ConsumerExample.md b/website/versioned_docs/version-2.2.0/ConsumerExample.md new file mode 100644 index 000000000..29264b45c --- /dev/null +++ b/website/versioned_docs/version-2.2.0/ConsumerExample.md @@ -0,0 +1,213 @@ +--- +id: version-2.2.0-consumer-example +title: Consumer +original_id: consumer-example +--- + +The following example assumes that you are using the local Kafka configuration described in [Running Kafka in Development](DockerLocal.md). + +```javascript +const ip = require('ip') + +const { Kafka, logLevel } = require('kafkajs') + +const host = process.env.HOST_IP || ip.address() + +const kafka = new Kafka({ + logLevel: logLevel.INFO, + brokers: [`${host}:9092`], + clientId: 'example-consumer', +}) + +const topic = 'topic-test' +const consumer = kafka.consumer({ groupId: 'test-group' }) + +const run = async () => { + await consumer.connect() + await consumer.subscribe({ topic, fromBeginning: true }) + await consumer.run({ + // eachBatch: async ({ batch }) => { + // console.log(batch) + // }, + eachMessage: async ({ topic, partition, message }) => { + const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}` + console.log(`- ${prefix} ${message.key}#${message.value}`) + }, + }) +} + +run().catch(e => console.error(`[example/consumer] ${e.message}`, e)) + +const errorTypes = ['unhandledRejection', 'uncaughtException'] +const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] + +errorTypes.forEach(type => { + process.on(type, async e => { + try { + console.log(`process.on ${type}`) + console.error(e) + await consumer.disconnect() + process.exit(0) + } catch (_) { + process.exit(1) + } + }) +}) + +signalTraps.forEach(type => { + process.once(type, async () => { + try { + await consumer.disconnect() + } finally { + process.kill(process.pid, type) + } + }) +}) +``` + +## TypeScript Example + +A similar example in TypeScript + +```typescript +import { Consumer, ConsumerSubscribeTopics, EachBatchPayload, Kafka, EachMessagePayload } from 'kafkajs' + +export default class ExampleConsumer { + private kafkaConsumer: Consumer + private messageProcessor: ExampleMessageProcessor + + public constructor(messageProcessor: ExampleMessageProcessor) { + this.messageProcessor = messageProcessor + this.kafkaConsumer = this.createKafkaConsumer() + } + + public async startConsumer(): Promise { + const topic: ConsumerSubscribeTopics = { + topics: ['example-topic'], + fromBeginning: false + } + + try { + await this.kafkaConsumer.connect() + await this.kafkaConsumer.subscribe(topic) + + await this.kafkaConsumer.run({ + eachMessage: async (messagePayload: EachMessagePayload) => { + const { topic, partition, message } = messagePayload + const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}` + console.log(`- ${prefix} ${message.key}#${message.value}`) + } + }) + } catch (error) { + console.log('Error: ', error) + } + } + + public async startBatchConsumer(): Promise { + const topic: ConsumerSubscribeTopics = { + topics: ['example-topic'], + fromBeginning: false + } + + try { + await this.kafkaConsumer.connect() + await this.kafkaConsumer.subscribe(topic) + await this.kafkaConsumer.run({ + eachBatch: async (eachBatchPayload: EachBatchPayload) => { + const { batch } = eachBatchPayload + for (const message of batch.messages) { + const prefix = `${batch.topic}[${batch.partition} | ${message.offset}] / ${message.timestamp}` + console.log(`- ${prefix} ${message.key}#${message.value}`) + } + } + }) + } catch (error) { + console.log('Error: ', error) + } + } + + public async shutdown(): Promise { + await this.kafkaConsumer.disconnect() + } + + private createKafkaConsumer(): Consumer { + const kafka = new Kafka({ + clientId: 'client-id', + brokers: ['example.kafka.broker:9092'] + }) + const consumer = kafka.consumer({ groupId: 'consumer-group' }) + return consumer + } +} +``` + +## SSL & SASL Authentication + +The following example assumes a valid SSL certificate and SASL authentication using the `scram-sha-256` mechanism. Other mechanisms are also available (see [Client Configuration](Configuration.md#sasl)). + +```javascript +const ip = require('ip') + +const { Kafka, logLevel } = require('../index') + +const host = process.env.HOST_IP || ip.address() + +const kafka = new Kafka({ + logLevel: logLevel.INFO, + brokers: [`${host}:9094`], + clientId: 'example-consumer', + ssl: { + rejectUnauthorized: true + }, + sasl: { + mechanism: 'scram-sha-256', + username: 'test', + password: 'testtest', + }, +}) + +const topic = 'topic-test' +const consumer = kafka.consumer({ groupId: 'test-group' }) + +const run = async () => { + await consumer.connect() + await consumer.subscribe({ topic, fromBeginning: true }) + await consumer.run({ + // eachBatch: async ({ batch }) => { + // console.log(batch) + // }, + eachMessage: async ({ topic, partition, message }) => { + const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}` + console.log(`- ${prefix} ${message.key}#${message.value}`) + }, + }) +} + +run().catch(e => console.error(`[example/consumer] ${e.message}`, e)) + +const errorTypes = ['unhandledRejection', 'uncaughtException'] +const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] + +errorTypes.forEach(type => { + process.on(type, async e => { + try { + console.log(`process.on ${type}`) + console.error(e) + await consumer.disconnect() + process.exit(0) + } catch (_) { + process.exit(1) + } + }) +}) + +signalTraps.forEach(type => { + process.once(type, async () => { + try { + await consumer.disconnect() + } finally { + process.kill(process.pid, type) + } + }) +}) +``` diff --git a/website/versioned_docs/version-2.2.0/CustomAuthenticationMechanism.md b/website/versioned_docs/version-2.2.0/CustomAuthenticationMechanism.md new file mode 100644 index 000000000..2e563fe35 --- /dev/null +++ b/website/versioned_docs/version-2.2.0/CustomAuthenticationMechanism.md @@ -0,0 +1,167 @@ +--- +id: version-2.2.0-custom-authentication-mechanism +title: Custom Authentication Mechanisms +original_id: custom-authentication-mechanism +--- + +To use an authentication mechanism that is not supported out of the box by KafkaJS, +custom authentication mechanisms can be introduced: + +```js +{ + sasl: { + mechanism: , + authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise } + } +} +``` + +`` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms` +property in `server.properties`. See the Kafka documentation for information on how to +configure your brokers. + +## Writing a custom authentication mechanism + +A custom authentication mechanism needs to fulfill the following interface: + +```ts +type AuthenticationProviderArgs = { + host: string + port: number + logger: Logger + saslAuthenticate: ( + request: SaslAuthenticationRequest, + response?: SaslAuthenticationResponse + ) => Promise +} + +type Mechanism = { + mechanism: string + authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator +} + +type Authenticator = { + authenticate(): Promise +} + +type SaslAuthenticationRequest = { + encode: () => Buffer | Promise +} + +type SaslAuthenticationResponse = { + decode: (rawResponse: Buffer) => Buffer | Promise + parse: (data: Buffer) => ParseResult +} +``` +* `host` - Hostname of the specific broker to connect to +* `port` - Port of the specific broker to connect to +* `logger` - A logger instance namespaced to the authentication mechanism +* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally +decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected. +### Example +In this example we will create a custom authentication mechanism called `simon`. The general +flow will be: +1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +request with the value of `says` as `auth_bytes`. +2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes` +should equal `says`, if it does not start with "Simon says", it should be an empty string. + +**This is a made up example!** + +It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism. + +```js +const simonAuthenticator = says = ({ host, port, logger, saslAuthenticate }) => { + const INT32_SIZE = 4 + + const request = { + /** + * Encodes the value for `auth_bytes` in SaslAuthenticate request + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * In this example, we are just sending `says` as a string, + * with the length of the string in bytes prepended as an int32 + **/ + encode: () => { + const byteLength = Buffer.byteLength(says, 'utf8') + const buf = Buffer.alloc(INT32_SIZE + byteLength) + buf.writeUInt32BE(byteLength, 0) + buf.write(says, INT32_SIZE, byteLength, 'utf8') + return buf + }, + } + const response = { + /** + * Decodes the `auth_bytes` in SaslAuthenticate response + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * This is essentially the reverse of `request.encode`, where + * we read the length of the string as an int32 and then read + * that many bytes + */ + decode: rawData => { + const byteLength = rawData.readInt32BE(0) + return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength) + }, + /** + * The return value from `response.decode` is passed into + * this function, which is responsible for interpreting + * the data. In this case, we just turn the buffer back + * into a string + */ + parse: data => { + return data.toString() + }, + } + return { + /** + * This function is responsible for orchestrating the authentication flow. + * Essentially we will send a SaslAuthenticate request with the + * value of `sasl.says` to the broker, and expect to + * get the same value back. + * + * Other authentication methods may do any other operations they + * like, but communication with the brokers goes through + * the SaslAuthenticate request. + */ + authenticate: async () => { + if (says == null) { + throw new Error('SASL Simon: Invalid "says"') + } + const broker = `${host}:${port}` + try { + logger.info('Authenticate with SASL Simon', { broker }) + const authenticateResponse = await saslAuthenticate({ request, response }) + + const saidSimon = says.startsWith("Simon says ") + const expectedResponse = saidSimon ? says : "" + if (authenticateResponse !== expectedResponse) { + throw new Error("Mismatching response from broker") + } + logger.info('SASL Simon authentication successful', { broker }) + } catch (e) { + const error = new Error( + `SASL Simon authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, + } +} +``` + +The `response` argument to `saslAuthenticate` is optional, in case the authentication +method does not require the `auth_bytes` in the response. + +In the example above, we expect the client to be configured as such: + +```js +const config = { + sasl: { + mechanism: 'simon' + authenticationProvider: simonAuthenticator('Simon says authenticate me') + } +} +``` \ No newline at end of file diff --git a/website/versions.json b/website/versions.json index 10d529481..35c431656 100644 --- a/website/versions.json +++ b/website/versions.json @@ -1,4 +1,5 @@ [ + "2.2.0", "2.1.0", "2.0.1", "2.0.0",