Skip to content

Commit

Permalink
Merge pull request #1542 from tulios/v2.2.4-docs
Browse files Browse the repository at this point in the history
Versioned docs for 2.2.4
  • Loading branch information
Nevon authored Feb 27, 2023
2 parents c084728 + 8c3c52f commit ff3b111
Show file tree
Hide file tree
Showing 6 changed files with 1,233 additions and 0 deletions.
476 changes: 476 additions & 0 deletions website/versioned_docs/version-2.2.4/Consuming.md

Large diffs are not rendered by default.

171 changes: 171 additions & 0 deletions website/versioned_docs/version-2.2.4/CustomAuthenticationMechanism.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
---
id: version-2.2.4-custom-authentication-mechanism
title: Custom Authentication Mechanisms
original_id: custom-authentication-mechanism
---

To use an authentication mechanism that is not supported out of the box by KafkaJS,
custom authentication mechanisms can be introduced:

```js
{
sasl: {
mechanism: <mechanism name>,
authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise<void> }
}
}
```

`<mechanism name>` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms`
property in `server.properties`. See the Kafka documentation for information on how to
configure your brokers.

## Writing a custom authentication mechanism

A custom authentication mechanism needs to fulfill the following interface:

```ts
type SaslAuthenticateArgs<ParseResult> = {
request: SaslAuthenticationRequest
response?: SaslAuthenticationResponse<ParseResult>
}

type AuthenticationProviderArgs = {
host: string
port: number
logger: Logger
saslAuthenticate: <ParseResult>(
args: SaslAuthenticateArgs<ParseResult>
) => Promise<ParseResult | void>
}

type Mechanism = {
mechanism: string
authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator
}

type Authenticator = {
authenticate(): Promise<void>
}

type SaslAuthenticationRequest = {
encode: () => Buffer | Promise<Buffer>
}

type SaslAuthenticationResponse<ParseResult> = {
decode: (rawResponse: Buffer) => Buffer | Promise<Buffer>
parse: (data: Buffer) => ParseResult
}
```
* `host` - Hostname of the specific broker to connect to
* `port` - Port of the specific broker to connect to
* `logger` - A logger instance namespaced to the authentication mechanism
* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate)
requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally
decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected.
### Example
In this example we will create a custom authentication mechanism called `simon`. The general
flow will be:
1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate)
request with the value of `says` as `auth_bytes`.
2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes`
should equal `says`, if it does not start with "Simon says", it should be an empty string.
**This is a made up example!**
It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism.
```js
const simonAuthenticator = says = ({ host, port, logger, saslAuthenticate }) => {
const INT32_SIZE = 4

const request = {
/**
* Encodes the value for `auth_bytes` in SaslAuthenticate request
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
*
* In this example, we are just sending `says` as a string,
* with the length of the string in bytes prepended as an int32
**/
encode: () => {
const byteLength = Buffer.byteLength(says, 'utf8')
const buf = Buffer.alloc(INT32_SIZE + byteLength)
buf.writeUInt32BE(byteLength, 0)
buf.write(says, INT32_SIZE, byteLength, 'utf8')
return buf
},
}
const response = {
/**
* Decodes the `auth_bytes` in SaslAuthenticate response
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
*
* This is essentially the reverse of `request.encode`, where
* we read the length of the string as an int32 and then read
* that many bytes
*/
decode: rawData => {
const byteLength = rawData.readInt32BE(0)
return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength)
},
/**
* The return value from `response.decode` is passed into
* this function, which is responsible for interpreting
* the data. In this case, we just turn the buffer back
* into a string
*/
parse: data => {
return data.toString()
},
}
return {
/**
* This function is responsible for orchestrating the authentication flow.
* Essentially we will send a SaslAuthenticate request with the
* value of `sasl.says` to the broker, and expect to
* get the same value back.
*
* Other authentication methods may do any other operations they
* like, but communication with the brokers goes through
* the SaslAuthenticate request.
*/
authenticate: async () => {
if (says == null) {
throw new Error('SASL Simon: Invalid "says"')
}
const broker = `${host}:${port}`
try {
logger.info('Authenticate with SASL Simon', { broker })
const authenticateResponse = await saslAuthenticate({ request, response })

const saidSimon = says.startsWith("Simon says ")
const expectedResponse = saidSimon ? says : ""
if (authenticateResponse !== expectedResponse) {
throw new Error("Mismatching response from broker")
}
logger.info('SASL Simon authentication successful', { broker })
} catch (e) {
const error = new Error(
`SASL Simon authentication failed: ${e.message}`
)
logger.error(error.message, { broker })
throw error
}
},
}
}
```

The `response` argument to `saslAuthenticate` is optional, in case the authentication
method does not require the `auth_bytes` in the response.

In the example above, we expect the client to be configured as such:

```js
const config = {
sasl: {
mechanism: 'simon'
authenticationProvider: simonAuthenticator('Simon says authenticate me')
}
}
```
99 changes: 99 additions & 0 deletions website/versioned_docs/version-2.2.4/CustomLogger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
id: version-2.2.4-custom-logger
title: Custom Logger
original_id: custom-logger
---

The logger is customized using log creators. A log creator is a function which receives a log level and returns a log function. The log function receives namespace, level, label, and log.

- `namespace` identifies the component which is performing the log, for example, connection or consumer.
- `level` is the log level of the log entry.
- `label` is a text representation of the log level, example: 'INFO'.
- `log` is an object with the following keys: `timestamp`, `logger`, `message`, and the extra keys given by the user. (`logger.info('test', { extra_data: true })`)

```javascript
{
level: 4,
label: 'INFO', // NOTHING, ERROR, WARN, INFO, or DEBUG
timestamp: '2017-12-29T13:39:54.575Z',
logger: 'kafkajs',
message: 'Started',
// ... any other extra key provided to the log function
}
```

The general structure looks like this:

```javascript
const MyLogCreator = logLevel => ({ namespace, level, label, log }) => {
// Example:
// const { timestamp, logger, message, ...others } = log
// console.log(`${label} [${namespace}] ${message} ${JSON.stringify(others)}`)
}
```

Example using [Winston](https://github.com/winstonjs/winston):

```javascript
const { logLevel } = require('kafkajs')
const winston = require('winston')
const toWinstonLogLevel = level => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return 'error'
case logLevel.WARN:
return 'warn'
case logLevel.INFO:
return 'info'
case logLevel.DEBUG:
return 'debug'
}
}

const WinstonLogCreator = logLevel => {
const logger = winston.createLogger({
level: toWinstonLogLevel(logLevel),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'myapp.log' })
]
})

return ({ namespace, level, label, log }) => {
const { message, ...extra } = log
logger.log({
level: toWinstonLogLevel(level),
message,
extra,
})
}
}
```

Once you have your log creator you can use the `logCreator` option to configure the client:

```javascript
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
logLevel: logLevel.ERROR,
logCreator: WinstonLogCreator
})
```

To get access to the namespaced logger of a consumer, producer, admin or root Kafka client after instantiation, you can use the `logger` method:

```javascript
const client = new Kafka( ... )
client.logger().info( ... )

const consumer = kafka.consumer( ... )
consumer.logger().info( ... )

const producer = kafka.producer( ... )
producer.logger().info( ... )

const admin = kafka.admin( ... )
admin.logger().info( ... )
```
Loading

0 comments on commit ff3b111

Please sign in to comment.