From 95962d4e60487bbf9b7c460c456af126d8c3b4e4 Mon Sep 17 00:00:00 2001 From: Jakub Warczarek Date: Tue, 23 May 2023 11:26:38 +0200 Subject: [PATCH] feat(telemetry): introduce count for all Gateway API objects --- CHANGELOG.md | 5 +- go.mod | 4 +- go.sum | 4 +- internal/manager/telemetry/manager.go | 16 +- internal/manager/telemetry/telemetry_test.go | 430 +++++++++++++++++++ 5 files changed, 449 insertions(+), 10 deletions(-) create mode 100644 internal/manager/telemetry/telemetry_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 5746204565d..4da6706551e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -156,10 +156,13 @@ Adding a new version? You'll need three changes: `pod-ip-address.my-namespace.pod`. - `service`: will make KIC build addresses using the following template: `pod-ip-address.service-name.my-namespace.svc`. - This is known to not work on GKE becuase it uses `kube-dns` instead of coredns. + This is known to not work on GKE because it uses `kube-dns` instead of `coredns`. - Gateway's `AttachedRoutes` fields get updated with the number of routes referencing and using each listener. [#4052](https://github.com/Kong/kubernetes-ingress-controller/pull/4052) +- Telemetry reports now include a number of every Kind of Kubernetes object + that is provided by `gateway.networking.k8s.io` CRDs. + [#4058](https://github.com/Kong/kubernetes-ingress-controller/pull/4058) ### Changed diff --git a/go.mod b/go.mod index b835a9952f2..87a79d88c31 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/jpillora/backoff v1.0.0 github.com/kong/deck v1.20.0 github.com/kong/go-kong v0.42.0 - github.com/kong/kubernetes-telemetry v0.0.4 + github.com/kong/kubernetes-telemetry v0.0.5 github.com/kong/kubernetes-testing-framework v0.31.0 github.com/lithammer/dedent v1.1.0 github.com/miekg/dns v1.1.54 @@ -65,6 +65,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/net v0.10.0 // indirect + k8s.io/klog/v2 v2.100.1 // indirect ) require ( @@ -187,7 +188,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/cli-runtime v0.27.2 // indirect - k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect k8s.io/kubectl v0.27.2 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect diff --git a/go.sum b/go.sum index b8a52c3a1cc..93a62bbc363 100644 --- a/go.sum +++ b/go.sum @@ -250,8 +250,8 @@ github.com/kong/deck v1.20.0 h1:q8+8VBnvv0O+9mYjcPdJP5prG3KzbvR4XfePwkTx+Zc= github.com/kong/deck v1.20.0/go.mod h1:yJWEu6/xnYiaNBg2vP4EsYLtbt33J67Zsolye3JpJmI= github.com/kong/go-kong v0.42.0 h1:N0Rth32eGq6S5x33Txu+Gv9ZJ3gG5noffDjqezwutfA= github.com/kong/go-kong v0.42.0/go.mod h1:YUq7A3gcwk+9Z1ajwzVY2HnSyL/IKq/TJHsJDqT8hJM= -github.com/kong/kubernetes-telemetry v0.0.4 h1:6iDDocM4b/pIKJ/KrSSoQjvyaHBIBtBb4U9LdOqg8Js= -github.com/kong/kubernetes-telemetry v0.0.4/go.mod h1:xopN/XY+5xCXoY8kfnjHf83yT6n4ezVcWKJxA7gmJUw= +github.com/kong/kubernetes-telemetry v0.0.5 h1:FCJx1ShFXbqNb2K6N9FE3MZKwCkp8CEVAJuej5+1S9U= +github.com/kong/kubernetes-telemetry v0.0.5/go.mod h1:FrAXjZHEPSAEL4aUXnNOpPoRiA68VJvwgN8dfG/qF60= github.com/kong/kubernetes-testing-framework v0.31.0 h1:3MTlUeiD/jV3ArdZWfRLidi7kchg19HI222dXQHwasw= github.com/kong/kubernetes-testing-framework v0.31.0/go.mod h1:RKFMHJCDByNnHiw+hRLwR26eIZRgd5ImhwYjwK+yUQg= github.com/kong/semver/v4 v4.0.1 h1:DIcNR8W3gfx0KabFBADPalxxsp+q/5COwIFkkhrFQ2Y= diff --git a/internal/manager/telemetry/manager.go b/internal/manager/telemetry/manager.go index e8063866710..5ad24760bcb 100644 --- a/internal/manager/telemetry/manager.go +++ b/internal/manager/telemetry/manager.go @@ -1,6 +1,7 @@ package telemetry import ( + "crypto/tls" "fmt" "strings" "time" @@ -22,10 +23,13 @@ import ( "github.com/kong/kubernetes-ingress-controller/v2/internal/util" ) -const ( - splunkEndpoint = "kong-hf.konghq.com:61833" - telemetryPeriod = time.Hour +var ( + SplunkEndpoint = "kong-hf.konghq.com:61833" + SplunkEndpointInsecureSkipVerify = false + TelemetryPeriod = time.Hour +) +const ( prefix = "kic" SignalStart = prefix + "-start" SignalPing = prefix + "-ping" @@ -56,14 +60,16 @@ func CreateManager(restConfig *rest.Config, gatewaysCounter workflows.Discovered dyn := dynamic.New(k.Discovery().RESTClient()) m, err := createManager(k, dyn, cl, gatewaysCounter, fixedPayload, rv, - telemetry.OptManagerPeriod(telemetryPeriod), + telemetry.OptManagerPeriod(TelemetryPeriod), telemetry.OptManagerLogger(logger), ) if err != nil { return nil, err } - tf, err := forwarders.NewTLSForwarder(splunkEndpoint, logger) + tf, err := forwarders.NewTLSForwarder(SplunkEndpoint, logger, func(c *tls.Config) { + c.InsecureSkipVerify = SplunkEndpointInsecureSkipVerify + }) if err != nil { return nil, fmt.Errorf("failed to create telemetry TLSForwarder: %w", err) } diff --git a/internal/manager/telemetry/telemetry_test.go b/internal/manager/telemetry/telemetry_test.go new file mode 100644 index 00000000000..dc05ca2a9f0 --- /dev/null +++ b/internal/manager/telemetry/telemetry_test.go @@ -0,0 +1,430 @@ +//go:build envtest +// +build envtest + +package telemetry_test + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "errors" + "fmt" + "io" + "math/big" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apimachinery/pkg/version" + discoveryclient "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/manager" + "github.com/kong/kubernetes-ingress-controller/v2/internal/manager/featuregates" + "github.com/kong/kubernetes-ingress-controller/v2/internal/manager/telemetry" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" + "github.com/kong/kubernetes-ingress-controller/v2/pkg/clientset/scheme" + "github.com/kong/kubernetes-ingress-controller/v2/test/envtest" +) + +func TestTelemetry(t *testing.T) { + t.Log("configuring TLS listener - server for telemetry data") + cert, err := generateSelfSignedCert() + require.NoError(t, err) + listener, err := tls.Listen("tcp", "localhost:0", &tls.Config{ + Certificates: []tls.Certificate{cert}, + // The same version as the one used by TLS forwarder in the pkg telemetry. + MinVersion: tls.VersionTLS13, + MaxVersion: tls.VersionTLS13, + }) + require.NoError(t, err) + defer listener.Close() + // Run a server that will receive the report, it's expected + // to be the first connection and the payload. + reportChan := make(chan []byte) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + // Any function return indicates that either the + // report was sent or there was nothing to send. + defer close(reportChan) + conn, err := listener.Accept() + if !assert.NoError(t, err) { + return + } + defer conn.Close() + for { + report := make([]byte, 2048) // Report is much shorter. + n, err := conn.Read(report) + if !assert.NoError(t, err) { + return + } + select { + case reportChan <- report[:n]: + case <-ctx.Done(): + return + } + } + }() + + t.Log("configuring envtest and creating K8s objects for telemetry test") + envcfg := envtest.Setup(t, scheme.Scheme) + cfg := configForEnvTestTelemetry(t, envcfg) + c, err := cfg.GetKubeconfig() + require.NoError(t, err) + createK8sObjectsForTelemetryTest(ctx, t, c) + + t.Log("starting the controller manager") + // Override the telemetry settings, to allow testing. + // Set them back to the original values at the end of the test. + set := func(ep string, skipVerify bool, dur time.Duration) { + telemetry.SplunkEndpoint = ep + telemetry.SplunkEndpointInsecureSkipVerify = skipVerify + telemetry.TelemetryPeriod = dur + } + defer set(telemetry.SplunkEndpoint, telemetry.SplunkEndpointInsecureSkipVerify, telemetry.TelemetryPeriod) + set(listener.Addr().String(), true, time.Second) // Let's have long duration due too rate limiter in K8s client. + go func() { + deprecatedLogger, _, err := manager.SetupLoggers(&cfg, io.Discard) + if !assert.NoError(t, err) { + return + } + err = manager.Run(ctx, &cfg, util.ConfigDumpDiagnostic{}, deprecatedLogger) + if !assert.NoError(t, err) { + return + } + }() + + dcl, err := discoveryclient.NewDiscoveryClientForConfig(envcfg) + require.NoError(t, err) + k8sVersion, err := dcl.ServerVersion() + require.NoError(t, err) + t.Log("verifying telemetry report") + require.EventuallyWithT(t, func(c *assert.CollectT) { + // The first report on the channel will be signal=kic-start; + // next ones will be signal=kic-ping; and on this we assert. + verifyTelemetryReport(t, c, k8sVersion, <-reportChan) + }, + 10*time.Second, + // Tick duration doesn't really matter, because reading from channel is blocking. + 100*time.Millisecond, + ) +} + +func configForEnvTestTelemetry(t *testing.T, envcfg *rest.Config) manager.Config { + t.Helper() + + cfg := manager.Config{} + cfg.FlagSet() // Just set the defaults. + + // Telemetry is enabled by default so nothing to configure here. + + // Override the APIServer. + cfg.APIServerHost = envcfg.Host + cfg.APIServerCertData = envcfg.CertData + cfg.APIServerKeyData = envcfg.KeyData + cfg.APIServerCAData = envcfg.CAData + cfg.KongAdminURLs = []string{envtest.StartAdminAPIServerMock(t).URL} + cfg.UpdateStatus = false + cfg.ProxySyncSeconds = 0.1 + + // And other settings which are irrelevant. + cfg.Konnect.ConfigSynchronizationEnabled = false + cfg.Konnect.LicenseSynchronizationEnabled = false + cfg.EnableProfiling = false + cfg.EnableConfigDumps = false + + cfg.FeatureGates = featuregates.GetFeatureGatesDefaults() + cfg.FeatureGates[featuregates.GatewayFeature] = false + + return cfg +} + +func createK8sObjectsForTelemetryTest(ctx context.Context, t *testing.T, cfg *rest.Config) { + t.Helper() + cl, err := kubernetes.NewForConfig(cfg) + require.NoError(t, err) + + gcl, err := gatewayclient.NewForConfig(cfg) + require.NoError(t, err) + + const additionalNamespace = "test-ns-1" + _, err = cl.CoreV1().Namespaces().Create( + ctx, + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: additionalNamespace}, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + for i := 0; i < 2; i++ { + _, err = gcl.GatewayV1beta1().GatewayClasses().Create( + ctx, + &gatewayv1beta1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1beta1.GatewayClassSpec{ + ControllerName: "test.com/gateway-controller", + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = cl.CoreV1().Nodes().Create( + ctx, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + for _, namespace := range []string{metav1.NamespaceDefault, additionalNamespace} { + _, err := cl.CoreV1().Services(namespace).Create( + ctx, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Port: 443, + }, + }, + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = cl.CoreV1().Pods(namespace).Create( + ctx, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test", Image: "test"}, + }, + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = gcl.GatewayV1beta1().Gateways(namespace).Create( + ctx, + &gatewayv1beta1.Gateway{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1beta1.GatewaySpec{ + GatewayClassName: gatewayv1beta1.ObjectName("test"), + Listeners: []gatewayv1beta1.Listener{ + { + Name: "test", + Port: 443, + Protocol: "HTTP", + }, + }, + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = gcl.GatewayV1beta1().HTTPRoutes(namespace).Create( + ctx, + &gatewayv1beta1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1beta1.HTTPRouteSpec{}, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = gcl.GatewayV1alpha2().GRPCRoutes(namespace).Create( + ctx, + &gatewayv1alpha2.GRPCRoute{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1alpha2.GRPCRouteSpec{}, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = gcl.GatewayV1alpha2().TCPRoutes(namespace).Create( + ctx, + &gatewayv1alpha2.TCPRoute{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1alpha2.TCPRouteSpec{ + Rules: []gatewayv1alpha2.TCPRouteRule{{}}, + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = gcl.GatewayV1alpha2().UDPRoutes(namespace).Create( + ctx, + &gatewayv1alpha2.UDPRoute{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1alpha2.UDPRouteSpec{ + Rules: []gatewayv1alpha2.UDPRouteRule{{}}, + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = gcl.GatewayV1alpha2().TLSRoutes(namespace).Create( + ctx, + &gatewayv1alpha2.TLSRoute{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1alpha2.TLSRouteSpec{ + Rules: []gatewayv1alpha2.TLSRouteRule{{}}, + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = gcl.GatewayV1beta1().ReferenceGrants(namespace).Create( + ctx, + &gatewayv1beta1.ReferenceGrant{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-%d", i)}, + Spec: gatewayv1beta1.ReferenceGrantSpec{ + From: []gatewayv1beta1.ReferenceGrantFrom{ + { + Kind: "test", + Namespace: metav1.NamespaceDefault, + }, + }, + To: []gatewayv1beta1.ReferenceGrantTo{ + { + Kind: "test", + }, + }, + }, + }, + metav1.CreateOptions{}, + ) + require.NoError(t, err) + } + } +} + +func verifyTelemetryReport(t *testing.T, c *assert.CollectT, k8sVersion *version.Info, report []byte) { + t.Helper() + hostname, err := os.Hostname() + assert.NoError(c, err) + semver, err := utilversion.ParseGeneric(k8sVersion.GitVersion) + assert.NoError(c, err) + + // Report contains stanza like: + // id=57a7a76c-25d0-4394-ab9a-954f7190e39a; + // uptime=9; + // that is not stable across runs, so we need to remove it. + reportToAssert, err := removeStanzaFromReport(string(report), "id") + assert.NoError(c, err) + reportToAssert, err = removeStanzaFromReport(reportToAssert, "uptime") + assert.NoError(c, err) + + assert.Equal( + c, + fmt.Sprintf( + "<14>"+ + "signal=kic-ping;"+ + "db=off;"+ + "feature-combinedroutes=true;"+ + "feature-combinedservices=false;"+ + "feature-expressionroutes=false;"+ + "feature-fillids=false;"+ + "feature-gateway-service-discovery=false;"+ + "feature-gateway=false;"+ + "feature-gatewayalpha=false;"+ + "feature-knative=false;"+ + "feature-konnect-sync=false;"+ + "hn=%s;"+ + "kv=3.3.0;"+ + "v=NOT_SET;"+ + "k8s_arch=%s;"+ + "k8s_provider=UNKNOWN;"+ + "k8sv=%s;"+ + "k8sv_semver=%s;"+ + "k8s_gatewayclasses_count=2;"+ + "k8s_gateways_count=4;"+ + "k8s_grpcroutes_count=4;"+ + "k8s_httproutes_count=4;"+ + "k8s_nodes_count=2;"+ + "k8s_pods_count=4;"+ + "k8s_referencegrants_count=4;"+ + "k8s_services_count=5;"+ + "k8s_tcproutes_count=4;"+ + "k8s_tlsroutes_count=4;"+ + "k8s_udproutes_count=4;"+ + "\n", + hostname, + k8sVersion.Platform, + k8sVersion.GitVersion, + "v"+semver.String(), + ), + reportToAssert, + ) +} + +func removeStanzaFromReport(report string, stanza string) (string, error) { + const idStanzaEnd = ";" + stanza = stanza + "=" + start := strings.Index(report, stanza) + if start == -1 { + return "", errors.New("stanza not found in report") + } + end := strings.Index(report[start:], idStanzaEnd) + if end == -1 { + return "", errors.New("stanza end not found in report") + } + end += start + return report[:start] + report[end+1:], nil +} + +func generateSelfSignedCert() (tls.Certificate, error) { + // Generate a new RSA private key. + privateKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return tls.Certificate{}, fmt.Errorf("failed to generate private key: %s", err.Error()) + } + + // Create a self-signed X.509 certificate. + template := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + CommonName: "localhost", + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(1, 0, 0), + BasicConstraintsValid: true, + } + derBytes, err := x509.CreateCertificate(rand.Reader, template, template, &privateKey.PublicKey, privateKey) + if err != nil { + return tls.Certificate{}, fmt.Errorf("failed to create certificate: %s", err.Error()) + } + + // Create a tls.Certificate from the generated private key and certificate. + certificate := tls.Certificate{ + Certificate: [][]byte{derBytes}, + PrivateKey: privateKey, + } + + return certificate, nil +}