Skip to content

Commit

Permalink
Merge pull request #92 from kube-logging/feat/stabilize-operator-beha…
Browse files Browse the repository at this point in the history
…viour

fix: stabilize operator behaviour
  • Loading branch information
csatib02 authored Nov 11, 2024
2 parents 9e3cf30 + 591a7d2 commit 4441da9
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 187 deletions.
2 changes: 2 additions & 0 deletions api/telemetry/v1alpha1/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ func (c CollectorSpec) GetMemoryLimit() *resource.Quantity {
// CollectorStatus defines the observed state of Collector
type CollectorStatus struct {
Tenants []string `json:"tenants,omitempty"`
State State `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:resource:scope=Cluster,categories=telemetry-all
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Tenants",type=string,JSONPath=`.status.tenants`
//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state`

// Collector is the Schema for the collectors API
type Collector struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ spec:
- jsonPath: .status.tenants
name: Tenants
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -8071,6 +8074,8 @@ spec:
status:
description: CollectorStatus defines the observed state of Collector
properties:
state:
type: string
tenants:
items:
type: string
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ spec:
- jsonPath: .status.tenants
name: Tenants
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -8071,6 +8074,8 @@ spec:
status:
description: CollectorStatus defines the observed state of Collector
properties:
state:
type: string
tenants:
items:
type: string
Expand Down
35 changes: 0 additions & 35 deletions docs/examples/tenant-to-tenant-routing/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,6 @@ spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: shared
subscriptionNamespaceSelectors:
- matchLabels:
tenant: shared
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Subscription
metadata:
name: shared
namespace: shared
spec:
condition: "true"
outputs:
- name: openobserve-shared
namespace: shared
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Output
metadata:
name: openobserve-shared
namespace: shared
spec:
otlp:
endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081
headers:
Authorization: "Basic <TOKEN>"
organization: default
stream-name: shared
tls:
insecure: true
---
# A tenant that consumes logs from the shared tenant using a bridge
apiVersion: telemetry.kube-logging.dev/v1alpha1
Expand All @@ -62,9 +33,6 @@ metadata:
collector: cluster
name: database
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: database
subscriptionNamespaceSelectors:
- matchLabels:
tenant: database
Expand Down Expand Up @@ -112,9 +80,6 @@ metadata:
collector: cluster
name: web
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: web
subscriptionNamespaceSelectors:
- matchLabels:
tenant: web
Expand Down
31 changes: 0 additions & 31 deletions e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,6 @@ spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: shared
subscriptionNamespaceSelectors:
- matchLabels:
tenant: shared
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Subscription
metadata:
name: shared
namespace: shared
spec:
condition: "true"
outputs:
- name: otlp-test-output-shared
namespace: collector
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Output
metadata:
name: otlp-test-output-shared
namespace: collector
spec:
otlp:
endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317
tls:
insecure: true
---
# A tenant that consumes logs from the shared tenant using a bridge
apiVersion: telemetry.kube-logging.dev/v1alpha1
Expand All @@ -84,9 +59,6 @@ metadata:
collector: cluster
name: database
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: database
subscriptionNamespaceSelectors:
- matchLabels:
tenant: database
Expand Down Expand Up @@ -130,9 +102,6 @@ metadata:
collector: cluster
name: web
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: web
subscriptionNamespaceSelectors:
- matchLabels:
tenant: web
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ require (
k8s.io/client-go v0.30.2
sigs.k8s.io/controller-runtime v0.18.4
sigs.k8s.io/yaml v1.4.0
github.com/hashicorp/go-multierror v1.1.1
)

require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/briandowns/spinner v1.23.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQN
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc=
github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
Expand Down
26 changes: 23 additions & 3 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -178,14 +179,17 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
logger := log.FromContext(ctx, "collector", req.Name)

collector := &v1alpha1.Collector{}
logger.Info("Reconciling collector")
logger.Info(fmt.Sprintf("getting collector: %q", req.Name))

if err := r.Get(ctx, req.NamespacedName, collector); client.IgnoreNotFound(err) != nil {
logger.Error(errors.New("failed getting collector, possible API server error"), "failed getting collector, possible API server error",
"collector", req.Name)
return ctrl.Result{}, err
}

collector.Spec.SetDefaults()
originalCollectorStatus := collector.Status.DeepCopy()
originalCollectorStatus := collector.Status
logger.Info(fmt.Sprintf("reconciling collector: %q", collector.Name))

otelConfigInput, err := r.buildConfigInputForCollector(ctx, collector)
if err != nil {
Expand All @@ -195,6 +199,15 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

// NOTE: This might be revised or removed in the future, but good enough for now to avoid
// deploying the collector that would immediately error due to configuration errors.
// Might also be a good place to add a validation webhook to validate the collector spec
if err := otelConfigInput.ValidateConfig(); err != nil {
collector.Status.State = v1alpha1.StateFailed
logger.Error(errors.WithStack(err), "failed validating otel config input")
return ctrl.Result{}, err
}

otelConfig := otelConfigInput.AssembleConfig(ctx)

saName, err := r.reconcileRBAC(ctx, collector)
Expand Down Expand Up @@ -264,7 +277,11 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
resourceReconciler := reconciler.NewReconcilerWith(r.Client, reconciler.WithLog(logger))
_, err = resourceReconciler.ReconcileResource(&otelCollector, reconciler.StatePresent)
if err != nil {
return ctrl.Result{}, err
collector.Status.State = v1alpha1.StateFailed
if apierrors.IsConflict(err) {
logger.Info("conflict while creating otel collector, retrying")
return ctrl.Result{Requeue: true}, nil
}
}

tenantNames := []string{}
Expand All @@ -273,9 +290,12 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
collector.Status.Tenants = normalizeStringSlice(tenantNames)

collector.Status.State = v1alpha1.StateReady
if !reflect.DeepEqual(originalCollectorStatus, collector.Status) {
logger.Info("collector status changed")
if err = r.Client.Status().Update(ctx, collector); err != nil {
logger.Error(errors.WithStack(err), "failed updating collector status")
return ctrl.Result{}, err
}
}

Expand Down
92 changes: 84 additions & 8 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package otel_conf_gen

import (
"context"
"errors"
"fmt"
"slices"

otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"golang.org/x/exp/maps"

"github.com/hashicorp/go-multierror"
"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
"github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline"
"github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components"
Expand Down Expand Up @@ -107,9 +109,11 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any {
if tenantIdx := slices.IndexFunc(cfgInput.Tenants, func(t v1alpha1.Tenant) bool {
return tenantName == t.Name
}); tenantIdx != -1 {
k8sReceiverName := fmt.Sprintf("filelog/%s", tenantName)
namespaces := cfgInput.Tenants[tenantIdx].Status.LogSourceNamespaces
receivers[k8sReceiverName] = receiver.GenerateDefaultKubernetesReceiver(namespaces)
// Generate filelog receiver for the tenant if it has any logsource namespaces
if len(namespaces) > 0 {
receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces)
}
}
}

Expand All @@ -121,13 +125,19 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any {
maps.Copy(connectors, connector.GenerateCountConnectors())

for _, tenant := range cfgInput.Tenants {
rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions)
connectors[rc.Name] = rc
// Generate routing connector for the tenant's subscription if it has any
if len(cfgInput.TenantSubscriptionMap[tenant.Name]) > 0 {
rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions)
connectors[rc.Name] = rc
}
}

for _, subscription := range cfgInput.Subscriptions {
rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()])
connectors[rc.Name] = rc
// Generate routing connector for the subscription's outputs if it has any
if len(cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) > 0 {
rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()])
connectors[rc.Name] = rc
}
}

