Skip to content

Commit

Permalink
feat(detector): resource detector matched policy potimization
Browse files Browse the repository at this point in the history
Signed-off-by: chang.qiangqiang <[email protected]>
  • Loading branch information
CharlesQQ committed Nov 15, 2024
1 parent 6f138cf commit 8e1d81b
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 136 deletions.
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/karmada-io/karmada/pkg/dependenciesdistributor"
"github.com/karmada-io/karmada/pkg/detector"
"github.com/karmada-io/karmada/pkg/features"
generatedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
Expand Down Expand Up @@ -725,6 +726,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)
generatedClientSet := generatedclientset.NewForConfigOrDie(restConfig)

overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName))
skippedResourceConfig := util.NewSkippedResourceConfig()
Expand All @@ -748,10 +750,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)

generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, opts.ResyncPeriod.Duration, stopChan)

resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(),
InformerManager: controlPlaneInformerManager,
GeneratedInformerManager: generatedInformerManager,
RESTMapper: mgr.GetRESTMapper(),
DynamicClient: dynamicClientSet,
SkippedResourceConfig: skippedResourceConfig,
Expand Down
8 changes: 8 additions & 0 deletions pkg/detector/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func getHighestPriorityPropagationPolicy(policies []*policyv1alpha1.PropagationP
var matchedPolicy *policyv1alpha1.PropagationPolicy

for _, policy := range policies {
if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policy.Namespace, policy.Name)
continue
}
implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...)
if implicitPriority <= util.PriorityMisMatch {
continue
Expand Down Expand Up @@ -70,6 +74,10 @@ func getHighestPriorityClusterPropagationPolicy(policies []*policyv1alpha1.Clust
var matchedClusterPolicy *policyv1alpha1.ClusterPropagationPolicy

for _, policy := range policies {
if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Cluster propagation policy(%s) cannot match any resource template because it's being deleted.", policy.Name)
continue
}
implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...)
if implicitPriority <= util.PriorityMisMatch {
continue
Expand Down
78 changes: 15 additions & 63 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/features"
generatedpolicylister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
Expand All @@ -71,6 +72,7 @@ type ResourceDetector struct {
// DynamicClient used to fetch arbitrary resources.
DynamicClient dynamic.Interface
InformerManager genericmanager.SingleClusterInformerManager
GeneratedInformerManager genericmanager.GeneratedInformerManager
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
SkippedResourceConfig *util.SkippedResourceConfig
Expand All @@ -81,12 +83,12 @@ type ResourceDetector struct {
// policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
// a reconcile function to consume the items in queue.
policyReconcileWorker util.AsyncWorker
propagationPolicyLister cache.GenericLister
propagationPolicyLister generatedpolicylister.PropagationPolicyLister

// clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and
// a reconcile function to consume the items in queue.
clusterPolicyReconcileWorker util.AsyncWorker
clusterPropagationPolicyLister cache.GenericLister
clusterPropagationPolicyLister generatedpolicylister.ClusterPropagationPolicyLister

RESTMapper meta.RESTMapper

Expand Down Expand Up @@ -132,24 +134,14 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.clusterPolicyReconcileWorker.Run(d.ConcurrentClusterPropagationPolicySyncs, d.stopCh)

// watch and enqueue PropagationPolicy changes.
propagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: policyv1alpha1.ResourcePluralPropagationPolicy,
}
policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, nil)
d.InformerManager.ForResource(propagationPolicyGVR, policyHandler)
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)
d.GeneratedInformerManager.ForPropagationPolicy(policyHandler)
d.propagationPolicyLister = d.GeneratedInformerManager.PropagationPolicyLister()

// watch and enqueue ClusterPropagationPolicy changes.
clusterPropagationPolicyGVR := schema.GroupVersionResource{
Group: policyv1alpha1.GroupVersion.Group,
Version: policyv1alpha1.GroupVersion.Version,
Resource: policyv1alpha1.ResourcePluralClusterPropagationPolicy,
}
clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, nil)
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)
d.GeneratedInformerManager.ForClusterPropagationPolicy(clusterPolicyHandler)
d.clusterPropagationPolicyLister = d.GeneratedInformerManager.ClusterPropagationPolicyLister()

detectorWorkerOptions := util.Options{
Name: "resource detector",
Expand All @@ -161,6 +153,8 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.EventHandler = fedinformer.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker(detectorWorkerOptions)
d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh)
d.GeneratedInformerManager.Start()
d.GeneratedInformerManager.WaitForCacheSync(d.stopCh)
go d.discoverResources(30 * time.Second)

<-d.stopCh
Expand Down Expand Up @@ -359,7 +353,7 @@ func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructure
}

