Skip to content

Commit

Permalink
feat: return partial reports without failure
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Aug 10, 2022
1 parent 7c03ed4 commit 8884711
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 40 deletions.
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ require (
sigs.k8s.io/controller-runtime v0.12.3
)

require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
Expand All @@ -27,6 +32,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
25 changes: 25 additions & 0 deletions pkg/forwarders/rawchannelforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package forwarders

import (
"github.com/kong/kubernetes-telemetry/pkg/types"
)

type rawChannelForwarder struct {
ch chan types.Report
}

// NewRawChannelForwarder creates new rawChannelForwarder.
func NewRawChannelForwarder(ch chan types.Report) *rawChannelForwarder {
return &rawChannelForwarder{
ch: ch,
}
}

func (f *rawChannelForwarder) Name() string {
return "LogForwarder"
}

func (f *rawChannelForwarder) Forward(r types.Report) error {
f.ch <- r
return nil
}
23 changes: 3 additions & 20 deletions pkg/forwarders/tlsforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"

"github.com/kong/kubernetes-telemetry/pkg/logp"
)

const (
Expand All @@ -20,24 +21,6 @@ var tlsConf = tls.Config{
MaxVersion: tls.VersionTLS13,
}

// TODO: Address logging levels and library to be used.
// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893
const (
logrusrDiff = 4

// InfoLevel is the converted logging level from logrus to go-logr for
// information level logging. Note that the logrusr middleware technically
// flattens all levels prior to this level into this level as well.
InfoLevel = int(logrus.InfoLevel) - logrusrDiff

// DebugLevel is the converted logging level from logrus to go-logr for
// debug level logging.
DebugLevel = int(logrus.DebugLevel) - logrusrDiff

// WarnLevel is the converted logrus level to go-logr for warnings.
WarnLevel = int(logrus.WarnLevel) - logrusrDiff
)

type tlsForwarder struct {
log logr.Logger
conn *tls.Conn
Expand All @@ -55,7 +38,7 @@ func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder {
&tlsConf,
)
if err != nil {
log.V(DebugLevel).Info("failed to connect to reporting server", "error", err)
log.V(logp.DebugLevel).Info("failed to connect to reporting server", "error", err)
return nil
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/logp/logp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package logp

import "github.com/sirupsen/logrus"

// TODO: Address logging levels and library to be used.
// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893
const (
logrusrDiff = 4

// InfoLevel is the converted logging level from logrus to go-logr for
// information level logging. Note that the logrusr middleware technically
// flattens all levels prior to this level into this level as well.
InfoLevel = int(logrus.InfoLevel) - logrusrDiff

// DebugLevel is the converted logging level from logrus to go-logr for
// debug level logging.
DebugLevel = int(logrus.DebugLevel) - logrusrDiff

// WarnLevel is the converted logrus level to go-logr for warnings.
WarnLevel = int(logrus.WarnLevel) - logrusrDiff
)
57 changes: 57 additions & 0 deletions pkg/telemetry/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,69 @@ func NewConsumer(s Serializer, f Forwarder) *consumer {
}
}

// Intake returns a channel on which this consumer will wait for data to consume it.
func (c *consumer) Intake() chan<- types.Report {
return c.ch
}

// Close closes the consumer.
func (c *consumer) Close() {
c.once.Do(func() {
close(c.done)
})
}

type rawconsumer struct {
logger logr.Logger
once sync.Once
ch chan types.Report
done chan struct{}
}

// RawForwarder is used to forward raw, unserialized telemetry reports to configured
// destination(s).
type RawForwarder interface {
Name() string
Forward(types.Report) error
}

// NewRawConsumer creates a new rawconsumer that will use the provided raw forwarder
// to forward received reports.
func NewRawConsumer(f RawForwarder) *rawconsumer {
var (
ch = make(chan types.Report)
done = make(chan struct{})
logger = defaultLogger() // TODO: allow configuration
)

go func() {
for {
select {
case <-done:
return
case r := <-ch:
if err := f.Forward(r); err != nil {
logger.Error(err, "failed to forward report using raw forwarder: %s", f.Name())
}
}
}
}()

return &rawconsumer{
logger: logger,
ch: ch,
done: done,
}
}

// Intake returns a channel on which this consumer will wait for data to consume it.
func (c *rawconsumer) Intake() chan<- types.Report {
return c.ch
}

// Close closes rawconsumer.
func (c *rawconsumer) Close() {
c.once.Do(func() {
close(c.done)
})
}
34 changes: 20 additions & 14 deletions pkg/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/hashicorp/go-multierror"
"github.com/puzpuzpuz/xsync"

"github.com/kong/kubernetes-telemetry/pkg/provider"
"github.com/kong/kubernetes-telemetry/pkg/logp"
"github.com/kong/kubernetes-telemetry/pkg/types"
)

Expand Down Expand Up @@ -165,7 +165,14 @@ func (m *manager) workflowsLoop() {
case <-ticker.C:
report, err := m.Execute(context.Background())
if err != nil {
m.logger.Error(err, "error executing workflows")
m.logger.V(logp.DebugLevel).
WithValues("error", err.Error()).
Info("error executing workflows")
}

// Continue the execution even if we get an error but account for possibility
// of getting nil reports, in which case move on to the next iteration (tick).
if report == nil {
continue
}

Expand All @@ -179,28 +186,27 @@ func (m *manager) workflowsLoop() {
}

// Execute executes all configures workflows and returns an aggregated report
// from all the underying providers.
// from all the underlying providers.
func (m *manager) Execute(ctx context.Context) (types.Report, error) {
var (
err error
mErr error
report = types.Report{}
)

m.workflows.Range(func(name string, v Workflow) bool {
var r provider.Report
r, err = v.Execute(ctx)
r, err := v.Execute(ctx)
if err != nil {
err = errors.Wrapf(err, "error executing workflow %s", name)
// TODO: return true and don't abort when encountering an error.
// Better to report partial report than nothing. In order to do so
// use an error agreggator like https://github.com/hashicorp/go-multierror.
return false
mErr = multierror.Append(mErr, err)
}

// Add the report regardless if it's partial only omitting empty (nil) reports.
if r != nil {
report[v.Name()] = r
}

report[v.Name()] = r
return true
})
return report, err
return report, mErr
}

// consumerLoop loops over all configured consumers and sends the gathered telemetry
Expand Down
63 changes: 63 additions & 0 deletions pkg/telemetry/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package telemetry

import (
"errors"
"fmt"
"runtime"
"testing"
"time"

"github.com/bombsimon/logrusr/v3"
"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -238,3 +241,63 @@ func TestManagerWithCatalogWorkflows(t *testing.T) {
}, report)
})
}

