diff --git a/manager/controller.go b/manager/controller.go index 82f0eb3..3a30c53 100644 --- a/manager/controller.go +++ b/manager/controller.go @@ -163,8 +163,7 @@ func (c *OwnedResourceController) processNext(ctx context.Context) bool { ctx = c.OperationsContext.WithValue(ctx, queue.NewOperations(done, requeue)) c.sync(ctx, *gvr, namespace, name) - - cancel() + done() <-ctx.Done() return true diff --git a/manager/controller_test.go b/manager/controller_test.go index ffd34e6..d3f3f2a 100644 --- a/manager/controller_test.go +++ b/manager/controller_test.go @@ -2,14 +2,18 @@ package manager import ( "context" + "fmt" + "testing" "time" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/fake" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" ctrlmanageropts "k8s.io/controller-manager/options" + "github.com/authzed/controller-idioms/cachekeys" "github.com/authzed/controller-idioms/queue" "github.com/authzed/controller-idioms/typed" ) @@ -37,3 +41,41 @@ func ExampleNewOwnedResourceController() { _ = mgr.Start(ctx, controller) // Output: } + +func TestControllerQueueDone(t *testing.T) { + gvr := schema.GroupVersionResource{ + Group: "example.com", + Version: "v1", + Resource: "mytypes", + } + CtxQueue := queue.NewQueueOperationsCtx() + registry := typed.NewRegistry() + broadcaster := record.NewBroadcaster() + eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewSimpleClientset().CoreV1().Events("")} + + controller := NewOwnedResourceController("my-controller", gvr, CtxQueue, registry, broadcaster, func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) { + }) + + mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":", broadcaster, eventSink) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + _ = mgr.Start(ctx, controller) + }() + + // add many keys + for i := 0; i < 10; i++ { + controller.Queue.Add(cachekeys.GVRMetaNamespaceKeyer(gvr, fmt.Sprintf("test/%d", i))) + } + require.Eventually(t, func() bool { + return controller.Queue.Len() == 0 + }, 1*time.Second, 1*time.Millisecond) + + // add the same key many times + for i := 0; i < 10; i++ { + controller.Queue.Add(cachekeys.GVRMetaNamespaceKeyer(gvr, "test/a")) + } + require.Eventually(t, func() bool { + return controller.Queue.Len() == 0 + }, 1*time.Second, 1*time.Millisecond) +}