From 88847111bad83fc4a8051c64dba57709220503f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Wed, 10 Aug 2022 14:59:10 +0200 Subject: [PATCH] feat: return partial reports without failure --- go.mod | 6 +++ go.sum | 4 ++ pkg/forwarders/rawchannelforwarder.go | 25 +++++++++++ pkg/forwarders/tlsforwarder.go | 23 ++-------- pkg/logp/logp.go | 21 +++++++++ pkg/telemetry/consumer.go | 57 ++++++++++++++++++++++++ pkg/telemetry/manager.go | 34 +++++++++------ pkg/telemetry/manager_test.go | 63 +++++++++++++++++++++++++++ pkg/telemetry/workflow.go | 16 ++++--- 9 files changed, 209 insertions(+), 40 deletions(-) create mode 100644 pkg/forwarders/rawchannelforwarder.go create mode 100644 pkg/logp/logp.go diff --git a/go.mod b/go.mod index 24f6a68..5a17679 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,11 @@ require ( sigs.k8s.io/controller-runtime v0.12.3 ) +require ( + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect +) + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.8.0 // indirect @@ -27,6 +32,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 github.com/imdario/mergo v0.3.13 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index df90006..9ae9918 100644 --- a/go.sum +++ b/go.sum @@ -197,6 +197,10 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/pkg/forwarders/rawchannelforwarder.go b/pkg/forwarders/rawchannelforwarder.go new file mode 100644 index 0000000..e19b5b2 --- /dev/null +++ b/pkg/forwarders/rawchannelforwarder.go @@ -0,0 +1,25 @@ +package forwarders + +import ( + "github.com/kong/kubernetes-telemetry/pkg/types" +) + +type rawChannelForwarder struct { + ch chan types.Report +} + +// NewRawChannelForwarder creates new rawChannelForwarder. +func NewRawChannelForwarder(ch chan types.Report) *rawChannelForwarder { + return &rawChannelForwarder{ + ch: ch, + } +} + +func (f *rawChannelForwarder) Name() string { + return "LogForwarder" +} + +func (f *rawChannelForwarder) Forward(r types.Report) error { + f.ch <- r + return nil +} diff --git a/pkg/forwarders/tlsforwarder.go b/pkg/forwarders/tlsforwarder.go index 30f1253..9b84b26 100644 --- a/pkg/forwarders/tlsforwarder.go +++ b/pkg/forwarders/tlsforwarder.go @@ -7,7 +7,8 @@ import ( "time" "github.com/go-logr/logr" - "github.com/sirupsen/logrus" + + "github.com/kong/kubernetes-telemetry/pkg/logp" ) const ( @@ -20,24 +21,6 @@ var tlsConf = tls.Config{ MaxVersion: tls.VersionTLS13, } -// TODO: Address logging levels and library to be used. -// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893 -const ( - logrusrDiff = 4 - - // InfoLevel is the converted logging level from logrus to go-logr for - // information level logging. Note that the logrusr middleware technically - // flattens all levels prior to this level into this level as well. - InfoLevel = int(logrus.InfoLevel) - logrusrDiff - - // DebugLevel is the converted logging level from logrus to go-logr for - // debug level logging. - DebugLevel = int(logrus.DebugLevel) - logrusrDiff - - // WarnLevel is the converted logrus level to go-logr for warnings. - WarnLevel = int(logrus.WarnLevel) - logrusrDiff -) - type tlsForwarder struct { log logr.Logger conn *tls.Conn @@ -55,7 +38,7 @@ func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder { &tlsConf, ) if err != nil { - log.V(DebugLevel).Info("failed to connect to reporting server", "error", err) + log.V(logp.DebugLevel).Info("failed to connect to reporting server", "error", err) return nil } diff --git a/pkg/logp/logp.go b/pkg/logp/logp.go new file mode 100644 index 0000000..fcdb3ad --- /dev/null +++ b/pkg/logp/logp.go @@ -0,0 +1,21 @@ +package logp + +import "github.com/sirupsen/logrus" + +// TODO: Address logging levels and library to be used. +// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893 +const ( + logrusrDiff = 4 + + // InfoLevel is the converted logging level from logrus to go-logr for + // information level logging. Note that the logrusr middleware technically + // flattens all levels prior to this level into this level as well. + InfoLevel = int(logrus.InfoLevel) - logrusrDiff + + // DebugLevel is the converted logging level from logrus to go-logr for + // debug level logging. + DebugLevel = int(logrus.DebugLevel) - logrusrDiff + + // WarnLevel is the converted logrus level to go-logr for warnings. + WarnLevel = int(logrus.WarnLevel) - logrusrDiff +) diff --git a/pkg/telemetry/consumer.go b/pkg/telemetry/consumer.go index bc31d5b..8f2e623 100644 --- a/pkg/telemetry/consumer.go +++ b/pkg/telemetry/consumer.go @@ -56,12 +56,69 @@ func NewConsumer(s Serializer, f Forwarder) *consumer { } } +// Intake returns a channel on which this consumer will wait for data to consume it. func (c *consumer) Intake() chan<- types.Report { return c.ch } +// Close closes the consumer. func (c *consumer) Close() { c.once.Do(func() { close(c.done) }) } + +type rawconsumer struct { + logger logr.Logger + once sync.Once + ch chan types.Report + done chan struct{} +} + +// RawForwarder is used to forward raw, unserialized telemetry reports to configured +// destination(s). +type RawForwarder interface { + Name() string + Forward(types.Report) error +} + +// NewRawConsumer creates a new rawconsumer that will use the provided raw forwarder +// to forward received reports. +func NewRawConsumer(f RawForwarder) *rawconsumer { + var ( + ch = make(chan types.Report) + done = make(chan struct{}) + logger = defaultLogger() // TODO: allow configuration + ) + + go func() { + for { + select { + case <-done: + return + case r := <-ch: + if err := f.Forward(r); err != nil { + logger.Error(err, "failed to forward report using raw forwarder: %s", f.Name()) + } + } + } + }() + + return &rawconsumer{ + logger: logger, + ch: ch, + done: done, + } +} + +// Intake returns a channel on which this consumer will wait for data to consume it. +func (c *rawconsumer) Intake() chan<- types.Report { + return c.ch +} + +// Close closes rawconsumer. +func (c *rawconsumer) Close() { + c.once.Do(func() { + close(c.done) + }) +} diff --git a/pkg/telemetry/manager.go b/pkg/telemetry/manager.go index f9d3031..8175770 100644 --- a/pkg/telemetry/manager.go +++ b/pkg/telemetry/manager.go @@ -8,10 +8,10 @@ import ( "time" "github.com/go-logr/logr" - "github.com/pkg/errors" + "github.com/hashicorp/go-multierror" "github.com/puzpuzpuz/xsync" - "github.com/kong/kubernetes-telemetry/pkg/provider" + "github.com/kong/kubernetes-telemetry/pkg/logp" "github.com/kong/kubernetes-telemetry/pkg/types" ) @@ -165,7 +165,14 @@ func (m *manager) workflowsLoop() { case <-ticker.C: report, err := m.Execute(context.Background()) if err != nil { - m.logger.Error(err, "error executing workflows") + m.logger.V(logp.DebugLevel). + WithValues("error", err.Error()). + Info("error executing workflows") + } + + // Continue the execution even if we get an error but account for possibility + // of getting nil reports, in which case move on to the next iteration (tick). + if report == nil { continue } @@ -179,28 +186,27 @@ func (m *manager) workflowsLoop() { } // Execute executes all configures workflows and returns an aggregated report -// from all the underying providers. +// from all the underlying providers. func (m *manager) Execute(ctx context.Context) (types.Report, error) { var ( - err error + mErr error report = types.Report{} ) m.workflows.Range(func(name string, v Workflow) bool { - var r provider.Report - r, err = v.Execute(ctx) + r, err := v.Execute(ctx) if err != nil { - err = errors.Wrapf(err, "error executing workflow %s", name) - // TODO: return true and don't abort when encountering an error. - // Better to report partial report than nothing. In order to do so - // use an error agreggator like https://github.com/hashicorp/go-multierror. - return false + mErr = multierror.Append(mErr, err) + } + + // Add the report regardless if it's partial only omitting empty (nil) reports. + if r != nil { + report[v.Name()] = r } - report[v.Name()] = r return true }) - return report, err + return report, mErr } // consumerLoop loops over all configured consumers and sends the gathered telemetry diff --git a/pkg/telemetry/manager_test.go b/pkg/telemetry/manager_test.go index 2267f97..5c2e2a3 100644 --- a/pkg/telemetry/manager_test.go +++ b/pkg/telemetry/manager_test.go @@ -1,12 +1,15 @@ package telemetry import ( + "errors" "fmt" "runtime" "testing" "time" + "github.com/bombsimon/logrusr/v3" "github.com/go-logr/logr" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -238,3 +241,63 @@ func TestManagerWithCatalogWorkflows(t *testing.T) { }, report) }) } + +func TestManagerWithMultilpleWorkflowsOneReturningError(t *testing.T) { + logrusLog := logrus.New() + logrusLog.Level = logrus.DebugLevel + log := logrusr.New(logrusLog) + m, err := NewManager( + OptManagerLogger(log), + OptManagerPeriod(time.Millisecond), + ) + require.NoError(t, err) + + { + w := NewWorkflow("basic") + { + p, err := provider.NewFixedValueProvider("constant1", provider.Report{ + "constant1": "value1", + }) + require.NoError(t, err) + w.AddProvider(p) + } + + m.AddWorkflow(w) + } + { + w := NewWorkflow("basic_with_error") + { + p, err := provider.NewFunctorProvider("error_provider", func() (provider.Report, error) { + return nil, errors.New("I am an error") + }) + require.NoError(t, err) + w.AddProvider(p) + } + { + + p, err := provider.NewFixedValueProvider("constant2", provider.Report{ + "constant2": "value2", + }) + require.NoError(t, err) + w.AddProvider(p) + } + + m.AddWorkflow(w) + } + + ch := make(chan types.Report) + consumer := NewRawConsumer(forwarders.NewRawChannelForwarder(ch)) + require.NoError(t, m.AddConsumer(consumer)) + require.NoError(t, m.Start()) + + report := <-ch + m.Stop() + require.EqualValues(t, types.Report{ + "basic": provider.Report{ + "constant1": "value1", + }, + "basic_with_error": provider.Report{ + "constant2": "value2", + }, + }, report) +} diff --git a/pkg/telemetry/workflow.go b/pkg/telemetry/workflow.go index d247399..eb7979e 100644 --- a/pkg/telemetry/workflow.go +++ b/pkg/telemetry/workflow.go @@ -6,6 +6,8 @@ import ( "sync" "github.com/gammazero/workerpool" + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" "github.com/kong/kubernetes-telemetry/pkg/provider" ) @@ -62,17 +64,15 @@ func (w *workflow) Execute(ctx context.Context) (provider.Report, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + wg.Add(len(w.providers)) for _, provider := range w.providers { p := provider - wg.Add(1) - wp.Submit(func() { defer wg.Done() report, err := p.Provide(ctx) if err != nil { - chErr <- err - return + chErr <- errors.Wrapf(err, "problem with provider %s", p.Name()) } chReport <- report @@ -86,12 +86,16 @@ func (w *workflow) Execute(ctx context.Context) (provider.Report, error) { close(chReport) }() + var mErr error + forLoop: for { select { case err := <-chErr: if err != nil { - return nil, err + mErr = multierror.Append(mErr, + errors.Wrapf(err, "error executing workflow %s", w.Name()), + ) } case r := <-chReport: report.Merge(r) @@ -100,5 +104,5 @@ forLoop: } } - return report, nil + return report, mErr }