From 18ba62e1f1fbc615e9ebe23392b064427c7795f5 Mon Sep 17 00:00:00 2001 From: Alexandre Gaudreault Date: Mon, 15 Jul 2024 10:16:05 -0400 Subject: [PATCH] fix: deadlock on start missing watches (#604) (#610) * fix: deadlock on start missing watches * revert error * add unit test to validate some deadlock scenarios * test name * clarify comment --------- Signed-off-by: Alexandre Gaudreault --- pkg/cache/cluster.go | 32 +++++---- pkg/cache/cluster_test.go | 138 ++++++++++++++++++++++++++++++------- pkg/cache/resource_test.go | 22 +++--- 3 files changed, 143 insertions(+), 49 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index f1b4672df..2708d22a4 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -507,6 +507,7 @@ func (c *clusterCache) startMissingWatches() error { delete(namespacedResources, api.GroupKind) return nil } + } go c.watchEvents(ctx, api, resClient, ns, resourceVersion) return nil @@ -527,11 +528,13 @@ func runSynced(lock sync.Locker, action func() error) error { } // listResources creates list pager and enforces number of concurrent list requests +// The callback should not wait on any locks that may be held by other callers. func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) { if err := c.listSemaphore.Acquire(ctx, 1); err != nil { return "", err } defer c.listSemaphore.Release(1) + var retryCount int64 = 0 resourceVersion := "" listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { @@ -568,9 +571,9 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso } func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) { - return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { - var items []*Resource - err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + var items []*Resource + resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { + return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { if un, ok := obj.(*unstructured.Unstructured); !ok { return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName()) } else { @@ -578,20 +581,21 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc } return nil }) + }) - if err != nil { - return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err) - } - if lock { - return runSynced(&c.lock, func() error { - c.replaceResourceCache(api.GroupKind, items, ns) - return nil - }) - } else { + if err != nil { + return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err) + } + + if lock { + return resourceVersion, runSynced(&c.lock, func() error { c.replaceResourceCache(api.GroupKind, items, ns) return nil - } - }) + }) + } else { + c.replaceResourceCache(api.GroupKind, items, ns) + return resourceVersion, nil + } } func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 68221ab89..f5e61a065 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -3,12 +3,15 @@ package cache import ( "context" "fmt" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" "sort" "strings" + "sync" "testing" "time" + "golang.org/x/sync/semaphore" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -71,6 +74,16 @@ var ( ) func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache { + cache := newClusterWithOptions(t, []UpdateSettingsFunc{}, objs...) + + t.Cleanup(func() { + cache.Invalidate() + }) + + return cache +} + +func newClusterWithOptions(t *testing.T, opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache { client := fake.NewSimpleDynamicClient(scheme.Scheme, objs...) reactor := client.ReactionChain[0] client.PrependReactor("list", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { @@ -101,11 +114,14 @@ func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache { Meta: metav1.APIResource{Namespaced: true}, }} + opts = append([]UpdateSettingsFunc{ + SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client}), + }, opts...) + cache := NewClusterCache( - &rest.Config{Host: "https://test"}, SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client})) - t.Cleanup(func() { - cache.Invalidate() - }) + &rest.Config{Host: "https://test"}, + opts..., + ) return cache } @@ -492,23 +508,23 @@ metadata: func TestGetManagedLiveObjsFailedConversion(t *testing.T) { cronTabGroup := "stable.example.com" - testCases := []struct{ - name string - localConvertFails bool + testCases := []struct { + name string + localConvertFails bool expectConvertToVersionCalled bool - expectGetResourceCalled bool + expectGetResourceCalled bool }{ { - name: "local convert fails, so GetResource is called", - localConvertFails: true, + name: "local convert fails, so GetResource is called", + localConvertFails: true, expectConvertToVersionCalled: true, - expectGetResourceCalled: true, + expectGetResourceCalled: true, }, { - name: "local convert succeeds, so GetResource is not called", - localConvertFails: false, + name: "local convert succeeds, so GetResource is not called", + localConvertFails: false, expectConvertToVersionCalled: true, - expectGetResourceCalled: false, + expectGetResourceCalled: false, }, } @@ -557,7 +573,6 @@ metadata: return testCronTab(), nil }) - managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool { return true }) @@ -816,25 +831,25 @@ func testPod() *corev1.Pod { func testCRD() *apiextensions.CustomResourceDefinition { return &apiextensions.CustomResourceDefinition{ - TypeMeta: metav1.TypeMeta{ + TypeMeta: metav1.TypeMeta{ APIVersion: "apiextensions.k8s.io/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: "crontabs.stable.example.com", }, - Spec: apiextensions.CustomResourceDefinitionSpec{ + Spec: apiextensions.CustomResourceDefinitionSpec{ Group: "stable.example.com", Versions: []apiextensions.CustomResourceDefinitionVersion{ { - Name: "v1", - Served: true, + Name: "v1", + Served: true, Storage: true, Schema: &apiextensions.CustomResourceValidation{ OpenAPIV3Schema: &apiextensions.JSONSchemaProps{ Type: "object", Properties: map[string]apiextensions.JSONSchemaProps{ "cronSpec": {Type: "string"}, - "image": {Type: "string"}, + "image": {Type: "string"}, "replicas": {Type: "integer"}, }, }, @@ -855,14 +870,14 @@ func testCRD() *apiextensions.CustomResourceDefinition { func testCronTab() *unstructured.Unstructured { return &unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": "stable.example.com/v1", - "kind": "CronTab", + "kind": "CronTab", "metadata": map[string]interface{}{ - "name": "test-crontab", + "name": "test-crontab", "namespace": "default", }, "spec": map[string]interface{}{ "cronSpec": "* * * * */5", - "image": "my-awesome-cron-image", + "image": "my-awesome-cron-image", }, }} } @@ -1006,3 +1021,78 @@ func TestIterateHierachy(t *testing.T) { keys) }) } + +// Test_watchEvents_Deadlock validates that starting watches will not create a deadlock +// caused by using improper locking in various callback methods when there is a high load on the +// system. +func Test_watchEvents_Deadlock(t *testing.T) { + // deadlock lock is used to simulate a user function calling the cluster cache while holding a lock + // and using this lock in callbacks such as OnPopulateResourceInfoHandler. + deadlock := sync.RWMutex{} + + hasDeadlock := false + res1 := testPod() + res2 := testRS() + + cluster := newClusterWithOptions(t, []UpdateSettingsFunc{ + // Set low blocking semaphore + SetListSemaphore(semaphore.NewWeighted(1)), + // Resync watches often to use the semaphore and trigger the rate limiting behavior + SetResyncTimeout(500 * time.Millisecond), + // Use new resource handler to run code in the list callbacks + SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { + if un.GroupVersionKind().GroupKind() == res1.GroupVersionKind().GroupKind() || + un.GroupVersionKind().GroupKind() == res2.GroupVersionKind().GroupKind() { + // Create a bottleneck for resources holding the semaphore + time.Sleep(2 * time.Second) + } + + //// Uncommenting the following code will simulate a different deadlock on purpose caused by + //// client code holding a lock and trying to acquire the same lock in the event callback. + //// It provides an easy way to validate if the test detect deadlocks as expected. + //// If the test fails with this code commented, a deadlock do exist in the codebase. + // deadlock.RLock() + // defer deadlock.RUnlock() + + return + }), + }, res1, res2, testDeploy()) + defer func() { + // Invalidate() is a blocking method and cannot be called safely in case of deadlock + if !hasDeadlock { + cluster.Invalidate() + } + }() + + err := cluster.EnsureSynced() + require.NoError(t, err) + + for i := 0; i < 2; i++ { + done := make(chan bool, 1) + go func() { + // Stop the watches, so startMissingWatches will restart them + cluster.stopWatching(res1.GroupVersionKind().GroupKind(), res1.Namespace) + cluster.stopWatching(res2.GroupVersionKind().GroupKind(), res2.Namespace) + + // calling startMissingWatches to simulate that a CRD event was received + // TODO: how to simulate real watch events and test the full watchEvents function? + err = runSynced(&cluster.lock, func() error { + deadlock.Lock() + defer deadlock.Unlock() + return cluster.startMissingWatches() + }) + require.NoError(t, err) + done <- true + }() + select { + case v := <-done: + require.True(t, v) + case <-time.After(10 * time.Second): + hasDeadlock = true + t.Errorf("timeout reached on attempt %d. It is possible that a deadlock occured", i) + // Tip: to debug the deadlock, increase the timer to a value higher than X in "go test -timeout X" + // This will make the test panic with the goroutines information + t.FailNow() + } + } +} diff --git a/pkg/cache/resource_test.go b/pkg/cache/resource_test.go index 0562cc65c..45e597341 100644 --- a/pkg/cache/resource_test.go +++ b/pkg/cache/resource_test.go @@ -7,12 +7,12 @@ import ( "k8s.io/client-go/rest" ) -var c = NewClusterCache(&rest.Config{}) +var cacheTest = NewClusterCache(&rest.Config{}) func TestIsParentOf(t *testing.T) { - child := c.newResource(mustToUnstructured(testPod())) - parent := c.newResource(mustToUnstructured(testRS())) - grandParent := c.newResource(mustToUnstructured(testDeploy())) + child := cacheTest.newResource(mustToUnstructured(testPod())) + parent := cacheTest.newResource(mustToUnstructured(testRS())) + grandParent := cacheTest.newResource(mustToUnstructured(testDeploy())) assert.True(t, parent.isParentOf(child)) assert.False(t, grandParent.isParentOf(child)) @@ -22,14 +22,14 @@ func TestIsParentOfSameKindDifferentGroupAndUID(t *testing.T) { rs := testRS() rs.APIVersion = "somecrd.io/v1" rs.SetUID("123") - child := c.newResource(mustToUnstructured(testPod())) - invalidParent := c.newResource(mustToUnstructured(rs)) + child := cacheTest.newResource(mustToUnstructured(testPod())) + invalidParent := cacheTest.newResource(mustToUnstructured(rs)) assert.False(t, invalidParent.isParentOf(child)) } func TestIsServiceParentOfEndPointWithTheSameName(t *testing.T) { - nonMatchingNameEndPoint := c.newResource(strToUnstructured(` + nonMatchingNameEndPoint := cacheTest.newResource(strToUnstructured(` apiVersion: v1 kind: Endpoints metadata: @@ -37,7 +37,7 @@ metadata: namespace: default `)) - matchingNameEndPoint := c.newResource(strToUnstructured(` + matchingNameEndPoint := cacheTest.newResource(strToUnstructured(` apiVersion: v1 kind: Endpoints metadata: @@ -45,7 +45,7 @@ metadata: namespace: default `)) - parent := c.newResource(testService) + parent := cacheTest.newResource(testService) assert.True(t, parent.isParentOf(matchingNameEndPoint)) assert.Equal(t, parent.Ref.UID, matchingNameEndPoint.OwnerRefs[0].UID) @@ -53,7 +53,7 @@ metadata: } func TestIsServiceAccountParentOfSecret(t *testing.T) { - serviceAccount := c.newResource(strToUnstructured(` + serviceAccount := cacheTest.newResource(strToUnstructured(` apiVersion: v1 kind: ServiceAccount metadata: @@ -63,7 +63,7 @@ metadata: secrets: - name: default-token-123 `)) - tokenSecret := c.newResource(strToUnstructured(` + tokenSecret := cacheTest.newResource(strToUnstructured(` apiVersion: v1 kind: Secret metadata: