Skip to content

Commit

Permalink
Merge pull request #7 from authzed/queue-done
Browse files Browse the repository at this point in the history
remove current key from queue after sync
  • Loading branch information
ecordell committed Aug 31, 2022
2 parents 9aca32d + 854c8ff commit 23f5716
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
3 changes: 1 addition & 2 deletions manager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func (c *OwnedResourceController) processNext(ctx context.Context) bool {
ctx = c.OperationsContext.WithValue(ctx, queue.NewOperations(done, requeue))

c.sync(ctx, *gvr, namespace, name)

cancel()
done()
<-ctx.Done()

return true
Expand Down
42 changes: 42 additions & 0 deletions manager/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package manager

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
ctrlmanageropts "k8s.io/controller-manager/options"

"github.com/authzed/controller-idioms/cachekeys"
"github.com/authzed/controller-idioms/queue"
"github.com/authzed/controller-idioms/typed"
)
Expand Down Expand Up @@ -37,3 +41,41 @@ func ExampleNewOwnedResourceController() {
_ = mgr.Start(ctx, controller)
// Output:
}

func TestControllerQueueDone(t *testing.T) {
gvr := schema.GroupVersionResource{
Group: "example.com",
Version: "v1",
Resource: "mytypes",
}
CtxQueue := queue.NewQueueOperationsCtx()
registry := typed.NewRegistry()
broadcaster := record.NewBroadcaster()
eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewSimpleClientset().CoreV1().Events("")}

controller := NewOwnedResourceController("my-controller", gvr, CtxQueue, registry, broadcaster, func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) {
})

mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":", broadcaster, eventSink)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
_ = mgr.Start(ctx, controller)
}()

// add many keys
for i := 0; i < 10; i++ {
controller.Queue.Add(cachekeys.GVRMetaNamespaceKeyer(gvr, fmt.Sprintf("test/%d", i)))
}
require.Eventually(t, func() bool {
return controller.Queue.Len() == 0
}, 1*time.Second, 1*time.Millisecond)

// add the same key many times
for i := 0; i < 10; i++ {
controller.Queue.Add(cachekeys.GVRMetaNamespaceKeyer(gvr, "test/a"))
}
require.Eventually(t, func() bool {
return controller.Queue.Len() == 0
}, 1*time.Second, 1*time.Millisecond)
}

0 comments on commit 23f5716

Please sign in to comment.