Skip to content

Commit

Permalink
feat(detector): resource detector matched policy potimization
Browse files Browse the repository at this point in the history
Signed-off-by: chang.qiangqiang <[email protected]>
  • Loading branch information
CharlesQQ committed Nov 19, 2024
1 parent 6f138cf commit a3b488b
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 140 deletions.
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/karmada-io/karmada/pkg/dependenciesdistributor"
"github.com/karmada-io/karmada/pkg/detector"
"github.com/karmada-io/karmada/pkg/features"
generatedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
Expand Down Expand Up @@ -725,6 +726,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)
generatedClientSet := generatedclientset.NewForConfigOrDie(restConfig)

overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName))
skippedResourceConfig := util.NewSkippedResourceConfig()
Expand All @@ -748,10 +750,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)

generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, opts.ResyncPeriod.Duration, stopChan)

resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(),
InformerManager: controlPlaneInformerManager,
GeneratedInformerManager: generatedInformerManager,
RESTMapper: mgr.GetRESTMapper(),
DynamicClient: dynamicClientSet,
SkippedResourceConfig: skippedResourceConfig,
Expand Down
8 changes: 8 additions & 0 deletions pkg/detector/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func getHighestPriorityPropagationPolicy(policies []*policyv1alpha1.PropagationP
var matchedPolicy *policyv1alpha1.PropagationPolicy

for _, policy := range policies {
if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policy.Namespace, policy.Name)
continue
}
implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...)
if implicitPriority <= util.PriorityMisMatch {
continue
Expand Down Expand Up @@ -70,6 +74,10 @@ func getHighestPriorityClusterPropagationPolicy(policies []*policyv1alpha1.Clust
var matchedClusterPolicy *policyv1alpha1.ClusterPropagationPolicy

for _, policy := range policies {
if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Cluster propagation policy(%s) cannot match any resource template because it's being deleted.", policy.Name)
continue
}
implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...)
if implicitPriority <= util.PriorityMisMatch {
continue
Expand Down
110 changes: 43 additions & 67 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/features"
generatedpolicylister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
Expand All @@ -71,6 +72,7 @@ type ResourceDetector struct {
// DynamicClient used to fetch arbitrary resources.
DynamicClient dynamic.Interface
InformerManager genericmanager.SingleClusterInformerManager
GeneratedInformerManager genericmanager.GeneratedInformerManager
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
SkippedResourceConfig *util.SkippedResourceConfig
Expand All @@ -81,12 +83,12 @@ type ResourceDetector struct {
// policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
// a reconcile function to consume the items in queue.
policyReconcileWorker util.AsyncWorker
propagationPolicyLister cache.GenericLister
propagationPolicyLister generatedpolicylister.PropagationPolicyLister

// clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and
// a reconcile function to consume the items in queue.
clusterPolicyReconcileWorker util.AsyncWorker
clusterPropagationPolicyLister cache.GenericLister
clusterPropagationPolicyLister generatedpolicylister.ClusterPropagationPolicyLister

RESTMapper meta.RESTMapper

Expand Down Expand Up @@ -132,24 +134,14 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.clusterPolicyReconcileWorker.Run(d.ConcurrentClusterPropagationPolicySyncs, d.stopCh)

// watch and enqueue PropagationPolicy changes.
propagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: policyv1alpha1.ResourcePluralPropagationPolicy,
}
policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, nil)
d.InformerManager.ForResource(propagationPolicyGVR, policyHandler)
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)
d.GeneratedInformerManager.ForPropagationPolicy(policyHandler)
d.propagationPolicyLister = d.GeneratedInformerManager.PropagationPolicyLister()

// watch and enqueue ClusterPropagationPolicy changes.
clusterPropagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: policyv1alpha1.ResourcePluralClusterPropagationPolicy,
}
clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, nil)
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)
d.GeneratedInformerManager.ForClusterPropagationPolicy(clusterPolicyHandler)
d.clusterPropagationPolicyLister = d.GeneratedInformerManager.ClusterPropagationPolicyLister()

