Skip to content

Commit

Permalink
chore: Optimize usage of locking in the cluster [#602]
Browse files Browse the repository at this point in the history
Closes #602
Use read lock for handlers getting. Refactor updating resources in the cluster cache to only acquire lock for an update itself, not calling handlers.

Signed-off-by: Andrii Korotkov <[email protected]>
  • Loading branch information
andrii-korotkov-verkada committed Jul 19, 2024
1 parent 6b2984e commit 637dca1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 12 deletions.
58 changes: 47 additions & 11 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ type clusterCache struct {
clusterResources bool
settings Settings

handlersLock sync.Mutex
handlersLock sync.RWMutex
handlerKey uint64
populateResourceInfoHandler OnPopulateResourceInfoHandler
resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler
Expand Down Expand Up @@ -266,8 +266,8 @@ func (c *clusterCache) OnResourceUpdated(handler OnResourceUpdatedHandler) Unsub
}

func (c *clusterCache) getResourceUpdatedHandlers() []OnResourceUpdatedHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
c.handlersLock.RLock()
defer c.handlersLock.RUnlock()
var handlers []OnResourceUpdatedHandler
for _, h := range c.resourceUpdatedHandlers {
handlers = append(handlers, h)
Expand All @@ -290,8 +290,8 @@ func (c *clusterCache) OnEvent(handler OnEventHandler) Unsubscribe {
}

func (c *clusterCache) getEventHandlers() []OnEventHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
c.handlersLock.RLock()
defer c.handlersLock.RUnlock()
handlers := make([]OnEventHandler, 0, len(c.eventHandlers))
for _, h := range c.eventHandlers {
handlers = append(handlers, h)
Expand Down Expand Up @@ -1240,26 +1240,54 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
return
}

ok, newRes, oldRes, ns := c.writeForResourceEvent(key, event, un)
if ok {
// Requesting a read lock, so that namespace resources aren't written to as they are being read from.
// Since each group/kind is processed by its own goroutine, resource shouldn't be updated between
// releasing write lock in `writeForResourceEvent` and acquiring this read lock, but namespace resources might be
// updated by other goroutines, resulting in a potentially fresher view of resources. However, potentially all
// of these variables can become stale if there's a cluster cache update. With respect to ArgoCD usage, either of
// these scenarios can result in triggering refresh for a wrong app in the worst case, which should be rare and
// doesn't hurt.
c.lock.RLock()
defer c.lock.RUnlock()
for _, h := range c.getResourceUpdatedHandlers() {
h(newRes, oldRes, ns)
}
}
}

// Encapsulates the logic of updating the resource in the cluster cache to limit the scope of locking.
func (c *clusterCache) writeForResourceEvent(key kube.ResourceKey, event watch.EventType, un *unstructured.Unstructured) (bool, *Resource, *Resource, map[kube.ResourceKey]*Resource) {
c.lock.Lock()
defer c.lock.Unlock()
existingNode, exists := c.resources[key]
if event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
ok, existing, ns := c.removeNode(key)
return ok, nil, existing, ns
} else {
return false, nil, nil, nil
}
} else if event != watch.Deleted {
c.onNodeUpdated(existingNode, c.newResource(un))
} else {
newRes, ns := c.updateNode(c.newResource(un))
return true, newRes, existingNode, ns
}
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
func (c *clusterCache) updateNode(newRes *Resource) (*Resource, map[kube.ResourceKey]*Resource) {
c.setNode(newRes)
return newRes, c.nsIndex[newRes.Ref.Namespace]
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
_, ns := c.updateNode(newRes)
for _, h := range c.getResourceUpdatedHandlers() {
h(newRes, oldRes, c.nsIndex[newRes.Ref.Namespace])
h(newRes, oldRes, ns)
}
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
func (c *clusterCache) removeNode(key kube.ResourceKey) (bool, *Resource, map[kube.ResourceKey]*Resource) {
existing, ok := c.resources[key]
if ok {
delete(c.resources, key)
Expand All @@ -1278,6 +1306,14 @@ func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
}
}
}
return true, existing, ns
}
return false, nil, nil
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
ok, existing, ns := c.removeNode(key)
if ok {
for _, h := range c.getResourceUpdatedHandlers() {
h(nil, existing, ns)
}
Expand Down
46 changes: 45 additions & 1 deletion pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -810,7 +811,7 @@ func getResourceKey(t *testing.T, obj runtime.Object) kube.ResourceKey {
return kube.NewResourceKey(gvk.Group, gvk.Kind, m.GetNamespace(), m.GetName())
}

func testPod1() *corev1.Pod {
func testPod1WithAnnotations(annotations map[string]string) *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -830,10 +831,15 @@ func testPod1() *corev1.Pod {
UID: "2",
},
},
Annotations: annotations,
},
}
}

func testPod1() *corev1.Pod {
return testPod1WithAnnotations(nil)
}

// Similar to pod1, but owner reference lacks uid
func testPod2() *corev1.Pod {
return &corev1.Pod{
Expand Down Expand Up @@ -1327,3 +1333,41 @@ func BenchmarkIterateHierarchyV2(b *testing.B) {
// })
// }
//}

func BenchmarkProcessEvents(b *testing.B) {
cluster := newCluster(b, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy())
err := cluster.EnsureSynced()
require.NoError(b, err)
counter := atomic.Int64{}
iterCount := 1000000
updatesCount := 10000
goroutineCount := 100
_ = cluster.OnResourceUpdated(func(newRes *Resource, oldRes *Resource, namespaceResources map[kube.ResourceKey]*Resource) {
localCounter := int64(0)
for i := 0; i < iterCount; i++ {
localCounter += 1
}
counter.Add(localCounter)
})
var wg sync.WaitGroup
b.ResetTimer()
for n := 0; n < b.N; n++ {
counter.Store(0)
for i := 0; i < goroutineCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
limit := updatesCount / goroutineCount
if i < updatesCount%goroutineCount {
limit++
}
for j := 0; j < limit; j++ {
un := mustToUnstructured(testPod1WithAnnotations(map[string]string{"test": fmt.Sprintf("%d", goroutineCount*j+i)}))
cluster.processEvent(watch.Modified, un)
}
}(i)
}
wg.Wait()
require.Equal(b, int64(updatesCount)*int64(iterCount), counter.Load())
}
}

0 comments on commit 637dca1

Please sign in to comment.