diff --git a/docs/cn/data-pipeline/flusher/flusher-kafka_v2.md b/docs/cn/data-pipeline/flusher/flusher-kafka_v2.md index 9cbe609965..48c75bdbbf 100644 --- a/docs/cn/data-pipeline/flusher/flusher-kafka_v2.md +++ b/docs/cn/data-pipeline/flusher/flusher-kafka_v2.md @@ -47,6 +47,7 @@ | Compression | String | 否 | 压缩算法,可选值:`none`, `snappy`,`lz4`和`gzip`,默认值`none` | | CompressionLevel | Int | 否 | 压缩级别,可选值:`1~9`,默认值:`4`,设置为`0`则禁用`Compression` | | MaxMessageBytes | Int | 否 | 一个批次提交的大小限制,配置和`message.max.bytes`对应,默认值:`1000000` | +| MaxOpenRequests | Int | 否 | 一个连接允许的最大打开的请求数,默认值:`5` | | MaxRetries | Int | 否 | 提交失败重试次数,最大`3`次,默认值:`3` | | BulkMaxSize | Int | 否 | 单次请求提交事件数,默认`2048` | | BulkFlushFrequency | Int | 否 | 发送批量 Kafka 请求之前等待的时间,0标识没有时延,默认值:`0` | @@ -61,8 +62,9 @@ | ClientID | String | 否 | 写入Kafka的Client ID,默认取值:`LogtailPlugin`。 | - `Version`需要填写的是`kafka protocol version`版本号,`flusher_kafka_v2`当前支持的`kafka`版本范围:`0.8.2.x~3.3.1`。 -请根据自己的`kafka`版本号参照下面的`kafka protocol version`规则进行配置。**建议根据自己的`kafka`版本指定对应`protocol version`**, -`kafka protocol version`支持版本号如下: + 请根据自己的`kafka`版本号参照下面的`kafka protocol version`规则进行配置。**建议根据自己的`kafka` + 版本指定对应`protocol version`**, + `kafka protocol version`支持版本号如下: ```plain 0.8.2.0,0.8.2.1,0.8.2.2 @@ -103,7 +105,7 @@ inputs: FilePattern: "*.log" flushers: - Type: flusher_kafka_v2 - Brokers: + Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 @@ -140,7 +142,7 @@ flushers: "time": "2022-07-20 16:55:05.415" }, "tags": { - "k8s.namespace.name":"java_app", + "k8s.namespace.name": "java_app", "host.ip": "192.168.6.128", "host.name": "master", "log.file.path": "/data/test.log" @@ -160,6 +162,7 @@ Topic: test_%{content.application} 最后`ilogtail`就自动将日志推送到`test_springboot-docker`这个`topic`中。 `topic`动态表达式规则: + - `%{content.fieldname}`。`content`代表从`contents`中取指定字段值 - `%{tag.fieldname}`,`tag`表示从`tags`中取指定字段值,例如:`%{tag.k8s.namespace.name}` - `${env_name}`, 读取系统变量绑定到动态`topic`上,`ilogtail 1.5.0`开始支持。 @@ -168,12 +171,13 @@ Topic: test_%{content.application} #### 动态topic中使用系统变量 动态`topic`绑定系统变量的两种场景: + - 将系统变量采集添加到日志的`tag`中,然后使用`%{tag.fieldname}`规则完成绑定。 -- 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的`topic`中,直接采用`${env_name}`规则完成绑定,此方式需要`1.5.0`才支持。 +- 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的`topic`中,直接采用`${env_name}` + 规则完成绑定,此方式需要`1.5.0`才支持。 由于上面提到的两种系统变量的采集绑定都需要做一些特殊配置,因此下面将分别介绍下相关的配置操作。 - **(1)将系统变量采集到日志中完成动态`topic`绑定** 将系统变量采集添加到日志中有两种方式,一种是在`ilogtail`容器`env`添加,另一种是通过`processor_add_fields` 插件添加, @@ -184,21 +188,22 @@ Topic: test_%{content.application} ```yaml env: - name: ALIYUN_LOG_ENV_TAGS # add log tags from env - value: _node_name_|_node_ip_|_app_name_ + value: _node_name_|_node_ip_|_app_name_ - name: _app_name_ # 添加自定义_app_name_变量, value: kafka ``` -自定义的变量`_app_name_`被添加到`ALIYUN_LOG_ENV_TAGS`中,日志的`tags`中会看到自定义的变量, 此时动态 `topic`采用`%{tag.fieldname}`规则配置即可。 +自定义的变量`_app_name_`被添加到`ALIYUN_LOG_ENV_TAGS`中,日志的`tags`中会看到自定义的变量, 此时动态 `topic` +采用`%{tag.fieldname}`规则配置即可。 - 使用`processor_add_fields` 插件系统变量添加到日志中,配置参考如下: ```yaml processors: - - Type: processor_add_fields - Fields: - service: ${env_name} - IgnoreIfExist: false + - Type: processor_add_fields + Fields: + service: ${env_name} + IgnoreIfExist: false ``` 这里`${env_name}`生效依赖于`ilogtail`的`enable_env_ref_in_config`配置,从`ilogtail 1.5.0`开始支持。 @@ -206,6 +211,7 @@ processors: **(2)直接采用`$`符将系统变量绑定动态`topic`中** 在`daemonset`或者`sidecar`方式部署的`ilogtail`容器`env`配置部分添加自定义的系统变量,配置参考案例如下: + ```yaml env: - name: ALIYUN_LOG_ENV_TAGS # add log tags from env @@ -213,6 +219,7 @@ env: - name: app_name # 添加自定义app_name变量, value: kafka ``` + `app_name`添加到系统变量中后,直接采用动态topic的:`${env_name}`规则即可绑定。 ```yaml @@ -223,14 +230,14 @@ inputs: FilePattern: "*.log" flushers: - Type: flusher_kafka_v2 - Brokers: + Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Topic: ilogtail_${app_name} ``` -- `${app_name}`就是我们上面添加的系统变量。 +- `${app_name}`就是我们上面添加的系统变量。 ### TagFieldsRename @@ -304,7 +311,7 @@ flushers: - Type: flusher_kafka_v2 PartitionerType: hash HashKeys: - - content.application + - content.application Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 @@ -313,11 +320,12 @@ flushers: ``` - `content.application`中表示从`contents`中取数据`application`字段数据,如果对`contents`协议字段做了重命名, -例如重名为`messege`,则应该配置为`messege.application` + 例如重名为`messege`,则应该配置为`messege.application` ### 配置Headers `iLogtail`中`Kafka`的消息头是以键值对数组的形式配置的。`header`中`value`仅支持字符串类型。 + ```yaml enable: true inputs: @@ -337,10 +345,12 @@ flushers: - key: "key2" value: "value2" ``` + ### 数据平铺 `ilogtail 1.8.0`新增数据平铺协议`custom_single_flatten`,`contents`、`tags`和`time`三个`convert`层的协议字段中数据做一级打平。 当前`convert`协议在单条数据处理仅支持`json`编码,因此`custom_single_flatten`需要配合`json`编码一起使用。 + ```yaml enable: true inputs: @@ -372,7 +382,7 @@ flushers: "@time": "2022-07-20 16:55:05.415" }, "tags": { - "k8s.namespace.name":"java_app", + "k8s.namespace.name": "java_app", "host.ip": "192.168.6.128", "host.name": "master", "log.file.path": "/data/test.log" @@ -380,20 +390,22 @@ flushers: "time": 1664435098 } ``` + 使用平铺协议后`custom_single_flatten`,`json`全部被一级平铺。 + ```json { - "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547", - "application": "springboot-docker", - "level": "ERROR", - "message": "Completed initialization in 9 ms", - "thread": "http-nio-8080-exec-10", - "@time": "2022-07-20 16:55:05.415", - "k8s.namespace.name":"java_app", - "host.ip": "192.168.6.128", - "host.name": "master", - "log.file.path": "/data/test.log", - "time": 1664435098 + "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547", + "application": "springboot-docker", + "level": "ERROR", + "message": "Completed initialization in 9 ms", + "thread": "http-nio-8080-exec-10", + "@time": "2022-07-20 16:55:05.415", + "k8s.namespace.name": "java_app", + "host.ip": "192.168.6.128", + "host.name": "master", + "log.file.path": "/data/test.log", + "time": 1664435098 } ``` @@ -420,7 +432,7 @@ flushers: - Type: flusher_kafka_v2 PartitionerType: hash HashKeys: - - content.application + - content.application Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 @@ -450,7 +462,7 @@ flushers: - Type: flusher_kafka_v2 PartitionerType: hash HashKeys: - - content.application + - content.application Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 diff --git a/plugins/flusher/kafkav2/flusher_kafka_v2.go b/plugins/flusher/kafkav2/flusher_kafka_v2.go index 5aabc3582c..6abb449be7 100644 --- a/plugins/flusher/kafkav2/flusher_kafka_v2.go +++ b/plugins/flusher/kafkav2/flusher_kafka_v2.go @@ -79,6 +79,9 @@ type FlusherKafka struct { // The compression level must be in the range of 1 (best speed) to 9 (best compression) // The default value is 4. CompressionLevel int + // How many outstanding requests a connection is allowed to have before + // sending on it blocks (default 5). + MaxOpenRequests int // The maximum number of events to bulk in a single Kafka request. The default is 2048. BulkMaxSize int @@ -179,6 +182,7 @@ func NewFlusherKafka() *FlusherKafka { KeepAlive: 0, MaxMessageBytes: nil, // use library default RequiredACKs: nil, // use library default + MaxOpenRequests: 5, BrokerTimeout: 10 * time.Second, Compression: "none", CompressionLevel: 4, @@ -387,6 +391,7 @@ func newSaramaConfig(config *FlusherKafka) (*sarama.Config, error) { k := sarama.NewConfig() // configure network level properties + k.Net.MaxOpenRequests = config.MaxOpenRequests timeout := config.Timeout k.Net.DialTimeout = timeout k.Net.ReadTimeout = timeout @@ -559,8 +564,7 @@ func (k *FlusherKafka) makeHeaders() []sarama.RecordHeader { } func (k *FlusherKafka) getConverter() (*converter.Converter, error) { - logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol, - "Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename) + logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol, "Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename) return converter.NewConverter(k.Convert.Protocol, k.Convert.Encoding, k.Convert.TagFieldsRename, k.Convert.ProtocolFieldsRename) }