Skip to content

Commit

Permalink
Merge pull request #255 from nokia/single-event-kafka-input
Browse files Browse the repository at this point in the history
autodetect if kafka input is receiving an event or a list of events
  • Loading branch information
karimra authored Oct 25, 2023
2 parents 345c505 + a1a4dc8 commit 1c3d90f
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions inputs/kafka_input/kafka_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package kafka_input

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand All @@ -22,12 +23,13 @@ import (
"github.com/Shopify/sarama"
"github.com/damiannolan/sasl/oauthbearer"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"

"github.com/openconfig/gnmic/formatters"
"github.com/openconfig/gnmic/inputs"
"github.com/openconfig/gnmic/outputs"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/gnmic/utils"
"google.golang.org/protobuf/proto"
)

const (
Expand All @@ -44,6 +46,9 @@ const (

var defaultVersion = sarama.V2_5_0_0

var openSquareBracket = []byte("[")
var openCurlyBrace = []byte("{")

func init() {
inputs.Register("kafka", func() inputs.Input {
return &KafkaInput{
Expand Down Expand Up @@ -162,8 +167,16 @@ START:
}
switch k.Cfg.Format {
case "event":
m.Value = bytes.TrimSpace(m.Value)
evMsgs := make([]*formatters.EventMsg, 1)
err = json.Unmarshal(m.Value, &evMsgs)
switch {
case len(m.Value) == 0:
continue
case m.Value[0] == openSquareBracket[0]:
err = json.Unmarshal(m.Value, &evMsgs)
case m.Value[0] == openCurlyBrace[0]:
err = json.Unmarshal(m.Value, evMsgs[0])
}
if err != nil {
if k.Cfg.Debug {
k.logger.Printf("%s failed to unmarshal event msg: %v", workerLogPrefix, err)
Expand Down

0 comments on commit 1c3d90f

Please sign in to comment.