Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): add metrics for asyncworker #5817

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ClusterCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectorsForAgent()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.AsyncWorkerCollectors()...)

if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func Run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ClusterCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.AsyncWorkerCollectors()...)

setupControllers(controllerManager, opts, ctx.Done())

Expand Down
10 changes: 5 additions & 5 deletions pkg/clusterdiscovery/clusterapi/clusterapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
"github.com/karmada-io/karmada/pkg/karmadactl/options"
"github.com/karmada-io/karmada/pkg/karmadactl/unjoin"
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

const (
Expand All @@ -62,7 +62,7 @@ type ClusterDetector struct {
ClusterAPIClient client.Client
InformerManager genericmanager.SingleClusterInformerManager
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
Processor worker.AsyncWorker
ConcurrentReconciles int

stopCh <-chan struct{}
Expand All @@ -74,12 +74,12 @@ func (d *ClusterDetector) Start(ctx context.Context) error {
d.stopCh = ctx.Done()

d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "cluster-api cluster detector",
KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile,
}
d.Processor = util.NewAsyncWorker(workerOptions)
d.Processor = worker.NewAsyncWorker(workerOptions)
d.Processor.Run(d.ConcurrentReconciles, d.stopCh)
d.discoveryCluster()

Expand Down Expand Up @@ -120,7 +120,7 @@ func (d *ClusterDetector) OnDelete(obj interface{}) {

// Reconcile performs a full reconciliation for the object referred to by the key.
// The key will be re-queued if an error is non-nil.
func (d *ClusterDetector) Reconcile(key util.QueueKey) error {
func (d *ClusterDetector) Reconcile(key worker.QueueKey) error {
clusterWideKey, ok := key.(keys.ClusterWideKey)
if !ok {
klog.Errorf("Invalid key")
Expand Down
4 changes: 2 additions & 2 deletions pkg/clusterdiscovery/clusterapi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package clusterapi

import (
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// ClusterWideKeyFunc generates a ClusterWideKey for object.
func ClusterWideKeyFunc(obj interface{}) (util.QueueKey, error) {
func ClusterWideKeyFunc(obj interface{}) (worker.QueueKey, error) {
return keys.ClusterWideKeyFunc(obj)
}
18 changes: 9 additions & 9 deletions pkg/controllers/cluster/taint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// TaintManagerName is the controller name that will be used when reporting events and metrics.
Expand All @@ -51,8 +51,8 @@ type NoExecuteTaintManager struct {
ClusterTaintEvictionRetryFrequency time.Duration
ConcurrentReconciles int

bindingEvictionWorker util.AsyncWorker
clusterBindingEvictionWorker util.AsyncWorker
bindingEvictionWorker worker.AsyncWorker
clusterBindingEvictionWorker worker.AsyncWorker
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -117,27 +117,27 @@ func (tc *NoExecuteTaintManager) syncCluster(ctx context.Context, cluster *clust

// Start starts an asynchronous loop that handle evictions.
func (tc *NoExecuteTaintManager) Start(ctx context.Context) error {
bindingEvictionWorkerOptions := util.Options{
bindingEvictionWorkerOptions := worker.Options{
Name: "binding-eviction",
KeyFunc: nil,
ReconcileFunc: tc.syncBindingEviction,
}
tc.bindingEvictionWorker = util.NewAsyncWorker(bindingEvictionWorkerOptions)
tc.bindingEvictionWorker = worker.NewAsyncWorker(bindingEvictionWorkerOptions)
tc.bindingEvictionWorker.Run(tc.ConcurrentReconciles, ctx.Done())

clusterBindingEvictionWorkerOptions := util.Options{
clusterBindingEvictionWorkerOptions := worker.Options{
Name: "cluster-binding-eviction",
KeyFunc: nil,
ReconcileFunc: tc.syncClusterBindingEviction,
}
tc.clusterBindingEvictionWorker = util.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
tc.clusterBindingEvictionWorker = worker.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
tc.clusterBindingEvictionWorker.Run(tc.ConcurrentReconciles, ctx.Done())

<-ctx.Done()
return nil
}

func (tc *NoExecuteTaintManager) syncBindingEviction(key util.QueueKey) error {
func (tc *NoExecuteTaintManager) syncBindingEviction(key worker.QueueKey) error {
fedKey, ok := key.(keys.FederatedKey)
if !ok {
klog.Errorf("Failed to sync binding eviction as invalid key: %v", key)
Expand Down Expand Up @@ -193,7 +193,7 @@ func (tc *NoExecuteTaintManager) syncBindingEviction(key util.QueueKey) error {
return nil
}

func (tc *NoExecuteTaintManager) syncClusterBindingEviction(key util.QueueKey) error {
func (tc *NoExecuteTaintManager) syncClusterBindingEviction(key worker.QueueKey) error {
fedKey, ok := key.(keys.FederatedKey)
if !ok {
klog.Errorf("Failed to sync cluster binding eviction as invalid key: %v", key)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/cluster/taint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/worker"
)

func newNoExecuteTaintManager() *NoExecuteTaintManager {
Expand All @@ -57,19 +58,19 @@ func newNoExecuteTaintManager() *NoExecuteTaintManager {
WithIndex(&workv1alpha2.ResourceBinding{}, rbClusterKeyIndex, rbIndexerFunc).
WithIndex(&workv1alpha2.ClusterResourceBinding{}, crbClusterKeyIndex, crbIndexerFunc).Build(),
}
bindingEvictionWorkerOptions := util.Options{
bindingEvictionWorkerOptions := worker.Options{
Name: "binding-eviction",
KeyFunc: nil,
ReconcileFunc: mgr.syncBindingEviction,
}
mgr.bindingEvictionWorker = util.NewAsyncWorker(bindingEvictionWorkerOptions)
mgr.bindingEvictionWorker = worker.NewAsyncWorker(bindingEvictionWorkerOptions)

clusterBindingEvictionWorkerOptions := util.Options{
clusterBindingEvictionWorkerOptions := worker.Options{
Name: "cluster-binding-eviction",
KeyFunc: nil,
ReconcileFunc: mgr.syncClusterBindingEviction,
}
mgr.clusterBindingEvictionWorker = util.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
mgr.clusterBindingEvictionWorker = worker.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
return mgr
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"

"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/worker"
)

const (
Expand All @@ -40,16 +40,16 @@ type HpaScaleTargetMarker struct {
DynamicClient dynamic.Interface
RESTMapper meta.RESTMapper

scaleTargetWorker util.AsyncWorker
scaleTargetWorker worker.AsyncWorker
}

// SetupWithManager creates a controller and register to controller manager.
func (r *HpaScaleTargetMarker) SetupWithManager(mgr controllerruntime.Manager) error {
scaleTargetWorkerOptions := util.Options{
scaleTargetWorkerOptions := worker.Options{
Name: "scale target worker",
ReconcileFunc: r.reconcileScaleRef,
}
r.scaleTargetWorker = util.NewAsyncWorker(scaleTargetWorkerOptions)
r.scaleTargetWorker = worker.NewAsyncWorker(scaleTargetWorkerOptions)
r.scaleTargetWorker.Run(scaleTargetWorkerNum, context.Background().Done())

return controllerruntime.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

type labelEventKind int
Expand All @@ -45,7 +46,7 @@ type labelEvent struct {
hpa *autoscalingv2.HorizontalPodAutoscaler
}

func (r *HpaScaleTargetMarker) reconcileScaleRef(key util.QueueKey) (err error) {
func (r *HpaScaleTargetMarker) reconcileScaleRef(key worker.QueueKey) (err error) {
event, ok := key.(labelEvent)
if !ok {
klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// ServiceExportControllerName is the controller name that will be used when reporting events and metrics.
Expand All @@ -75,7 +76,7 @@ type ServiceExportController struct {
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
// worker process resources periodic from rateLimitingQueue.
worker util.AsyncWorker
worker worker.AsyncWorker
}

var (
Expand Down Expand Up @@ -136,12 +137,12 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *ServiceExportController) RunWorkQueue() {
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "service-export",
KeyFunc: nil,
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker = worker.NewAsyncWorker(workerOptions)
c.worker.Run(c.WorkerNumber, c.StopChan)

go c.enqueueReportedEpsServiceExport()
Expand Down Expand Up @@ -191,7 +192,7 @@ func (c *ServiceExportController) enqueueReportedEpsServiceExport() {
}
}

func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key util.QueueKey) error {
func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key worker.QueueKey) error {
ctx := context.Background()
fedKey, ok := key.(keys.FederatedKey)
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// EndpointSliceCollectController collects EndpointSlice from member clusters and reports them to control-plane.
Expand All @@ -64,7 +65,7 @@ type EndpointSliceCollectController struct {
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
worker worker.AsyncWorker // worker process resources periodic from rateLimitingQueue.

ClusterCacheSyncTimeout metav1.Duration
}
Expand Down Expand Up @@ -124,16 +125,16 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *EndpointSliceCollectController) RunWorkQueue() {
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "endpointslice-collect",
KeyFunc: nil,
ReconcileFunc: c.collectEndpointSlice,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker = worker.NewAsyncWorker(workerOptions)
c.worker.Run(c.WorkerNumber, c.StopChan)
}

func (c *EndpointSliceCollectController) collectEndpointSlice(key util.QueueKey) error {
func (c *EndpointSliceCollectController) collectEndpointSlice(key worker.QueueKey) error {
ctx := context.Background()
fedKey, ok := key.(keys.FederatedKey)
if !ok {
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/status/work_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/restmapper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// WorkStatusControllerName is the controller name that will be used when reporting events and metrics.
Expand All @@ -66,7 +67,7 @@ type WorkStatusController struct {
InformerManager genericmanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
worker worker.AsyncWorker // worker process resources periodic from rateLimitingQueue.
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
ConcurrentWorkStatusSyncs int
ObjectWatcher objectwatcher.ObjectWatcher
Expand Down Expand Up @@ -142,17 +143,17 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *WorkStatusController) RunWorkQueue() {
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "work-status",
KeyFunc: generateKey,
ReconcileFunc: c.syncWorkStatus,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker = worker.NewAsyncWorker(workerOptions)
c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan)
}

// generateKey generates a key from obj, the key contains cluster, GVK, namespace and name.
func generateKey(obj interface{}) (util.QueueKey, error) {
func generateKey(obj interface{}) (worker.QueueKey, error) {
resource := obj.(*unstructured.Unstructured)
cluster, err := getClusterNameFromAnnotation(resource)
if err != nil {
Expand Down Expand Up @@ -183,7 +184,7 @@ func getClusterNameFromAnnotation(resource *unstructured.Unstructured) (string,
}

// syncWorkStatus will collect status of object referencing by key and update to work which holds the object.
func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
func (c *WorkStatusController) syncWorkStatus(key worker.QueueKey) error {
ctx := context.Background()
fedKey, ok := key.(keys.FederatedKey)
if !ok {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/status/work_status_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/worker"
testhelper "github.com/karmada-io/karmada/test/helper"
)

Expand Down Expand Up @@ -325,7 +326,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
}

func TestWorkStatusController_getEventHandler(t *testing.T) {
opt := util.Options{
opt := worker.Options{
Name: "opt",
KeyFunc: nil,
ReconcileFunc: nil,
Expand All @@ -339,7 +340,7 @@ func TestWorkStatusController_getEventHandler(t *testing.T) {
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
worker: util.NewAsyncWorker(opt),
worker: worker.NewAsyncWorker(opt),
}

eventHandler := c.getEventHandler()
Expand Down Expand Up @@ -980,12 +981,12 @@ func TestWorkStatusController_registerInformersAndStart(t *testing.T) {
defer close(stopCh)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
c := newWorkStatusController(cluster)
opt := util.Options{
opt := worker.Options{
Name: "opt",
KeyFunc: nil,
ReconcileFunc: nil,
}
c.worker = util.NewAsyncWorker(opt)
c.worker = worker.NewAsyncWorker(opt)

workUID := "92345678-1234-5678-1234-567812345678"
raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`)
Expand Down
Loading