diff --git a/src/mapper/pkg/resolvers/helpers.go b/src/mapper/pkg/resolvers/helpers.go index eaf4654d..295593f0 100644 --- a/src/mapper/pkg/resolvers/helpers.go +++ b/src/mapper/pkg/resolvers/helpers.go @@ -26,21 +26,18 @@ func (r *mutationResolver) discoverSrcIdentity(ctx context.Context, src model.Re srcPod, err := r.kubeFinder.ResolveIPToPod(ctx, src.SrcIP) if err != nil { if errors.Is(err, kubefinder.ErrFoundMoreThanOnePod) { - logrus.WithError(err).Debugf("Ip %s belongs to more than one pod, ignoring", src.SrcIP) - return model.OtterizeServiceIdentity{}, nil + return model.OtterizeServiceIdentity{}, fmt.Errorf("IP %s belongs to more than one pod, ignoring", src.SrcIP) } return model.OtterizeServiceIdentity{}, fmt.Errorf("could not resolve %s to pod: %w", src.SrcIP, err) } if src.SrcHostname != "" && srcPod.Name != src.SrcHostname { // This could mean a new pod is reusing the same IP // TODO: Use the captured hostname to actually find the relevant pod (instead of the IP that might no longer exist or be reused) - logrus.Warnf("Found pod %s (by ip %s) doesn't match captured hostname %s, ignoring", srcPod.Name, src.SrcIP, src.SrcHostname) - return model.OtterizeServiceIdentity{}, nil + return model.OtterizeServiceIdentity{}, fmt.Errorf("found pod %s (by ip %s) doesn't match captured hostname %s, ignoring", srcPod.Name, src.SrcIP, src.SrcHostname) } if srcPod.DeletionTimestamp != nil { - logrus.Debugf("Pod %s is being deleted, ignoring", srcPod.Name) - return model.OtterizeServiceIdentity{}, nil + return model.OtterizeServiceIdentity{}, fmt.Errorf("pod %s is being deleted, ignoring", srcPod.Name) } srcService, err := r.serviceIdResolver.ResolvePodToServiceIdentity(ctx, srcPod) diff --git a/src/mapper/pkg/resolvers/resolver_test.go b/src/mapper/pkg/resolvers/resolver_test.go index 942125b1..c9262147 100644 --- a/src/mapper/pkg/resolvers/resolver_test.go +++ b/src/mapper/pkg/resolvers/resolver_test.go @@ -13,7 +13,13 @@ import ( "github.com/otterize/network-mapper/src/shared/testbase" "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "net/http/httptest" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "testing" "time" ) @@ -151,6 +157,339 @@ func (s *ResolverTestSuite) TestReportCaptureResults() { }) } +func (s *ResolverTestSuite) TestReportCaptureResultsHostnameMismatch() { + s.AddDeploymentWithService("service1", []string{"1.1.1.1"}, map[string]string{"app": "service1"}, "10.0.0.16") + s.AddDeploymentWithService("service2", []string{"1.1.1.2"}, map[string]string{"app": "service2"}, "10.0.0.17") + s.AddDaemonSetWithService("service3", []string{"1.1.1.3"}, map[string]string{"app": "service3"}, "10.0.0.18") + s.AddPod("pod4", "1.1.1.4", nil, nil) + s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background())) + + packetTime := time.Now().Add(time.Minute) + _, err := test_gql_client.ReportCaptureResults(context.Background(), s.client, test_gql_client.CaptureResults{ + Results: []test_gql_client.RecordedDestinationsForSrc{ + { + SrcIp: "1.1.1.1", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + { + SrcIp: "1.1.1.3", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + // should be discarded - hostname mismatch + { + SrcIp: "1.1.1.4", + SrcHostname: "pod5", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + }, + }) + s.Require().NoError(err) + + res, err := test_gql_client.ServiceIntents(context.Background(), s.client, nil) + s.Require().NoError(err) + s.Require().ElementsMatch(res.ServiceIntents, []test_gql_client.ServiceIntentsServiceIntents{ + { + Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ + Name: fmt.Sprintf("deployment-%s", "service1"), + Namespace: s.TestNamespace, + PodOwnerKind: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentityPodOwnerKindGroupVersionKind{ + Group: "apps", + Kind: "Deployment", + Version: "v1", + }, + }, + Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ + { + Name: fmt.Sprintf("deployment-%s", "service2"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service2", + }, + }, + }, + { + Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ + Name: fmt.Sprintf("daemonset-%s", "service3"), + Namespace: s.TestNamespace, + PodOwnerKind: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentityPodOwnerKindGroupVersionKind{ + Group: "apps", + Kind: "DaemonSet", + Version: "v1", + }, + }, + Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ + { + Name: fmt.Sprintf("deployment-%s", "service1"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service1", + }, + { + Name: fmt.Sprintf("deployment-%s", "service2"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service2", + }, + }, + }, + }) +} + +func (s *ResolverTestSuite) TestReportCaptureResultsPodDeletion() { + s.AddDeploymentWithService("service1", []string{"1.1.1.1"}, map[string]string{"app": "service1"}, "10.0.0.16") + s.AddDeploymentWithService("service2", []string{"1.1.1.2"}, map[string]string{"app": "service2"}, "10.0.0.17") + s.AddDaemonSetWithService("service3", []string{"1.1.1.3"}, map[string]string{"app": "service3"}, "10.0.0.18") + pod := s.AddPod("pod4", "1.1.1.4", nil, nil) + var podToUpdate v1.Pod + err := s.Mgr.GetClient().Get(context.Background(), types.NamespacedName{Name: pod.GetName(), Namespace: pod.GetNamespace()}, &podToUpdate) + s.Require().NoError(err) + s.Require().True(controllerutil.AddFinalizer(&podToUpdate, "intents.otterize.com/finalizer-so-that-object-cant-be-deleted-for-this-test")) + err = s.Mgr.GetClient().Update(context.Background(), &podToUpdate) + s.Require().NoError(err) + s.Require().NoError(wait.PollImmediate(1*time.Second, 10*time.Second, func() (done bool, err error) { + var readPod v1.Pod + err = s.Mgr.GetClient().Get(context.Background(), types.NamespacedName{Name: pod.GetName(), Namespace: pod.GetNamespace()}, &readPod) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + + if !slices.Contains(readPod.Finalizers, "intents.otterize.com/finalizer-so-that-object-cant-be-deleted-for-this-test") { + return false, nil + } + return true, nil + })) + + err = s.Mgr.GetClient().Delete(context.Background(), pod) + s.Require().NoError(err) + + packetTime := time.Now().Add(time.Minute) + _, err = test_gql_client.ReportCaptureResults(context.Background(), s.client, test_gql_client.CaptureResults{ + Results: []test_gql_client.RecordedDestinationsForSrc{ + { + SrcIp: "1.1.1.1", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + { + SrcIp: "1.1.1.3", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + // should be discarded - deleted pod + { + SrcIp: "1.1.1.4", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + }, + }) + s.Require().NoError(err) + + res, err := test_gql_client.ServiceIntents(context.Background(), s.client, nil) + s.Require().NoError(err) + s.Require().ElementsMatch(res.ServiceIntents, []test_gql_client.ServiceIntentsServiceIntents{ + { + Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ + Name: fmt.Sprintf("deployment-%s", "service1"), + Namespace: s.TestNamespace, + PodOwnerKind: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentityPodOwnerKindGroupVersionKind{ + Group: "apps", + Kind: "Deployment", + Version: "v1", + }, + }, + Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ + { + Name: fmt.Sprintf("deployment-%s", "service2"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service2", + }, + }, + }, + { + Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ + Name: fmt.Sprintf("daemonset-%s", "service3"), + Namespace: s.TestNamespace, + PodOwnerKind: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentityPodOwnerKindGroupVersionKind{ + Group: "apps", + Kind: "DaemonSet", + Version: "v1", + }, + }, + Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ + { + Name: fmt.Sprintf("deployment-%s", "service1"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service1", + }, + { + Name: fmt.Sprintf("deployment-%s", "service2"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service2", + }, + }, + }, + }) +} + +func (s *ResolverTestSuite) TestReportCaptureResultsIPReuse() { + s.AddDeploymentWithService("service1", []string{"1.1.1.1"}, map[string]string{"app": "service1"}, "10.0.0.16") + s.AddDeploymentWithService("service2", []string{"1.1.1.2"}, map[string]string{"app": "service2"}, "10.0.0.17") + s.AddDaemonSetWithService("service3", []string{"1.1.1.3"}, map[string]string{"app": "service3"}, "10.0.0.18") + s.AddPod("pod4", "1.1.1.4", nil, nil) + // intentionally reusing Pod IP + s.AddDaemonSetWithService("network-sniffer", []string{"1.1.1.5"}, map[string]string{"app": "network-sniffer"}, "10.0.0.19") + s.AddDaemonSetWithService("network-sniffer-2", []string{"1.1.1.5"}, map[string]string{"app": "network-sniffer"}, "10.0.0.20") + s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background())) + + packetTime := time.Now().Add(time.Minute) + _, err := test_gql_client.ReportCaptureResults(context.Background(), s.client, test_gql_client.CaptureResults{ + Results: []test_gql_client.RecordedDestinationsForSrc{ + { + SrcIp: "1.1.1.1", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + { + SrcIp: "1.1.1.3", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + { + SrcIp: "1.1.1.4", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + // should be discarded - IP belongs to more than one pod + { + SrcIp: "1.1.1.5", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("svc-service2.%s.svc.cluster.local", s.TestNamespace), + LastSeen: packetTime, + }, + }, + }, + }, + }) + s.Require().NoError(err) + + res, err := test_gql_client.ServiceIntents(context.Background(), s.client, nil) + s.Require().NoError(err) + s.Require().ElementsMatch(res.ServiceIntents, []test_gql_client.ServiceIntentsServiceIntents{ + { + Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ + Name: fmt.Sprintf("deployment-%s", "service1"), + Namespace: s.TestNamespace, + PodOwnerKind: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentityPodOwnerKindGroupVersionKind{ + Group: "apps", + Kind: "Deployment", + Version: "v1", + }, + }, + Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ + { + Name: fmt.Sprintf("deployment-%s", "service2"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service2", + }, + }, + }, + { + Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ + Name: fmt.Sprintf("daemonset-%s", "service3"), + Namespace: s.TestNamespace, + PodOwnerKind: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentityPodOwnerKindGroupVersionKind{ + Group: "apps", + Kind: "DaemonSet", + Version: "v1", + }, + }, + Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ + { + Name: fmt.Sprintf("deployment-%s", "service1"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service1", + }, + { + Name: fmt.Sprintf("deployment-%s", "service2"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service2", + }, + }, + }, + { + Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ + Name: "pod4", + Namespace: s.TestNamespace, + PodOwnerKind: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentityPodOwnerKindGroupVersionKind{ + Group: "", + Kind: "Pod", + Version: "v1", + }, + }, + Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ + { + Name: fmt.Sprintf("deployment-%s", "service2"), + Namespace: s.TestNamespace, + KubernetesService: "svc-service2", + }, + }, + }, + }) +} + func (s *ResolverTestSuite) TestReportCaptureResultsIgnoreOldPacket() { s.AddDeploymentWithService("service1", []string{"1.1.1.1"}, map[string]string{"app": "service1"}, "10.0.0.19") s.AddDeploymentWithService("service2", []string{"1.1.1.2"}, map[string]string{"app": "service2"}, "10.0.0.20") diff --git a/src/mapper/pkg/resolvers/schema.resolvers.go b/src/mapper/pkg/resolvers/schema.resolvers.go index c3f38eaa..2d7f21e4 100644 --- a/src/mapper/pkg/resolvers/schema.resolvers.go +++ b/src/mapper/pkg/resolvers/schema.resolvers.go @@ -35,10 +35,10 @@ func (r *mutationResolver) ReportCaptureResults(ctx context.Context, results mod continue } for _, dest := range captureItem.Destinations { + destCopy := dest destAddress := dest.Destination if !strings.HasSuffix(destAddress, viper.GetString(config.ClusterDomainKey)) { - // not a k8s service, ignore - err := r.handleDNSCaptureResultsAsExternalTraffic(ctx, dest, srcSvcIdentity) + err := r.handleDNSCaptureResultsAsExternalTraffic(ctx, destCopy, srcSvcIdentity) if err != nil { logrus.WithError(err).Error("could not handle DNS capture result as external traffic") continue @@ -47,7 +47,7 @@ func (r *mutationResolver) ReportCaptureResults(ctx context.Context, results mod continue } - err := r.handleDNSCaptureResultsAsKubernetesPods(ctx, dest, srcSvcIdentity) + err := r.handleDNSCaptureResultsAsKubernetesPods(ctx, destCopy, srcSvcIdentity) if err != nil { logrus.WithError(err).Error("could not handle DNS capture result as pod") continue @@ -64,11 +64,12 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results for _, socketScanItem := range results.Results { srcSvcIdentity, err := r.discoverSrcIdentity(ctx, socketScanItem) if err != nil { - logrus.WithError(err).Errorf("could not discover src identity for '%s'", socketScanItem.SrcIP) + logrus.WithError(err).Debugf("could not discover src identity for '%s'", socketScanItem.SrcIP) continue } for _, dest := range socketScanItem.Destinations { - isService, err := r.tryHandleSocketScanDestinationAsService(ctx, srcSvcIdentity, dest) + destCopy := dest + isService, err := r.tryHandleSocketScanDestinationAsService(ctx, srcSvcIdentity, destCopy) if err != nil { logrus.WithError(err).Errorf("failed to handle IP '%s' as service, it may or may not be a service. This error only occurs if something failed; not if the IP does not belong to a service.", dest.Destination) // Log error but don't stop handling other destinations. @@ -79,14 +80,14 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results continue // No need to try to handle IP as Pod, since IP belonged to a service. } - destPod, err := r.kubeFinder.ResolveIPToPod(ctx, dest.Destination) + destPod, err := r.kubeFinder.ResolveIPToPod(ctx, destCopy.Destination) if err != nil { logrus.WithError(err).Debugf("Could not resolve %s to pod", dest.Destination) // Log error but don't stop handling other destinations. continue } - err = r.addSocketScanPodIntent(ctx, srcSvcIdentity, dest, destPod) + err = r.addSocketScanPodIntent(ctx, srcSvcIdentity, destCopy, destPod) if err != nil { logrus.WithError(err).Errorf("failed to resolve IP '%s' to pod", dest.Destination) // Log error but don't stop handling other destinations. diff --git a/src/shared/testbase/testsuitebase.go b/src/shared/testbase/testsuitebase.go index 43531559..368aa818 100644 --- a/src/shared/testbase/testsuitebase.go +++ b/src/shared/testbase/testsuitebase.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/samber/lo" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -34,26 +35,19 @@ type ControllerManagerTestSuiteBase struct { Mgr manager.Manager } -func (s *ControllerManagerTestSuiteBase) SetupSuite() { +func (s *ControllerManagerTestSuiteBase) SetupTest() { s.testEnv = &envtest.Environment{} var err error s.cfg, err = s.testEnv.Start() s.Require().NoError(err) s.Require().NotNil(s.cfg) + logrus.SetLevel(logrus.DebugLevel) s.K8sDirectClient, err = kubernetes.NewForConfig(s.cfg) s.Require().NoError(err) s.Require().NotNil(s.K8sDirectClient) -} - -func (s *ControllerManagerTestSuiteBase) TearDownSuite() { - s.Require().NoError(s.testEnv.Stop()) -} - -func (s *ControllerManagerTestSuiteBase) SetupTest() { s.mgrCtx, s.mgrCtxCancelFunc = context.WithCancel(context.Background()) - var err error s.Mgr, err = manager.New(s.cfg, manager.Options{MetricsBindAddress: "0"}) s.Require().NoError(err) testName := s.T().Name()[strings.LastIndex(s.T().Name(), "/")+1:] @@ -79,6 +73,7 @@ func (s *ControllerManagerTestSuiteBase) TearDownTest() { s.mgrCtxCancelFunc() err := s.K8sDirectClient.CoreV1().Namespaces().Delete(context.Background(), s.TestNamespace, metav1.DeleteOptions{}) s.Require().NoError(err) + s.Require().NoError(s.testEnv.Stop()) } // waitForObjectToBeCreated tries to get an object multiple times until it is available in the k8s API server