for _, bridge := range cfgInput.Bridges {
Expand All @@ -144,7 +154,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
var namedPipelines = make(map[string]*otelv1beta1.Pipeline)
tenants := []string{}
for tenant := range cfgInput.TenantSubscriptionMap {
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(tenant)
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)
tenants = append(tenants, tenant)
}

Expand All @@ -153,7 +163,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
for _, tenant := range tenants {
// Generate a pipeline for the tenant
tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant)
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(tenant)
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)

connector.GenerateRoutingConnectorForBridgesTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Bridges)
processor.GenerateTransformProcessorForTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Tenants)
Expand Down Expand Up @@ -255,3 +265,69 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be
},
}
}

func validateTenants(tenants *[]v1alpha1.Tenant) error {
var result *multierror.Error

if len(*tenants) == 0 {
return errors.New("no tenants provided, at least one tenant must be provided")
}

for _, tenant := range *tenants {
if len(tenant.Spec.SubscriptionNamespaceSelectors) == 0 && len(tenant.Spec.LogSourceNamespaceSelectors) == 0 {
result = multierror.Append(result, fmt.Errorf("tenant must have at least one subscription or logsource namespace selector, tenant: %s has neither", tenant.Name))
}
}

return result.ErrorOrNil()
}

func validateSubscriptionsAndBridges(tenants *[]v1alpha1.Tenant, subscriptions *map[v1alpha1.NamespacedName]v1alpha1.Subscription, bridges *[]v1alpha1.Bridge) error {
var result *multierror.Error

hasSubs := len(*subscriptions) > 0
hasBridges := len(*bridges) > 0
if !hasSubs && !hasBridges {
return errors.New("no subscriptions or bridges provided, at least one subscription or bridge must be provided")
}

if hasSubs {
for _, subscription := range *subscriptions {
if len(subscription.Spec.Outputs) == 0 {
result = multierror.Append(result, fmt.Errorf("subscription %s has no outputs", subscription.Name))
}
}
}

if hasBridges {
tenantMap := make(map[string]struct{})
for _, tenant := range *tenants {
tenantMap[tenant.Name] = struct{}{}
}

for _, bridge := range *bridges {
if _, sourceFound := tenantMap[bridge.Spec.SourceTenant]; !sourceFound {
result = multierror.Append(result, fmt.Errorf("bridge: %s has a source tenant: %s that does not exist", bridge.Name, bridge.Spec.SourceTenant))
}
if _, targetFound := tenantMap[bridge.Spec.TargetTenant]; !targetFound {
result = multierror.Append(result, fmt.Errorf("bridge: %s has a target tenant: %s that does not exist", bridge.Name, bridge.Spec.TargetTenant))
}
}
}

return result.ErrorOrNil()
}

func (cfgInput *OtelColConfigInput) ValidateConfig() error {
var result *multierror.Error

if err := validateTenants(&cfgInput.Tenants); err != nil {
result = multierror.Append(result, err)
}

if err := validateSubscriptionsAndBridges(&cfgInput.Tenants, &cfgInput.Subscriptions, &cfgInput.Bridges); err != nil {
result = multierror.Append(result, err)
}

return result.ErrorOrNil()
}
Loading

0 comments on commit 4441da9

Please sign in to comment.