Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions event-exporter/event_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ func (e *eventExporter) Run(stopCh <-chan struct{}) {
utils.RunConcurrentlyUntil(stopCh, e.sink.Run, e.watcher.Run)
}

func newEventExporter(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, storageType watchers.StorageType) *eventExporter {
func newEventExporter(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, listerWatcherEnableStreaming bool, storageType watchers.StorageType) *eventExporter {
return &eventExporter{
sink: sink,
watcher: createWatcher(client, sink, resyncPeriod, eventLabelSelector, listerWatcherOptionsLimit, storageType),
watcher: createWatcher(client, sink, resyncPeriod, eventLabelSelector, listerWatcherOptionsLimit, listerWatcherEnableStreaming, storageType),
}
}

func createWatcher(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, storageType watchers.StorageType) watchers.Watcher {
func createWatcher(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, listerWatcherEnableStreaming bool, storageType watchers.StorageType) watchers.Watcher {
return events.NewEventWatcher(client, &events.EventWatcherConfig{
OnList: sink.OnList,
ResyncPeriod: resyncPeriod,
Handler: sink,
EventLabelSelector: eventLabelSelector,
ListerWatcherOptionsLimit: listerWatcherOptionsLimit,
StorageType: storageType,
OnList: sink.OnList,
ResyncPeriod: resyncPeriod,
Handler: sink,
EventLabelSelector: eventLabelSelector,
ListerWatcherOptionsLimit: listerWatcherOptionsLimit,
ListerWatcherEnableStreaming: listerWatcherEnableStreaming,
StorageType: storageType,
})
}
105 changes: 89 additions & 16 deletions event-exporter/kubernetes/watchers/events/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package events

import (
"context"
"errors"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -59,12 +60,13 @@ type EventWatcherConfig struct {
// there can be many, e.g. because of network problems. Note also, that
// items in the List response WILL NOT trigger OnAdd method in handler,
// instead Store contents will be completely replaced.
OnList OnListFunc
ResyncPeriod time.Duration
Handler EventHandler
EventLabelSelector labels.Selector
ListerWatcherOptionsLimit int64
StorageType watchers.StorageType
OnList OnListFunc
ResyncPeriod time.Duration
Handler EventHandler
EventLabelSelector labels.Selector
ListerWatcherOptionsLimit int64
ListerWatcherEnableStreaming bool
StorageType watchers.StorageType
}

// NewEventWatcher create a new watcher that only watches the events resource.
Expand All @@ -73,18 +75,21 @@ func NewEventWatcher(client kubernetes.Interface, config *EventWatcherConfig) wa
// List and watch events in all namespaces.
ListerWatcher: &cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
if config.ListerWatcherOptionsLimit > 0 {
options.Limit = config.ListerWatcherOptionsLimit
if config.ListerWatcherEnableStreaming {
return streamingListEvents(client, config, options)
} else {
if config.ListerWatcherOptionsLimit > 0 {
options.Limit = config.ListerWatcherOptionsLimit
}
options.LabelSelector = config.EventLabelSelector.String()
list, err := client.CoreV1().Events(meta_v1.NamespaceAll).List(context.TODO(), options)
if err == nil {
config.OnList(list)
}
return list, err
}
options.LabelSelector = config.EventLabelSelector.String()
list, err := client.CoreV1().Events(meta_v1.NamespaceAll).List(context.TODO(), options)
if err == nil {
config.OnList(list)
// Clear items to prevent Reflector from buffering them in memeory.
list.Items = []corev1.Event{}
}
return list, err
},

WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = config.EventLabelSelector.String()
return client.CoreV1().Events(meta_v1.NamespaceAll).Watch(context.TODO(), options)
Expand All @@ -101,3 +106,71 @@ func NewEventWatcher(client kubernetes.Interface, config *EventWatcherConfig) wa
WatchListPageSize: eventWatchListPageSize,
})
}

