Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka net config MaxOpenRequests #1224

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions docs/cn/data-pipeline/flusher/flusher-kafka_v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
| Compression | String | 否 | 压缩算法,可选值:`none`, `snappy`,`lz4`和`gzip`,默认值`none` |
| CompressionLevel | Int | 否 | 压缩级别,可选值:`1~9`,默认值:`4`,设置为`0`则禁用`Compression` |
| MaxMessageBytes | Int | 否 | 一个批次提交的大小限制,配置和`message.max.bytes`对应,默认值:`1000000` |
| MaxOpenRequests | Int | 否 | 一个连接允许的最大打开的请求数,默认值:`5` |
| MaxRetries | Int | 否 | 提交失败重试次数,最大`3`次,默认值:`3` |
| BulkMaxSize | Int | 否 | 单次请求提交事件数,默认`2048` |
| BulkFlushFrequency | Int | 否 | 发送批量 Kafka 请求之前等待的时间,0标识没有时延,默认值:`0` |
Expand All @@ -61,8 +62,9 @@
| ClientID | String | 否 | 写入Kafka的Client ID,默认取值:`LogtailPlugin`。 |

- `Version`需要填写的是`kafka protocol version`版本号,`flusher_kafka_v2`当前支持的`kafka`版本范围:`0.8.2.x~3.3.1`。
请根据自己的`kafka`版本号参照下面的`kafka protocol version`规则进行配置。**建议根据自己的`kafka`版本指定对应`protocol version`**,
`kafka protocol version`支持版本号如下:
请根据自己的`kafka`版本号参照下面的`kafka protocol version`规则进行配置。**建议根据自己的`kafka`
版本指定对应`protocol version`**,
`kafka protocol version`支持版本号如下:

```plain
0.8.2.0,0.8.2.1,0.8.2.2
Expand Down Expand Up @@ -103,7 +105,7 @@ inputs:
FilePattern: "*.log"
flushers:
- Type: flusher_kafka_v2
Brokers:
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Expand Down Expand Up @@ -140,7 +142,7 @@ flushers:
"time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name":"java_app",
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
Expand All @@ -160,6 +162,7 @@ Topic: test_%{content.application}

最后`ilogtail`就自动将日志推送到`test_springboot-docker`这个`topic`中。
`topic`动态表达式规则:

- `%{content.fieldname}`。`content`代表从`contents`中取指定字段值
- `%{tag.fieldname}`,`tag`表示从`tags`中取指定字段值,例如:`%{tag.k8s.namespace.name}`
- `${env_name}`, 读取系统变量绑定到动态`topic`上,`ilogtail 1.5.0`开始支持。
Expand All @@ -168,12 +171,13 @@ Topic: test_%{content.application}
#### 动态topic中使用系统变量

动态`topic`绑定系统变量的两种场景:

- 将系统变量采集添加到日志的`tag`中,然后使用`%{tag.fieldname}`规则完成绑定。
- 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的`topic`中,直接采用`${env_name}`规则完成绑定,此方式需要`1.5.0`才支持。
- 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的`topic`中,直接采用`${env_name}`
规则完成绑定,此方式需要`1.5.0`才支持。

由于上面提到的两种系统变量的采集绑定都需要做一些特殊配置,因此下面将分别介绍下相关的配置操作。


**(1)将系统变量采集到日志中完成动态`topic`绑定**

将系统变量采集添加到日志中有两种方式,一种是在`ilogtail`容器`env`添加,另一种是通过`processor_add_fields` 插件添加,
Expand All @@ -184,35 +188,38 @@ Topic: test_%{content.application}
```yaml
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_|_app_name_
value: _node_name_|_node_ip_|_app_name_
- name: _app_name_ # 添加自定义_app_name_变量,
value: kafka
```

自定义的变量`_app_name_`被添加到`ALIYUN_LOG_ENV_TAGS`中,日志的`tags`中会看到自定义的变量, 此时动态 `topic`采用`%{tag.fieldname}`规则配置即可。
自定义的变量`_app_name_`被添加到`ALIYUN_LOG_ENV_TAGS`中,日志的`tags`中会看到自定义的变量, 此时动态 `topic`
采用`%{tag.fieldname}`规则配置即可。

- 使用`processor_add_fields` 插件系统变量添加到日志中,配置参考如下:

```yaml
processors:
- Type: processor_add_fields
Fields:
service: ${env_name}
IgnoreIfExist: false
- Type: processor_add_fields
Fields:
service: ${env_name}
IgnoreIfExist: false
```

这里`${env_name}`生效依赖于`ilogtail`的`enable_env_ref_in_config`配置,从`ilogtail 1.5.0`开始支持。

**(2)直接采用`$`符将系统变量绑定动态`topic`中**

在`daemonset`或者`sidecar`方式部署的`ilogtail`容器`env`配置部分添加自定义的系统变量,配置参考案例如下:

```yaml
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_
- name: app_name # 添加自定义app_name变量,
value: kafka
```

`app_name`添加到系统变量中后,直接采用动态topic的:`${env_name}`规则即可绑定。

```yaml
Expand All @@ -223,14 +230,14 @@ inputs:
FilePattern: "*.log"
flushers:
- Type: flusher_kafka_v2
Brokers:
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: ilogtail_${app_name}
```
- `${app_name}`就是我们上面添加的系统变量。

- `${app_name}`就是我们上面添加的系统变量。

### TagFieldsRename

Expand Down Expand Up @@ -304,7 +311,7 @@ flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
Expand All @@ -313,11 +320,12 @@ flushers:
```