detectorWorkerOptions := util.Options{
Name: "resource detector",
Expand All @@ -161,6 +153,8 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.EventHandler = fedinformer.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker(detectorWorkerOptions)
d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh)
d.GeneratedInformerManager.Start()
d.GeneratedInformerManager.WaitForCacheSync(d.stopCh)
go d.discoverResources(30 * time.Second)

<-d.stopCh
Expand Down Expand Up @@ -359,7 +353,7 @@ func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructure
}

klog.V(2).Infof("Attempts to match policy for resource(%s)", objectKey)
policyObjects, err := d.propagationPolicyLister.ByNamespace(objectKey.Namespace).List(labels.Everything())
policyObjects, err := d.propagationPolicyLister.PropagationPolicies(objectKey.Namespace).List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list propagation policy: %v", err)
return nil, err
Expand All @@ -369,22 +363,7 @@ func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructure
return nil, nil
}

policyList := make([]*policyv1alpha1.PropagationPolicy, 0)
for index := range policyObjects {
policy := &policyv1alpha1.PropagationPolicy{}
if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil {
klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err)
return nil, err
}

if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policy.Namespace, policy.Name)
continue
}
policyList = append(policyList, policy)
}

return getHighestPriorityPropagationPolicy(policyList, object, objectKey), nil
return getHighestPriorityPropagationPolicy(policyObjects, object, objectKey), nil
}

// LookForMatchedClusterPolicy tries to find a ClusterPropagationPolicy for object referenced by object key.
Expand All @@ -400,22 +379,7 @@ func (d *ResourceDetector) LookForMatchedClusterPolicy(object *unstructured.Unst
return nil, nil
}

policyList := make([]*policyv1alpha1.ClusterPropagationPolicy, 0)
for index := range policyObjects {
policy := &policyv1alpha1.ClusterPropagationPolicy{}
if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil {
klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err)
return nil, err
}

if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Cluster propagation policy(%s) cannot match any resource template because it's being deleted.", policy.Name)
continue
}
policyList = append(policyList, policy)
}

return getHighestPriorityClusterPropagationPolicy(policyList, object, objectKey), nil
return getHighestPriorityClusterPropagationPolicy(policyObjects, object, objectKey), nil
}

// ApplyPolicy starts propagate the object referenced by object key according to PropagationPolicy.
Expand Down Expand Up @@ -849,12 +813,24 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour

// OnPropagationPolicyAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) {
d.policyReconcileWorker.Enqueue(obj)
policyObj := obj.(*policyv1alpha1.PropagationPolicy)
policyObj.SetGroupVersionKind(schema.GroupVersionKind{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Kind: policyv1alpha1.ResourceKindPropagationPolicy,
})
d.policyReconcileWorker.Enqueue(policyObj)
}

// OnPropagationPolicyUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) {
d.policyReconcileWorker.Enqueue(newObj)
policyObj := newObj.(*policyv1alpha1.PropagationPolicy)
policyObj.SetGroupVersionKind(schema.GroupVersionKind{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Kind: policyv1alpha1.ResourceKindPropagationPolicy,
})
d.policyReconcileWorker.Enqueue(policyObj)

// Temporary solution of corner case: After the priority(.spec.priority) of
// PropagationPolicy changed from high priority (e.g. 5) to low priority(e.g. 3),
Expand Down Expand Up @@ -915,7 +891,7 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
return fmt.Errorf("invalid key")
}

unstructuredObj, err := d.propagationPolicyLister.Get(ckey.NamespaceKey())
propagationObject, err := d.propagationPolicyLister.PropagationPolicies(ckey.Namespace).Get(ckey.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -924,12 +900,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
return err
}

