Skip to content

Commit

Permalink
Merge branch 'main' into INFOPLAT-1562-dynamic-expiring-auth-headers
Browse files Browse the repository at this point in the history
  • Loading branch information
hendoxc committed Dec 23, 2024
2 parents d895532 + db7919d commit a809c1c
Show file tree
Hide file tree
Showing 13 changed files with 440 additions and 226 deletions.
19 changes: 18 additions & 1 deletion pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
if cfg.LogExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s
}
if cfg.LogExportMaxBatchSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.LogExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s
}
if cfg.LogMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048
}
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts...,
Expand Down Expand Up @@ -164,6 +173,15 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
if cfg.EmitterExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s
}
if cfg.EmitterExportMaxBatchSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.EmitterExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s
}
if cfg.EmitterMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048
}
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts...,
Expand Down Expand Up @@ -374,7 +392,6 @@ func newMeterProvider(config Config, resource *sdkresource.Resource, creds crede
if err != nil {
return nil, err
}

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
Expand Down
35 changes: 23 additions & 12 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@ type Config struct {
// OTel Resource
ResourceAttributes []otelattr.KeyValue
// Message Emitter
EmitterExportTimeout time.Duration
// Batch processing is enabled by default
// Disable it only for testing
EmitterBatchProcessor bool
EmitterExportTimeout time.Duration
EmitterExportInterval time.Duration
EmitterExportMaxBatchSize int
EmitterMaxQueueSize int
EmitterBatchProcessor bool // Enabled by default. Disable only for testing.

// OTel Trace
TraceSampleRatio float64
TraceBatchTimeout time.Duration
TraceSpanExporter sdktrace.SpanExporter // optional additional exporter
TraceRetryConfig *RetryConfig

// OTel Metric
MetricReaderInterval time.Duration
MetricRetryConfig *RetryConfig
MetricViews []sdkmetric.View

// OTel Log
LogExportTimeout time.Duration
// Batch processing is enabled by default
// Disable it only for testing
LogBatchProcessor bool
LogExportTimeout time.Duration
LogExportInterval time.Duration
LogExportMaxBatchSize int
LogMaxQueueSize int
LogBatchProcessor bool // Enabled by default. Disable only for testing.
// Retry config for shared log exporter, used by Emitter and Logger
LogRetryConfig *RetryConfig

Expand Down Expand Up @@ -82,8 +87,11 @@ func DefaultConfig() Config {
// Resource
ResourceAttributes: defaultOtelAttributes,
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterBatchProcessor: true,
EmitterExportTimeout: 30 * time.Second,
EmitterExportMaxBatchSize: 512,
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: defaultRetryConfig.Copy(),
// Trace
Expand All @@ -96,8 +104,11 @@ func DefaultConfig() Config {
// OTel metric exporter retry config
MetricRetryConfig: defaultRetryConfig.Copy(),
// Log
LogExportTimeout: 1 * time.Second,
LogBatchProcessor: true,
LogExportTimeout: 30 * time.Second,
LogExportMaxBatchSize: 512,
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
}
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ func ExampleConfig() {
otelattr.String("sender", "beholderclient"),
},
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterBatchProcessor: true,
EmitterExportTimeout: 1 * time.Second,
EmitterExportMaxBatchSize: 512,
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: nil,
// Trace
Expand All @@ -39,8 +42,11 @@ func ExampleConfig() {
// OTel metric exporter retry config
MetricRetryConfig: nil,
// Log
LogExportTimeout: 1 * time.Second,
LogBatchProcessor: true,
LogExportTimeout: 1 * time.Second,
LogExportMaxBatchSize: 512,
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
}
fmt.Printf("%+v\n", config)
config.LogRetryConfig = &beholder.RetryConfig{
Expand All @@ -50,6 +56,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] LogExportTimeout:1s LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[] AuthHeaderProvider:<nil>}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[] AuthHeaderProvider:<nil>}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
21 changes: 20 additions & 1 deletion pkg/beholder/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
if cfg.LogExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s
}
if cfg.LogExportMaxBatchSize > 0 {

batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.LogExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s
}
if cfg.LogMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048
}
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts...,
Expand Down Expand Up @@ -124,9 +134,18 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
if cfg.EmitterExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s
}
if cfg.EmitterExportMaxBatchSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.EmitterExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s
}
if cfg.EmitterMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048
}
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts..., // Default is 30s
batchProcessorOpts...,
)
} else {
messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down
62 changes: 41 additions & 21 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ const (
envTracingAttribute = "CL_TRACING_ATTRIBUTE_"
envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH"

envTelemetryEnabled = "CL_TELEMETRY_ENABLED"
envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT"
envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION"
envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE"
envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_"
envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO"
envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER"
envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX"
envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR"
envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT"
envTelemetryEnabled = "CL_TELEMETRY_ENABLED"
envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT"
envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION"
envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE"
envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_"
envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO"
envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER"
envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX"
envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR"
envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT"
envTelemetryEmitterExportInterval = "CL_TELEMETRY_EMITTER_EXPORT_INTERVAL"
envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE"
envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE"
)

// EnvConfig is the configuration between the application and the LOOP executable. The values
Expand All @@ -46,16 +49,19 @@ type EnvConfig struct {
TracingTLSCertPath string
TracingAttributes map[string]string

TelemetryEnabled bool
TelemetryEndpoint string
TelemetryInsecureConnection bool
TelemetryCACertFile string
TelemetryAttributes OtelAttributes
TelemetryTraceSampleRatio float64
TelemetryAuthHeaders map[string]string
TelemetryAuthPubKeyHex string
TelemetryEmitterBatchProcessor bool
TelemetryEmitterExportTimeout time.Duration
TelemetryEnabled bool
TelemetryEndpoint string
TelemetryInsecureConnection bool
TelemetryCACertFile string
TelemetryAttributes OtelAttributes
TelemetryTraceSampleRatio float64
TelemetryAuthHeaders map[string]string
TelemetryAuthPubKeyHex string
TelemetryEmitterBatchProcessor bool
TelemetryEmitterExportTimeout time.Duration
TelemetryEmitterExportInterval time.Duration
TelemetryEmitterExportMaxBatchSize int
TelemetryEmitterMaxQueueSize int
}

// AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.
Expand Down Expand Up @@ -93,7 +99,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryAuthPubKeyHex, e.TelemetryAuthPubKeyHex)
add(envTelemetryEmitterBatchProcessor, strconv.FormatBool(e.TelemetryEmitterBatchProcessor))
add(envTelemetryEmitterExportTimeout, e.TelemetryEmitterExportTimeout.String())

add(envTelemetryEmitterExportInterval, e.TelemetryEmitterExportInterval.String())
add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize))
add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize))
return
}

Expand Down Expand Up @@ -150,6 +158,18 @@ func (e *EnvConfig) parse() error {
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err)
}
e.TelemetryEmitterExportInterval, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportInterval))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportInterval, err)
}
e.TelemetryEmitterExportMaxBatchSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterExportMaxBatchSize))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportMaxBatchSize, err)
}
e.TelemetryEmitterMaxQueueSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterMaxQueueSize))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterMaxQueueSize, err)
}
}
return nil
}
Expand Down
Loading

0 comments on commit a809c1c

Please sign in to comment.