Skip to content

Commit

Permalink
Add support for shared key
Browse files Browse the repository at this point in the history
  • Loading branch information
r0mdau committed Nov 20, 2023
1 parent 40cde1a commit 158c9a1
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 41 deletions.
32 changes: 13 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,21 @@ Forward is the protocol used by Fluentd to route message between peers.
- Protocol specification: [Forward protocol specification v1](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1)
- Library used [IBM/fluent-forward-go](https://github.com/IBM/fluent-forward-go) (MIT License)

Do not use in production.

TODO:

- TLS support
- Shared key support
- Some exporter unit tests

## Getting Started

### Settings

| Property | Mandatory | Default value | Type | Description |
|---|---|---|---|---|
| endpoint | yes | | string | Target URL to send `Forward` log streams to |
| connection_timeout | no | 30s | time.Duration | Maximum amount of time a dial will wait for a connect to complete |
| tls.enabled | no | false | bool | Enable TLS for privacy and data integrity |
| tls.insecure_skip_verify | no | false | bool | Controls whether the exporter verifies the server's certificate chain and host name. If **true**, any certificate is accepted and any host name. This mode is susceptible to man-in-the-middle attacks |
| require_ack| no | false | bool | Protocol delivery acknowledgment for log streams : true = at-least-once, false = at-most-once |
| tag | no | "tag" | string | Fluentd tag is a string separated by '.'s (e.g. myapp.access), and is used as the directions for Fluentd's internal routing engine |
| compress_gzip | no | false | bool | Transparent data compression. You can use this feature to reduce the transferred payload size |
| default_labels_enabled | no | true | map[string]bool | If omitted then default labels will be added. If one of the labels is omitted then this label will be added |
| Property | Default value | Type | Description |
|---|---|---|---|
| endpoint | | string | **MANDATORY** Target URL to send `Forward` log streams to |
| connection_timeout | 30s | time.Duration | Maximum amount of time a dial will wait for a connect to complete |
| tls.enabled | false | bool | Enable TLS for privacy and data integrity |
| tls.insecure_skip_verify | false | bool | Controls whether the exporter verifies the server's certificate chain and host name. If **true**, any certificate is accepted and any host name. This mode is susceptible to man-in-the-middle attacks |
| shared_key | "" | string | A key string known by the server, used for authorization |
| require_ack| false | bool | Protocol delivery acknowledgment for log streams : true = at-least-once, false = at-most-once |
| tag | "tag" | string | Fluentd tag is a string separated by '.'s (e.g. myapp.access), and is used as the directions for Fluentd's internal routing engine |
| compress_gzip | false | bool | Transparent data compression. You can use this feature to reduce the transferred payload size |
| default_labels_enabled | true | map[string]bool | If omitted then default labels will be added. If one of the labels is omitted then this label will be added |

See the default values in the method `createDefaultConfig()` in [factory.go](factory.go) file.

Expand All @@ -58,7 +51,7 @@ exporters:
instance: false
```
Example with TLS enabled:
Example with TLS enabled and shared key:
```yaml
exporters:
Expand All @@ -67,6 +60,7 @@ exporters:
connection_timeout: 10s
tls:
enabled: true
shared_key: otelcol-dev
```
## Severity
Expand Down
8 changes: 3 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

Expand All @@ -19,10 +18,6 @@ import (
type TLSClientSetting struct {
// Enabled defines if TLS is enabled or not.
Enabled bool `mapstructure:"enabled"`
// squash ensures fields are correctly decoded in embedded struct.
configtls.TLSSetting `mapstructure:",squash"`

// These are config options specific to client connections.

// InsecureSkipVerify will enable TLS but not verify the certificate.
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
Expand All @@ -37,6 +32,9 @@ type TCPClientSettings struct {

// TLSSetting struct exposes TLS client configuration.
TLSSetting TLSClientSetting `mapstructure:"tls"`

// SharedKey is used for authorization with the server that knows it.
SharedKey string `mapstructure:"shared_key"`
}

// Config defines configuration for fluentforward exporter.
Expand Down
1 change: 1 addition & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestLoadConfigNewExporter(t *testing.T) {
Enabled: true,
InsecureSkipVerify: true,
},
SharedKey: "otelcol-dev",
},
RequireAck: true,
Tag: "nginx",
Expand Down
38 changes: 30 additions & 8 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func newExporter(config *Config, settings component.TelemetrySettings) *fluentfo
}

func (f *fluentforwardExporter) start(_ context.Context, host component.Host) error {
connOptions := fclient.ConnectionOptions{
RequireAck: f.config.RequireAck,
}

connFactory := &fclient.ConnFactory{
Address: f.config.Endpoint,
Timeout: f.config.ConnectionTimeout,
Expand All @@ -42,17 +46,17 @@ func (f *fluentforwardExporter) start(_ context.Context, host component.Host) er
InsecureSkipVerify: f.config.TLSSetting.InsecureSkipVerify,
}
}
connOptions.Factory = connFactory

client := fclient.New(fclient.ConnectionOptions{
Factory: connFactory,
RequireAck: f.config.RequireAck,
})

if err := client.Connect(); err != nil {
f.settings.Logger.Error(fmt.Sprintf("The fluentforward exporter failed to connect to its endpoint %s when starting", f.config.Endpoint))
if f.config.SharedKey != "" {
connOptions.AuthInfo = fclient.AuthInfo{
SharedKey: []byte(f.config.SharedKey),
}
}

client := fclient.New(connOptions)
f.client = client
f.connectForward()

return nil
}
Expand All @@ -62,6 +66,21 @@ func (f *fluentforwardExporter) stop(context.Context) (err error) {
return f.client.Disconnect()
}

// connectForward connects to the Fluent Forward endpoint and keep running otel even if the connection is failing
func (f *fluentforwardExporter) connectForward() {
if err := f.client.Connect(); err != nil {
f.settings.Logger.Error(fmt.Sprintf("Failed to connect to the endpoint %s", f.config.Endpoint))
}
f.settings.Logger.Info(fmt.Sprintf("Successfull connection to the endpoint %s", f.config.Endpoint))

if f.config.SharedKey != "" {
if err := f.client.Handshake(); err != nil {
f.settings.Logger.Error(fmt.Sprintf("Failed to shared key handshake with the endpoint %s", f.config.Endpoint))
}
f.settings.Logger.Info("Successfull shared key handshake with the endpoint")
}
}

func (f *fluentforwardExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
// move for loops into a translator
entries := []fproto.EntryExt{}
Expand Down Expand Up @@ -108,10 +127,13 @@ type sendFunc func(string, protocol.EntryList) error

func (f *fluentforwardExporter) send(sendMethod sendFunc, entries []fproto.EntryExt) error {
err := sendMethod(f.config.Tag, entries)
// sometimes the connection is lost, we try to reconnect and send the data again
if err != nil {
if errr := f.client.Reconnect(); errr != nil {
if errr := f.client.Disconnect(); errr != nil {
return errr
}
f.settings.Logger.Warn(fmt.Sprintf("Failed to send data to the endpoint %s, trying to reconnect", f.config.Endpoint))
f.connectForward()
err = sendMethod(f.config.Tag, entries)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func createDefaultConfig() component.Config {
Enabled: false,
InsecureSkipVerify: false,
},
SharedKey: "",
},
RequireAck: false,
Tag: "tag",
Expand Down
1 change: 1 addition & 0 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestNewExporterFullConfig(t *testing.T) {
Enabled: true,
InsecureSkipVerify: false,
},
SharedKey: "otelcol-dev",
},
RequireAck: true,
Tag: "tag",
Expand Down
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.89.0
go.opentelemetry.io/collector/config/configtls v0.89.0
go.opentelemetry.io/collector/confmap v0.89.0
go.opentelemetry.io/collector/exporter v0.89.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0018
Expand All @@ -16,7 +15,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.1 // indirect
Expand All @@ -36,7 +34,6 @@ require (
github.com/tinylib/msgp v1.1.6 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.89.0 // indirect
go.opentelemetry.io/collector/config/configopaque v0.89.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.89.0 // indirect
go.opentelemetry.io/collector/consumer v0.89.0 // indirect
go.opentelemetry.io/collector/extension v0.89.0 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
Expand Down Expand Up @@ -113,12 +111,8 @@ go.opentelemetry.io/collector v0.89.0 h1:lzpfD9NTHh+1M+qzcoYUH+i2rOgFSox3bGQFUI5
go.opentelemetry.io/collector v0.89.0/go.mod h1:UZUtmQ3kai0CLPWvPmHKpmwqqEoo50n1bwzYYhXX0eA=
go.opentelemetry.io/collector/component v0.89.0 h1:PoQJX86BpaSZhzx0deQXHh3QMuW6XKVmolSdTKE506c=
go.opentelemetry.io/collector/component v0.89.0/go.mod h1:ZZncnMVaNs++JIbAMiemUIWLZrZ3PMEzI3S3K8pnkws=
go.opentelemetry.io/collector/config/configopaque v0.89.0 h1:Ad6yGcGBHs+J9SNjkedY68JsLZ1vBn4kKzdqKuTCRsE=
go.opentelemetry.io/collector/config/configopaque v0.89.0/go.mod h1:TPCHaU+QXiEV+JXbgyr6mSErTI9chwQyasDVMdJr3eY=
go.opentelemetry.io/collector/config/configtelemetry v0.89.0 h1:NtRknYDfMgP1r8mnByo6qQQK8IBw/lF9Qke5f7VhGZ0=
go.opentelemetry.io/collector/config/configtelemetry v0.89.0/go.mod h1:+LAXM5WFMW/UbTlAuSs6L/W72WC+q8TBJt/6z39FPOU=
go.opentelemetry.io/collector/config/configtls v0.89.0 h1:XDeUaTU7LYwnEXz/CSdjbCStJa7n0YR1q0QpK0Vtw9w=
go.opentelemetry.io/collector/config/configtls v0.89.0/go.mod h1:NlE4elqXoyFfzQvYfzgH6uOU1zNVa+5tt6EIq52TJ9Y=
go.opentelemetry.io/collector/confmap v0.89.0 h1:N5Vg1+FXEFBHHlGIPg4OSlM9uTHjCI7RlWWrKjtOzWQ=
go.opentelemetry.io/collector/confmap v0.89.0/go.mod h1:D8FMPvuihtVxwXaz/qp5q9X2lq9l97QyjfsdZD1spmc=
go.opentelemetry.io/collector/consumer v0.89.0 h1:MteKhkudX2L1ylbtdpSazO8SwyHSxl6fUEElc0rRLDQ=
Expand Down
1 change: 1 addition & 0 deletions testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ fluentforward/allsettings:
tls:
enabled: true
insecure_skip_verify: true
shared_key: "otelcol-dev"
require_ack: true
tag: nginx
compress_gzip: true
Expand Down

0 comments on commit 158c9a1

Please sign in to comment.