- `content.application`中表示从`contents`中取数据`application`字段数据,如果对`contents`协议字段做了重命名,
例如重名为`messege`,则应该配置为`messege.application`
例如重名为`messege`,则应该配置为`messege.application`

### 配置Headers

`iLogtail`中`Kafka`的消息头是以键值对数组的形式配置的。`header`中`value`仅支持字符串类型。

```yaml
enable: true
inputs:
Expand All @@ -337,10 +345,12 @@ flushers:
- key: "key2"
value: "value2"
```

### 数据平铺

`ilogtail 1.8.0`新增数据平铺协议`custom_single_flatten`,`contents`、`tags`和`time`三个`convert`层的协议字段中数据做一级打平。
当前`convert`协议在单条数据处理仅支持`json`编码,因此`custom_single_flatten`需要配合`json`编码一起使用。

```yaml
enable: true
inputs:
Expand Down Expand Up @@ -372,28 +382,30 @@ flushers:
"@time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name":"java_app",
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
},
"time": 1664435098
}
```

使用平铺协议后`custom_single_flatten`,`json`全部被一级平铺。

```json
{
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415",
"k8s.namespace.name":"java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log",
"time": 1664435098
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415",
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log",
"time": 1664435098
}
```

Expand All @@ -420,7 +432,7 @@ flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
Expand Down Expand Up @@ -450,7 +462,7 @@ flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
Expand Down
8 changes: 6 additions & 2 deletions plugins/flusher/kafkav2/flusher_kafka_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type FlusherKafka struct {
// The compression level must be in the range of 1 (best speed) to 9 (best compression)
// The default value is 4.
CompressionLevel int
// How many outstanding requests a connection is allowed to have before
// sending on it blocks (default 5).
MaxOpenRequests int

// The maximum number of events to bulk in a single Kafka request. The default is 2048.
BulkMaxSize int
Expand Down Expand Up @@ -179,6 +182,7 @@ func NewFlusherKafka() *FlusherKafka {
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
MaxOpenRequests: 5,
BrokerTimeout: 10 * time.Second,
Compression: "none",
CompressionLevel: 4,
Expand Down Expand Up @@ -387,6 +391,7 @@ func newSaramaConfig(config *FlusherKafka) (*sarama.Config, error) {
k := sarama.NewConfig()

// configure network level properties
k.Net.MaxOpenRequests = config.MaxOpenRequests
timeout := config.Timeout
k.Net.DialTimeout = timeout
k.Net.ReadTimeout = timeout
Expand Down Expand Up @@ -559,8 +564,7 @@ func (k *FlusherKafka) makeHeaders() []sarama.RecordHeader {
}

func (k *FlusherKafka) getConverter() (*converter.Converter, error) {
logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol,
"Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename)
logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol, "Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename)
return converter.NewConverter(k.Convert.Protocol, k.Convert.Encoding, k.Convert.TagFieldsRename, k.Convert.ProtocolFieldsRename)
}

Expand Down
Loading