propagationObject := &policyv1alpha1.PropagationPolicy{}
if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil {
klog.Errorf("Failed to convert PropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
return err
}

if !propagationObject.DeletionTimestamp.IsZero() {
klog.Infof("PropagationPolicy(%s) is being deleted.", ckey.NamespaceKey())
if err = d.HandlePropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.PropagationPolicyPermanentIDLabel]); err != nil {
Expand All @@ -950,12 +920,24 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {

// OnClusterPropagationPolicyAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) {
d.clusterPolicyReconcileWorker.Enqueue(obj)
policyObj := obj.(*policyv1alpha1.ClusterPropagationPolicy)
policyObj.SetGroupVersionKind(schema.GroupVersionKind{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Kind: policyv1alpha1.ResourceKindClusterPropagationPolicy,
})
d.clusterPolicyReconcileWorker.Enqueue(policyObj)
}

// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) {
d.clusterPolicyReconcileWorker.Enqueue(newObj)
policyObj := newObj.(*policyv1alpha1.ClusterPropagationPolicy)
policyObj.SetGroupVersionKind(schema.GroupVersionKind{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Kind: policyv1alpha1.ResourceKindClusterPropagationPolicy,
})
d.clusterPolicyReconcileWorker.Enqueue(policyObj)

// Temporary solution of corner case: After the priority(.spec.priority) of
// ClusterPropagationPolicy changed from high priority (e.g. 5) to low priority(e.g. 3),
Expand Down Expand Up @@ -1016,7 +998,7 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey)
return fmt.Errorf("invalid key")
}

unstructuredObj, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey())
propagationObject, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey())
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -1026,12 +1008,6 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey)
return err
}

propagationObject := &policyv1alpha1.ClusterPropagationPolicy{}
if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil {
klog.Errorf("Failed to convert ClusterPropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
return err
}

if !propagationObject.DeletionTimestamp.IsZero() {
klog.Infof("ClusterPropagationPolicy(%s) is being deleted.", ckey.NamespaceKey())
if err = d.HandleClusterPropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel]); err != nil {
Expand Down
36 changes: 28 additions & 8 deletions pkg/detector/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package detector
import (
"context"
"fmt"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"regexp"
"strings"
"testing"
Expand All @@ -41,6 +42,7 @@ import (
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
fakegeneratedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
)
Expand Down Expand Up @@ -677,20 +679,29 @@ func TestLookForMatchedPolicy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheme := setupTestScheme()
stopChan := make(chan struct{})
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme)
generatedClientSet := fakegeneratedclientset.NewSimpleClientset()
generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopChan)

d := &ResourceDetector{
DynamicClient: fakeClient,
propagationPolicyLister: &mockPropagationPolicyLister{
policies: tt.policies,
},
DynamicClient: fakeClient,
propagationPolicyLister: generatedInformerManager.PropagationPolicyLister(),
}

objectKey := keys.ClusterWideKey{
Name: tt.object.GetName(),
Namespace: tt.object.GetNamespace(),
Kind: tt.object.GetKind(),
}
for _, object := range tt.policies {
_, err := generatedClientSet.PolicyV1alpha1().PropagationPolicies(object.Namespace).Create(context.TODO(), object, metav1.CreateOptions{})
if err != nil {
t.Errorf("Create PropagationPolicy failed: %v", err)
}
}
generatedInformerManager.Start()
generatedInformerManager.WaitForCacheSync(stopChan)

policy, err := d.LookForMatchedPolicy(tt.object, objectKey)

Expand Down Expand Up @@ -766,20 +777,29 @@ func TestLookForMatchedClusterPolicy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheme := setupTestScheme()
stopChan := make(chan struct{})
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme)
generatedClientSet := fakegeneratedclientset.NewSimpleClientset()
generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopChan)

d := &ResourceDetector{
DynamicClient: fakeClient,
clusterPropagationPolicyLister: &mockClusterPropagationPolicyLister{
policies: tt.policies,
},
DynamicClient: fakeClient,
clusterPropagationPolicyLister: generatedInformerManager.ClusterPropagationPolicyLister(),
}

objectKey := keys.ClusterWideKey{
Name: tt.object.GetName(),
Namespace: tt.object.GetNamespace(),
Kind: tt.object.GetKind(),
}
for _, object := range tt.policies {
_, err := generatedClientSet.PolicyV1alpha1().ClusterPropagationPolicies().Create(context.TODO(), object, metav1.CreateOptions{})
if err != nil {
t.Errorf("Create PropagationPolicy failed: %v", err)
}
}
generatedInformerManager.Start()
generatedInformerManager.WaitForCacheSync(stopChan)

policy, err := d.LookForMatchedClusterPolicy(tt.object, objectKey)

Expand Down
Loading

0 comments on commit a3b488b

Please sign in to comment.