From d777c9a2f4689b39190af98cbb335e23636f1efa Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Thu, 4 Jul 2024 13:23:50 -0700 Subject: [PATCH 01/15] chore: More optimal IterateHierarchyV2 and iterateChildrenV2 [#600] Closes #600 The existing (effectively v1) implementations are suboptimal since they don't construct a graph before the iteration. They search for children by looking at all namespace resources and checking `isParentOf`, which can give `O(tree_size * namespace_resources_count)` time complexity. The v2 algorithms construct the graph and have `O(namespace_resources_count)` time complexity. See more details in the linked issues. Signed-off-by: Andrii Korotkov --- pkg/cache/cluster.go | 91 ++++++++++++ pkg/cache/cluster_test.go | 250 ++++++++++++++++++++++++++------ pkg/cache/mocks/ClusterCache.go | 7 +- pkg/cache/resource.go | 25 ++++ pkg/cache/resource_test.go | 4 +- 5 files changed, 326 insertions(+), 51 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 50791920e..290f08c58 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -120,6 +120,9 @@ type ClusterCache interface { // IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree. // The action callback returns true if iteration should continue and false otherwise. IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) + // IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree. + // The action callback returns true if iteration should continue and false otherwise. + IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) // IsNamespaced answers if specified group/kind is a namespaced resource API or not IsNamespaced(gk schema.GroupKind) (bool, error) // GetManagedLiveObjs helps finding matching live K8S resources for a given resources list. @@ -1000,6 +1003,94 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour } } +// IterateHierarchy iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree +func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + c.lock.RLock() + defer c.lock.RUnlock() + keysPerNamespace := make(map[string][]kube.ResourceKey) + for _, key := range keys { + keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key) + } + for namespace, namespaceKeys := range keysPerNamespace { + nsNodes := c.nsIndex[namespace] + // Prepare to construct a graph + nodesByUID := make(map[types.UID][]*Resource) + nodeByGraphKey := make(map[string]*Resource) + for _, node := range nsNodes { + nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) + // Based on what's used by isParentOf + graphKey := fmt.Sprintf("%s/%s/%s", node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name) + nodeByGraphKey[graphKey] = node + } + // Construct a graph using a logic similar to isParentOf but more optimal + graph := make(map[kube.ResourceKey][]kube.ResourceKey) + childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource) + for _, node := range nsNodes { + childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) + } + for _, node := range nsNodes { + for i, ownerRef := range node.OwnerRefs { + // backfill UID of inferred owner child references + if ownerRef.UID == "" { + graphKey := fmt.Sprintf("%s/%s/%s", ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name) + graphKeyNode, ok := nodeByGraphKey[graphKey] + if ok { + ownerRef.UID = graphKeyNode.Ref.UID + node.OwnerRefs[i] = ownerRef + } else { + continue + } + } + + uidNodes, ok := nodesByUID[ownerRef.UID] + if ok { + for _, uidNode := range uidNodes { + graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], node.ResourceKey()) + childrenByUID[uidNode.ResourceKey()][node.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][node.Ref.UID], node) + } + } + } + } + visited := make(map[kube.ResourceKey]int) + for _, key := range namespaceKeys { + visited[key] = 0 + } + for _, key := range namespaceKeys { + res, ok := c.resources[key] + if !ok { + continue + } + if visited[key] == 2 || !action(res, nsNodes) { + continue + } + visited[key] = 1 + // make sure children has no duplicates + for _, children := range childrenByUID[key] { + if len(children) > 0 { + // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). It is ok to pick any object but we need to make sure + // we pick the same child after every refresh. + sort.Slice(children, func(i, j int) bool { + key1 := children[i].ResourceKey() + key2 := children[j].ResourceKey() + return strings.Compare(key1.String(), key2.String()) < 0 + }) + child := children[0] + if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { + child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { + if err != nil { + c.log.V(2).Info(err.Error()) + return false + } + return action(child, namespaceResources) + }) + } + } + } + visited[key] = 2 + } + } +} + // IsNamespaced answers if specified group/kind is a namespaced resource API or not func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { if isNamespaced, ok := c.namespacedResources[gk]; ok { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 68221ab89..565a16270 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -3,12 +3,13 @@ package cache import ( "context" "fmt" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" "sort" "strings" "testing" "time" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -99,6 +100,10 @@ func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache { GroupKind: schema.GroupKind{Group: "apps", Kind: "StatefulSet"}, GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}, Meta: metav1.APIResource{Namespaced: true}, + }, { + GroupKind: schema.GroupKind{Group: "extensions", Kind: "ReplicaSet"}, + GroupVersionResource: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}, + Meta: metav1.APIResource{Namespaced: true}, }} cache := NewClusterCache( @@ -273,7 +278,7 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) { } func TestGetChildren(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -282,7 +287,7 @@ func TestGetChildren(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -316,7 +321,7 @@ func TestGetChildren(t *testing.T) { } func TestGetManagedLiveObjs(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -342,7 +347,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -367,7 +372,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -408,7 +413,7 @@ metadata: } func TestGetManagedLiveObjsAllNamespaces(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -436,7 +441,7 @@ metadata: } func TestGetManagedLiveObjsValidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -464,7 +469,7 @@ metadata: } func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -492,23 +497,23 @@ metadata: func TestGetManagedLiveObjsFailedConversion(t *testing.T) { cronTabGroup := "stable.example.com" - testCases := []struct{ - name string - localConvertFails bool + testCases := []struct { + name string + localConvertFails bool expectConvertToVersionCalled bool - expectGetResourceCalled bool + expectGetResourceCalled bool }{ { - name: "local convert fails, so GetResource is called", - localConvertFails: true, + name: "local convert fails, so GetResource is called", + localConvertFails: true, expectConvertToVersionCalled: true, - expectGetResourceCalled: true, + expectGetResourceCalled: true, }, { - name: "local convert succeeds, so GetResource is not called", - localConvertFails: false, + name: "local convert succeeds, so GetResource is not called", + localConvertFails: false, expectConvertToVersionCalled: true, - expectGetResourceCalled: false, + expectGetResourceCalled: false, }, } @@ -557,7 +562,6 @@ metadata: return testCronTab(), nil }) - managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool { return true }) @@ -572,26 +576,26 @@ metadata: } func TestChildDeletedEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) - cluster.processEvent(watch.Deleted, mustToUnstructured(testPod())) + cluster.processEvent(watch.Deleted, mustToUnstructured(testPod1())) rsChildren := getChildren(cluster, mustToUnstructured(testRS())) assert.Equal(t, []*Resource{}, rsChildren) } func TestProcessNewChildEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) newPod := strToUnstructured(` apiVersion: v1 kind: Pod metadata: - uid: "4" - name: helm-guestbook-pod2 + uid: "5" + name: helm-guestbook-pod-1-new namespace: default ownerReferences: - apiVersion: apps/v1 @@ -610,7 +614,7 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -628,9 +632,9 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod2", + Name: "helm-guestbook-pod-1-new", APIVersion: "v1", - UID: "4", + UID: "5", }, OwnerRefs: []metav1.OwnerReference{{ APIVersion: "apps/v1", @@ -643,10 +647,10 @@ func TestProcessNewChildEvent(t *testing.T) { } func TestWatchCacheUpdated(t *testing.T) { - removed := testPod() + removed := testPod1() removed.SetName(removed.GetName() + "-removed-pod") - updated := testPod() + updated := testPod1() updated.SetName(updated.GetName() + "-updated-pod") updated.SetResourceVersion("updated-pod-version") @@ -655,10 +659,10 @@ func TestWatchCacheUpdated(t *testing.T) { require.NoError(t, err) - added := testPod() + added := testPod1() added.SetName(added.GetName() + "-new-pod") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster.lock.Lock() defer cluster.lock.Unlock() @@ -669,13 +673,13 @@ func TestWatchCacheUpdated(t *testing.T) { } func TestNamespaceModeReplace(t *testing.T) { - ns1Pod := testPod() + ns1Pod := testPod1() ns1Pod.SetNamespace("ns1") ns1Pod.SetName("pod1") - ns2Pod := testPod() + ns2Pod := testPod1() ns2Pod.SetNamespace("ns2") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster := newCluster(t, ns1Pod, ns2Pod) err := cluster.EnsureSynced() @@ -790,14 +794,14 @@ func getResourceKey(t *testing.T, obj runtime.Object) kube.ResourceKey { return kube.NewResourceKey(gvk.Group, gvk.Kind, m.GetNamespace(), m.GetName()) } -func testPod() *corev1.Pod { +func testPod1() *corev1.Pod { return &corev1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", Namespace: "default", UID: "1", ResourceVersion: "123", @@ -814,27 +818,51 @@ func testPod() *corev1.Pod { } } +// Similar to pod1, but owner reference lacks uid +func testPod2() *corev1.Pod { + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook-pod-2", + Namespace: "default", + UID: "4", + ResourceVersion: "123", + CreationTimestamp: metav1.NewTime(testCreationTime), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + }, + }, + }, + } +} + func testCRD() *apiextensions.CustomResourceDefinition { return &apiextensions.CustomResourceDefinition{ - TypeMeta: metav1.TypeMeta{ + TypeMeta: metav1.TypeMeta{ APIVersion: "apiextensions.k8s.io/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: "crontabs.stable.example.com", }, - Spec: apiextensions.CustomResourceDefinitionSpec{ + Spec: apiextensions.CustomResourceDefinitionSpec{ Group: "stable.example.com", Versions: []apiextensions.CustomResourceDefinitionVersion{ { - Name: "v1", - Served: true, + Name: "v1", + Served: true, Storage: true, Schema: &apiextensions.CustomResourceValidation{ OpenAPIV3Schema: &apiextensions.JSONSchemaProps{ Type: "object", Properties: map[string]apiextensions.JSONSchemaProps{ "cronSpec": {Type: "string"}, - "image": {Type: "string"}, + "image": {Type: "string"}, "replicas": {Type: "integer"}, }, }, @@ -855,14 +883,14 @@ func testCRD() *apiextensions.CustomResourceDefinition { func testCronTab() *unstructured.Unstructured { return &unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": "stable.example.com/v1", - "kind": "CronTab", + "kind": "CronTab", "metadata": map[string]interface{}{ - "name": "test-crontab", + "name": "test-crontab", "namespace": "default", }, "spec": map[string]interface{}{ "cronSpec": "* * * * */5", - "image": "my-awesome-cron-image", + "image": "my-awesome-cron-image", }, }} } @@ -943,7 +971,7 @@ func testDeploy() *appsv1.Deployment { } func TestIterateHierachy(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -956,7 +984,8 @@ func TestIterateHierachy(t *testing.T) { assert.ElementsMatch(t, []kube.ResourceKey{ - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), kube.GetResourceKey(mustToUnstructured(testRS())), kube.GetResourceKey(mustToUnstructured(testDeploy()))}, keys) @@ -1001,8 +1030,133 @@ func TestIterateHierachy(t *testing.T) { []kube.ResourceKey{ kube.GetResourceKey(mustToUnstructured(testDeploy())), kube.GetResourceKey(mustToUnstructured(testRS())), - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + }, + keys) + }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + keys := []kube.ResourceKey{} + cluster.IterateHierarchy(kube.GetResourceKey(mustToUnstructured(testExtensionsRS())), func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) +} + +func TestIterateHierachyV2(t *testing.T) { + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) + err := cluster.EnsureSynced() + require.NoError(t, err) + + t.Run("IterateAll", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtRoot", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return false + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtSecondLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.ReplicaSetKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + }, + keys) + }) + + t.Run("ExitAtThirdLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.PodKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), }, keys) }) + + t.Run("IterateAllStartFromMultiple", func(t *testing.T) { + startKeys := []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy())), + } + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) } diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index 7a1be7324..c9fbc8f9c 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package mocks @@ -237,6 +237,11 @@ func (_m *ClusterCache) IterateHierarchy(key kube.ResourceKey, action func(*cach _m.Called(key, action) } +// IterateHierarchyV2 provides a mock function with given fields: keys, action +func (_m *ClusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(*cache.Resource, map[kube.ResourceKey]*cache.Resource) bool) { + _m.Called(keys, action) +} + // OnEvent provides a mock function with given fields: handler func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe { ret := _m.Called(handler) diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 4097f4dca..95ef8b1cf 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -99,3 +99,28 @@ func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents ma } } } + +func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceKey, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + key := r.ResourceKey() + if visited[key] == 2 { + return + } + visited[key] = 1 + defer func() { + visited[key] = 2 + }() + childKeys, ok := graph[key] + if !ok || childKeys == nil { + return + } + for _, childKey := range childKeys { + child := ns[childKey] + if visited[childKey] == 1 { + _ = action(fmt.Errorf("circular dependency detected. %s is child and parent of %s", childKey.String(), key.String()), child, ns) + } else if visited[childKey] == 0 { + if action(nil, child, ns) { + child.iterateChildrenV2(graph, ns, visited, action) + } + } + } +} diff --git a/pkg/cache/resource_test.go b/pkg/cache/resource_test.go index 0562cc65c..c5cf0173b 100644 --- a/pkg/cache/resource_test.go +++ b/pkg/cache/resource_test.go @@ -10,7 +10,7 @@ import ( var c = NewClusterCache(&rest.Config{}) func TestIsParentOf(t *testing.T) { - child := c.newResource(mustToUnstructured(testPod())) + child := c.newResource(mustToUnstructured(testPod1())) parent := c.newResource(mustToUnstructured(testRS())) grandParent := c.newResource(mustToUnstructured(testDeploy())) @@ -22,7 +22,7 @@ func TestIsParentOfSameKindDifferentGroupAndUID(t *testing.T) { rs := testRS() rs.APIVersion = "somecrd.io/v1" rs.SetUID("123") - child := c.newResource(mustToUnstructured(testPod())) + child := c.newResource(mustToUnstructured(testPod1())) invalidParent := c.newResource(mustToUnstructured(rs)) assert.False(t, invalidParent.isParentOf(child)) From 39bba43d8b7b4fdd268f6537294a5a0f106b1abf Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 17:16:07 -0400 Subject: [PATCH 02/15] improvements to graph building Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 88 ++++++++++++++++++++++----------------- pkg/cache/cluster_test.go | 50 ++++++++++++++++++++++ pkg/cache/resource.go | 4 ++ 3 files changed, 104 insertions(+), 38 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 290f08c58..af58fe6f3 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1013,44 +1013,7 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r } for namespace, namespaceKeys := range keysPerNamespace { nsNodes := c.nsIndex[namespace] - // Prepare to construct a graph - nodesByUID := make(map[types.UID][]*Resource) - nodeByGraphKey := make(map[string]*Resource) - for _, node := range nsNodes { - nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) - // Based on what's used by isParentOf - graphKey := fmt.Sprintf("%s/%s/%s", node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name) - nodeByGraphKey[graphKey] = node - } - // Construct a graph using a logic similar to isParentOf but more optimal - graph := make(map[kube.ResourceKey][]kube.ResourceKey) - childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource) - for _, node := range nsNodes { - childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) - } - for _, node := range nsNodes { - for i, ownerRef := range node.OwnerRefs { - // backfill UID of inferred owner child references - if ownerRef.UID == "" { - graphKey := fmt.Sprintf("%s/%s/%s", ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name) - graphKeyNode, ok := nodeByGraphKey[graphKey] - if ok { - ownerRef.UID = graphKeyNode.Ref.UID - node.OwnerRefs[i] = ownerRef - } else { - continue - } - } - - uidNodes, ok := nodesByUID[ownerRef.UID] - if ok { - for _, uidNode := range uidNodes { - graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], node.ResourceKey()) - childrenByUID[uidNode.ResourceKey()][node.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][node.Ref.UID], node) - } - } - } - } + graph, childrenByUID := buildGraph(nsNodes) visited := make(map[kube.ResourceKey]int) for _, key := range namespaceKeys { visited[key] = 0 @@ -1091,6 +1054,55 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r } } +type graphKey struct { + kind string + apiVersion string + name string +} + +func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][]kube.ResourceKey, map[kube.ResourceKey]map[types.UID][]*Resource) { + // Prepare to construct a childrenByParent + nodesByUID := make(map[types.UID][]*Resource, len(nsNodes)) + nodeByGraphKey := make(map[graphKey]*Resource, len(nsNodes)) + childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource, len(nsNodes)) + for _, node := range nsNodes { + nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) + nodeByGraphKey[graphKey{node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name}] = node + childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) + } + + // In childrenByParent, they key is the parent and the value is a list of children. + childrenByParent := make(map[kube.ResourceKey][]kube.ResourceKey) + + // Loop through all nodes, calling each one "childNode," because we're only bothering with it if it has a parent. + for _, childNode := range nsNodes { + for i, ownerRef := range childNode.OwnerRefs { + // First, backfill UID of inferred owner child references. + if ownerRef.UID == "" { + graphKeyNode, ok := nodeByGraphKey[graphKey{ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name}] + if ok { + ownerRef.UID = graphKeyNode.Ref.UID + childNode.OwnerRefs[i] = ownerRef + } else { + // No resource found with the given childrenByParent key, so move on. + continue + } + } + + // Now that we have the UID of the parent, update the childrenByParent and the childrenByUID map. + uidNodes, ok := nodesByUID[ownerRef.UID] + if ok { + for _, uidNode := range uidNodes { + // Update the childrenByParent for this owner to include the child. + childrenByParent[uidNode.ResourceKey()] = append(childrenByParent[uidNode.ResourceKey()], childNode.ResourceKey()) + childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID], childNode) + } + } + } + } + return childrenByParent, childrenByUID +} + // IsNamespaced answers if specified group/kind is a namespaced resource API or not func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { if isNamespaced, ok := c.namespacedResources[gk]; ok { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 565a16270..aac92fb3d 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -3,6 +3,7 @@ package cache import ( "context" "fmt" + "github.com/google/uuid" "sort" "strings" "testing" @@ -1160,3 +1161,52 @@ func TestIterateHierachyV2(t *testing.T) { keys) }) } + +var testResources = map[kube.ResourceKey]*Resource{} + +func init() { + testResources = buildTestResourceMap() +} + +func buildTestResourceMap() map[kube.ResourceKey]*Resource { + ns := make(map[kube.ResourceKey]*Resource) + for i := 0; i < 100000; i++ { + name := fmt.Sprintf("test-%d", i) + ownerName := fmt.Sprintf("test-%d", i/10) + uid := uuid.New().String() + key := kube.ResourceKey{ + Namespace: "default", + Name: name, + Kind: "Pod", + } + resourceYaml := fmt.Sprintf(` +apiVersion: v1 +kind: Pod +metadata: + namespace: default + name: %s + uid: %s`, name, uid) + if i/10 != 0 { + owner := ns[kube.ResourceKey{ + Namespace: "default", + Name: ownerName, + Kind: "Pod", + }] + ownerUid := owner.Ref.UID + resourceYaml += fmt.Sprintf(` + ownerReferences: + - apiVersion: v1 + kind: Pod + name: %s + uid: %s`, ownerName, ownerUid) + } + ns[key] = c.newResource(strToUnstructured(resourceYaml)) + } + return ns +} + +func BenchmarkBuildGraph(b *testing.B) { + for n := 0; n < b.N; n++ { + buildGraph(testResources) + } +} diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 95ef8b1cf..a6051c0ff 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -100,13 +100,16 @@ func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents ma } } +// iterateChildrenV2 is a depth-first traversal of the graph of resources starting from the current resource. func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceKey, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { key := r.ResourceKey() if visited[key] == 2 { return } + // this indicates that we've started processing this node's children visited[key] = 1 defer func() { + // this indicates that we've finished processing this node's children visited[key] = 2 }() childKeys, ok := graph[key] @@ -116,6 +119,7 @@ func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceK for _, childKey := range childKeys { child := ns[childKey] if visited[childKey] == 1 { + // Since we encountered a node that we're currently processing, we know we have a circular dependency. _ = action(fmt.Errorf("circular dependency detected. %s is child and parent of %s", childKey.String(), key.String()), child, ns) } else if visited[childKey] == 0 { if action(nil, child, ns) { From 120afb4bcf5ed11bbb63b04a170e515e2ff3b83e Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 17:18:20 -0400 Subject: [PATCH 03/15] use old name Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index af58fe6f3..086b6f7a6 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1064,11 +1064,11 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] // Prepare to construct a childrenByParent nodesByUID := make(map[types.UID][]*Resource, len(nsNodes)) nodeByGraphKey := make(map[graphKey]*Resource, len(nsNodes)) - childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource, len(nsNodes)) + graph := make(map[kube.ResourceKey]map[types.UID][]*Resource, len(nsNodes)) for _, node := range nsNodes { nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) nodeByGraphKey[graphKey{node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name}] = node - childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) + graph[node.ResourceKey()] = make(map[types.UID][]*Resource) } // In childrenByParent, they key is the parent and the value is a list of children. @@ -1089,18 +1089,18 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] } } - // Now that we have the UID of the parent, update the childrenByParent and the childrenByUID map. + // Now that we have the UID of the parent, update the childrenByParent and the graph map. uidNodes, ok := nodesByUID[ownerRef.UID] if ok { for _, uidNode := range uidNodes { // Update the childrenByParent for this owner to include the child. childrenByParent[uidNode.ResourceKey()] = append(childrenByParent[uidNode.ResourceKey()], childNode.ResourceKey()) - childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID], childNode) + graph[uidNode.ResourceKey()][childNode.Ref.UID] = append(graph[uidNode.ResourceKey()][childNode.Ref.UID], childNode) } } } } - return childrenByParent, childrenByUID + return childrenByParent, graph } // IsNamespaced answers if specified group/kind is a namespaced resource API or not From 335ff88efe8b7b2c55cf1c08deedeaebbe013708 Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Thu, 4 Jul 2024 13:23:50 -0700 Subject: [PATCH 04/15] chore: More optimal IterateHierarchyV2 and iterateChildrenV2 [#600] Closes #600 The existing (effectively v1) implementations are suboptimal since they don't construct a graph before the iteration. They search for children by looking at all namespace resources and checking `isParentOf`, which can give `O(tree_size * namespace_resources_count)` time complexity. The v2 algorithms construct the graph and have `O(namespace_resources_count)` time complexity. See more details in the linked issues. Signed-off-by: Andrii Korotkov --- pkg/cache/cluster.go | 91 ++++++++++++++ pkg/cache/cluster_test.go | 212 +++++++++++++++++++++++++++----- pkg/cache/mocks/ClusterCache.go | 7 +- pkg/cache/resource.go | 25 ++++ pkg/cache/resource_test.go | 4 +- 5 files changed, 307 insertions(+), 32 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 96f9ebe72..f1d894311 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -120,6 +120,9 @@ type ClusterCache interface { // IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree. // The action callback returns true if iteration should continue and false otherwise. IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) + // IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree. + // The action callback returns true if iteration should continue and false otherwise. + IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) // IsNamespaced answers if specified group/kind is a namespaced resource API or not IsNamespaced(gk schema.GroupKind) (bool, error) // GetManagedLiveObjs helps finding matching live K8S resources for a given resources list. @@ -1004,6 +1007,94 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour } } +// IterateHierarchy iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree +func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + c.lock.RLock() + defer c.lock.RUnlock() + keysPerNamespace := make(map[string][]kube.ResourceKey) + for _, key := range keys { + keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key) + } + for namespace, namespaceKeys := range keysPerNamespace { + nsNodes := c.nsIndex[namespace] + // Prepare to construct a graph + nodesByUID := make(map[types.UID][]*Resource) + nodeByGraphKey := make(map[string]*Resource) + for _, node := range nsNodes { + nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) + // Based on what's used by isParentOf + graphKey := fmt.Sprintf("%s/%s/%s", node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name) + nodeByGraphKey[graphKey] = node + } + // Construct a graph using a logic similar to isParentOf but more optimal + graph := make(map[kube.ResourceKey][]kube.ResourceKey) + childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource) + for _, node := range nsNodes { + childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) + } + for _, node := range nsNodes { + for i, ownerRef := range node.OwnerRefs { + // backfill UID of inferred owner child references + if ownerRef.UID == "" { + graphKey := fmt.Sprintf("%s/%s/%s", ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name) + graphKeyNode, ok := nodeByGraphKey[graphKey] + if ok { + ownerRef.UID = graphKeyNode.Ref.UID + node.OwnerRefs[i] = ownerRef + } else { + continue + } + } + + uidNodes, ok := nodesByUID[ownerRef.UID] + if ok { + for _, uidNode := range uidNodes { + graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], node.ResourceKey()) + childrenByUID[uidNode.ResourceKey()][node.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][node.Ref.UID], node) + } + } + } + } + visited := make(map[kube.ResourceKey]int) + for _, key := range namespaceKeys { + visited[key] = 0 + } + for _, key := range namespaceKeys { + res, ok := c.resources[key] + if !ok { + continue + } + if visited[key] == 2 || !action(res, nsNodes) { + continue + } + visited[key] = 1 + // make sure children has no duplicates + for _, children := range childrenByUID[key] { + if len(children) > 0 { + // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). It is ok to pick any object but we need to make sure + // we pick the same child after every refresh. + sort.Slice(children, func(i, j int) bool { + key1 := children[i].ResourceKey() + key2 := children[j].ResourceKey() + return strings.Compare(key1.String(), key2.String()) < 0 + }) + child := children[0] + if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { + child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { + if err != nil { + c.log.V(2).Info(err.Error()) + return false + } + return action(child, namespaceResources) + }) + } + } + } + visited[key] = 2 + } + } +} + // IsNamespaced answers if specified group/kind is a namespaced resource API or not func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { if isNamespaced, ok := c.namespacedResources[gk]; ok { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index f5e61a065..6ffbbf4fe 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -112,6 +112,10 @@ func newClusterWithOptions(t *testing.T, opts []UpdateSettingsFunc, objs ...runt GroupKind: schema.GroupKind{Group: "apps", Kind: "StatefulSet"}, GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}, Meta: metav1.APIResource{Namespaced: true}, + }, { + GroupKind: schema.GroupKind{Group: "extensions", Kind: "ReplicaSet"}, + GroupVersionResource: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}, + Meta: metav1.APIResource{Namespaced: true}, }} opts = append([]UpdateSettingsFunc{ @@ -289,7 +293,7 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) { } func TestGetChildren(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -298,7 +302,7 @@ func TestGetChildren(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -332,7 +336,7 @@ func TestGetChildren(t *testing.T) { } func TestGetManagedLiveObjs(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -358,7 +362,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -383,7 +387,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -424,7 +428,7 @@ metadata: } func TestGetManagedLiveObjsAllNamespaces(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -452,7 +456,7 @@ metadata: } func TestGetManagedLiveObjsValidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -480,7 +484,7 @@ metadata: } func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -587,26 +591,26 @@ metadata: } func TestChildDeletedEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) - cluster.processEvent(watch.Deleted, mustToUnstructured(testPod())) + cluster.processEvent(watch.Deleted, mustToUnstructured(testPod1())) rsChildren := getChildren(cluster, mustToUnstructured(testRS())) assert.Equal(t, []*Resource{}, rsChildren) } func TestProcessNewChildEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) newPod := strToUnstructured(` apiVersion: v1 kind: Pod metadata: - uid: "4" - name: helm-guestbook-pod2 + uid: "5" + name: helm-guestbook-pod-1-new namespace: default ownerReferences: - apiVersion: apps/v1 @@ -625,7 +629,7 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -643,9 +647,9 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod2", + Name: "helm-guestbook-pod-1-new", APIVersion: "v1", - UID: "4", + UID: "5", }, OwnerRefs: []metav1.OwnerReference{{ APIVersion: "apps/v1", @@ -658,10 +662,10 @@ func TestProcessNewChildEvent(t *testing.T) { } func TestWatchCacheUpdated(t *testing.T) { - removed := testPod() + removed := testPod1() removed.SetName(removed.GetName() + "-removed-pod") - updated := testPod() + updated := testPod1() updated.SetName(updated.GetName() + "-updated-pod") updated.SetResourceVersion("updated-pod-version") @@ -670,10 +674,10 @@ func TestWatchCacheUpdated(t *testing.T) { require.NoError(t, err) - added := testPod() + added := testPod1() added.SetName(added.GetName() + "-new-pod") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster.lock.Lock() defer cluster.lock.Unlock() @@ -684,13 +688,13 @@ func TestWatchCacheUpdated(t *testing.T) { } func TestNamespaceModeReplace(t *testing.T) { - ns1Pod := testPod() + ns1Pod := testPod1() ns1Pod.SetNamespace("ns1") ns1Pod.SetName("pod1") - ns2Pod := testPod() + ns2Pod := testPod1() ns2Pod.SetNamespace("ns2") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster := newCluster(t, ns1Pod, ns2Pod) err := cluster.EnsureSynced() @@ -805,14 +809,14 @@ func getResourceKey(t *testing.T, obj runtime.Object) kube.ResourceKey { return kube.NewResourceKey(gvk.Group, gvk.Kind, m.GetNamespace(), m.GetName()) } -func testPod() *corev1.Pod { +func testPod1() *corev1.Pod { return &corev1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", Namespace: "default", UID: "1", ResourceVersion: "123", @@ -829,6 +833,30 @@ func testPod() *corev1.Pod { } } +// Similar to pod1, but owner reference lacks uid +func testPod2() *corev1.Pod { + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook-pod-2", + Namespace: "default", + UID: "4", + ResourceVersion: "123", + CreationTimestamp: metav1.NewTime(testCreationTime), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + }, + }, + }, + } +} + func testCRD() *apiextensions.CustomResourceDefinition { return &apiextensions.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ @@ -958,7 +986,7 @@ func testDeploy() *appsv1.Deployment { } func TestIterateHierachy(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -971,7 +999,8 @@ func TestIterateHierachy(t *testing.T) { assert.ElementsMatch(t, []kube.ResourceKey{ - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), kube.GetResourceKey(mustToUnstructured(testRS())), kube.GetResourceKey(mustToUnstructured(testDeploy()))}, keys) @@ -1016,10 +1045,135 @@ func TestIterateHierachy(t *testing.T) { []kube.ResourceKey{ kube.GetResourceKey(mustToUnstructured(testDeploy())), kube.GetResourceKey(mustToUnstructured(testRS())), - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), }, keys) }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + keys := []kube.ResourceKey{} + cluster.IterateHierarchy(kube.GetResourceKey(mustToUnstructured(testExtensionsRS())), func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) +} + +func TestIterateHierachyV2(t *testing.T) { + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) + err := cluster.EnsureSynced() + require.NoError(t, err) + + t.Run("IterateAll", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtRoot", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return false + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtSecondLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.ReplicaSetKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + }, + keys) + }) + + t.Run("ExitAtThirdLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.PodKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + }, + keys) + }) + + t.Run("IterateAllStartFromMultiple", func(t *testing.T) { + startKeys := []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy())), + } + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) } // Test_watchEvents_Deadlock validates that starting watches will not create a deadlock @@ -1031,7 +1185,7 @@ func Test_watchEvents_Deadlock(t *testing.T) { deadlock := sync.RWMutex{} hasDeadlock := false - res1 := testPod() + res1 := testPod1() res2 := testRS() cluster := newClusterWithOptions(t, []UpdateSettingsFunc{ diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index 7a1be7324..c9fbc8f9c 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package mocks @@ -237,6 +237,11 @@ func (_m *ClusterCache) IterateHierarchy(key kube.ResourceKey, action func(*cach _m.Called(key, action) } +// IterateHierarchyV2 provides a mock function with given fields: keys, action +func (_m *ClusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(*cache.Resource, map[kube.ResourceKey]*cache.Resource) bool) { + _m.Called(keys, action) +} + // OnEvent provides a mock function with given fields: handler func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe { ret := _m.Called(handler) diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 4097f4dca..95ef8b1cf 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -99,3 +99,28 @@ func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents ma } } } + +func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceKey, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + key := r.ResourceKey() + if visited[key] == 2 { + return + } + visited[key] = 1 + defer func() { + visited[key] = 2 + }() + childKeys, ok := graph[key] + if !ok || childKeys == nil { + return + } + for _, childKey := range childKeys { + child := ns[childKey] + if visited[childKey] == 1 { + _ = action(fmt.Errorf("circular dependency detected. %s is child and parent of %s", childKey.String(), key.String()), child, ns) + } else if visited[childKey] == 0 { + if action(nil, child, ns) { + child.iterateChildrenV2(graph, ns, visited, action) + } + } + } +} diff --git a/pkg/cache/resource_test.go b/pkg/cache/resource_test.go index 45e597341..a3b06a6cc 100644 --- a/pkg/cache/resource_test.go +++ b/pkg/cache/resource_test.go @@ -10,7 +10,7 @@ import ( var cacheTest = NewClusterCache(&rest.Config{}) func TestIsParentOf(t *testing.T) { - child := cacheTest.newResource(mustToUnstructured(testPod())) + child := cacheTest.newResource(mustToUnstructured(testPod1())) parent := cacheTest.newResource(mustToUnstructured(testRS())) grandParent := cacheTest.newResource(mustToUnstructured(testDeploy())) @@ -22,7 +22,7 @@ func TestIsParentOfSameKindDifferentGroupAndUID(t *testing.T) { rs := testRS() rs.APIVersion = "somecrd.io/v1" rs.SetUID("123") - child := cacheTest.newResource(mustToUnstructured(testPod())) + child := cacheTest.newResource(mustToUnstructured(testPod1())) invalidParent := cacheTest.newResource(mustToUnstructured(rs)) assert.False(t, invalidParent.isParentOf(child)) From 0fb5064fa41f72cc1366219cf3eb24825de7accd Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 17:33:56 -0400 Subject: [PATCH 05/15] finish merge Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 88 --------------------------------------- pkg/cache/cluster_test.go | 2 +- 2 files changed, 1 insertion(+), 89 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 43ac985c6..59e4d7bd8 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1107,94 +1107,6 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] return childrenByParent, graph } -// IterateHierarchy iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree -func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { - c.lock.RLock() - defer c.lock.RUnlock() - keysPerNamespace := make(map[string][]kube.ResourceKey) - for _, key := range keys { - keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key) - } - for namespace, namespaceKeys := range keysPerNamespace { - nsNodes := c.nsIndex[namespace] - // Prepare to construct a graph - nodesByUID := make(map[types.UID][]*Resource) - nodeByGraphKey := make(map[string]*Resource) - for _, node := range nsNodes { - nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) - // Based on what's used by isParentOf - graphKey := fmt.Sprintf("%s/%s/%s", node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name) - nodeByGraphKey[graphKey] = node - } - // Construct a graph using a logic similar to isParentOf but more optimal - graph := make(map[kube.ResourceKey][]kube.ResourceKey) - childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource) - for _, node := range nsNodes { - childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) - } - for _, node := range nsNodes { - for i, ownerRef := range node.OwnerRefs { - // backfill UID of inferred owner child references - if ownerRef.UID == "" { - graphKey := fmt.Sprintf("%s/%s/%s", ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name) - graphKeyNode, ok := nodeByGraphKey[graphKey] - if ok { - ownerRef.UID = graphKeyNode.Ref.UID - node.OwnerRefs[i] = ownerRef - } else { - continue - } - } - - uidNodes, ok := nodesByUID[ownerRef.UID] - if ok { - for _, uidNode := range uidNodes { - graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], node.ResourceKey()) - childrenByUID[uidNode.ResourceKey()][node.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][node.Ref.UID], node) - } - } - } - } - visited := make(map[kube.ResourceKey]int) - for _, key := range namespaceKeys { - visited[key] = 0 - } - for _, key := range namespaceKeys { - res, ok := c.resources[key] - if !ok { - continue - } - if visited[key] == 2 || !action(res, nsNodes) { - continue - } - visited[key] = 1 - // make sure children has no duplicates - for _, children := range childrenByUID[key] { - if len(children) > 0 { - // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). It is ok to pick any object but we need to make sure - // we pick the same child after every refresh. - sort.Slice(children, func(i, j int) bool { - key1 := children[i].ResourceKey() - key2 := children[j].ResourceKey() - return strings.Compare(key1.String(), key2.String()) < 0 - }) - child := children[0] - if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { - child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { - if err != nil { - c.log.V(2).Info(err.Error()) - return false - } - return action(child, namespaceResources) - }) - } - } - } - visited[key] = 2 - } - } -} - // IsNamespaced answers if specified group/kind is a namespaced resource API or not func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { if isNamespaced, ok := c.namespacedResources[gk]; ok { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 4b76d8b5d..951801757 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -1290,7 +1290,7 @@ metadata: name: %s uid: %s`, ownerName, ownerUid) } - ns[key] = c.newResource(strToUnstructured(resourceYaml)) + ns[key] = cacheTest.newResource(strToUnstructured(resourceYaml)) } return ns } From af08910fd40121032f6245898acf6ce4bbe97680 Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Thu, 4 Jul 2024 13:23:50 -0700 Subject: [PATCH 06/15] chore: More optimal IterateHierarchyV2 and iterateChildrenV2 [#600] Closes #600 The existing (effectively v1) implementations are suboptimal since they don't construct a graph before the iteration. They search for children by looking at all namespace resources and checking `isParentOf`, which can give `O(tree_size * namespace_resources_count)` time complexity. The v2 algorithms construct the graph and have `O(namespace_resources_count)` time complexity. See more details in the linked issues. Signed-off-by: Andrii Korotkov --- pkg/cache/cluster.go | 93 ++++++++++++++ pkg/cache/cluster_test.go | 212 +++++++++++++++++++++++++++----- pkg/cache/mocks/ClusterCache.go | 7 +- pkg/cache/resource.go | 25 ++++ pkg/cache/resource_test.go | 4 +- 5 files changed, 309 insertions(+), 32 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 96f9ebe72..53141068b 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -120,6 +120,9 @@ type ClusterCache interface { // IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree. // The action callback returns true if iteration should continue and false otherwise. IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) + // IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree. + // The action callback returns true if iteration should continue and false otherwise. + IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) // IsNamespaced answers if specified group/kind is a namespaced resource API or not IsNamespaced(gk schema.GroupKind) (bool, error) // GetManagedLiveObjs helps finding matching live K8S resources for a given resources list. @@ -1004,6 +1007,96 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour } } +// IterateHierarchy iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree +func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + c.lock.RLock() + defer c.lock.RUnlock() + keysPerNamespace := make(map[string][]kube.ResourceKey) + for _, key := range keys { + _, ok := c.resources[key] + if !ok { + continue + } + keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key) + } + for namespace, namespaceKeys := range keysPerNamespace { + nsNodes := c.nsIndex[namespace] + // Prepare to construct a graph + nodesByUID := make(map[types.UID][]*Resource) + nodeByGraphKey := make(map[string]*Resource) + for _, node := range nsNodes { + nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) + // Based on what's used by isParentOf + graphKey := fmt.Sprintf("%s/%s/%s", node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name) + nodeByGraphKey[graphKey] = node + } + // Construct a graph using a logic similar to isParentOf but more optimal + graph := make(map[kube.ResourceKey][]kube.ResourceKey) + childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource) + for _, node := range nsNodes { + childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) + } + for _, node := range nsNodes { + for i, ownerRef := range node.OwnerRefs { + // backfill UID of inferred owner child references + if ownerRef.UID == "" { + graphKey := fmt.Sprintf("%s/%s/%s", ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name) + graphKeyNode, ok := nodeByGraphKey[graphKey] + if ok { + ownerRef.UID = graphKeyNode.Ref.UID + node.OwnerRefs[i] = ownerRef + } else { + continue + } + } + + uidNodes, ok := nodesByUID[ownerRef.UID] + if ok { + for _, uidNode := range uidNodes { + graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], node.ResourceKey()) + childrenByUID[uidNode.ResourceKey()][node.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][node.Ref.UID], node) + } + } + } + } + visited := make(map[kube.ResourceKey]int) + for _, key := range namespaceKeys { + visited[key] = 0 + } + for _, key := range namespaceKeys { + // The check for existence of key is done above. + res := c.resources[key] + if visited[key] == 2 || !action(res, nsNodes) { + continue + } + visited[key] = 1 + // make sure children has no duplicates + for _, children := range childrenByUID[key] { + if len(children) > 0 { + // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). It is ok to pick any object but we need to make sure + // we pick the same child after every refresh. + sort.Slice(children, func(i, j int) bool { + key1 := children[i].ResourceKey() + key2 := children[j].ResourceKey() + return strings.Compare(key1.String(), key2.String()) < 0 + }) + child := children[0] + if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { + child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { + if err != nil { + c.log.V(2).Info(err.Error()) + return false + } + return action(child, namespaceResources) + }) + } + } + } + visited[key] = 2 + } + } +} + // IsNamespaced answers if specified group/kind is a namespaced resource API or not func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { if isNamespaced, ok := c.namespacedResources[gk]; ok { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index f5e61a065..6ffbbf4fe 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -112,6 +112,10 @@ func newClusterWithOptions(t *testing.T, opts []UpdateSettingsFunc, objs ...runt GroupKind: schema.GroupKind{Group: "apps", Kind: "StatefulSet"}, GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}, Meta: metav1.APIResource{Namespaced: true}, + }, { + GroupKind: schema.GroupKind{Group: "extensions", Kind: "ReplicaSet"}, + GroupVersionResource: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}, + Meta: metav1.APIResource{Namespaced: true}, }} opts = append([]UpdateSettingsFunc{ @@ -289,7 +293,7 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) { } func TestGetChildren(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -298,7 +302,7 @@ func TestGetChildren(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -332,7 +336,7 @@ func TestGetChildren(t *testing.T) { } func TestGetManagedLiveObjs(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -358,7 +362,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -383,7 +387,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -424,7 +428,7 @@ metadata: } func TestGetManagedLiveObjsAllNamespaces(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -452,7 +456,7 @@ metadata: } func TestGetManagedLiveObjsValidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -480,7 +484,7 @@ metadata: } func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -587,26 +591,26 @@ metadata: } func TestChildDeletedEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) - cluster.processEvent(watch.Deleted, mustToUnstructured(testPod())) + cluster.processEvent(watch.Deleted, mustToUnstructured(testPod1())) rsChildren := getChildren(cluster, mustToUnstructured(testRS())) assert.Equal(t, []*Resource{}, rsChildren) } func TestProcessNewChildEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) newPod := strToUnstructured(` apiVersion: v1 kind: Pod metadata: - uid: "4" - name: helm-guestbook-pod2 + uid: "5" + name: helm-guestbook-pod-1-new namespace: default ownerReferences: - apiVersion: apps/v1 @@ -625,7 +629,7 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -643,9 +647,9 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod2", + Name: "helm-guestbook-pod-1-new", APIVersion: "v1", - UID: "4", + UID: "5", }, OwnerRefs: []metav1.OwnerReference{{ APIVersion: "apps/v1", @@ -658,10 +662,10 @@ func TestProcessNewChildEvent(t *testing.T) { } func TestWatchCacheUpdated(t *testing.T) { - removed := testPod() + removed := testPod1() removed.SetName(removed.GetName() + "-removed-pod") - updated := testPod() + updated := testPod1() updated.SetName(updated.GetName() + "-updated-pod") updated.SetResourceVersion("updated-pod-version") @@ -670,10 +674,10 @@ func TestWatchCacheUpdated(t *testing.T) { require.NoError(t, err) - added := testPod() + added := testPod1() added.SetName(added.GetName() + "-new-pod") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster.lock.Lock() defer cluster.lock.Unlock() @@ -684,13 +688,13 @@ func TestWatchCacheUpdated(t *testing.T) { } func TestNamespaceModeReplace(t *testing.T) { - ns1Pod := testPod() + ns1Pod := testPod1() ns1Pod.SetNamespace("ns1") ns1Pod.SetName("pod1") - ns2Pod := testPod() + ns2Pod := testPod1() ns2Pod.SetNamespace("ns2") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster := newCluster(t, ns1Pod, ns2Pod) err := cluster.EnsureSynced() @@ -805,14 +809,14 @@ func getResourceKey(t *testing.T, obj runtime.Object) kube.ResourceKey { return kube.NewResourceKey(gvk.Group, gvk.Kind, m.GetNamespace(), m.GetName()) } -func testPod() *corev1.Pod { +func testPod1() *corev1.Pod { return &corev1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", Namespace: "default", UID: "1", ResourceVersion: "123", @@ -829,6 +833,30 @@ func testPod() *corev1.Pod { } } +// Similar to pod1, but owner reference lacks uid +func testPod2() *corev1.Pod { + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook-pod-2", + Namespace: "default", + UID: "4", + ResourceVersion: "123", + CreationTimestamp: metav1.NewTime(testCreationTime), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + }, + }, + }, + } +} + func testCRD() *apiextensions.CustomResourceDefinition { return &apiextensions.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ @@ -958,7 +986,7 @@ func testDeploy() *appsv1.Deployment { } func TestIterateHierachy(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -971,7 +999,8 @@ func TestIterateHierachy(t *testing.T) { assert.ElementsMatch(t, []kube.ResourceKey{ - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), kube.GetResourceKey(mustToUnstructured(testRS())), kube.GetResourceKey(mustToUnstructured(testDeploy()))}, keys) @@ -1016,10 +1045,135 @@ func TestIterateHierachy(t *testing.T) { []kube.ResourceKey{ kube.GetResourceKey(mustToUnstructured(testDeploy())), kube.GetResourceKey(mustToUnstructured(testRS())), - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), }, keys) }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + keys := []kube.ResourceKey{} + cluster.IterateHierarchy(kube.GetResourceKey(mustToUnstructured(testExtensionsRS())), func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) +} + +func TestIterateHierachyV2(t *testing.T) { + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) + err := cluster.EnsureSynced() + require.NoError(t, err) + + t.Run("IterateAll", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtRoot", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return false + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtSecondLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.ReplicaSetKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + }, + keys) + }) + + t.Run("ExitAtThirdLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.PodKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + }, + keys) + }) + + t.Run("IterateAllStartFromMultiple", func(t *testing.T) { + startKeys := []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy())), + } + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) } // Test_watchEvents_Deadlock validates that starting watches will not create a deadlock @@ -1031,7 +1185,7 @@ func Test_watchEvents_Deadlock(t *testing.T) { deadlock := sync.RWMutex{} hasDeadlock := false - res1 := testPod() + res1 := testPod1() res2 := testRS() cluster := newClusterWithOptions(t, []UpdateSettingsFunc{ diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index 7a1be7324..c9fbc8f9c 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package mocks @@ -237,6 +237,11 @@ func (_m *ClusterCache) IterateHierarchy(key kube.ResourceKey, action func(*cach _m.Called(key, action) } +// IterateHierarchyV2 provides a mock function with given fields: keys, action +func (_m *ClusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(*cache.Resource, map[kube.ResourceKey]*cache.Resource) bool) { + _m.Called(keys, action) +} + // OnEvent provides a mock function with given fields: handler func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe { ret := _m.Called(handler) diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 4097f4dca..95ef8b1cf 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -99,3 +99,28 @@ func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents ma } } } + +func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceKey, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + key := r.ResourceKey() + if visited[key] == 2 { + return + } + visited[key] = 1 + defer func() { + visited[key] = 2 + }() + childKeys, ok := graph[key] + if !ok || childKeys == nil { + return + } + for _, childKey := range childKeys { + child := ns[childKey] + if visited[childKey] == 1 { + _ = action(fmt.Errorf("circular dependency detected. %s is child and parent of %s", childKey.String(), key.String()), child, ns) + } else if visited[childKey] == 0 { + if action(nil, child, ns) { + child.iterateChildrenV2(graph, ns, visited, action) + } + } + } +} diff --git a/pkg/cache/resource_test.go b/pkg/cache/resource_test.go index 45e597341..a3b06a6cc 100644 --- a/pkg/cache/resource_test.go +++ b/pkg/cache/resource_test.go @@ -10,7 +10,7 @@ import ( var cacheTest = NewClusterCache(&rest.Config{}) func TestIsParentOf(t *testing.T) { - child := cacheTest.newResource(mustToUnstructured(testPod())) + child := cacheTest.newResource(mustToUnstructured(testPod1())) parent := cacheTest.newResource(mustToUnstructured(testRS())) grandParent := cacheTest.newResource(mustToUnstructured(testDeploy())) @@ -22,7 +22,7 @@ func TestIsParentOfSameKindDifferentGroupAndUID(t *testing.T) { rs := testRS() rs.APIVersion = "somecrd.io/v1" rs.SetUID("123") - child := cacheTest.newResource(mustToUnstructured(testPod())) + child := cacheTest.newResource(mustToUnstructured(testPod1())) invalidParent := cacheTest.newResource(mustToUnstructured(rs)) assert.False(t, invalidParent.isParentOf(child)) From 703a60d189ea59c67de31640379ed1e926dbf737 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 20:22:09 -0400 Subject: [PATCH 07/15] discard unneeded copies of child resources as we go Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 62 ++++++++++++++++++++------------------- pkg/cache/cluster_test.go | 43 ++++++++++++++++++++++----- 2 files changed, 67 insertions(+), 38 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 3c9691461..14a1422d0 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1034,25 +1034,15 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r } visited[key] = 1 // make sure children has no duplicates - for _, children := range childrenByUID[key] { - if len(children) > 0 { - // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). It is ok to pick any object but we need to make sure - // we pick the same child after every refresh. - sort.Slice(children, func(i, j int) bool { - key1 := children[i].ResourceKey() - key2 := children[j].ResourceKey() - return strings.Compare(key1.String(), key2.String()) < 0 + for _, child := range childrenByUID[key] { + if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { + child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { + if err != nil { + c.log.V(2).Info(err.Error()) + return false + } + return action(child, namespaceResources) }) - child := children[0] - if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { - child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { - if err != nil { - c.log.V(2).Info(err.Error()) - return false - } - return action(child, namespaceResources) - }) - } } } visited[key] = 2 @@ -1066,19 +1056,19 @@ type graphKey struct { name string } -func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][]kube.ResourceKey, map[kube.ResourceKey]map[types.UID][]*Resource) { - // Prepare to construct a childrenByParent +func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][]kube.ResourceKey, map[kube.ResourceKey]map[types.UID]*Resource) { + // Prepare to construct a graph nodesByUID := make(map[types.UID][]*Resource, len(nsNodes)) nodeByGraphKey := make(map[graphKey]*Resource, len(nsNodes)) - graph := make(map[kube.ResourceKey]map[types.UID][]*Resource, len(nsNodes)) + childrenByUID := make(map[kube.ResourceKey]map[types.UID]*Resource, len(nsNodes)) for _, node := range nsNodes { nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) nodeByGraphKey[graphKey{node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name}] = node - graph[node.ResourceKey()] = make(map[types.UID][]*Resource) + childrenByUID[node.ResourceKey()] = make(map[types.UID]*Resource) } - // In childrenByParent, they key is the parent and the value is a list of children. - childrenByParent := make(map[kube.ResourceKey][]kube.ResourceKey) + // In graph, they key is the parent and the value is a list of children. + graph := make(map[kube.ResourceKey][]kube.ResourceKey) // Loop through all nodes, calling each one "childNode," because we're only bothering with it if it has a parent. for _, childNode := range nsNodes { @@ -1090,23 +1080,35 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] ownerRef.UID = graphKeyNode.Ref.UID childNode.OwnerRefs[i] = ownerRef } else { - // No resource found with the given childrenByParent key, so move on. + // No resource found with the given graph key, so move on. continue } } - // Now that we have the UID of the parent, update the childrenByParent and the graph map. + // Now that we have the UID of the parent, update the graph and the childrenByUID map. uidNodes, ok := nodesByUID[ownerRef.UID] if ok { for _, uidNode := range uidNodes { - // Update the childrenByParent for this owner to include the child. - childrenByParent[uidNode.ResourceKey()] = append(childrenByParent[uidNode.ResourceKey()], childNode.ResourceKey()) - graph[uidNode.ResourceKey()][childNode.Ref.UID] = append(graph[uidNode.ResourceKey()][childNode.Ref.UID], childNode) + // Update the graph for this owner to include the child. + graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], childNode.ResourceKey()) + + r, ok := childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] + if !ok { + childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] = childNode + } else if r != nil { + // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). + // It is ok to pick any object, but we need to make sure we pick the same child after every refresh. + key1 := r.ResourceKey() + key2 := childNode.ResourceKey() + if strings.Compare(key1.String(), key2.String()) > 0 { + childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] = childNode + } + } } } } } - return childrenByParent, graph + return graph, childrenByUID } // IsNamespaced answers if specified group/kind is a namespaced resource API or not diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 951801757..26815c07e 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -74,7 +74,7 @@ var ( - hostname: localhost`, testCreationTime.UTC().Format(time.RFC3339))) ) -func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache { +func newCluster(t testing.TB, objs ...runtime.Object) *clusterCache { cache := newClusterWithOptions(t, []UpdateSettingsFunc{}, objs...) t.Cleanup(func() { @@ -84,7 +84,7 @@ func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache { return cache } -func newClusterWithOptions(t *testing.T, opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache { +func newClusterWithOptions(t testing.TB, opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache { client := fake.NewSimpleDynamicClient(scheme.Scheme, objs...) reactor := client.ReactionChain[0] client.PrependReactor("list", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { @@ -1252,12 +1252,6 @@ func Test_watchEvents_Deadlock(t *testing.T) { } } -var testResources = map[kube.ResourceKey]*Resource{} - -func init() { - testResources = buildTestResourceMap() -} - func buildTestResourceMap() map[kube.ResourceKey]*Resource { ns := make(map[kube.ResourceKey]*Resource) for i := 0; i < 100000; i++ { @@ -1296,7 +1290,40 @@ metadata: } func BenchmarkBuildGraph(b *testing.B) { + testResources := buildTestResourceMap() + b.ResetTimer() for n := 0; n < b.N; n++ { buildGraph(testResources) } } + +func BenchmarkIterateHierarchyV2(b *testing.B) { + cluster := newCluster(b) + testResources := buildTestResourceMap() + for _, resource := range testResources { + cluster.setNode(resource) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + cluster.IterateHierarchyV2([]kube.ResourceKey{ + {Namespace: "default", Name: "test-1", Kind: "Pod"}, + }, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + return true + }) + } +} + +//func BenchmarkIterateHierarchy(b *testing.B) { +// cluster := newCluster(b) +// for _, resource := range testResources { +// cluster.setNode(resource) +// } +// b.ResetTimer() +// for n := 0; n < b.N; n++ { +// cluster.IterateHierarchy(kube.ResourceKey{ +// Namespace: "default", Name: "test-1", Kind: "Pod", +// }, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { +// return true +// }) +// } +//} From 19aa0bf51008e294ad31bf7a35c18db0ad7e28e0 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 20:29:17 -0400 Subject: [PATCH 08/15] remove unnecessary comment Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 14a1422d0..815ea1617 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1033,7 +1033,6 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r continue } visited[key] = 1 - // make sure children has no duplicates for _, child := range childrenByUID[key] { if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { From 38701d0c2ccb044e7c33779e91a4da7703609df1 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 20:41:31 -0400 Subject: [PATCH 09/15] make childrenByUID sparse Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 815ea1617..a5e220f0c 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1033,15 +1033,17 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r continue } visited[key] = 1 - for _, child := range childrenByUID[key] { - if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { - child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { - if err != nil { - c.log.V(2).Info(err.Error()) - return false - } - return action(child, namespaceResources) - }) + if _, ok := childrenByUID[key]; ok { + for _, child := range childrenByUID[key] { + if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { + child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { + if err != nil { + c.log.V(2).Info(err.Error()) + return false + } + return action(child, namespaceResources) + }) + } } } visited[key] = 2 @@ -1059,16 +1061,16 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] // Prepare to construct a graph nodesByUID := make(map[types.UID][]*Resource, len(nsNodes)) nodeByGraphKey := make(map[graphKey]*Resource, len(nsNodes)) - childrenByUID := make(map[kube.ResourceKey]map[types.UID]*Resource, len(nsNodes)) for _, node := range nsNodes { nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) nodeByGraphKey[graphKey{node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name}] = node - childrenByUID[node.ResourceKey()] = make(map[types.UID]*Resource) } // In graph, they key is the parent and the value is a list of children. graph := make(map[kube.ResourceKey][]kube.ResourceKey) + childrenByUID := make(map[kube.ResourceKey]map[types.UID]*Resource) + // Loop through all nodes, calling each one "childNode," because we're only bothering with it if it has a parent. for _, childNode := range nsNodes { for i, ownerRef := range childNode.OwnerRefs { @@ -1091,6 +1093,9 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] // Update the graph for this owner to include the child. graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], childNode.ResourceKey()) + if _, ok := childrenByUID[uidNode.ResourceKey()]; !ok { + childrenByUID[uidNode.ResourceKey()] = make(map[types.UID]*Resource) + } r, ok := childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] if !ok { childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] = childNode From 8284fb0a5bf58eface906921dc1d83156efb0e55 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 21:09:44 -0400 Subject: [PATCH 10/15] eliminate duplicate map Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 29 ++++++++++++----------------- pkg/cache/resource.go | 10 ++++++---- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index a5e220f0c..4467c4faf 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1021,7 +1021,7 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r } for namespace, namespaceKeys := range keysPerNamespace { nsNodes := c.nsIndex[namespace] - graph, childrenByUID := buildGraph(nsNodes) + graph := buildGraph(nsNodes) visited := make(map[kube.ResourceKey]int) for _, key := range namespaceKeys { visited[key] = 0 @@ -1033,8 +1033,8 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r continue } visited[key] = 1 - if _, ok := childrenByUID[key]; ok { - for _, child := range childrenByUID[key] { + if _, ok := graph[key]; ok { + for _, child := range graph[key] { if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { if err != nil { @@ -1057,7 +1057,7 @@ type graphKey struct { name string } -func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][]kube.ResourceKey, map[kube.ResourceKey]map[types.UID]*Resource) { +func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map[types.UID]*Resource { // Prepare to construct a graph nodesByUID := make(map[types.UID][]*Resource, len(nsNodes)) nodeByGraphKey := make(map[graphKey]*Resource, len(nsNodes)) @@ -1067,9 +1067,7 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] } // In graph, they key is the parent and the value is a list of children. - graph := make(map[kube.ResourceKey][]kube.ResourceKey) - - childrenByUID := make(map[kube.ResourceKey]map[types.UID]*Resource) + graph := make(map[kube.ResourceKey]map[types.UID]*Resource) // Loop through all nodes, calling each one "childNode," because we're only bothering with it if it has a parent. for _, childNode := range nsNodes { @@ -1086,33 +1084,30 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) (map[kube.ResourceKey][] } } - // Now that we have the UID of the parent, update the graph and the childrenByUID map. + // Now that we have the UID of the parent, update the graph and the graph map. uidNodes, ok := nodesByUID[ownerRef.UID] if ok { for _, uidNode := range uidNodes { - // Update the graph for this owner to include the child. - graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], childNode.ResourceKey()) - - if _, ok := childrenByUID[uidNode.ResourceKey()]; !ok { - childrenByUID[uidNode.ResourceKey()] = make(map[types.UID]*Resource) + if _, ok := graph[uidNode.ResourceKey()]; !ok { + graph[uidNode.ResourceKey()] = make(map[types.UID]*Resource) } - r, ok := childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] + r, ok := graph[uidNode.ResourceKey()][childNode.Ref.UID] if !ok { - childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] = childNode + graph[uidNode.ResourceKey()][childNode.Ref.UID] = childNode } else if r != nil { // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). // It is ok to pick any object, but we need to make sure we pick the same child after every refresh. key1 := r.ResourceKey() key2 := childNode.ResourceKey() if strings.Compare(key1.String(), key2.String()) > 0 { - childrenByUID[uidNode.ResourceKey()][childNode.Ref.UID] = childNode + graph[uidNode.ResourceKey()][childNode.Ref.UID] = childNode } } } } } } - return graph, childrenByUID + return graph } // IsNamespaced answers if specified group/kind is a namespaced resource API or not diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index a6051c0ff..eae3d4e6e 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -2,6 +2,7 @@ package cache import ( "fmt" + "k8s.io/apimachinery/pkg/types" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -101,7 +102,7 @@ func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents ma } // iterateChildrenV2 is a depth-first traversal of the graph of resources starting from the current resource. -func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceKey, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { +func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey]map[types.UID]*Resource, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { key := r.ResourceKey() if visited[key] == 2 { return @@ -112,11 +113,12 @@ func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceK // this indicates that we've finished processing this node's children visited[key] = 2 }() - childKeys, ok := graph[key] - if !ok || childKeys == nil { + children, ok := graph[key] + if !ok || children == nil { return } - for _, childKey := range childKeys { + for _, c := range children { + childKey := c.ResourceKey() child := ns[childKey] if visited[childKey] == 1 { // Since we encountered a node that we're currently processing, we know we have a circular dependency. From 5c23ab5d815bbf404a57f477e55478ae62a25ab4 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 21:12:49 -0400 Subject: [PATCH 11/15] fix comment Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 4467c4faf..fcbc9feae 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1084,7 +1084,7 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map } } - // Now that we have the UID of the parent, update the graph and the graph map. + // Now that we have the UID of the parent, update the graph. uidNodes, ok := nodesByUID[ownerRef.UID] if ok { for _, uidNode := range uidNodes { From 8dbcf053ce840392a11dbaa9de0e26dde1dbb698 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 21:14:17 -0400 Subject: [PATCH 12/15] add useful comment back Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index fcbc9feae..4c57beada 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1088,6 +1088,7 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map uidNodes, ok := nodesByUID[ownerRef.UID] if ok { for _, uidNode := range uidNodes { + // Update the graph for this owner to include the child. if _, ok := graph[uidNode.ResourceKey()]; !ok { graph[uidNode.ResourceKey()] = make(map[types.UID]*Resource) } From 3ef365182f32e2e1426e57f62829368b5effd8bb Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Tue, 16 Jul 2024 21:47:26 -0400 Subject: [PATCH 13/15] use nsNodes instead of dupe map Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 4c57beada..bdd93d919 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1060,10 +1060,8 @@ type graphKey struct { func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map[types.UID]*Resource { // Prepare to construct a graph nodesByUID := make(map[types.UID][]*Resource, len(nsNodes)) - nodeByGraphKey := make(map[graphKey]*Resource, len(nsNodes)) for _, node := range nsNodes { nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) - nodeByGraphKey[graphKey{node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name}] = node } // In graph, they key is the parent and the value is a list of children. @@ -1074,7 +1072,8 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map for i, ownerRef := range childNode.OwnerRefs { // First, backfill UID of inferred owner child references. if ownerRef.UID == "" { - graphKeyNode, ok := nodeByGraphKey[graphKey{ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name}] + group, _ := schema.ParseGroupVersion(ownerRef.APIVersion) + graphKeyNode, ok := nsNodes[kube.ResourceKey{Group: group.Group, Kind: ownerRef.Kind, Namespace: childNode.Ref.Namespace, Name: ownerRef.Name}] if ok { ownerRef.UID = graphKeyNode.Ref.UID childNode.OwnerRefs[i] = ownerRef From e2fb7829264077fedbbb5f72a17eb949c329f183 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Thu, 18 Jul 2024 12:39:22 -0400 Subject: [PATCH 14/15] remove unused struct Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index bdd93d919..aeb02df59 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1051,12 +1051,6 @@ func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(r } } -type graphKey struct { - kind string - apiVersion string - name string -} - func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map[types.UID]*Resource { // Prepare to construct a graph nodesByUID := make(map[types.UID][]*Resource, len(nsNodes)) From 0b6e366c2f7a56d7c6b6b21509e3ed018444d952 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Thu, 18 Jul 2024 12:41:54 -0400 Subject: [PATCH 15/15] skip invalid APIVersion Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- pkg/cache/cluster.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index aeb02df59..d8f0a5ddd 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -1066,7 +1066,11 @@ func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map for i, ownerRef := range childNode.OwnerRefs { // First, backfill UID of inferred owner child references. if ownerRef.UID == "" { - group, _ := schema.ParseGroupVersion(ownerRef.APIVersion) + group, err := schema.ParseGroupVersion(ownerRef.APIVersion) + if err != nil { + // APIVersion is invalid, so we couldn't find the parent. + continue + } graphKeyNode, ok := nsNodes[kube.ResourceKey{Group: group.Group, Kind: ownerRef.Kind, Namespace: childNode.Ref.Namespace, Name: ownerRef.Name}] if ok { ownerRef.UID = graphKeyNode.Ref.UID