Skip to content

Commit

Permalink
chore: cr
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <[email protected]>
  • Loading branch information
csatib02 committed Dec 5, 2024
1 parent 9d6e895 commit 493c685
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 35 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Forward is the protocol used by Fluentd to route message between peers.

| Property | Default value | Type | Description |
|---|---|---|---|
| endpoint | | string | **MANDATORY** Target URL to send `Forward` log streams to |
| endpoint.tcp_addr | | string | **MANDATORY** Target URL to send `Forward` log streams to |
| endpoint.validate_tcp_resolution | false | bool | Controls whether to validate the tcp address. |
| connection_timeout | 30s | time.Duration | Maximum amount of time a dial will wait for a connect to complete |
| tls.insecure | true | bool | If set to **true**, the connexion is not secured with TLS. |
| tls.insecure_skip_verify | false | bool | Controls whether the exporter verifies the server's certificate chain and host name. If **true**, any certificate is accepted and any host name. This mode is susceptible to man-in-the-middle attacks |
Expand All @@ -31,13 +32,11 @@ Forward is the protocol used by Fluentd to route message between peers.
| tls.key_file | "" | string | Used for mTLS. Path to the client TLS key to use |
| shared_key | "" | string | A key string known by the server, used for authorization |
| require_ack| false | bool | Protocol delivery acknowledgment for log streams : true = at-least-once, false = at-most-once |
| skip_fail_on_invalid_tcp_endpoint | false | bool | Controls whether to fail if the endpoint is invalid. This is useful for cases where the collector is started before the endpoint becomes available |
| tag | "tag" | string | Fluentd tag is a string separated by '.'s (e.g. myapp.access), and is used as the directions for Fluentd's internal routing engine |
| compress_gzip | false | bool | Transparent data compression. You can use this feature to reduce the transferred payload size |
| default_labels_enabled | true | map[string]bool | If omitted then default labels will be added. If one of the labels is omitted then this label will be added |
| kubernetes_metadata.key | | string | KubernetesMetadata includes kubernetes metadata as a nested object. It leverages resources attributes provided by k8sattributesprocessor |
| kubernetes_metadata.include_pod_labels | | bool | Whether pod labels should be added to the nested object |
| connection_timeout | 30s | time.Duration | Maximum amount of time a dial will wait for a connect to complete |

See the default values in the method `createDefaultConfig()` in [factory.go](factory.go) file.

Expand Down
23 changes: 13 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

