Skip to content

Commit

Permalink
chore: More optimal IterateHierarchyV2 and iterateChildrenV2 [#600]
Browse files Browse the repository at this point in the history
Closes #600

The existing (effectively v1) implementations are suboptimal since they don't construct a graph before the iteration. They search for children by looking at all namespace resources and checking `isParentOf`, which can give `O(tree_size * namespace_resources_count)` time complexity. The v2 algorithms construct the graph and have `O(namespace_resources_count)` time complexity. See more details in the linked issues.

Signed-off-by: Andrii Korotkov <[email protected]>
  • Loading branch information
andrii-korotkov-verkada committed Jul 4, 2024
1 parent a22b346 commit b0f2923
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 20 deletions.
91 changes: 91 additions & 0 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ type ClusterCache interface {
// IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree.
// The action callback returns true if iteration should continue and false otherwise.
IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool)
// IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree.
// The action callback returns true if iteration should continue and false otherwise.
IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool)
// IsNamespaced answers if specified group/kind is a namespaced resource API or not
IsNamespaced(gk schema.GroupKind) (bool, error)
// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
Expand Down Expand Up @@ -1000,6 +1003,94 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour
}
}

// IterateHierarchy iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree
func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) {
c.lock.RLock()
defer c.lock.RUnlock()
keysPerNamespace := make(map[string][]kube.ResourceKey)
for _, key := range keys {
keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key)
}
for namespace, namespaceKeys := range keysPerNamespace {
nsNodes := c.nsIndex[namespace]
// Prepare to construct a graph
nodesByUID := make(map[types.UID][]*Resource)
nodeByGraphKey := make(map[string]*Resource)
for _, node := range nsNodes {
nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node)
// Based on what's used by isParentOf
graphKey := fmt.Sprintf("%s/%s/%s", node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name)
nodeByGraphKey[graphKey] = node
}
// Construct a graph using a logic similar to isParentOf but more optimal
graph := make(map[kube.ResourceKey][]kube.ResourceKey)
childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource)
for _, node := range nsNodes {
childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource)
}
for _, node := range nsNodes {
for i, ownerRef := range node.OwnerRefs {
// backfill UID of inferred owner child references
if ownerRef.UID == "" {
graphKey := fmt.Sprintf("%s/%s/%s", ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name)
graphKeyNode, ok := nodeByGraphKey[graphKey]
if ok {
ownerRef.UID = graphKeyNode.Ref.UID
node.OwnerRefs[i] = ownerRef
} else {
continue
}
}

uidNodes, ok := nodesByUID[ownerRef.UID]
if ok {
for _, uidNode := range uidNodes {
graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], node.ResourceKey())
childrenByUID[uidNode.ResourceKey()][node.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][node.Ref.UID], node)
}
}
}
}
visited := make(map[kube.ResourceKey]int)
for _, key := range namespaceKeys {
visited[key] = 0
}
for _, key := range namespaceKeys {
res, ok := c.resources[key]
if !ok {
continue
}
if visited[key] == 2 || !action(res, nsNodes) {
continue
}
visited[key] = 1
// make sure children has no duplicates
for _, children := range childrenByUID[key] {
if len(children) > 0 {
// The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). It is ok to pick any object but we need to make sure
// we pick the same child after every refresh.
sort.Slice(children, func(i, j int) bool {
key1 := children[i].ResourceKey()
key2 := children[j].ResourceKey()
return strings.Compare(key1.String(), key2.String()) < 0
})
child := children[0]
if visited[child.ResourceKey()] == 0 && action(child, nsNodes) {
child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool {
if err != nil {
c.log.V(2).Info(err.Error())
return false
}
return action(child, namespaceResources)
})
}
}
}
visited[key] = 2
}
}
}

