Skip to content
70 changes: 22 additions & 48 deletions cmd/fluent-bit-output-plugin/kubernetes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,36 @@
package main

import (
"fmt"
"os"
"time"
"context"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/gardener/logging/v1/pkg/config"
"github.com/gardener/logging/v1/pkg/controller"
)

gardenerclientsetversioned "github.com/gardener/logging/v1/pkg/cluster/clientset/versioned"
gardeninternalcoreinformers "github.com/gardener/logging/v1/pkg/cluster/informers/externalversions"
var (
ctrlManager *controller.Manager
clusterController controller.Controller
)

// inClusterKubernetesClient creates a Kubernetes client using in-cluster configuration.
// It returns nil if the in-cluster config is not available (e.g., when running outside a cluster).
func inClusterKubernetesClient() (gardenerclientsetversioned.Interface, error) {
c, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get incluster config: %v", err)
// initControllerManager initializes the controller-runtime based manager for watching
// Cluster resources using the reconciler pattern.
func initControllerManager(ctx context.Context, cfg *config.Config) error {
if ctrlManager != nil {
return nil
}

return gardenerclientsetversioned.NewForConfig(c)
}

// envKubernetesClient creates a Kubernetes client using the KUBECONFIG environment variable.
// It returns an error if the KUBECONFIG env var is not set or the config file is invalid.
func envKubernetesClient() (gardenerclientsetversioned.Interface, error) {
fromFlags, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
var err error
ctrlManager, clusterController, err = controller.NewControllerManager(
ctx,
cfg.ControllerConfig.CtlSyncTimeout,
cfg,
logger,
)
if err != nil {
return nil, fmt.Errorf("failed to get kubeconfig from env: %v", err)
return err
}

return gardenerclientsetversioned.NewForConfig(fromFlags)
}

// initClusterInformer initializes and starts the shared informer instance for Cluster resources.
// It first attempts to use in-cluster configuration, falling back to KUBECONFIG if that fails.
// The informer is used to watch for changes to Cluster resources when dynamic host paths are configured.
// This function panics if it cannot obtain a valid Kubernetes client from either source.
func initClusterInformer() {
if informer != nil && !informer.IsStopped() {
return
}

var (
err error
kubernetesClient gardenerclientsetversioned.Interface
)
if kubernetesClient, _ = inClusterKubernetesClient(); kubernetesClient == nil {
logger.Info("[flb-go] failed to get in-cluster kubernetes client, trying KUBECONFIG env variable")
kubernetesClient, err = envKubernetesClient()
if err != nil {
panic(fmt.Errorf("failed to get kubernetes client, give up: %v", err))
}
}
logger.Info("[flb-go] controller-runtime manager initialized with reconciler")

kubeInformerFactory := gardeninternalcoreinformers.NewSharedInformerFactory(kubernetesClient, time.Second*30)
informer = kubeInformerFactory.Extensions().V1alpha1().Clusters().Informer()
informerStopChan = make(chan struct{})
kubeInformerFactory.Start(informerStopChan)
return nil
}
34 changes: 24 additions & 10 deletions cmd/fluent-bit-output-plugin/output_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

import (
"context"
"errors"
"fmt"
"net/http"
Expand All @@ -22,7 +23,6 @@ import (
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/version"

"github.com/gardener/logging/v1/pkg/config"
Expand All @@ -36,14 +36,15 @@ import (
var (
// registered plugin instances, required for disposal during shutdown
// Uses sync.Map for concurrent-safe access without explicit locking
plugins sync.Map // map[string]plugin.OutputPlugin
logger logr.Logger
informer cache.SharedIndexInformer
informerStopChan chan struct{}
pprofOnce sync.Once
plugins sync.Map // map[string]plugin.OutputPlugin
logger logr.Logger
pluginCtx context.Context
pluginCancel context.CancelFunc
pprofOnce sync.Once
)

func init() {
pluginCtx, pluginCancel = context.WithCancel(context.Background())
logger = log.NewLogger("info")
logger.Info("Starting fluent-bit-gardener-output-plugin",
"version", version.Get().GitVersion,
Expand Down Expand Up @@ -105,13 +106,20 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
setPprofProfile()
}

// Initialize controller-runtime manager for watching Cluster resources
if len(cfg.ControllerConfig.DynamicHostPath) > 0 {
initClusterInformer()
if err := initControllerManager(pluginCtx, cfg); err != nil {
metrics.Errors.WithLabelValues(metrics.ErrorFLBPluginInit).Inc()
logger.Error(err, "[flb-go] failed to initialize controller manager")

return output.FLB_ERROR
}
}

id, _, _ := strings.Cut(string(uuid.NewUUID()), "-")

outputPlugin, err := plugin.NewPlugin(informer, cfg, log.NewLogger(cfg.PluginConfig.LogLevel))
// Pass the cluster controller to the plugin (may be nil if no dynamic host path)
outputPlugin, err := plugin.NewPlugin(clusterController, cfg, log.NewLogger(cfg.PluginConfig.LogLevel))
if err != nil {
metrics.Errors.WithLabelValues(metrics.ErrorNewPlugin).Inc()
logger.Error(err, "[flb-go] error creating output plugin", "id", id)
Expand Down Expand Up @@ -222,8 +230,14 @@ func FLBPluginExit() int {

return true
})
if informerStopChan != nil {
close(informerStopChan)

// Stop the controller manager
if ctrlManager != nil {
ctrlManager.Stop()
}

if pluginCancel != nil {
pluginCancel()
}

return output.FLB_OK
Expand Down
2 changes: 0 additions & 2 deletions pkg/client/dque_batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,8 +746,6 @@ func (p *DQueBatchProcessor) exportBatch(batch []sdklog.Record) {
ctx, cancel := context.WithTimeout(p.ctx, p.config.exportTimeout)
defer cancel()

p.logger.V(3).Info("exporting batch", "size", len(batch))

// Blocking export call (gRPC or HTTP)
if err := p.exporter.Export(ctx, batch); err != nil {
p.logger.Error(err, "failed to export batch", "size", len(batch))
Expand Down
83 changes: 0 additions & 83 deletions pkg/cluster/clientset/versioned/clientset.go

This file was deleted.

70 changes: 0 additions & 70 deletions pkg/cluster/clientset/versioned/fake/clientset_generated.go

This file was deleted.

28 changes: 0 additions & 28 deletions pkg/cluster/clientset/versioned/fake/register.go

This file was deleted.

28 changes: 0 additions & 28 deletions pkg/cluster/clientset/versioned/scheme/register.go

This file was deleted.

Loading
Loading