// TCPClientSettings defines common settings for a TCP client.
type TCPClientSettings struct {
// The target endpoint URI to send data to (e.g.: some.url:24224).
Endpoint string `mapstructure:"endpoint"`
// Endpoint to send logs to.
Endpoint `mapstructure:"endpoint"`

// Connection Timeout parameter configures `net.Dialer`.
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`
Expand All @@ -33,10 +33,6 @@ type TCPClientSettings struct {
type Config struct {
TCPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// SkipFailOnInvalidTCPEndpoint controls whether to fail if the endpoint is invalid.
// This is useful for cases where the collector is started before the endpoint becomes available.
SkipFailOnInvalidTCPEndpoint bool `mapstructure:"skip_fail_on_invalid_tcp_endpoint"`

// RequireAck enables the acknowledgement feature.
RequireAck bool `mapstructure:"require_ack"`

Expand Down Expand Up @@ -76,6 +72,13 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
}

type Endpoint struct {
// TCPAddr is the address of the server to connect to.
TCPAddr string `mapstructure:"tcp_addr"`
// Controls whether to validate the tcp address.
ValidateTCPResolution bool `mapstructure:"validate_tcp_resolution"`
}

type KubernetesMetadata struct {
Key string `mapstructure:"key"`
IncludePodLabels bool `mapstructure:"include_pod_labels"`
Expand All @@ -89,10 +92,10 @@ func (config *Config) Validate() error {
return fmt.Errorf("queue settings has invalid configuration: %w", err)
}

// Resolve TCP address just to ensure that it is a valid one. It is better
// to fail here than at when the exporter is started.
if !config.SkipFailOnInvalidTCPEndpoint {
if _, err := net.ResolveTCPAddr("tcp", config.Endpoint); err != nil {
if config.TCPClientSettings.Endpoint.ValidateTCPResolution {
// Resolve TCP address just to ensure that it is a valid one. It is better
// to fail here than at when the exporter is started.
if _, err := net.ResolveTCPAddr("tcp", config.Endpoint.TCPAddr); err != nil {
return fmt.Errorf("exporter has an invalid TCP endpoint: %w", err)
}
}
Expand Down
31 changes: 23 additions & 8 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func TestLoadConfigNewExporter(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "allsettings"),
expected: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
},
ConnectionTimeout: time.Second * 30,
ClientConfig: configtls.ClientConfig{
Insecure: false,
Expand Down Expand Up @@ -97,35 +99,48 @@ func TestConfigValidate(t *testing.T) {
}{
{
desc: "QueueSettings are invalid",
cfg: &Config{QueueConfig: exporterhelper.QueueConfig{QueueSize: -1, Enabled: true}},
err: fmt.Errorf("queue settings has invalid configuration"),
cfg: &Config{
QueueConfig: exporterhelper.QueueConfig{
QueueSize: -1,
Enabled: true,
},
},
err: fmt.Errorf("queue settings has invalid configuration"),
},
{
desc: "Endpoint is invalid",
cfg: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "http://localhost:24224",
Endpoint: Endpoint{
TCPAddr: "http://localhost:24224",
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
},
err: fmt.Errorf("exporter has an invalid TCP endpoint: address http://localhost:24224: too many colons in address"),
},
{
desc: "Endpoint is invalid but SkipFailOnInvalidTCPEndpoint is false",
desc: "Endpoint is invalid but ValidateTCPResolution is false",
cfg: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "http://localhost:24224",
Endpoint: Endpoint{
TCPAddr: "http://localhost:24224",
ValidateTCPResolution: false,
},
ConnectionTimeout: time.Second * 30,
},
SkipFailOnInvalidTCPEndpoint: true,
},
err: nil,
},
{
desc: "Config is valid",
cfg: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
},
Expand Down
10 changes: 5 additions & 5 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (f *fluentforwardExporter) start(ctx context.Context, host component.Host)
return err
}
connFactory := &fclient.ConnFactory{
Address: f.config.Endpoint,
Address: f.config.Endpoint.TCPAddr,
Timeout: f.config.ConnectionTimeout,
TLSConfig: tlsConfig,
}
Expand Down Expand Up @@ -70,14 +70,14 @@ func (f *fluentforwardExporter) stop(context.Context) (err error) {
// connectForward connects to the Fluent Forward endpoint and keep running otel even if the connection is failing
func (f *fluentforwardExporter) connectForward() {
if err := f.client.Connect(); err != nil {
f.settings.Logger.Error(fmt.Sprintf("Failed to connect to the endpoint %s", f.config.Endpoint))
f.settings.Logger.Error(fmt.Sprintf("Failed to connect to the endpoint %s", f.config.Endpoint.TCPAddr))
return
}
f.settings.Logger.Info(fmt.Sprintf("Successfull connection to the endpoint %s", f.config.Endpoint))
f.settings.Logger.Info(fmt.Sprintf("Successfull connection to the endpoint %s", f.config.Endpoint.TCPAddr))

if f.config.SharedKey != "" {
if err := f.client.Handshake(); err != nil {
f.settings.Logger.Error(fmt.Sprintf("Failed shared key handshake with the endpoint %s", f.config.Endpoint))
f.settings.Logger.Error(fmt.Sprintf("Failed shared key handshake with the endpoint %s", f.config.Endpoint.TCPAddr))
return
}
f.settings.Logger.Info("Successfull shared key handshake with the endpoint")
Expand Down Expand Up @@ -182,7 +182,7 @@ func (f *fluentforwardExporter) send(sendMethod sendFunc, entries []protocol.Ent
if errr := f.client.Disconnect(); errr != nil {
return errr
}
f.settings.Logger.Warn(fmt.Sprintf("Failed to send data to the endpoint %s, trying to reconnect", f.config.Endpoint))
f.settings.Logger.Warn(fmt.Sprintf("Failed to send data to the endpoint %s, trying to reconnect", f.config.Endpoint.TCPAddr))
f.connectForward()
err = sendMethod(f.config.Tag, entries)
if err != nil {
Expand Down
15 changes: 12 additions & 3 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
func TestNewExporter(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -37,7 +40,10 @@ func TestNewExporter(t *testing.T) {
func TestStart(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -62,7 +68,10 @@ func TestStartInvalidEndpointErrorLog(t *testing.T) {

config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "invalidEndpoint",
Endpoint: Endpoint{
TCPAddr: "invalidEndpoint",
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand Down
5 changes: 4 additions & 1 deletion factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func NewFactory() exporter.Factory {
func createDefaultConfig() component.Config {
return &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "localhost:24224",
Endpoint: Endpoint{
TCPAddr: "localhost:24224",
ValidateTCPResolution: false,
},
ConnectionTimeout: time.Second * 30,
ClientConfig: configtls.ClientConfig{
Insecure: true,
Expand Down
20 changes: 16 additions & 4 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func TestNewExporterMinimalConfig(t *testing.T) {
t.Run("with valid config", func(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -43,7 +46,10 @@ func TestNewExporterFullConfig(t *testing.T) {
t.Run("with valid config", func(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
ClientConfig: configtls.ClientConfig{
Insecure: true,
Expand Down Expand Up @@ -82,7 +88,10 @@ func TestNewExporterFullConfig(t *testing.T) {
func TestStartAlwaysReturnsNil(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -94,7 +103,10 @@ func TestStartAlwaysReturnsNil(t *testing.T) {
func TestStopAlwaysReturnsNil(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand Down
4 changes: 3 additions & 1 deletion testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
fluentforward:
endpoint: "localhost:24224"
fluentforward/allsettings:
endpoint: "localhost:24224"
endpoint:
tcp_addr: "localhost:24224"
validate_tcp_resolution: false
connection_timeout: 30s
tls:
insecure: false
Expand Down

0 comments on commit 493c685

Please sign in to comment.