// IsNamespaced answers if specified group/kind is a namespaced resource API or not
func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) {
if isNamespaced, ok := c.namespacedResources[gk]; ok {
Expand Down
147 changes: 127 additions & 20 deletions pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package cache
import (
"context"
"fmt"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"sort"
"strings"
"testing"
"time"

"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -99,6 +100,10 @@ func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache {
GroupKind: schema.GroupKind{Group: "apps", Kind: "StatefulSet"},
GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"},
Meta: metav1.APIResource{Namespaced: true},
}, {
GroupKind: schema.GroupKind{Group: "extensions", Kind: "ReplicaSet"},
GroupVersionResource: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"},
Meta: metav1.APIResource{Namespaced: true},
}}

cache := NewClusterCache(
Expand Down Expand Up @@ -492,23 +497,23 @@ metadata:
func TestGetManagedLiveObjsFailedConversion(t *testing.T) {
cronTabGroup := "stable.example.com"

testCases := []struct{
name string
localConvertFails bool
testCases := []struct {
name string
localConvertFails bool
expectConvertToVersionCalled bool
expectGetResourceCalled bool
expectGetResourceCalled bool
}{
{
name: "local convert fails, so GetResource is called",
localConvertFails: true,
name: "local convert fails, so GetResource is called",
localConvertFails: true,
expectConvertToVersionCalled: true,
expectGetResourceCalled: true,
expectGetResourceCalled: true,
},
{
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
expectConvertToVersionCalled: true,
expectGetResourceCalled: false,
expectGetResourceCalled: false,
},
}

Expand Down Expand Up @@ -557,7 +562,6 @@ metadata:
return testCronTab(), nil
})


managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool {
return true
})
Expand Down Expand Up @@ -816,25 +820,25 @@ func testPod() *corev1.Pod {

func testCRD() *apiextensions.CustomResourceDefinition {
return &apiextensions.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "crontabs.stable.example.com",
},
Spec: apiextensions.CustomResourceDefinitionSpec{
Spec: apiextensions.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Versions: []apiextensions.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensions.CustomResourceValidation{
OpenAPIV3Schema: &apiextensions.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensions.JSONSchemaProps{
"cronSpec": {Type: "string"},
"image": {Type: "string"},
"image": {Type: "string"},
"replicas": {Type: "integer"},
},
},
Expand All @@ -855,14 +859,14 @@ func testCRD() *apiextensions.CustomResourceDefinition {
func testCronTab() *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"kind": "CronTab",
"metadata": map[string]interface{}{
"name": "test-crontab",
"name": "test-crontab",
"namespace": "default",
},
"spec": map[string]interface{}{
"cronSpec": "* * * * */5",
"image": "my-awesome-cron-image",
"image": "my-awesome-cron-image",
},
}}
}
Expand Down Expand Up @@ -1006,3 +1010,106 @@ func TestIterateHierachy(t *testing.T) {
keys)
})
}

func TestIterateHierachyV2(t *testing.T) {
cluster := newCluster(t, testPod(), testRS(), testExtensionsRS(), testDeploy())
err := cluster.EnsureSynced()
require.NoError(t, err)

t.Run("IterateAll", func(t *testing.T) {
startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))}
keys := []kube.ResourceKey{}
cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool {
keys = append(keys, child.ResourceKey())
return true
})

assert.ElementsMatch(t,
[]kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(testPod())),
kube.GetResourceKey(mustToUnstructured(testRS())),
kube.GetResourceKey(mustToUnstructured(testDeploy()))},
keys)
})

t.Run("ExitAtRoot", func(t *testing.T) {
startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))}
keys := []kube.ResourceKey{}
cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool {
keys = append(keys, child.ResourceKey())
return false
})

assert.ElementsMatch(t,
[]kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(testDeploy()))},
keys)
})

t.Run("ExitAtSecondLevelChild", func(t *testing.T) {
startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))}
keys := []kube.ResourceKey{}
cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool {
keys = append(keys, child.ResourceKey())
return child.ResourceKey().Kind != kube.ReplicaSetKind
})

assert.ElementsMatch(t,
[]kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(testDeploy())),
kube.GetResourceKey(mustToUnstructured(testRS())),
},
keys)
})

t.Run("ExitAtThirdLevelChild", func(t *testing.T) {
startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))}
keys := []kube.ResourceKey{}
cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool {
keys = append(keys, child.ResourceKey())
return child.ResourceKey().Kind != kube.PodKind
})

assert.ElementsMatch(t,
[]kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(testDeploy())),
kube.GetResourceKey(mustToUnstructured(testRS())),
kube.GetResourceKey(mustToUnstructured(testPod())),
},
keys)
})

t.Run("IterateAllStartFromMultiple", func(t *testing.T) {
startKeys := []kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(testRS())),
kube.GetResourceKey(mustToUnstructured(testDeploy())),
}
keys := []kube.ResourceKey{}
cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool {
keys = append(keys, child.ResourceKey())
return true
})

assert.ElementsMatch(t,
[]kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(testPod())),
kube.GetResourceKey(mustToUnstructured(testRS())),
kube.GetResourceKey(mustToUnstructured(testDeploy()))},
keys)
})

t.Run("IterateStartFromExtensionsRS", func(t *testing.T) {
startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}
keys := []kube.ResourceKey{}
cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool {
keys = append(keys, child.ResourceKey())
return true
})

assert.ElementsMatch(t,
[]kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(testPod())),
kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))},
keys)
})
}
25 changes: 25 additions & 0 deletions pkg/cache/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,28 @@ func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents ma
}
}
}

func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceKey, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) {
key := r.ResourceKey()
if visited[key] == 2 {
return
}
visited[key] = 1
defer func() {
visited[key] = 2
}()
childKeys, ok := graph[key]
if !ok || childKeys == nil {
return
}
for _, childKey := range childKeys {
child := ns[childKey]
if visited[childKey] == 1 {
_ = action(fmt.Errorf("circular dependency detected. %s is child and parent of %s", childKey.String(), key.String()), child, ns)
} else if visited[childKey] == 0 {
if action(nil, child, ns) {
child.iterateChildrenV2(graph, ns, visited, action)
}
}
}
}

0 comments on commit b0f2923

Please sign in to comment.