klog.V(2).Infof("Attempts to match policy for resource(%s)", objectKey)
policyObjects, err := d.propagationPolicyLister.ByNamespace(objectKey.Namespace).List(labels.Everything())
policyObjects, err := d.propagationPolicyLister.PropagationPolicies(objectKey.Namespace).List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list propagation policy: %v", err)
return nil, err
Expand All @@ -369,22 +363,7 @@ func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructure
return nil, nil
}

policyList := make([]*policyv1alpha1.PropagationPolicy, 0)
for index := range policyObjects {
policy := &policyv1alpha1.PropagationPolicy{}
if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil {
klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err)
return nil, err
}

if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policy.Namespace, policy.Name)
continue
}
policyList = append(policyList, policy)
}

return getHighestPriorityPropagationPolicy(policyList, object, objectKey), nil
return getHighestPriorityPropagationPolicy(policyObjects, object, objectKey), nil
}

// LookForMatchedClusterPolicy tries to find a ClusterPropagationPolicy for object referenced by object key.
Expand All @@ -400,22 +379,7 @@ func (d *ResourceDetector) LookForMatchedClusterPolicy(object *unstructured.Unst
return nil, nil
}

policyList := make([]*policyv1alpha1.ClusterPropagationPolicy, 0)
for index := range policyObjects {
policy := &policyv1alpha1.ClusterPropagationPolicy{}
if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil {
klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err)
return nil, err
}

if !policy.DeletionTimestamp.IsZero() {
klog.V(4).Infof("Cluster propagation policy(%s) cannot match any resource template because it's being deleted.", policy.Name)
continue
}
policyList = append(policyList, policy)
}

return getHighestPriorityClusterPropagationPolicy(policyList, object, objectKey), nil
return getHighestPriorityClusterPropagationPolicy(policyObjects, object, objectKey), nil
}

// ApplyPolicy starts propagate the object referenced by object key according to PropagationPolicy.
Expand Down Expand Up @@ -915,7 +879,7 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
return fmt.Errorf("invalid key")
}

unstructuredObj, err := d.propagationPolicyLister.Get(ckey.NamespaceKey())
propagationObject, err := d.propagationPolicyLister.PropagationPolicies(ckey.Namespace).Get(ckey.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -924,12 +888,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
return err
}

propagationObject := &policyv1alpha1.PropagationPolicy{}
if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil {
klog.Errorf("Failed to convert PropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
return err
}

if !propagationObject.DeletionTimestamp.IsZero() {
klog.Infof("PropagationPolicy(%s) is being deleted.", ckey.NamespaceKey())
if err = d.HandlePropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.PropagationPolicyPermanentIDLabel]); err != nil {
Expand Down Expand Up @@ -1016,7 +974,7 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey)
return fmt.Errorf("invalid key")
}

unstructuredObj, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey())
propagationObject, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey())
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -1026,12 +984,6 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey)
return err
}

propagationObject := &policyv1alpha1.ClusterPropagationPolicy{}
if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil {
klog.Errorf("Failed to convert ClusterPropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
return err
}

if !propagationObject.DeletionTimestamp.IsZero() {
klog.Infof("ClusterPropagationPolicy(%s) is being deleted.", ckey.NamespaceKey())
if err = d.HandleClusterPropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel]); err != nil {
Expand Down
36 changes: 28 additions & 8 deletions pkg/detector/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package detector
import (
"context"
"fmt"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"regexp"
"strings"
"testing"
Expand All @@ -41,6 +42,7 @@ import (
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
fakegeneratedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
)
Expand Down Expand Up @@ -677,20 +679,29 @@ func TestLookForMatchedPolicy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheme := setupTestScheme()
stopChan := make(chan struct{})
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme)
generatedClientSet := fakegeneratedclientset.NewSimpleClientset()
generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopChan)

d := &ResourceDetector{
DynamicClient: fakeClient,
propagationPolicyLister: &mockPropagationPolicyLister{
policies: tt.policies,
},
DynamicClient: fakeClient,
propagationPolicyLister: generatedInformerManager.PropagationPolicyLister(),
}

objectKey := keys.ClusterWideKey{
Name: tt.object.GetName(),
Namespace: tt.object.GetNamespace(),
Kind: tt.object.GetKind(),
}
for _, object := range tt.policies {
_, err := generatedClientSet.PolicyV1alpha1().PropagationPolicies(object.Namespace).Create(context.TODO(), object, metav1.CreateOptions{})
if err != nil {
t.Errorf("Create PropagationPolicy failed: %v", err)
}
}
generatedInformerManager.Start()
generatedInformerManager.WaitForCacheSync(stopChan)

policy, err := d.LookForMatchedPolicy(tt.object, objectKey)

Expand Down Expand Up @@ -766,20 +777,29 @@ func TestLookForMatchedClusterPolicy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheme := setupTestScheme()
stopChan := make(chan struct{})
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme)
generatedClientSet := fakegeneratedclientset.NewSimpleClientset()
generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopChan)

