Skip to content

Commit

Permalink
improve record log formatting (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickdemers6 committed Aug 27, 2024
1 parent 58eada8 commit 696f2ff
Show file tree
Hide file tree
Showing 17 changed files with 742 additions and 6 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ For ease of installation and operation, run Fleet Telemetry on Kubernetes or a s
"flush_period": int - ms flush period
}
},
"logger": {
"verbose": bool - include data types in the logs. Only applicable for records of type 'V'
},
"kafka": { // librdkafka kafka config, seen here: https://raw.githubusercontent.com/confluentinc/librdkafka/master/CONFIGURATION.md
"bootstrap.servers": "kafka:9092",
"queue.buffering.max.messages": 1000000
Expand Down
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Config struct {
// Monitoring defines information for metrics
Monitoring *metrics.MonitoringConfig `json:"monitoring,omitempty"`

// LoggerConfig configures the simple logger
LoggerConfig *simple.Config `json:"logger,omitempty"`

// LogLevel set the log-level
LogLevel string `json:"log_level,omitempty"`

Expand Down Expand Up @@ -250,7 +253,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
}

producers := make(map[telemetry.Dispatcher]telemetry.Producer)
producers[telemetry.Logger] = simple.NewProtoLogger(logger)
producers[telemetry.Logger] = simple.NewProtoLogger(c.LoggerConfig, logger)

requiredDispatchers := make(map[telemetry.Dispatcher][]string)
for recordName, dispatchRules := range c.Records {
Expand Down
5 changes: 4 additions & 1 deletion config/config_initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/sirupsen/logrus/hooks/test"

"github.com/teslamotors/fleet-telemetry/datastore/simple"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/telemetry"
Expand Down Expand Up @@ -41,7 +42,9 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
return nil, err
}

config := &Config{}
config := &Config{
LoggerConfig: &simple.Config{},
}
err = json.NewDecoder(configFile).Decode(&config)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions config/config_initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var _ = Describe("Test application config initialization", func() {
Expect(err).NotTo(HaveOccurred())

expectedConfig.MetricCollector = loadedConfig.MetricCollector
expectedConfig.LoggerConfig = loadedConfig.LoggerConfig
expectedConfig.AckChan = loadedConfig.AckChan
Expect(loadedConfig).To(Equal(expectedConfig))
})
Expand All @@ -67,6 +68,8 @@ var _ = Describe("Test application config initialization", func() {
loadedConfig, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())

Expect(loadedConfig.LoggerConfig).ToNot(BeNil())
expectedConfig.LoggerConfig = loadedConfig.LoggerConfig
expectedConfig.MetricCollector = loadedConfig.MetricCollector
expectedConfig.AckChan = loadedConfig.AckChan
Expect(loadedConfig).To(Equal(expectedConfig))
Expand Down
45 changes: 41 additions & 4 deletions datastore/simple/logger.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
package simple

import (
"fmt"

"github.com/teslamotors/fleet-telemetry/datastore/simple/transformers"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/protos"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

type Config struct {
// Verbose controls whether types are explicitly shown in the logs. Only applicable for record type 'V'.
Verbose bool `json:"verbose"`
}

// ProtoLogger is a simple protobuf logger
type ProtoLogger struct {
Config *Config
logger *logrus.Logger
}

// NewProtoLogger initializes the parameters for protobuf payload logging
func NewProtoLogger(logger *logrus.Logger) telemetry.Producer {
return &ProtoLogger{logger: logger}
func NewProtoLogger(config *Config, logger *logrus.Logger) telemetry.Producer {
return &ProtoLogger{Config: config, logger: logger}
}

// SetReliableAckTxType no-op for logger datastore
Expand All @@ -21,15 +31,42 @@ func (p *ProtoLogger) ProcessReliableAck(entry *telemetry.Record) {

// Produce sends the data to the logger
func (p *ProtoLogger) Produce(entry *telemetry.Record) {
data, err := entry.GetJSONPayload()
data, err := p.recordToLogMap(entry)
if err != nil {
p.logger.ErrorLog("json_unmarshal_error", err, logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata()})
return
}
p.logger.ActivityLog("logger_json_unmarshal", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": string(data)})
p.logger.ActivityLog("logger_json_unmarshal", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": data})
}

// ReportError noop method
func (p *ProtoLogger) ReportError(message string, err error, logInfo logrus.LogInfo) {
return
}

// recordToLogMap converts the data of a record to a map or slice of maps
func (p *ProtoLogger) recordToLogMap(record *telemetry.Record) (interface{}, error) {
payload, err := record.GetProtoMessage()
if err != nil {
return nil, err
}

switch payload := payload.(type) {
case *protos.Payload:
return transformers.PayloadToMap(payload, p.Config.Verbose, p.logger), nil
case *protos.VehicleAlerts:
alertMaps := make([]map[string]interface{}, len(payload.Alerts))
for i, alert := range payload.Alerts {
alertMaps[i] = transformers.VehicleAlertToMap(alert)
}
return alertMaps, nil
case *protos.VehicleErrors:
errorMaps := make([]map[string]interface{}, len(payload.Errors))
for i, vehicleError := range payload.Errors {
errorMaps[i] = transformers.VehicleErrorToMap(vehicleError)
}
return errorMaps, nil
default:
return nil, fmt.Errorf("unknown txType: %s", record.TxType)
}
}
137 changes: 137 additions & 0 deletions datastore/simple/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package simple_test

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/teslamotors/fleet-telemetry/datastore/simple"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/protos"
"github.com/teslamotors/fleet-telemetry/telemetry"

"github.com/sirupsen/logrus/hooks/test"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

var _ = Describe("ProtoLogger", func() {
var (
protoLogger *simple.ProtoLogger
testLogger *logrus.Logger
hook *test.Hook
config *simple.Config
)

BeforeEach(func() {
testLogger, hook = logrus.NoOpLogger()
config = &simple.Config{Verbose: false}
protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger)
})

Describe("NewProtoLogger", func() {
It("creates a new ProtoLogger", func() {
Expect(protoLogger).NotTo(BeNil())
Expect(protoLogger.Config).To(Equal(config))
})
})

Describe("ProcessReliableAck", func() {
It("does not panic", func() {
entry := &telemetry.Record{}
Expect(func() { protoLogger.ProcessReliableAck(entry) }).NotTo(Panic())
})
})

Describe("Produce", func() {
var (
record *telemetry.Record
)

BeforeEach(func() {
payload := &protos.Payload{
Vin: "TEST123",
CreatedAt: timestamppb.New(time.Unix(0, 0)),
Data: []*protos.Datum{
{
Key: protos.Field_VehicleName,
Value: &protos.Value{
Value: &protos.Value_StringValue{StringValue: "TestVehicle"},
},
},
{
Key: protos.Field_Gear,
Value: &protos.Value{
Value: &protos.Value_ShiftStateValue{ShiftStateValue: protos.ShiftState_ShiftStateD},
},
},
},
}
payloadBytes, err := proto.Marshal(payload)
Expect(err).NotTo(HaveOccurred())

record = &telemetry.Record{
Vin: "TEST123",
PayloadBytes: payloadBytes,
TxType: "V",
}
})

It("logs data", func() {
protoLogger.Produce(record)

lastLog := hook.LastEntry()
Expect(lastLog.Message).To(Equal("logger_json_unmarshal"))
Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123"))
Expect(lastLog.Data).To(HaveKey("data"))

data, ok := lastLog.Data["data"].(map[string]interface{})
Expect(ok).To(BeTrue())
Expect(data).To(Equal(map[string]interface{}{
"VehicleName": "TestVehicle",
"Gear": "ShiftStateD",
"Vin": "TEST123",
"CreatedAt": "1970-01-01T00:00:00Z",
}))
})

It("logs an error when unmarshaling fails", func() {
record.PayloadBytes = []byte("invalid payload")
protoLogger.Produce(record)

lastLog := hook.LastEntry()
Expect(lastLog.Message).To(Equal("json_unmarshal_error"))
Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123"))
Expect(lastLog.Data).To(HaveKey("metadata"))
})

Context("when verbose set to true", func() {
BeforeEach(func() {
config.Verbose = true
protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger)
})

It("does not include types in the data", func() {
protoLogger.Produce(record)

data, ok := hook.LastEntry().Data["data"].(map[string]interface{})
Expect(ok).To(BeTrue())
Expect(data).To(Equal(map[string]interface{}{
"VehicleName": map[string]interface{}{"stringValue": "TestVehicle"},
"Gear": map[string]interface{}{"shiftStateValue": "ShiftStateD"},
"Vin": "TEST123",
"CreatedAt": "1970-01-01T00:00:00Z",
}))
})
})
})

Describe("ReportError", func() {
It("succeeds", func() {
Expect(func() {
protoLogger.ReportError("test error", nil, logrus.LogInfo{})
}).NotTo(Panic())
})
})
})
13 changes: 13 additions & 0 deletions datastore/simple/simple_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package simple_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestSimple(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Simple Suite Tests")
}
83 changes: 83 additions & 0 deletions datastore/simple/transformers/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package transformers

import (
"time"

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/protos"
)

func PayloadToMap(payload *protos.Payload, includeTypes bool, logger *logrus.Logger) map[string]interface{} {
convertedPayload := make(map[string]interface{}, len(payload.Data)+2)
convertedPayload["Vin"] = payload.Vin
convertedPayload["CreatedAt"] = payload.CreatedAt.AsTime().Format(time.RFC3339)

for _, datum := range payload.Data {
if datum == nil || datum.Value == nil {
logger.ActivityLog("unknown_payload_data_type", logrus.LogInfo{"vin": payload.Vin})
continue
}
name := protos.Field_name[int32(datum.Key.Number())]
value, ok := transformValue(datum.Value.Value, includeTypes)
if !ok {
logger.ActivityLog("unknown_payload_value_data_type", logrus.LogInfo{"name": name, "vin": payload.Vin})
continue
}
convertedPayload[name] = value
}

return convertedPayload
}

func transformValue(value interface{}, includeTypes bool) (interface{}, bool) {
var outputValue interface{}
var outputType string

// ordered by expected frequency
switch v := value.(type) {
case *protos.Value_StringValue:
outputType = "stringValue"
outputValue = v.StringValue
case *protos.Value_LocationValue:
outputType = "locationValue"
outputValue = map[string]float64{
"latitude": v.LocationValue.Latitude,
"longitude": v.LocationValue.Longitude,
}
case *protos.Value_FloatValue:
outputType = "floatValue"
outputValue = v.FloatValue
case *protos.Value_IntValue:
outputType = "intValue"
outputValue = v.IntValue
case *protos.Value_DoubleValue:
outputType = "doubleValue"
outputValue = v.DoubleValue
case *protos.Value_LongValue:
outputType = "longValue"
outputValue = v.LongValue
case *protos.Value_BooleanValue:
outputType = "booleanValue"
outputValue = v.BooleanValue
case *protos.Value_Invalid:
outputType = "invalid"
outputValue = "<invalid>"
if includeTypes {
outputValue = true
}
case *protos.Value_ShiftStateValue:
outputType = "shiftStateValue"
outputValue = v.ShiftStateValue.String()
case *protos.Value_ChargingValue:
outputType = "chargingValue"
outputValue = v.ChargingValue.String()
default:
return nil, false
}

if includeTypes {
return map[string]interface{}{outputType: outputValue}, true
}

return outputValue, true
}
Loading

0 comments on commit 696f2ff

Please sign in to comment.