diff --git a/docs/cn/SUMMARY.md b/docs/cn/SUMMARY.md
index 2db8b9343e..d897d68602 100644
--- a/docs/cn/SUMMARY.md
+++ b/docs/cn/SUMMARY.md
@@ -115,6 +115,12 @@
* [Pulsar](plugins/flusher/flusher-pulsar.md)
* [HTTP](plugins/flusher/flusher-http.md)
* [Loki](plugins/flusher/loki.md)
+* [扩展](plugins/extension/README.md)
+ * [BasicAuth鉴权](plugins/extension/ext-basicauth.md)
+ * [协议解码/反序列化](plugins/extension/ext-default-decoder.md)
+ * [协议编码/序列化](plugins/extension/ext-default-encoder.md)
+ * [数据筛选](plugins/extension/ext-groupinfo-filter.md)
+ * [请求熔断](plugins/extension/ext-request-breaker.md)
## 工作原理
diff --git a/docs/cn/plugins/extension/README.md b/docs/cn/plugins/extension/README.md
new file mode 100644
index 0000000000..bbf31e717b
--- /dev/null
+++ b/docs/cn/plugins/extension/README.md
@@ -0,0 +1,3 @@
+# 扩展
+
+扩展插件用于对其它插件能力的补充(e.g. 鉴权、编解码、熔断、限流……)
\ No newline at end of file
diff --git a/docs/cn/plugins/extension/ext-default-encoder.md b/docs/cn/plugins/extension/ext-default-encoder.md
new file mode 100644
index 0000000000..b5f6df9078
--- /dev/null
+++ b/docs/cn/plugins/extension/ext-default-encoder.md
@@ -0,0 +1,37 @@
+# DefaultEncoder Encoder扩展
+
+## 简介
+
+[ext_default_encoder](https://github.com/alibaba/ilogtail/blob/main/plugins/extension/default_encoder/default_encoder.go)
+扩展,实现了 [Encoder](https://github.com/alibaba/ilogtail/blob/main/pkg/pipeline/extensions/encoder.go) 接口,可以用在
+`flusher_http` 等插件中用于序列化不同的协议数据。
+
+## 版本
+
+[Alpha](../stability-level.md)
+
+## 配置参数
+
+| 参数 | 类型 | 是否必选 | 说明 |
+|-------------|--------|------|-----------------------------------------------------------------------------------------------------------|
+| Format | String | 是 | 具体的协议,[查看支持的具体协议列表](https://github.com/alibaba/ilogtail/blob/master/pkg/protocol/encoder/common/comon.go) |
+| SeriesLimit | Int | 否 | 触发序列化时序切片的最大长度,默认 1000,仅针对 Format=prometheus 时有效 |
+
+## 样例
+
+使用 `flusher_http` flusher 插件,配置发送 `prometheus` 协议数据。
+
+```yaml
+enable: true
+flushers:
+- Type: flusher_http
+ ...
+ Encoder:
+ Type: ext_default_encoder/prometheus
+ ...
+...
+extensions:
+- Type: ext_default_encoder/prometheus
+ Format: 'prometheus'
+ SeriesLimit: 1024
+```
diff --git a/docs/cn/plugins/overview.md b/docs/cn/plugins/overview.md
index f462a05692..c0ccf6a220 100644
--- a/docs/cn/plugins/overview.md
+++ b/docs/cn/plugins/overview.md
@@ -115,3 +115,9 @@
| 名称 | 提供方 | 简介 |
|----------------------------------------------------------------------------|-------------------------------------------------|-----------------------------|
| [`ext_default_decoder`](extension/ext-default-decoder.md)
默认的decoder扩展 | 社区
[`snakorse`](https://github.com/snakorse) | 将内置支持的Format以Decoder扩展的形式封装 |
+
+### Encoder
+
+| 名称 | 提供方 | 简介 |
+|----------------------------------------------------------------------------|--------------------------------------------------------|-----------------------------|
+| [`ext_default_encoder`](extension/ext-default-encoder.md)
默认的encoder扩展 | 社区
[`yuanshuai.1900`](https://github.com/aiops1900) | 将内置支持的Format以Encoder扩展的形式封装 |
diff --git a/go.mod b/go.mod
index e6f183cc2a..483e82614b 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.19
require (
github.com/ClickHouse/clickhouse-go/v2 v2.6.0
github.com/IBM/sarama v1.42.2
- github.com/VictoriaMetrics/VictoriaMetrics v1.83.1
+ github.com/VictoriaMetrics/VictoriaMetrics v1.83.0
github.com/alibaba/ilogtail/pkg v0.0.0
github.com/apache/pulsar-client-go v0.10.0
github.com/buger/jsonparser v1.1.1
@@ -44,7 +44,7 @@ require (
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.42.0
github.com/prometheus/procfs v0.8.0
- github.com/pyroscope-io/pyroscope v0.37.2
+ github.com/pyroscope-io/pyroscope v1.5.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/goconvey v1.7.2
diff --git a/pkg/go.mod b/pkg/go.mod
index 0de999b532..5e9313b879 100644
--- a/pkg/go.mod
+++ b/pkg/go.mod
@@ -4,6 +4,7 @@ go 1.19
require (
github.com/Microsoft/go-winio v0.5.2
+ github.com/VictoriaMetrics/VictoriaMetrics v1.83.0
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.2.0
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575
@@ -18,12 +19,13 @@ require (
github.com/influxdata/telegraf v1.20.0
github.com/json-iterator/go v1.1.12
github.com/mailru/easyjson v0.7.7
+ github.com/mitchellh/mapstructure v1.4.2
github.com/narqo/go-dogstatsd-parser v0.2.0
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/prometheus/common v0.42.0
github.com/prometheus/prometheus v1.8.2-0.20210430082741-2a4b8e12bbf2
github.com/pyroscope-io/jfr-parser v0.6.0
- github.com/pyroscope-io/pyroscope v0.0.0-00010101000000-000000000000
+ github.com/pyroscope-io/pyroscope v1.5.0
github.com/richardartoul/molecule v1.0.0
github.com/smartystreets/goconvey v1.7.2
github.com/stretchr/testify v1.8.2
diff --git a/pkg/go.sum b/pkg/go.sum
index 2b45645951..c8cced1fc8 100644
--- a/pkg/go.sum
+++ b/pkg/go.sum
@@ -783,6 +783,8 @@ github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKEN
github.com/hetznercloud/hcloud-go v1.24.0/go.mod h1:3YmyK8yaZZ48syie6xpm3dt26rtB6s65AisBHylXYFA=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
+github.com/iLogtail/VictoriaMetrics v1.83.4-ilogtail h1:LRDJt9eUKKhHdwPJRbC6tgtiMs/0XTjlCz1dl2pzRt0=
+github.com/iLogtail/VictoriaMetrics v1.83.4-ilogtail/go.mod h1:JagjwAO58g1WNpyr6x/lrQqMTf99d/WU/yxjADxBz8E=
github.com/iLogtail/handy v0.0.0-20230327021402-6a47ec586270/go.mod h1:6ai2R0qBm3xL13e10jwvyIf91Spxvo/yREZE9KOz7so=
github.com/iLogtail/jfr-parser v0.6.0 h1:dNaQ0Ng2BLE5uxrhUQwtx1q7O9LIQFpMthl3SV326AU=
github.com/iLogtail/jfr-parser v0.6.0/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8=
@@ -944,6 +946,8 @@ github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
+github.com/mitchellh/mapstructure v1.4.2 h1:6h7AQ0yhTcIsmFmnAwQls75jp2Gzs4iB8W7pjMO+rqo=
+github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A=
github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg=
github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
diff --git a/pkg/pipeline/extensions/encoder.go b/pkg/pipeline/extensions/encoder.go
new file mode 100644
index 0000000000..85e5fcb6a5
--- /dev/null
+++ b/pkg/pipeline/extensions/encoder.go
@@ -0,0 +1,53 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "github.com/alibaba/ilogtail/pkg/models"
+ "github.com/alibaba/ilogtail/pkg/pipeline"
+ "github.com/alibaba/ilogtail/pkg/protocol"
+)
+
+// Encoder encodes data of iLogtail data models into bytes.
+// Different drivers with different encoding protocols implement Encoder interface.
+//
+// drivers: raw, influxdb, prometheus, sls, ...
+type Encoder interface {
+ EncoderV1
+ EncoderV2
+}
+
+// EncoderV1 supports v1 pipeline plugin interface,
+// encodes data of v1 model into bytes.
+//
+// drivers: sls, influxdb, ...
+type EncoderV1 interface {
+ EncodeV1(*protocol.LogGroup) ([][]byte, error)
+ EncodeBatchV1([]*protocol.LogGroup) ([][]byte, error)
+}
+
+// EncoderV2 supports v2 pipeline plugin interface,
+// encodes data of v2 model into bytes.
+//
+// drivers: raw, influxdb, prometheus, ...
+type EncoderV2 interface {
+ EncodeV2(*models.PipelineGroupEvents) ([][]byte, error)
+ EncodeBatchV2([]*models.PipelineGroupEvents) ([][]byte, error)
+}
+
+type EncoderExtension interface {
+ Encoder
+ pipeline.Extension
+}
diff --git a/pkg/protocol/encoder/common/common.go b/pkg/protocol/encoder/common/common.go
new file mode 100644
index 0000000000..4caf8109df
--- /dev/null
+++ b/pkg/protocol/encoder/common/common.go
@@ -0,0 +1,19 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package common
+
+const (
+ ProtocolPrometheus = "prometheus"
+)
diff --git a/pkg/protocol/encoder/encoder.go b/pkg/protocol/encoder/encoder.go
new file mode 100644
index 0000000000..81fc0667ce
--- /dev/null
+++ b/pkg/protocol/encoder/encoder.go
@@ -0,0 +1,40 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package encoder
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/mitchellh/mapstructure"
+
+ "github.com/alibaba/ilogtail/pkg/pipeline/extensions"
+ "github.com/alibaba/ilogtail/pkg/protocol/encoder/common"
+ "github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus"
+)
+
+func NewEncoder(format string, options map[string]any) (extensions.Encoder, error) {
+ switch strings.TrimSpace(strings.ToLower(format)) {
+ case common.ProtocolPrometheus:
+ var opt prometheus.Option
+ if err := mapstructure.Decode(options, &opt); err != nil {
+ return nil, err
+ }
+ return prometheus.NewPromEncoder(opt.SeriesLimit), nil
+
+ default:
+ return nil, fmt.Errorf("not supported encode format: %s", format)
+ }
+}
diff --git a/pkg/protocol/encoder/prometheus/encoder_prometheus.go b/pkg/protocol/encoder/prometheus/encoder_prometheus.go
new file mode 100644
index 0000000000..4be1ef5792
--- /dev/null
+++ b/pkg/protocol/encoder/prometheus/encoder_prometheus.go
@@ -0,0 +1,126 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package prometheus
+
+import (
+ "context"
+ "errors"
+
+ "github.com/alibaba/ilogtail/pkg/logger"
+ "github.com/alibaba/ilogtail/pkg/models"
+ "github.com/alibaba/ilogtail/pkg/pipeline/extensions"
+ "github.com/alibaba/ilogtail/pkg/protocol"
+)
+
+const defaultSeriesLimit = 1000
+
+var errNilOrZeroGroupEvents = errors.New("nil or zero group events")
+
+type Option struct {
+ SeriesLimit int // config for prometheus encoder
+}
+
+func NewPromEncoder(seriesLimit int) extensions.Encoder {
+ return newPromEncoder(seriesLimit)
+}
+
+type Encoder struct {
+ SeriesLimit int
+}
+
+func newPromEncoder(seriesLimit int) *Encoder {
+ if seriesLimit <= 0 {
+ seriesLimit = defaultSeriesLimit
+ }
+
+ return &Encoder{
+ SeriesLimit: seriesLimit,
+ }
+}
+
+func (p *Encoder) EncodeV1(logGroups *protocol.LogGroup) ([][]byte, error) {
+ // TODO implement me
+ return nil, nil
+}
+
+func (p *Encoder) EncodeBatchV1(logGroups []*protocol.LogGroup) ([][]byte, error) {
+ // TODO implement me
+ return nil, nil
+}
+
+func (p *Encoder) EncodeV2(groupEvents *models.PipelineGroupEvents) ([][]byte, error) {
+ if groupEvents == nil || len(groupEvents.Events) == 0 {
+ return nil, errNilOrZeroGroupEvents
+ }
+
+ var res [][]byte
+
+ wr := getWriteRequest(p.SeriesLimit)
+ defer putWriteRequest(wr)
+
+ for _, event := range groupEvents.Events {
+ if event == nil {
+ logger.Debugf(context.Background(), "nil event")
+ continue
+ }
+
+ if event.GetType() != models.EventTypeMetric {
+ logger.Debugf(context.Background(), "event type (%s) not metric", event.GetName())
+ continue
+ }
+
+ metricEvent, ok := event.(*models.Metric)
+ if !ok {
+ logger.Debugf(context.Background(), "assert metric event type (%s) failed", event.GetName())
+ continue
+ }
+
+ wr.Timeseries = append(wr.Timeseries, genPromRemoteWriteTimeseries(metricEvent))
+ if len(wr.Timeseries) >= p.SeriesLimit {
+ res = append(res, marshalBatchTimeseriesData(wr))
+ wr.Timeseries = wr.Timeseries[:0]
+ }
+ }
+
+ if len(wr.Timeseries) > 0 {
+ res = append(res, marshalBatchTimeseriesData(wr))
+ wr.Timeseries = wr.Timeseries[:0]
+ }
+
+ return res, nil
+}
+
+func (p *Encoder) EncodeBatchV2(groupEventsSlice []*models.PipelineGroupEvents) ([][]byte, error) {
+ if len(groupEventsSlice) == 0 {
+ return nil, errNilOrZeroGroupEvents
+ }
+
+ var res [][]byte
+
+ for _, groupEvents := range groupEventsSlice {
+ bytes, err := p.EncodeV2(groupEvents)
+ if err != nil {
+ continue
+ }
+
+ res = append(res, bytes...)
+ }
+
+ if res == nil {
+ return nil, errNilOrZeroGroupEvents
+ }
+
+ return res, nil
+}
diff --git a/pkg/protocol/encoder/prometheus/encoder_prometheus_test.go b/pkg/protocol/encoder/prometheus/encoder_prometheus_test.go
new file mode 100644
index 0000000000..2091cf5d1d
--- /dev/null
+++ b/pkg/protocol/encoder/prometheus/encoder_prometheus_test.go
@@ -0,0 +1,505 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package prometheus
+
+import (
+ "errors"
+ "strconv"
+ "testing"
+
+ "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
+ pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
+ "github.com/prometheus/common/model"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/alibaba/ilogtail/pkg/models"
+)
+
+// 场景:性能测试,确定性能基线(UT粒度)
+// 因子:所有 Event type 均为 models.EventTypeMetric
+// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric
+// 预期:EncodeV2 和 EncodeBatchV2 性能相当(实现上 EncodeBatchV2 会循环调用 EncodeV2)
+// Benchmark结果(每次在具体数值上可能会有差异,但数量级相同):
+// goos: darwin
+// goarch: arm64
+// pkg: github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus
+// BenchmarkV2Encode
+// BenchmarkV2Encode/EncodeV2
+// BenchmarkV2Encode/EncodeV2-12 685 1655657 ns/op
+// BenchmarkV2Encode/BatchEncodeV2
+// BenchmarkV2Encode/BatchEncodeV2-12 716 1639491 ns/op
+// PASS
+func BenchmarkV2Encode(b *testing.B) {
+ // given
+ p := NewPromEncoder(19)
+ groupEventsSlice := genNormalPipelineGroupEventsSlice(100)
+ want := append([]*models.PipelineGroupEvents(nil), groupEventsSlice...)
+
+ b.Run("EncodeV2", func(b *testing.B) {
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ for _, groupEvents := range groupEventsSlice {
+ p.EncodeV2(groupEvents)
+ }
+ }
+ })
+ assert.Equal(b, want, groupEventsSlice)
+
+ b.Run("BatchEncodeV2", func(b *testing.B) {
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ p.EncodeBatchV2(groupEventsSlice)
+ }
+ })
+ assert.Equal(b, want, groupEventsSlice)
+}
+
+// 场景:V2 Encode接口功能测试
+// 说明:EncodeBatchV2 内部会调用 EncodeV2,所以也同时测试了 EncodeV2 的正常逻辑的功能
+// 因子:所有 Event type 均为 models.EventTypeMetric
+// 因子:所有 PipelineEvent interface 的实现均为「正常的*models.Metric」(而不是new(models.Metric)),
+// 具体区别在于正常的*models.Metric,其中的Tags、Value等都不是nil(如果为nil,会触发序列化的异常逻辑)
+// 预期:V2 Encode逻辑正常(正常流程都能正确处理),返回的error类型为nil,[][]byte不为nil
+func TestV2Encode_ShouldReturnNoError_GivenNormalDataOfPipelineGroupEvents(t *testing.T) {
+ // given
+ groupEventsSlice1 := genNormalPipelineGroupEventsSlice(100)
+ groupEventsSlice2 := genNormalPipelineGroupEventsSlice(100)
+ p := NewPromEncoder(19)
+
+ // when
+ // then
+ data1, err1 := p.EncodeBatchV2(groupEventsSlice1)
+ assert.NoError(t, err1)
+ data2, err2 := p.EncodeBatchV2(groupEventsSlice2)
+ assert.NoError(t, err2)
+
+ assert.Equal(t, len(data2), len(data1))
+}
+
+// 场景:V2 Encode接口功能测试(异常数据,非全nil或0值)
+// 说明:尽管 EncodeBatchV2 内部会调用 EncodeV2,但异常情况可能是 EncodeBatchV2 侧的防御,
+// 所以还需要测试异常情况下 EncodeV2 的功能
+// 因子:并非所有 Event type 均为 models.EventTypeMetric(e.g. 还可能是 models.EventTypeLogging 等)
+// 因子:PipelineEvent interface 的实现,部分是「正常的*models.Metric」,部分为 nil,部分为new(models.Metric),
+// 部分为其它(e.g. *models.Log 等)
+// 预期:Encode逻辑正常(异常流程也能正确处理),返回的error类型不为nil,[][]byte为nil
+func TestV2Encode_ShouldReturnError_GivenAbNormalDataOfPipelineGroupEvents(t *testing.T) {
+ // given
+ groupEventsSlice1 := genPipelineGroupEventsSliceIncludingAbnormalData(100)
+ groupEventsSlice2 := genPipelineGroupEventsSliceIncludingAbnormalData(100)
+ assert.Equal(t, len(groupEventsSlice1), len(groupEventsSlice2))
+ p := NewPromEncoder(19)
+
+ // when
+ // then
+ t.Run("Test EncodeV2 with abnormal data input", func(t *testing.T) {
+ for i, groupEvents := range groupEventsSlice1 {
+ data1, err1 := p.EncodeV2(groupEvents)
+ data2, err2 := p.EncodeV2(groupEventsSlice2[i])
+ if err1 != nil {
+ assert.Error(t, err2)
+ assert.Equal(t, err1, err2)
+ } else {
+ assert.NoError(t, err2)
+ assert.Equal(t, len(data2), len(data1))
+ }
+ }
+ })
+
+ t.Run("Test EncodeBatchV2 with abnormal data input", func(t *testing.T) {
+ data1, err1 := p.EncodeBatchV2(groupEventsSlice1)
+ assert.NoError(t, err1)
+ data2, err2 := p.EncodeBatchV2(groupEventsSlice2)
+ assert.NoError(t, err2)
+
+ assert.Equal(t, len(data2), len(data1))
+ })
+}
+
+// 场景:V2 Encode接口功能测试(异常数据,全nil或0值)
+// 说明:尽管 EncodeBatchV2 内部会调用 EncodeV2,但异常情况可能是 EncodeBatchV2 侧的防御,
+// 所以还需要测试异常情况下 EncodeV2 的功能
+// 因子:所有 *models.PipelineGroupEvents 及 []*models.PipelineGroupEvents 底层为 nil 或者 长度为0的切片
+// 预期:Encode逻辑正常(异常流程也能正确处理),返回的error类型不为nil,[][]byte为nil
+func TestV2Encode_ShouldReturnError_GivenNilOrZeroDataOfPipelineGroupEvents(t *testing.T) {
+ // given
+ p := NewPromEncoder(19)
+ nilOrZeroGroupEventsSlices := []*models.PipelineGroupEvents{
+ nil,
+ {}, // same as {Events: nil},
+ {Events: make([]models.PipelineEvent, 0)},
+ }
+ nilOrZeroGroupEventsSlicesEx := [][]*models.PipelineGroupEvents{
+ nil,
+ {}, // same as {nil}
+ {{Events: nil}},
+ nilOrZeroGroupEventsSlices,
+ }
+
+ // when
+ // then
+ t.Run("Test EncodeV2 with nil or zero data input", func(t *testing.T) {
+ for _, input := range nilOrZeroGroupEventsSlices {
+ data, err := p.EncodeV2(input)
+ assert.Error(t, err)
+ assert.Nil(t, data)
+ }
+ })
+
+ t.Run("Test EncodeBatchV2 with nil or zero data input", func(t *testing.T) {
+ for _, input := range nilOrZeroGroupEventsSlicesEx {
+ data, err := p.EncodeBatchV2(input)
+ assert.Error(t, err)
+ assert.Nil(t, data)
+ }
+ })
+}
+
+// 场景:V2 Encode接口功能测试
+// 说明:EncoderBatchV2 内部会调用 EncoderV2,所以也同时测试了 EncoderV2 的功能
+// 因子:所有 Event type 均为 models.EventTypeMetric
+// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric
+// 因子:每个 metric event 生成的 *models.Metric.Tags 中的 tag 仅一个
+// (确保 Encode 时 range map不用考虑 range go原生map每次顺序随机,从而导致2次Encode相同数据后得到的结果不同)
+// PS:如果不这么做,就要对map做转化,先变成 range 保序的 slice,再 Encode;
+// 但测试的功能点是Encode,所以采用上述方法绕过range go原生map每次顺序随机的特点,完成功能测试
+// 预期:Encode逻辑正常(正常流程都能正确处理),返回的error类型为nil,[][]byte不为nil,且两次Encode后返回的数据相同
+func TestEncoderBatchV2_ShouldReturnNoErrorAndEqualData_GivenNormalDataOfDataOfPipelineGroupEventsWithSingleTag(t *testing.T) {
+ // given
+ groupEventsSlice1 := genPipelineGroupEventsSliceSingleTag(100)
+ groupEventsSlice2 := genPipelineGroupEventsSliceSingleTag(100)
+ p := NewPromEncoder(19)
+
+ // when
+ // then
+ data1, err1 := p.EncodeBatchV2(groupEventsSlice1)
+ assert.NoError(t, err1)
+ data2, err2 := p.EncodeBatchV2(groupEventsSlice2)
+ assert.NoError(t, err2)
+
+ assert.Equal(t, data2, data1)
+}
+
+// 场景:V2 Encode接口功能测试
+// 说明:EncoderBatchV2 内部会调用 EncoderV2,所以也同时测试了 EncoderV2 的功能
+// 因子:所有 Event type 均为 models.EventTypeMetric
+// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric
+// 因子:每个 metric event 生成的 *models.Metric.Tags 中的 tag 仅一个
+// (确保 Encode 时 range map不用考虑 range go原生map每次顺序随机,从而导致2次Encode相同数据后得到的结果不同)
+// PS:如果不这么做,就要对map做转化,先变成 range 保序的 slice,再 Encode;
+// 但测试的功能点是Encode,所以采用上述方法绕过range go原生map每次顺序随机的特点,完成功能测试
+// 因子:对Encode后的数据进行Decode
+// 因子:「构造的用例数据」的长度(len([]*models.PipelineGroupEvents))未超过 series limit
+// 预期:「构造的用例数据」和「对用例数据先Encode再Decode后的数据」相等
+func TestEncoderBatchV2_ShouldDecodeSuccess_GivenNormalDataOfDataOfPipelineGroupEventsWithSingleTagNotExceedingSeriesLimit(t *testing.T) {
+ // given
+ seriesLimit := 19
+ n := seriesLimit
+ wantGroupEventsSlice := genPipelineGroupEventsSliceSingleTag(n)
+ p := NewPromEncoder(seriesLimit)
+ data, err := p.EncodeBatchV2(wantGroupEventsSlice)
+ assert.NoError(t, err)
+
+ // when
+ // then
+ gotGroupEventsSlice, err := DecodeBatchV2(data)
+ assert.NoError(t, err)
+ assert.Equal(t, wantGroupEventsSlice, gotGroupEventsSlice)
+}
+
+// 场景:V2 Encode接口功能测试
+// 说明:EncoderBatchV2 内部会调用 EncoderV2,所以也同时测试了 EncoderV2 的功能
+// 因子:所有 Event type 均为 models.EventTypeMetric
+// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric
+// 因子:每个 metric event 生成的 *models.Metric.Tags 中的 tag 仅一个
+// (确保 Encode 时 range map不用考虑 range go原生map每次顺序随机,从而导致2次Encode相同数据后得到的结果不同)
+// PS:如果不这么做,就要对map做转化,先变成 range 保序的 slice,再 Encode;
+// 但测试的功能点是Encode,所以采用上述方法绕过range go原生map每次顺序随机的特点,完成功能测试
+// 因子:对Encode后的数据进行Decode
+// 因子:「构造的用例数据」的长度(len([]*models.PipelineGroupEvents))超过 series limit
+// 预期:「构造的用例数据」的长度小于「对用例数据先Encode再Decode后的数据」的长度,且用 expectedLen 计算后的长度相等
+// PS:expectedLen 的计算方法,其实是和 genPipelineGroupEventsSlice 生成用例及根据series limit确定encode批次
+// 的逻辑相关,和 Encode 本身的逻辑无关
+func TestEncoderBatchV2_ShouldDecodeSuccess_GivenNormalDataOfDataOfPipelineGroupEventsWithSingleTagExceedingSeriesLimit(t *testing.T) {
+ // given
+ seriesLimit := 19
+ n := 100
+ wantGroupEventsSlice := genPipelineGroupEventsSliceSingleTag(n)
+ assert.Equal(t, n, len(wantGroupEventsSlice))
+ p := NewPromEncoder(seriesLimit)
+ data, err := p.EncodeBatchV2(wantGroupEventsSlice)
+ assert.NoError(t, err)
+ expectedLen := func(limit, length int) int {
+ // make sure limit > 0 && length > 0
+ if limit <= 0 || length <= 0 {
+ return -1
+ }
+
+ mod := length % limit
+ mul := length / limit
+
+ res := 0
+ for i := 0; i <= mul; i++ {
+ res += i * limit
+ }
+ res += (mul + 1) * mod
+
+ return res
+ }
+
+ // when
+ gotGroupEventsSlice, err := DecodeBatchV2(data)
+ assert.NoError(t, err)
+
+ // then
+ assert.Equal(t, expectedLen(seriesLimit, n), len(gotGroupEventsSlice))
+}
+
+func genNormalPipelineGroupEventsSlice(n int) []*models.PipelineGroupEvents {
+ return genPipelineGroupEventsSlice(n, genPipelineEvent)
+}
+
+func genPipelineGroupEventsSliceIncludingAbnormalData(n int) []*models.PipelineGroupEvents {
+ return genPipelineGroupEventsSlice(n, genPipelineEventIncludingAbnormalData)
+}
+
+func genPipelineGroupEventsSliceSingleTag(n int) []*models.PipelineGroupEvents {
+ return genPipelineGroupEventsSlice(n, genPipelineEventSingleTag)
+}
+
+func genPipelineGroupEventsSlice(n int, genPipelineEventFn func(int) []models.PipelineEvent) []*models.PipelineGroupEvents {
+ res := make([]*models.PipelineGroupEvents, 0, n)
+ for i := 1; i <= n; i++ {
+ res = append(res, &models.PipelineGroupEvents{
+ Group: models.NewGroup(models.NewMetadata(), models.NewTags()),
+ Events: genPipelineEventFn(i),
+ })
+ }
+
+ return res
+}
+
+func genPipelineEvent(n int) []models.PipelineEvent {
+ res := make([]models.PipelineEvent, 0, n)
+ for i := 1; i <= n; i++ {
+ res = append(res, genMetric(i))
+ }
+
+ return res
+}
+
+func genMetric(n int) *models.Metric {
+ i := strconv.Itoa(n)
+ tags := models.NewKeyValues[string]()
+ tags.AddAll(map[string]string{
+ // range map will out of order
+ "a" + i: "A" + i,
+ "b" + i: "B" + i,
+ "c" + i: "C" + i,
+ "d" + i: "D" + i,
+ })
+
+ return &models.Metric{
+ Timestamp: 11111111 * uint64(n),
+ Tags: tags,
+ Value: &models.MetricSingleValue{Value: 1.1 * float64(n)},
+ }
+}
+
+func genPipelineEventIncludingAbnormalData(n int) []models.PipelineEvent {
+ res := make([]models.PipelineEvent, 0, n)
+ for i := 1; i <= n; i++ {
+ if i&1 == 0 { // i is even number
+ // normal data
+ res = append(res, genMetric(i))
+ continue
+ }
+
+ // i is odd number
+ // abnormal data
+ if i%3 == 0 {
+ // abnormal data: nil data
+ res = append(res, nil)
+ continue
+ }
+
+ if i%5 == 0 {
+ // abnormal data: zero data
+ // PS:
+ // 1. 这里只是从边界情况考虑,构造了这种异常值
+ // 但实际场景中,不会直接 new(models.Metric) 或者 &models.Metric{} 这样创建 zero data,
+ // 一般都是用 models.NewMetric|NewSingleValueMetric|NewMultiValuesMetric 等 构造函数(工厂模式)来创建,
+ // 上述构造函数位置:ilogtail/pkg/models/factory.go
+ // 2. 此外,也可以给 *models.Metric 的 GetTag 方法增加下 *models.Metric.Tag 为 nil 时的保护
+ // (参考其 GetValue 方法的实现),文件位置:ilogtail/pkg/models/metric.go
+ res = append(res, new(models.Metric))
+ continue
+ }
+
+ // abnormal data: other event type not models.EventTypeMetric
+ res = append(res, new(models.Log))
+ }
+
+ return res
+}
+
+func genPipelineEventSingleTag(n int) []models.PipelineEvent {
+ res := make([]models.PipelineEvent, 0, n)
+ for i := 1; i <= n; i++ {
+ res = append(res, genMetricSingleTag(i))
+ }
+
+ return res
+}
+
+func genMetricSingleTag(n int) *models.Metric {
+ metricName := "test_metric"
+ i := strconv.Itoa(n)
+ tags := models.NewTagsWithMap(map[string]string{
+ // only single tag
+ // keep range in order
+ "x" + i: "X" + i,
+ })
+
+ dataPoint := pb.Sample{Timestamp: 11111111 * int64(n), Value: 1.1 * float64(n)}
+
+ return models.NewSingleValueMetric(
+ metricName, // value of key "__name__" in prometheus
+ models.MetricTypeGauge,
+ tags,
+ model.Time(dataPoint.Timestamp).Time().UnixNano(),
+ dataPoint.Value,
+ )
+}
+
+func DecodeBatchV2(data [][]byte) ([]*models.PipelineGroupEvents, error) {
+ if len(data) == 0 {
+ return nil, errors.New("no data to decode")
+ }
+
+ var res []*models.PipelineGroupEvents
+
+ meta, commonTags := models.NewMetadata(), models.NewTags()
+ for _, d := range data {
+ groupEvents, err := convertPromRequestToPipelineGroupEvents(d, meta, commonTags)
+ if err != nil {
+ continue
+ }
+
+ res = append(res, groupEvents)
+ }
+
+ return res, nil
+}
+
+func convertPromRequestToPipelineGroupEvents(data []byte, metaInfo models.Metadata, commonTags models.Tags) (*models.PipelineGroupEvents, error) {
+ wr, err := unmarshalBatchTimeseriesData(data)
+ if err != nil {
+ return nil, err
+ }
+
+ groupEvent := &models.PipelineGroupEvents{
+ Group: models.NewGroup(metaInfo, commonTags),
+ }
+
+ for _, ts := range wr.Timeseries {
+ var metricName string
+ tags := models.NewTags()
+ for _, label := range ts.Labels {
+ if label.Name == metricNameKey {
+ metricName = label.Value
+ continue
+ }
+ tags.Add(label.Name, label.Value)
+ }
+
+ for _, dataPoint := range ts.Samples {
+ metric := models.NewSingleValueMetric(
+ metricName,
+ models.MetricTypeGauge,
+ tags,
+ // Decode (during input_prometheus stage) makes timestamp
+ // with unix milliseconds into unix nanoseconds,
+ // e.g. "model.Time(milliseconds).Time().UnixNano()".
+ model.Time(dataPoint.Timestamp).Time().UnixNano(),
+ dataPoint.Value,
+ )
+ groupEvent.Events = append(groupEvent.Events, metric)
+ }
+ }
+
+ return groupEvent, nil
+}
+
+func unmarshalBatchTimeseriesData(data []byte) (*pb.WriteRequest, error) {
+ wr := new(prompb.WriteRequest)
+ if err := wr.Unmarshal(data); err != nil {
+ return nil, err
+ }
+
+ return convertPrompbToVictoriaMetricspb(wr)
+}
+
+func convertPrompbToVictoriaMetricspb(wr *prompb.WriteRequest) (*pb.WriteRequest, error) {
+ if wr == nil || len(wr.Timeseries) == 0 {
+ return nil, errors.New("nil *prompb.WriteRequest")
+ }
+
+ res := &pb.WriteRequest{
+ Timeseries: make([]pb.TimeSeries, 0, len(wr.Timeseries)),
+ }
+ for _, tss := range wr.Timeseries {
+ res.Timeseries = append(res.Timeseries, pb.TimeSeries{
+ Labels: convertToVMLabels(tss.Labels),
+ Samples: convertToVMSamples(tss.Samples),
+ })
+ }
+
+ return res, nil
+}
+
+func convertToVMLabels(labels []prompb.Label) []pb.Label {
+ if len(labels) == 0 {
+ return nil
+ }
+
+ res := make([]pb.Label, 0, len(labels))
+ for _, label := range labels {
+ res = append(res, pb.Label{
+ Name: string(label.Name),
+ Value: string(label.Value),
+ })
+ }
+
+ return res
+}
+
+func convertToVMSamples(samples []prompb.Sample) []pb.Sample {
+ if len(samples) == 0 {
+ return nil
+ }
+
+ res := make([]pb.Sample, 0, len(samples))
+ for _, sample := range samples {
+ res = append(res, pb.Sample{
+ Value: sample.Value,
+ Timestamp: sample.Timestamp,
+ })
+ }
+
+ return res
+}
diff --git a/pkg/protocol/encoder/prometheus/utils.go b/pkg/protocol/encoder/prometheus/utils.go
new file mode 100644
index 0000000000..622b869e63
--- /dev/null
+++ b/pkg/protocol/encoder/prometheus/utils.go
@@ -0,0 +1,118 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package prometheus
+
+import (
+ "context"
+ "sort"
+ "sync"
+
+ pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
+
+ "github.com/alibaba/ilogtail/pkg/logger"
+ "github.com/alibaba/ilogtail/pkg/models"
+)
+
+const metricNameKey = "__name__"
+
+func marshalBatchTimeseriesData(wr *pb.WriteRequest) []byte {
+ if len(wr.Timeseries) == 0 {
+ return nil
+ }
+
+ data, err := wr.Marshal()
+ if err != nil {
+ // logger.Error(context.Background(), alarmType, "pb marshal err", err)
+ return nil
+ }
+
+ return data
+}
+
+func genPromRemoteWriteTimeseries(event *models.Metric) pb.TimeSeries {
+ return pb.TimeSeries{
+ Labels: lexicographicalSort(append(convTagsToLabels(event.GetTags()), pb.Label{Name: metricNameKey, Value: event.GetName()})),
+ Samples: []pb.Sample{{
+ Value: event.GetValue().GetSingleValue(),
+
+ // Decode (during input_prometheus stage) makes timestamp
+ // with unix milliseconds into unix nanoseconds,
+ // e.g. "model.Time(milliseconds).Time().UnixNano()".
+ //
+ // Encode (during flusher_prometheus stage) conversely makes timestamp
+ // with unix nanoseconds into unix milliseconds,
+ // e.g. "int64(nanoseconds)/10^6".
+ Timestamp: int64(event.GetTimestamp()) / 1e6,
+ }},
+ }
+}
+
+func convTagsToLabels(tags models.Tags) []pb.Label {
+ if tags == nil {
+ logger.Debugf(context.Background(), "get nil models.Tags")
+ return nil
+ }
+
+ labels := make([]pb.Label, 0, tags.Len())
+ for k, v := range tags.Iterator() {
+ // MUST NOT contain any empty label names or values.
+ // reference: https://prometheus.io/docs/specs/remote_write_spec/#labels
+ if k != "" && v != "" {
+ labels = append(labels, pb.Label{Name: k, Value: v})
+ }
+ }
+
+ return labels
+}
+
+// MUST have label names sorted in lexicographical order.
+// reference: https://prometheus.io/docs/specs/remote_write_spec/#labels
+func lexicographicalSort(labels []pb.Label) []pb.Label {
+ sort.Sort(promLabels(labels))
+
+ return labels
+}
+
+type promLabels []pb.Label
+
+func (p promLabels) Len() int {
+ return len(p)
+}
+
+func (p promLabels) Less(i, j int) bool {
+ return p[i].Name < p[j].Name
+}
+
+func (p promLabels) Swap(i, j int) {
+ p[i], p[j] = p[j], p[i]
+}
+
+var wrPool sync.Pool
+
+func getWriteRequest(seriesLimit int) *pb.WriteRequest {
+ wr := wrPool.Get()
+ if wr == nil {
+ return &pb.WriteRequest{
+ Timeseries: make([]pb.TimeSeries, 0, seriesLimit),
+ }
+ }
+
+ return wr.(*pb.WriteRequest)
+}
+
+func putWriteRequest(wr *pb.WriteRequest) {
+ wr.Timeseries = wr.Timeseries[:0]
+ wrPool.Put(wr)
+}
diff --git a/pkg/protocol/encoder/prometheus/utils_test.go b/pkg/protocol/encoder/prometheus/utils_test.go
new file mode 100644
index 0000000000..c84e4ab68f
--- /dev/null
+++ b/pkg/protocol/encoder/prometheus/utils_test.go
@@ -0,0 +1,96 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package prometheus
+
+import (
+ "sort"
+ "testing"
+
+ pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
+ "github.com/stretchr/testify/assert"
+)
+
+// 场景:Prometheus label names 字典序排序
+// 因子:乱序的 Prometheus label names
+// 预期:Prometheus label names 按字典序排序
+func TestLexicographicalSort_ShouldSortedInLexicographicalOrder(t *testing.T) {
+ // given
+ labels := []pb.Label{
+ {Name: "Tutorial", Value: "tutorial"},
+ {Name: "Point", Value: "point"},
+ {Name: "Java", Value: "java"},
+ {Name: "C++", Value: "c++"},
+ {Name: "Golang", Value: "golang"},
+ {Name: metricNameKey, Value: "test_metric_name"},
+ }
+ ans := []pb.Label{
+ {Name: "C++", Value: "c++"},
+ {Name: "Golang", Value: "golang"},
+ {Name: "Java", Value: "java"},
+ {Name: "Point", Value: "point"},
+ {Name: "Tutorial", Value: "tutorial"},
+ {Name: metricNameKey, Value: "test_metric_name"},
+ }
+ assert.Equal(t, len(ans), len(labels))
+
+ // when
+ got := lexicographicalSort(labels)
+
+ // then
+ assert.Equal(t, ans, got)
+}
+
+// 场景:性能测试,确定 lexicographicalSort 字典序排序方法的性能
+// 因子:利用 lexicographicalSort(底层基于sort.Sort())对 Prometheus label names 进行字典序排序
+// 预期:lexicographicalSort 和 sort.Strings 对 Prometheus label names 的字典序排序性能相当(数量级相同)
+// goos: darwin
+// goarch: arm64
+// pkg: github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus
+// BenchmarkLexicographicalSort
+// BenchmarkLexicographicalSort/lexicographicalSort
+// BenchmarkLexicographicalSort/lexicographicalSort-12 23059904 47.51 ns/op
+// BenchmarkLexicographicalSort/sort.Strings
+// BenchmarkLexicographicalSort/sort.Strings-12 25321753 47.30 ns/op
+// PASS
+func BenchmarkLexicographicalSort(b *testing.B) {
+ prometheusLabels := []pb.Label{
+ {Name: "Tutorial", Value: "tutorial"},
+ {Name: "Point", Value: "point"},
+ {Name: "Java", Value: "java"},
+ {Name: "C++", Value: "c++"},
+ {Name: "Golang", Value: "golang"},
+ {Name: metricNameKey, Value: "test_metric_name"},
+ }
+ stringLabels := []string{
+ "Tutorial",
+ "Point",
+ "Java",
+ "C++",
+ "Golang",
+ metricNameKey,
+ }
+
+ b.Run("lexicographicalSort", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ lexicographicalSort(prometheusLabels)
+ }
+ })
+
+ b.Run("sort.Strings", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ sort.Strings(stringLabels)
+ }
+ })
+}
diff --git a/plugins.yml b/plugins.yml
index 167c7e1482..f09d74d16d 100644
--- a/plugins.yml
+++ b/plugins.yml
@@ -25,6 +25,7 @@ plugins:
- import: "github.com/alibaba/ilogtail/plugins/aggregator/skywalking"
- import: "github.com/alibaba/ilogtail/plugins/extension/basicauth"
- import: "github.com/alibaba/ilogtail/plugins/extension/default_decoder"
+ - import: "github.com/alibaba/ilogtail/plugins/extension/default_encoder"
- import: "github.com/alibaba/ilogtail/plugins/extension/group_info_filter"
- import: "github.com/alibaba/ilogtail/plugins/extension/request_breaker"
- import: "github.com/alibaba/ilogtail/plugins/flusher/checker"
diff --git a/plugins/extension/default_encoder/default_encoder.go b/plugins/extension/default_encoder/default_encoder.go
new file mode 100644
index 0000000000..646d4b2f16
--- /dev/null
+++ b/plugins/extension/default_encoder/default_encoder.go
@@ -0,0 +1,83 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package defaultencoder
+
+import (
+ "encoding/json"
+ "errors"
+
+ "github.com/alibaba/ilogtail/pkg/logger"
+ "github.com/alibaba/ilogtail/pkg/pipeline"
+ "github.com/alibaba/ilogtail/pkg/pipeline/extensions"
+ "github.com/alibaba/ilogtail/pkg/protocol/encoder"
+)
+
+// ensure ExtensionDefaultEncoder implements the extensions.EncoderExtension interface
+var _ extensions.EncoderExtension = (*ExtensionDefaultEncoder)(nil)
+
+type ExtensionDefaultEncoder struct {
+ extensions.Encoder
+
+ Format string
+ options map[string]any // additional properties map to here
+}
+
+func NewExtensionDefaultEncoder() *ExtensionDefaultEncoder {
+ return &ExtensionDefaultEncoder{}
+}
+
+func (e *ExtensionDefaultEncoder) UnmarshalJSON(bytes []byte) error {
+ err := json.Unmarshal(bytes, &e.options)
+ if err != nil {
+ return err
+ }
+
+ format, ok := e.options["Format"].(string)
+ if !ok {
+ return errors.New("field Format should be type of string")
+ }
+
+ delete(e.options, "Format")
+ e.Format = format
+
+ return nil
+}
+
+func (e *ExtensionDefaultEncoder) Description() string {
+ return "default encoder that support builtin formats"
+}
+
+func (e *ExtensionDefaultEncoder) Init(context pipeline.Context) error {
+ enc, err := encoder.NewEncoder(e.Format, e.options)
+ if err != nil {
+ return err
+ }
+
+ e.Encoder = enc
+
+ logger.Infof(context.GetRuntimeContext(), "%s init success, encoder: %s", e.Description(), e.Format)
+
+ return nil
+}
+
+func (e *ExtensionDefaultEncoder) Stop() error {
+ return nil
+}
+
+func init() {
+ pipeline.AddExtensionCreator("ext_default_encoder", func() pipeline.Extension {
+ return NewExtensionDefaultEncoder()
+ })
+}
diff --git a/plugins/extension/default_encoder/default_encoder_test.go b/plugins/extension/default_encoder/default_encoder_test.go
new file mode 100644
index 0000000000..0e7a2d1dc6
--- /dev/null
+++ b/plugins/extension/default_encoder/default_encoder_test.go
@@ -0,0 +1,141 @@
+// Copyright 2024 iLogtail Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package defaultencoder
+
+import (
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ . "github.com/smartystreets/goconvey/convey"
+
+ "github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus"
+ "github.com/alibaba/ilogtail/plugins/test/mock"
+)
+
+// 场景:插件初始化
+// 因子:Format 字段存在
+// 因子:Prometheus Protocol
+// 预期:初始化成功,且 Encoder 为 Prometheus Encoder
+func TestEncoder_ShouldPassConfigToRealEncoder_GivenCorrectConfigInput(t *testing.T) {
+ Convey("Given correct config json string", t, func() {
+ e := NewExtensionDefaultEncoder()
+ So(e, ShouldNotBeNil)
+ So(e.Encoder, ShouldBeNil)
+ So(e.Format, ShouldBeEmpty)
+ So(e.options, ShouldBeNil)
+
+ encodeProtocol := "prometheus"
+ optField, optValue := "SeriesLimit", 1024
+ configJSONStr := fmt.Sprintf(`{"Format":"%s","%s":%d}`, encodeProtocol, optField, optValue)
+ // must using float64(optValue), not optValue
+ // https://github.com/smartystreets/goconvey/issues/437
+ wantOpts := map[string]any{optField: float64(optValue)}
+
+ Convey("Then should json unmarshal success", func() {
+ err := json.Unmarshal([]byte(configJSONStr), e)
+ So(err, ShouldBeNil)
+ So(e.Encoder, ShouldBeNil)
+ So(e.options, ShouldResemble, wantOpts)
+ So(e.Format, ShouldEqual, encodeProtocol)
+
+ Convey("Then should init success", func() {
+ err = e.Init(mock.NewEmptyContext("p", "l", "c"))
+ So(err, ShouldBeNil)
+ So(e.Encoder, ShouldNotBeNil)
+
+ Convey("Then encoder implement should be *prometheus.Encoder", func() {
+ promEncoder, ok := e.Encoder.(*prometheus.Encoder)
+ So(ok, ShouldBeTrue)
+ So(promEncoder, ShouldNotBeNil)
+ So(promEncoder.SeriesLimit, ShouldEqual, optValue)
+ })
+ })
+
+ Convey("Then should stop success", func() {
+ err := e.Stop()
+ So(err, ShouldBeNil)
+ })
+ })
+ })
+}
+
+// 场景:插件初始化
+// 因子:Format 字段存在
+// 因子:Unsupported Protocol
+// 预期:初始化失败
+func TestEncoder_ShouldNotPassConfigToRealEncoder_GivenIncorrectConfigInput(t *testing.T) {
+ Convey("Given incorrect config json string but with Format field", t, func() {
+ e := NewExtensionDefaultEncoder()
+ So(e, ShouldNotBeNil)
+ So(e.Encoder, ShouldBeNil)
+ So(e.Format, ShouldBeEmpty)
+ So(e.options, ShouldBeNil)
+
+ encodeProtocol := "unknown"
+ configJSONStr := fmt.Sprintf(`{"Format":"%s"}`, encodeProtocol)
+
+ Convey("Then should json unmarshal success", func() {
+ err := json.Unmarshal([]byte(configJSONStr), e)
+ So(err, ShouldBeNil)
+ So(e.Encoder, ShouldBeNil)
+ So(e.Format, ShouldEqual, encodeProtocol)
+
+ Convey("Then should init failed", func() {
+ err = e.Init(mock.NewEmptyContext("p", "l", "c"))
+ So(err, ShouldNotBeNil)
+ So(e.Encoder, ShouldBeNil)
+ })
+
+ Convey("Then should stop success", func() {
+ err := e.Stop()
+ So(err, ShouldBeNil)
+ })
+ })
+ })
+}
+
+// 场景:插件初始化
+// 因子:Format 字段缺失
+// 预期:json unmarshal 失败,初始化失败
+func TestEncoder_ShouldUnmarshalFailed_GivenConfigWithoutFormat(t *testing.T) {
+ Convey("Given incorrect config json string and without Format field", t, func() {
+ e := NewExtensionDefaultEncoder()
+ So(e, ShouldNotBeNil)
+ So(e.Encoder, ShouldBeNil)
+ So(e.Format, ShouldBeEmpty)
+ So(e.options, ShouldBeNil)
+
+ configJSONStr := `{"Unknown":"unknown"}`
+
+ Convey("Then should json unmarshal failed", func() {
+ err := json.Unmarshal([]byte(configJSONStr), e)
+ So(err, ShouldNotBeNil)
+ So(e.Encoder, ShouldBeNil)
+ So(e.Format, ShouldBeEmpty)
+
+ Convey("Then should init failed", func() {
+ err = e.Init(mock.NewEmptyContext("p", "l", "c"))
+ So(err, ShouldNotBeNil)
+ So(e.Encoder, ShouldBeNil)
+ })
+
+ Convey("Then should stop success", func() {
+ err := e.Stop()
+ So(err, ShouldBeNil)
+ })
+ })
+ })
+}