Skip to content

Commit

Permalink
fix: deadlock on start missing watches (#604) (#610)
Browse files Browse the repository at this point in the history
* fix: deadlock on start missing watches



* revert error



* add unit test to validate some deadlock scenarios



* test name



* clarify comment



---------

Signed-off-by: Alexandre Gaudreault <[email protected]>
  • Loading branch information
agaudreault authored Jul 15, 2024
1 parent 4d911d2 commit 18ba62e
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 49 deletions.
32 changes: 18 additions & 14 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ func (c *clusterCache) startMissingWatches() error {
delete(namespacedResources, api.GroupKind)
return nil
}

}
go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
return nil
Expand All @@ -527,11 +528,13 @@ func runSynced(lock sync.Locker, action func() error) error {
}

// listResources creates list pager and enforces number of concurrent list requests
// The callback should not wait on any locks that may be held by other callers.
func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) {
if err := c.listSemaphore.Acquire(ctx, 1); err != nil {
return "", err
}
defer c.listSemaphore.Release(1)

var retryCount int64 = 0
resourceVersion := ""
listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
Expand Down Expand Up @@ -568,30 +571,31 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
}

func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
var items []*Resource
err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
var items []*Resource
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
} else {
items = append(items, c.newResource(un))
}
return nil
})
})

if err != nil {
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}
if lock {
return runSynced(&c.lock, func() error {
c.replaceResourceCache(api.GroupKind, items, ns)
return nil
})
} else {
if err != nil {
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}

if lock {
return resourceVersion, runSynced(&c.lock, func() error {
c.replaceResourceCache(api.GroupKind, items, ns)
return nil
}
})
})
} else {
c.replaceResourceCache(api.GroupKind, items, ns)
return resourceVersion, nil
}
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
Expand Down
138 changes: 114 additions & 24 deletions pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package cache
import (
"context"
"fmt"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"sort"
"strings"
"sync"
"testing"
"time"

"golang.org/x/sync/semaphore"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -71,6 +74,16 @@ var (
)

func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache {
cache := newClusterWithOptions(t, []UpdateSettingsFunc{}, objs...)

t.Cleanup(func() {
cache.Invalidate()
})

return cache
}

func newClusterWithOptions(t *testing.T, opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache {
client := fake.NewSimpleDynamicClient(scheme.Scheme, objs...)
reactor := client.ReactionChain[0]
client.PrependReactor("list", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
Expand Down Expand Up @@ -101,11 +114,14 @@ func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache {
Meta: metav1.APIResource{Namespaced: true},
}}

opts = append([]UpdateSettingsFunc{
SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client}),
}, opts...)

cache := NewClusterCache(
&rest.Config{Host: "https://test"}, SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client}))
t.Cleanup(func() {
cache.Invalidate()
})
&rest.Config{Host: "https://test"},
opts...,
)
return cache
}

Expand Down Expand Up @@ -492,23 +508,23 @@ metadata:
func TestGetManagedLiveObjsFailedConversion(t *testing.T) {
cronTabGroup := "stable.example.com"

testCases := []struct{
name string
localConvertFails bool
testCases := []struct {
name string
localConvertFails bool
expectConvertToVersionCalled bool
expectGetResourceCalled bool
expectGetResourceCalled bool
}{
{
name: "local convert fails, so GetResource is called",
localConvertFails: true,
name: "local convert fails, so GetResource is called",
localConvertFails: true,
expectConvertToVersionCalled: true,
expectGetResourceCalled: true,
expectGetResourceCalled: true,
},
{
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
expectConvertToVersionCalled: true,
expectGetResourceCalled: false,
expectGetResourceCalled: false,
},
}

Expand Down Expand Up @@ -557,7 +573,6 @@ metadata:
return testCronTab(), nil
})


managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool {
return true
})
Expand Down Expand Up @@ -816,25 +831,25 @@ func testPod() *corev1.Pod {

func testCRD() *apiextensions.CustomResourceDefinition {
return &apiextensions.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "crontabs.stable.example.com",
},
Spec: apiextensions.CustomResourceDefinitionSpec{
Spec: apiextensions.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Versions: []apiextensions.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensions.CustomResourceValidation{
OpenAPIV3Schema: &apiextensions.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensions.JSONSchemaProps{
"cronSpec": {Type: "string"},
"image": {Type: "string"},
"image": {Type: "string"},
"replicas": {Type: "integer"},
},
},
Expand All @@ -855,14 +870,14 @@ func testCRD() *apiextensions.CustomResourceDefinition {
func testCronTab() *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"kind": "CronTab",
"metadata": map[string]interface{}{
"name": "test-crontab",
"name": "test-crontab",
"namespace": "default",
},
"spec": map[string]interface{}{
"cronSpec": "* * * * */5",
"image": "my-awesome-cron-image",
"image": "my-awesome-cron-image",
},
}}
}
Expand Down Expand Up @@ -1006,3 +1021,78 @@ func TestIterateHierachy(t *testing.T) {
keys)
})
}