d := &ResourceDetector{
DynamicClient: fakeClient,
clusterPropagationPolicyLister: &mockClusterPropagationPolicyLister{
policies: tt.policies,
},
DynamicClient: fakeClient,
clusterPropagationPolicyLister: generatedInformerManager.ClusterPropagationPolicyLister(),
}

objectKey := keys.ClusterWideKey{
Name: tt.object.GetName(),
Namespace: tt.object.GetNamespace(),
Kind: tt.object.GetKind(),
}
for _, object := range tt.policies {
_, err := generatedClientSet.PolicyV1alpha1().ClusterPropagationPolicies().Create(context.TODO(), object, metav1.CreateOptions{})
if err != nil {
t.Errorf("Create PropagationPolicy failed: %v", err)
}
}
generatedInformerManager.Start()
generatedInformerManager.WaitForCacheSync(stopChan)

policy, err := d.LookForMatchedClusterPolicy(tt.object, objectKey)

Expand Down
26 changes: 7 additions & 19 deletions pkg/detector/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured,

func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
resourceChangeByKarmada bool, policyNamespace, policyName, claimedID string) error {
policyObject, err := d.propagationPolicyLister.ByNamespace(policyNamespace).Get(policyName)
policyObject, err := d.propagationPolicyLister.PropagationPolicies(policyNamespace).Get(policyName)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("PropagationPolicy(%s/%s) has been removed.", policyNamespace, policyName)
Expand All @@ -117,30 +117,24 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured,
return err
}

matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{}
if err = helper.ConvertToTypedObject(policyObject, matchedPropagationPolicy); err != nil {
klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err)
return err
}

// Some resources are available in more than one group in the same kubernetes version.
// Therefore, the following scenarios occurs:
// In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups.
// When user creates an Ingress(networking.k8s.io/v1) and specifies a PropagationPolicy to propagate it
// to the member clusters, the detector will listen two resource creation events:
// Ingress(networking.k8s.io/v1) and Ingress(extensions/v1beta1). In order to prevent
// Ingress(extensions/v1beta1) from being propagated, we need to ignore it.
if !util.ResourceMatchSelectors(object, matchedPropagationPolicy.Spec.ResourceSelectors...) {
if !util.ResourceMatchSelectors(object, policyObject.Spec.ResourceSelectors...) {
return nil
}

// return err when dependents not present, that we can retry at next reconcile.
if present, err := helper.IsDependentOverridesPresent(d.Client, matchedPropagationPolicy); err != nil || !present {
if present, err := helper.IsDependentOverridesPresent(d.Client, policyObject); err != nil || !present {
klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", policyNamespace, policyName)
return fmt.Errorf("waiting for dependent overrides")
}

return d.ApplyPolicy(object, objectKey, resourceChangeByKarmada, matchedPropagationPolicy)
return d.ApplyPolicy(object, objectKey, resourceChangeByKarmada, policyObject)
}

func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
Expand All @@ -156,30 +150,24 @@ func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstruc
return err
}

matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
if err = helper.ConvertToTypedObject(policyObject, matchedClusterPropagationPolicy); err != nil {
klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err)
return err
}

// Some resources are available in more than one group in the same kubernetes version.
// Therefore, the following scenarios occurs:
// In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups.
// When user creates an Ingress(networking.k8s.io/v1) and specifies a ClusterPropagationPolicy to
// propagate it to the member clusters, the detector will listen two resource creation events:
// Ingress(networking.k8s.io/v1) and Ingress(extensions/v1beta1). In order to prevent
// Ingress(extensions/v1beta1) from being propagated, we need to ignore it.
if !util.ResourceMatchSelectors(object, matchedClusterPropagationPolicy.Spec.ResourceSelectors...) {
if !util.ResourceMatchSelectors(object, policyObject.Spec.ResourceSelectors...) {
return nil
}

// return err when dependents not present, that we can retry at next reconcile.
if present, err := helper.IsDependentClusterOverridesPresent(d.Client, matchedClusterPropagationPolicy); err != nil || !present {
if present, err := helper.IsDependentClusterOverridesPresent(d.Client, policyObject); err != nil || !present {
klog.Infof("Waiting for dependent overrides present for policy(%s)", policyName)
return fmt.Errorf("waiting for dependent overrides")
}

return d.ApplyClusterPolicy(object, objectKey, resourceChangeByKarmada, matchedClusterPropagationPolicy)
return d.ApplyClusterPolicy(object, objectKey, resourceChangeByKarmada, policyObject)
}

func (d *ResourceDetector) cleanPPUnmatchedRBs(policyID, policyNamespace, policyName string, selectors []policyv1alpha1.ResourceSelector) error {
Expand Down
Loading

0 comments on commit 8e1d81b

Please sign in to comment.