// streamingListEvents uses Streaming List (SendInitialEvents=true) to avoid buffering.
// This allows us to process initial events incrementally.
func streamingListEvents(client kubernetes.Interface, config *EventWatcherConfig, options meta_v1.ListOptions) (runtime.Object, error) {
sendInitialEvents := true
options.SendInitialEvents = &sendInitialEvents
options.ResourceVersionMatch = meta_v1.ResourceVersionMatchNotOlderThan
options.Watch = true
options.LabelSelector = config.EventLabelSelector.String()
options.AllowWatchBookmarks = true

// Perform the streaming list (actually a Watch)
watcher, err := client.CoreV1().Events(meta_v1.NamespaceAll).Watch(context.TODO(), options)
if err != nil {
return nil, err
}
defer watcher.Stop()

// Call OnList once to start the sink (it just logs "Started watching")
config.OnList(&corev1.EventList{})

lastRV := ""
bookmarkReceived := false

eventLoop:
for event := range watcher.ResultChan() {
if meta, ok := event.Object.(meta_v1.Object); ok {
lastRV = meta.GetResourceVersion()
}

switch event.Type {
case watch.Added:
if e, ok := event.Object.(*corev1.Event); ok {
// Manually pass to handler since we bypass Reflector's store
config.Handler.OnAdd(e)
}
case watch.Bookmark:
// Check for the annotation that signals the initial list is done.
if m, ok := event.Object.(meta_v1.Object); ok {
if val, ok := m.GetAnnotations()["k8s.io/initial-events-end"]; ok && val == "true" {
// Close the channel and break the loop
bookmarkReceived = true
break eventLoop
}
}
case watch.Error:
// If we get an error, Reflector will retry ListFunc anyway.
// We can return the error here to trigger that retry.
if status, ok := event.Object.(*meta_v1.Status); ok {
return nil, errors.New(status.Message)
}
}
}

if !bookmarkReceived {
// If we exited the loop without receiving the bookmark, something went wrong.
return nil, errors.New("streaming list ended without receiving initial-events-end bookmark")
}

// Return an empty list with the correct ResourceVersion.
// Reflector will then start Watching from this version.
return &corev1.EventList{
ListMeta: meta_v1.ListMeta{
ResourceVersion: lastRV,
},
Items: []corev1.Event{},
}, nil
}
11 changes: 6 additions & 5 deletions event-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ var (
systemNamespaces = flag.String("system-namespaces", "kube-system,gke-connect", "Comma "+
"separated list of system namespaces to skip the owner label collection")

enablePodOwnerLabel = flag.Bool("enable-pod-owner-label", true, "Whether to enable the pod label collector to add pod owner labels to log entries")
eventLabelSelector = flag.String("event-label-selector", "", "Export events only if they match the given label selector. Same syntax as kubectl label")
listerWatcherOptionsLimit = flag.Int64("lister-watcher-options-limit", 100, "Maximum number of responses to return for a list call on events watch. Larger the number, higher the memory event-exporter will consume. No limits when set to 0.")
storageType = flag.String("storage-type", "DeltaFIFOStorage", "What storage should be used as a cache for the watcher. Supported sotrage type: SimpleStorage, TTLStorage and DeltaFIFOStorage.")
enablePodOwnerLabel = flag.Bool("enable-pod-owner-label", true, "Whether to enable the pod label collector to add pod owner labels to log entries")
eventLabelSelector = flag.String("event-label-selector", "", "Export events only if they match the given label selector. Same syntax as kubectl label")
listerWatcherOptionsLimit = flag.Int64("lister-watcher-options-limit", 100, "Maximum number of responses to return for a list call on events watch. Larger the number, higher the memory event-exporter will consume. No limits when set to 0.")
listerWatcherEnableStreaming = flag.Bool("lister-watcher-enable-streaming", false, "Enable watch streaming for lister watcher to prevent all the unhandled events get loaded into memory at once. Instead, events will be processed one by one. If this flag is set to true, lister-watcher-options-limit will be ignored.")
storageType = flag.String("storage-type", "DeltaFIFOStorage", "What storage should be used as a cache for the watcher. Supported sotrage type: SimpleStorage, TTLStorage and DeltaFIFOStorage.")
)

func newSystemStopChannel() chan struct{} {
Expand Down Expand Up @@ -118,7 +119,7 @@ func main() {
glog.Fatalf("Unsupported storage type:%v.", *storageType)
}

eventExporter := newEventExporter(client, sink, *resyncPeriod, parsedLabelSelector, *listerWatcherOptionsLimit, st)
eventExporter := newEventExporter(client, sink, *resyncPeriod, parsedLabelSelector, *listerWatcherOptionsLimit, *listerWatcherEnableStreaming, st)

// Expose the Prometheus http endpoint
go func() {
Expand Down
Loading