func TestManagerWithMultilpleWorkflowsOneReturningError(t *testing.T) {
logrusLog := logrus.New()
logrusLog.Level = logrus.DebugLevel
log := logrusr.New(logrusLog)
m, err := NewManager(
OptManagerLogger(log),
OptManagerPeriod(time.Millisecond),
)
require.NoError(t, err)

{
w := NewWorkflow("basic")
{
p, err := provider.NewFixedValueProvider("constant1", provider.Report{
"constant1": "value1",
})
require.NoError(t, err)
w.AddProvider(p)
}

m.AddWorkflow(w)
}
{
w := NewWorkflow("basic_with_error")
{
p, err := provider.NewFunctorProvider("error_provider", func() (provider.Report, error) {
return nil, errors.New("I am an error")
})
require.NoError(t, err)
w.AddProvider(p)
}
{

p, err := provider.NewFixedValueProvider("constant2", provider.Report{
"constant2": "value2",
})
require.NoError(t, err)
w.AddProvider(p)
}

m.AddWorkflow(w)
}

ch := make(chan types.Report)
consumer := NewRawConsumer(forwarders.NewRawChannelForwarder(ch))
require.NoError(t, m.AddConsumer(consumer))
require.NoError(t, m.Start())

report := <-ch
m.Stop()
require.EqualValues(t, types.Report{
"basic": provider.Report{
"constant1": "value1",
},
"basic_with_error": provider.Report{
"constant2": "value2",
},
}, report)
}
16 changes: 10 additions & 6 deletions pkg/telemetry/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"

"github.com/gammazero/workerpool"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"

"github.com/kong/kubernetes-telemetry/pkg/provider"
)
Expand Down Expand Up @@ -62,17 +64,15 @@ func (w *workflow) Execute(ctx context.Context) (provider.Report, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

wg.Add(len(w.providers))
for _, provider := range w.providers {
p := provider
wg.Add(1)

wp.Submit(func() {
defer wg.Done()

report, err := p.Provide(ctx)
if err != nil {
chErr <- err
return
chErr <- errors.Wrapf(err, "problem with provider %s", p.Name())
}

chReport <- report
Expand All @@ -86,12 +86,16 @@ func (w *workflow) Execute(ctx context.Context) (provider.Report, error) {
close(chReport)
}()

var mErr error

forLoop:
for {
select {
case err := <-chErr:
if err != nil {
return nil, err
mErr = multierror.Append(mErr,
errors.Wrapf(err, "error executing workflow %s", w.Name()),
)
}
case r := <-chReport:
report.Merge(r)
Expand All @@ -100,5 +104,5 @@ forLoop:
}
}

return report, nil
return report, mErr
}

0 comments on commit 8884711

Please sign in to comment.