// Test_watchEvents_Deadlock validates that starting watches will not create a deadlock
// caused by using improper locking in various callback methods when there is a high load on the
// system.
func Test_watchEvents_Deadlock(t *testing.T) {
// deadlock lock is used to simulate a user function calling the cluster cache while holding a lock
// and using this lock in callbacks such as OnPopulateResourceInfoHandler.
deadlock := sync.RWMutex{}

hasDeadlock := false
res1 := testPod()
res2 := testRS()

cluster := newClusterWithOptions(t, []UpdateSettingsFunc{
// Set low blocking semaphore
SetListSemaphore(semaphore.NewWeighted(1)),
// Resync watches often to use the semaphore and trigger the rate limiting behavior
SetResyncTimeout(500 * time.Millisecond),
// Use new resource handler to run code in the list callbacks
SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) {
if un.GroupVersionKind().GroupKind() == res1.GroupVersionKind().GroupKind() ||
un.GroupVersionKind().GroupKind() == res2.GroupVersionKind().GroupKind() {
// Create a bottleneck for resources holding the semaphore
time.Sleep(2 * time.Second)
}

//// Uncommenting the following code will simulate a different deadlock on purpose caused by
//// client code holding a lock and trying to acquire the same lock in the event callback.
//// It provides an easy way to validate if the test detect deadlocks as expected.
//// If the test fails with this code commented, a deadlock do exist in the codebase.
// deadlock.RLock()
// defer deadlock.RUnlock()

return
}),
}, res1, res2, testDeploy())
defer func() {
// Invalidate() is a blocking method and cannot be called safely in case of deadlock
if !hasDeadlock {
cluster.Invalidate()
}
}()

err := cluster.EnsureSynced()
require.NoError(t, err)

for i := 0; i < 2; i++ {
done := make(chan bool, 1)
go func() {
// Stop the watches, so startMissingWatches will restart them
cluster.stopWatching(res1.GroupVersionKind().GroupKind(), res1.Namespace)
cluster.stopWatching(res2.GroupVersionKind().GroupKind(), res2.Namespace)

// calling startMissingWatches to simulate that a CRD event was received
// TODO: how to simulate real watch events and test the full watchEvents function?
err = runSynced(&cluster.lock, func() error {
deadlock.Lock()
defer deadlock.Unlock()
return cluster.startMissingWatches()
})
require.NoError(t, err)
done <- true
}()
select {
case v := <-done:
require.True(t, v)
case <-time.After(10 * time.Second):
hasDeadlock = true
t.Errorf("timeout reached on attempt %d. It is possible that a deadlock occured", i)
// Tip: to debug the deadlock, increase the timer to a value higher than X in "go test -timeout X"
// This will make the test panic with the goroutines information
t.FailNow()
}
}
}
22 changes: 11 additions & 11 deletions pkg/cache/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"k8s.io/client-go/rest"
)

var c = NewClusterCache(&rest.Config{})
var cacheTest = NewClusterCache(&rest.Config{})

func TestIsParentOf(t *testing.T) {
child := c.newResource(mustToUnstructured(testPod()))
parent := c.newResource(mustToUnstructured(testRS()))
grandParent := c.newResource(mustToUnstructured(testDeploy()))
child := cacheTest.newResource(mustToUnstructured(testPod()))
parent := cacheTest.newResource(mustToUnstructured(testRS()))
grandParent := cacheTest.newResource(mustToUnstructured(testDeploy()))

assert.True(t, parent.isParentOf(child))
assert.False(t, grandParent.isParentOf(child))
Expand All @@ -22,38 +22,38 @@ func TestIsParentOfSameKindDifferentGroupAndUID(t *testing.T) {
rs := testRS()
rs.APIVersion = "somecrd.io/v1"
rs.SetUID("123")
child := c.newResource(mustToUnstructured(testPod()))
invalidParent := c.newResource(mustToUnstructured(rs))
child := cacheTest.newResource(mustToUnstructured(testPod()))
invalidParent := cacheTest.newResource(mustToUnstructured(rs))

assert.False(t, invalidParent.isParentOf(child))
}

func TestIsServiceParentOfEndPointWithTheSameName(t *testing.T) {
nonMatchingNameEndPoint := c.newResource(strToUnstructured(`
nonMatchingNameEndPoint := cacheTest.newResource(strToUnstructured(`
apiVersion: v1
kind: Endpoints
metadata:
name: not-matching-name
namespace: default
`))

matchingNameEndPoint := c.newResource(strToUnstructured(`
matchingNameEndPoint := cacheTest.newResource(strToUnstructured(`
apiVersion: v1
kind: Endpoints
metadata:
name: helm-guestbook
namespace: default
`))

parent := c.newResource(testService)
parent := cacheTest.newResource(testService)

assert.True(t, parent.isParentOf(matchingNameEndPoint))
assert.Equal(t, parent.Ref.UID, matchingNameEndPoint.OwnerRefs[0].UID)
assert.False(t, parent.isParentOf(nonMatchingNameEndPoint))
}

func TestIsServiceAccountParentOfSecret(t *testing.T) {
serviceAccount := c.newResource(strToUnstructured(`
serviceAccount := cacheTest.newResource(strToUnstructured(`
apiVersion: v1
kind: ServiceAccount
metadata:
Expand All @@ -63,7 +63,7 @@ metadata:
secrets:
- name: default-token-123
`))
tokenSecret := c.newResource(strToUnstructured(`
tokenSecret := cacheTest.newResource(strToUnstructured(`
apiVersion: v1
kind: Secret
metadata:
Expand Down

0 comments on commit 18ba62e

Please sign in to comment.