Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return partial reports without failure #45

Merged
merged 1 commit into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ require (
sigs.k8s.io/controller-runtime v0.12.3
)

require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
Expand All @@ -27,6 +32,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
25 changes: 25 additions & 0 deletions pkg/forwarders/rawchannelforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package forwarders

import (
"github.com/kong/kubernetes-telemetry/pkg/types"
)

type rawChannelForwarder struct {
ch chan types.Report
}

// NewRawChannelForwarder creates new rawChannelForwarder.
func NewRawChannelForwarder(ch chan types.Report) *rawChannelForwarder {
return &rawChannelForwarder{
ch: ch,
}
}

func (f *rawChannelForwarder) Name() string {
return "LogForwarder"
}

func (f *rawChannelForwarder) Forward(r types.Report) error {
f.ch <- r
return nil
}
pmalek marked this conversation as resolved.
Show resolved Hide resolved
33 changes: 8 additions & 25 deletions pkg/forwarders/tlsforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"

"github.com/kong/kubernetes-telemetry/pkg/log"
)

const (
Expand All @@ -20,32 +21,14 @@ var tlsConf = tls.Config{
MaxVersion: tls.VersionTLS13,
}

// TODO: Address logging levels and library to be used.
// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893
const (
logrusrDiff = 4

// InfoLevel is the converted logging level from logrus to go-logr for
// information level logging. Note that the logrusr middleware technically
// flattens all levels prior to this level into this level as well.
InfoLevel = int(logrus.InfoLevel) - logrusrDiff

// DebugLevel is the converted logging level from logrus to go-logr for
// debug level logging.
DebugLevel = int(logrus.DebugLevel) - logrusrDiff

// WarnLevel is the converted logrus level to go-logr for warnings.
WarnLevel = int(logrus.WarnLevel) - logrusrDiff
)

type tlsForwarder struct {
log logr.Logger
conn *tls.Conn
logger logr.Logger
conn *tls.Conn
}

// NewTLSForwarder creates a TLS forwarder which forwards received serialized reports
// to a TLS endpoint specified by the provided address.
func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder {
func NewTLSForwarder(address string, logger logr.Logger) *tlsForwarder {
conn, err := tls.DialWithDialer(
&net.Dialer{
Timeout: defaultTimeout,
Expand All @@ -55,13 +38,13 @@ func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder {
&tlsConf,
)
if err != nil {
log.V(DebugLevel).Info("failed to connect to reporting server", "error", err)
logger.V(log.DebugLevel).Info("failed to connect to reporting server", "error", err)
return nil
}

return &tlsForwarder{
log: log,
conn: conn,
logger: logger,
conn: conn,
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/log/logp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package log

import "github.com/sirupsen/logrus"

// TODO: Address logging levels and library to be used.
// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893
const (
logrusrDiff = 4

// InfoLevel is the converted logging level from logrus to go-logr for
// information level logging. Note that the logrusr middleware technically
// flattens all levels prior to this level into this level as well.
InfoLevel = int(logrus.InfoLevel) - logrusrDiff

// DebugLevel is the converted logging level from logrus to go-logr for
// debug level logging.
DebugLevel = int(logrus.DebugLevel) - logrusrDiff

// WarnLevel is the converted logrus level to go-logr for warnings.
WarnLevel = int(logrus.WarnLevel) - logrusrDiff
)
65 changes: 62 additions & 3 deletions pkg/telemetry/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type Forwarder interface {
// serialize the data and then forward it using the provided forwarder.
func NewConsumer(s Serializer, f Forwarder) *consumer {
var (
ch = make(chan types.Report)
done = make(chan struct{})
logger = defaultLogger() // TODO: allow configuration
ch = make(chan types.Report)
done = make(chan struct{})
// TODO: allow configuration: https://github.com/Kong/kubernetes-telemetry/issues/46
logger = defaultLogger()
)

go func() {
Expand Down Expand Up @@ -56,12 +57,70 @@ func NewConsumer(s Serializer, f Forwarder) *consumer {
}
}

// Intake returns a channel on which this consumer will wait for data to consume it.
func (c *consumer) Intake() chan<- types.Report {
return c.ch
}

// Close closes the consumer.
func (c *consumer) Close() {
c.once.Do(func() {
close(c.done)
})
}

type rawConsumer struct {
logger logr.Logger
once sync.Once
ch chan types.Report
done chan struct{}
}

// RawForwarder is used to forward raw, unserialized telemetry reports to configured
// destination(s).
type RawForwarder interface {
Name() string
Forward(types.Report) error
}

shaneutt marked this conversation as resolved.
Show resolved Hide resolved
// NewRawConsumer creates a new rawconsumer that will use the provided raw forwarder
// to forward received reports.
func NewRawConsumer(f RawForwarder) *rawConsumer {
var (
ch = make(chan types.Report)
done = make(chan struct{})
// TODO: allow configuration: https://github.com/Kong/kubernetes-telemetry/issues/46
logger = defaultLogger()
)

go func() {
for {
select {
case <-done:
return
case r := <-ch:
if err := f.Forward(r); err != nil {
logger.Error(err, "failed to forward report using raw forwarder: %s", f.Name())
}
}
}
}()

return &rawConsumer{
logger: logger,
ch: ch,
done: done,
}
}

// Intake returns a channel on which this consumer will wait for data to consume it.
func (c *rawConsumer) Intake() chan<- types.Report {
return c.ch
}

// Close closes rawconsumer.
func (c *rawConsumer) Close() {
c.once.Do(func() {
close(c.done)
})
}
34 changes: 20 additions & 14 deletions pkg/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/hashicorp/go-multierror"
"github.com/puzpuzpuz/xsync"

"github.com/kong/kubernetes-telemetry/pkg/provider"
"github.com/kong/kubernetes-telemetry/pkg/log"
"github.com/kong/kubernetes-telemetry/pkg/types"
)

Expand Down Expand Up @@ -165,7 +165,14 @@ func (m *manager) workflowsLoop() {
case <-ticker.C:
report, err := m.Execute(context.Background())
if err != nil {
m.logger.Error(err, "error executing workflows")
m.logger.V(log.DebugLevel).
WithValues("error", err.Error()).
Info("error executing workflows")
}

// Continue the execution even if we get an error but account for possibility
// of getting nil reports, in which case move on to the next iteration (tick).
if report == nil {
continue
}

Expand All @@ -179,28 +186,27 @@ func (m *manager) workflowsLoop() {
}

// Execute executes all configures workflows and returns an aggregated report
// from all the underying providers.
// from all the underlying providers.
func (m *manager) Execute(ctx context.Context) (types.Report, error) {
var (
err error
mErr error
report = types.Report{}
)

m.workflows.Range(func(name string, v Workflow) bool {
var r provider.Report
r, err = v.Execute(ctx)
r, err := v.Execute(ctx)
if err != nil {
err = errors.Wrapf(err, "error executing workflow %s", name)
// TODO: return true and don't abort when encountering an error.
// Better to report partial report than nothing. In order to do so
// use an error agreggator like https://github.com/hashicorp/go-multierror.
return false
mErr = multierror.Append(mErr, err)
}

// Add the report regardless if it's partial only omitting empty (nil) reports.
if r != nil {
report[v.Name()] = r
}

report[v.Name()] = r
return true
})
return report, err
return report, mErr
}

// consumerLoop loops over all configured consumers and sends the gathered telemetry
Expand Down
63 changes: 63 additions & 0 deletions pkg/telemetry/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package telemetry

import (
"errors"
"fmt"
"runtime"
"testing"
"time"

"github.com/bombsimon/logrusr/v3"
"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -238,3 +241,63 @@ func TestManagerWithCatalogWorkflows(t *testing.T) {
}, report)
})
}

func TestManagerWithMultilpleWorkflowsOneReturningError(t *testing.T) {
logrusLog := logrus.New()
logrusLog.Level = logrus.DebugLevel
log := logrusr.New(logrusLog)
m, err := NewManager(
OptManagerLogger(log),
OptManagerPeriod(time.Millisecond),
)
require.NoError(t, err)

{
w := NewWorkflow("basic")
{
p, err := provider.NewFixedValueProvider("constant1", provider.Report{
"constant1": "value1",
})
require.NoError(t, err)
w.AddProvider(p)
}

m.AddWorkflow(w)
}
{
w := NewWorkflow("basic_with_error")
{
p, err := provider.NewFunctorProvider("error_provider", func() (provider.Report, error) {
return nil, errors.New("I am an error")
})
require.NoError(t, err)
w.AddProvider(p)
}
{

p, err := provider.NewFixedValueProvider("constant2", provider.Report{
"constant2": "value2",
})
require.NoError(t, err)
w.AddProvider(p)
}

m.AddWorkflow(w)
}

ch := make(chan types.Report)
consumer := NewRawConsumer(forwarders.NewRawChannelForwarder(ch))
require.NoError(t, m.AddConsumer(consumer))
require.NoError(t, m.Start())

report := <-ch
m.Stop()
require.EqualValues(t, types.Report{
"basic": provider.Report{
"constant1": "value1",
},
"basic_with_error": provider.Report{
"constant2": "value2",
},
}, report)
}
Loading