Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'
go-version: '1.25'

- name: Test
run: make
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM --platform=$BUILDPLATFORM golang:1.23-alpine3.21@sha256:f8113c4b13e2a8b3a168dceaee88ac27743cc84e959f43b9dbd2291e9c3f57a0 AS builder
FROM --platform=$BUILDPLATFORM golang:1.25.2-alpine3.22@sha256:06cdd34bd531b810650e47762c01e025eb9b1c7eadd191553b91c9f2d549fae8 AS builder

RUN apk add --update --no-cache ca-certificates make git curl

Expand All @@ -35,7 +35,7 @@ COPY Makefile /app/Makefile

RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH make build

FROM gcr.io/distroless/static-debian12@sha256:3f2b64ef97bd285e36132c684e6b2ae8f2723293d09aae046196cca64251acac
FROM gcr.io/distroless/static:latest@sha256:87bce11be0af225e4ca761c40babb06d6d559f5767fbf7dc3c47f0f1a466b92c

COPY --from=builder /app/eventrouter /app/eventrouter

Expand Down
24 changes: 1 addition & 23 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,23 +1 @@
{
"kubeconfig": "/var/run/kubernetes/admin.kubeconfig",
"sink": "glog",
"kafkaBrokers": "kafka:9092",
"kafkaTopic": "topic",
"kafkaSaslUser": "user",
"kafkaSaslPwd": "password"
"httpSinkUrl": "http://localhost:8080",
"httpSinkBufferSize": 1500,
"httpSinkDiscardMessages": true,
"rocksetAPIKey": "",
"rocksetCollectionName": "",
"rocksetWorkspaceName": "",
"s3SinkAccessKeyID": "",
"s3SinkSecretAccessKey": "",
"s3SinkRegion": "ap-south-1",
"s3SinkBucket": "",
"s3SinkBucketDir": "",
"s3SinkBufferSize": 1500,
"s3SinkDiscardMessages": true,
"s3SinkOutputFormat": "flatjson",
"s3SinkUploadInterval": 120
}
{"sink": "stdout"}
142 changes: 103 additions & 39 deletions eventrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,80 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) {
<-stopCh
}

// shouldProcessEvent checks if an event should be processed based on resource version
func (er *EventRouter) shouldProcessEvent(resourceVersion string) bool {
if resourceVersion == "" {
return false
}

if er.lastSeenResourceVersion == "" {
return true
}

return cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(resourceVersion)
}

// addEvent is called when an event is created, or during the initial list
func (er *EventRouter) addEvent(obj interface{}) {
e := obj.(*v1.Event)
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(e.ResourceVersion) {
e, ok := obj.(*v1.Event)
if !ok {
glog.Errorf("Expected *v1.Event, got %T", obj)
return
}

if e == nil {
glog.Error("Received nil event")
return
}

if er.shouldProcessEvent(e.ResourceVersion) {
prometheusEvent(e)
er.eSink.UpdateEvents(e, nil)
er.lastResourceVersionPosition(e.ResourceVersion)
if er.eSink != nil {
er.eSink.UpdateEvents(e, nil)
} else {
glog.Error("Event sink is nil, cannot process event")
return
}
if er.lastResourceVersionPosition != nil {
er.lastResourceVersionPosition(e.ResourceVersion)
}
} else {
glog.V(5).Infof("Event had already been processed:\n%v", e)
glog.V(5).Infof("Event had already been processed: %s (resource version: %s)", e.Name, e.ResourceVersion)
}
}

// updateEvent is called any time there is an update to an existing event
func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) {
eOld := objOld.(*v1.Event)
eNew := objNew.(*v1.Event)
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(eNew.ResourceVersion) {
eOld, okOld := objOld.(*v1.Event)
if !okOld {
glog.Errorf("Expected *v1.Event for old object, got %T", objOld)
return
}

eNew, okNew := objNew.(*v1.Event)
if !okNew {
glog.Errorf("Expected *v1.Event for new object, got %T", objNew)
return
}

if eNew == nil {
glog.Error("Received nil new event")
return
}

if er.shouldProcessEvent(eNew.ResourceVersion) {
prometheusEvent(eNew)
er.eSink.UpdateEvents(eNew, eOld)
er.lastResourceVersionPosition(eNew.ResourceVersion)
if er.eSink != nil {
er.eSink.UpdateEvents(eNew, eOld)
} else {
glog.Error("Event sink is nil, cannot process event update")
return
}
if er.lastResourceVersionPosition != nil {
er.lastResourceVersionPosition(eNew.ResourceVersion)
}
} else {
glog.V(5).Infof("Event had already been processed:\n%v", eNew)
glog.V(5).Infof("Event had already been processed: %s (resource version: %s)", eNew.Name, eNew.ResourceVersion)
}
}

Expand All @@ -167,56 +219,68 @@ func prometheusEvent(event *v1.Event) {
if !viper.GetBool("enable-prometheus") {
return
}

if event == nil {
glog.Error("Cannot record metrics for nil event")
return
}

// Safely get label values with defaults
safeString := func(s string) string {
if s == "" {
return "unknown"
}
return s
}

kind := safeString(event.InvolvedObject.Kind)
name := safeString(event.InvolvedObject.Name)
namespace := safeString(event.InvolvedObject.Namespace)
reason := safeString(event.Reason)
sourceHost := safeString(event.Source.Host)

var counter prometheus.Counter
var err error

switch event.Type {
case "Normal":
counter, err = kubernetesNormalEventCounterVec.GetMetricWithLabelValues(
event.InvolvedObject.Kind,
event.InvolvedObject.Name,
event.InvolvedObject.Namespace,
event.Reason,
event.Source.Host,
)
kind, name, namespace, reason, sourceHost)
case "Warning":
counter, err = kubernetesWarningEventCounterVec.GetMetricWithLabelValues(
event.InvolvedObject.Kind,
event.InvolvedObject.Name,
event.InvolvedObject.Namespace,
event.Reason,
event.Source.Host,
)
kind, name, namespace, reason, sourceHost)
case "Info":
counter, err = kubernetesInfoEventCounterVec.GetMetricWithLabelValues(
event.InvolvedObject.Kind,
event.InvolvedObject.Name,
event.InvolvedObject.Namespace,
event.Reason,
event.Source.Host,
)
kind, name, namespace, reason, sourceHost)
default:
glog.V(4).Infof("Unknown event type: %s", event.Type)
counter, err = kubernetesUnknownEventCounterVec.GetMetricWithLabelValues(
event.InvolvedObject.Kind,
event.InvolvedObject.Name,
event.InvolvedObject.Namespace,
event.Reason,
event.Source.Host,
)
kind, name, namespace, reason, sourceHost)
}

if err != nil {
// Not sure this is the right place to log this error?
glog.Warning(err)
glog.Errorf("Failed to get Prometheus counter for event %s/%s: %v", namespace, name, err)
} else {
counter.Add(1)
glog.V(6).Infof("Recorded Prometheus metric for event %s/%s (type: %s)", namespace, name, event.Type)
}
}

// deleteEvent should only occur when the system garbage collects events via TTL expiration
func (er *EventRouter) deleteEvent(obj interface{}) {
e := obj.(*v1.Event)
e, ok := obj.(*v1.Event)
if !ok {
glog.Errorf("Expected *v1.Event in deleteEvent, got %T", obj)
return
}

if e == nil {
glog.Error("Received nil event in deleteEvent")
return
}

// NOTE: This should *only* happen on TTL expiration there
// is no reason to push this to a sink
glog.V(5).Infof("Event Deleted from the system:\n%v", e)
glog.V(5).Infof("Event deleted from the system: %s/%s (reason: %s, resource version: %s)",
e.Namespace, e.Name, e.Reason, e.ResourceVersion)
}
Loading
Loading