Skip to content

Commit 4892551

Browse files
committed
feat(worker): add metrics for asyncworker
Signed-off-by: chang.qiangqiang <[email protected]>
1 parent 35b7e8f commit 4892551

24 files changed

+224
-87
lines changed

cmd/agent/app/agent.go

+1
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ func run(ctx context.Context, opts *options.Options) error {
234234
crtlmetrics.Registry.MustRegister(metrics.ClusterCollectors()...)
235235
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectorsForAgent()...)
236236
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
237+
crtlmetrics.Registry.MustRegister(metrics.AsyncWorkerCollectors()...)
237238

238239
if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil {
239240
return err

cmd/controller-manager/app/controllermanager.go

+1
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ func Run(ctx context.Context, opts *options.Options) error {
189189
crtlmetrics.Registry.MustRegister(metrics.ClusterCollectors()...)
190190
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectors()...)
191191
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
192+
crtlmetrics.Registry.MustRegister(metrics.AsyncWorkerCollectors()...)
192193

193194
setupControllers(controllerManager, opts, ctx.Done())
194195

pkg/clusterdiscovery/clusterapi/clusterapi.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ import (
3838
"github.com/karmada-io/karmada/pkg/karmadactl/options"
3939
"github.com/karmada-io/karmada/pkg/karmadactl/unjoin"
4040
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
41-
"github.com/karmada-io/karmada/pkg/util"
4241
"github.com/karmada-io/karmada/pkg/util/fedinformer"
4342
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
4443
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
4544
"github.com/karmada-io/karmada/pkg/util/helper"
45+
"github.com/karmada-io/karmada/pkg/util/worker"
4646
)
4747

4848
const (
@@ -62,7 +62,7 @@ type ClusterDetector struct {
6262
ClusterAPIClient client.Client
6363
InformerManager genericmanager.SingleClusterInformerManager
6464
EventHandler cache.ResourceEventHandler
65-
Processor util.AsyncWorker
65+
Processor worker.AsyncWorker
6666
ConcurrentReconciles int
6767

6868
stopCh <-chan struct{}
@@ -74,12 +74,12 @@ func (d *ClusterDetector) Start(ctx context.Context) error {
7474
d.stopCh = ctx.Done()
7575

7676
d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
77-
workerOptions := util.Options{
77+
workerOptions := worker.Options{
7878
Name: "cluster-api cluster detector",
7979
KeyFunc: ClusterWideKeyFunc,
8080
ReconcileFunc: d.Reconcile,
8181
}
82-
d.Processor = util.NewAsyncWorker(workerOptions)
82+
d.Processor = worker.NewAsyncWorker(workerOptions)
8383
d.Processor.Run(d.ConcurrentReconciles, d.stopCh)
8484
d.discoveryCluster()
8585

@@ -120,7 +120,7 @@ func (d *ClusterDetector) OnDelete(obj interface{}) {
120120

121121
// Reconcile performs a full reconciliation for the object referred to by the key.
122122
// The key will be re-queued if an error is non-nil.
123-
func (d *ClusterDetector) Reconcile(key util.QueueKey) error {
123+
func (d *ClusterDetector) Reconcile(key worker.QueueKey) error {
124124
clusterWideKey, ok := key.(keys.ClusterWideKey)
125125
if !ok {
126126
klog.Errorf("Invalid key")

pkg/clusterdiscovery/clusterapi/handler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ limitations under the License.
1717
package clusterapi
1818

1919
import (
20-
"github.com/karmada-io/karmada/pkg/util"
2120
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
21+
"github.com/karmada-io/karmada/pkg/util/worker"
2222
)
2323

2424
// ClusterWideKeyFunc generates a ClusterWideKey for object.
25-
func ClusterWideKeyFunc(obj interface{}) (util.QueueKey, error) {
25+
func ClusterWideKeyFunc(obj interface{}) (worker.QueueKey, error) {
2626
return keys.ClusterWideKeyFunc(obj)
2727
}

pkg/controllers/cluster/taint_manager.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ import (
3434
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
3535
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
3636
"github.com/karmada-io/karmada/pkg/features"
37-
"github.com/karmada-io/karmada/pkg/util"
3837
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
3938
"github.com/karmada-io/karmada/pkg/util/helper"
39+
"github.com/karmada-io/karmada/pkg/util/worker"
4040
)
4141

4242
// TaintManagerName is the controller name that will be used when reporting events and metrics.
@@ -51,8 +51,8 @@ type NoExecuteTaintManager struct {
5151
ClusterTaintEvictionRetryFrequency time.Duration
5252
ConcurrentReconciles int
5353

54-
bindingEvictionWorker util.AsyncWorker
55-
clusterBindingEvictionWorker util.AsyncWorker
54+
bindingEvictionWorker worker.AsyncWorker
55+
clusterBindingEvictionWorker worker.AsyncWorker
5656
}
5757

5858
// Reconcile performs a full reconciliation for the object referred to by the Request.
@@ -117,27 +117,27 @@ func (tc *NoExecuteTaintManager) syncCluster(ctx context.Context, cluster *clust
117117

118118
// Start starts an asynchronous loop that handle evictions.
119119
func (tc *NoExecuteTaintManager) Start(ctx context.Context) error {
120-
bindingEvictionWorkerOptions := util.Options{
120+
bindingEvictionWorkerOptions := worker.Options{
121121
Name: "binding-eviction",
122122
KeyFunc: nil,
123123
ReconcileFunc: tc.syncBindingEviction,
124124
}
125-
tc.bindingEvictionWorker = util.NewAsyncWorker(bindingEvictionWorkerOptions)
125+
tc.bindingEvictionWorker = worker.NewAsyncWorker(bindingEvictionWorkerOptions)
126126
tc.bindingEvictionWorker.Run(tc.ConcurrentReconciles, ctx.Done())
127127

128-
clusterBindingEvictionWorkerOptions := util.Options{
128+
clusterBindingEvictionWorkerOptions := worker.Options{
129129
Name: "cluster-binding-eviction",
130130
KeyFunc: nil,
131131
ReconcileFunc: tc.syncClusterBindingEviction,
132132
}
133-
tc.clusterBindingEvictionWorker = util.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
133+
tc.clusterBindingEvictionWorker = worker.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
134134
tc.clusterBindingEvictionWorker.Run(tc.ConcurrentReconciles, ctx.Done())
135135

136136
<-ctx.Done()
137137
return nil
138138
}
139139

140-
func (tc *NoExecuteTaintManager) syncBindingEviction(key util.QueueKey) error {
140+
func (tc *NoExecuteTaintManager) syncBindingEviction(key worker.QueueKey) error {
141141
fedKey, ok := key.(keys.FederatedKey)
142142
if !ok {
143143
klog.Errorf("Failed to sync binding eviction as invalid key: %v", key)
@@ -193,7 +193,7 @@ func (tc *NoExecuteTaintManager) syncBindingEviction(key util.QueueKey) error {
193193
return nil
194194
}
195195

196-
func (tc *NoExecuteTaintManager) syncClusterBindingEviction(key util.QueueKey) error {
196+
func (tc *NoExecuteTaintManager) syncClusterBindingEviction(key worker.QueueKey) error {
197197
fedKey, ok := key.(keys.FederatedKey)
198198
if !ok {
199199
klog.Errorf("Failed to sync cluster binding eviction as invalid key: %v", key)

pkg/controllers/cluster/taint_manager_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/karmada-io/karmada/pkg/util"
3434
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
3535
"github.com/karmada-io/karmada/pkg/util/gclient"
36+
"github.com/karmada-io/karmada/pkg/util/worker"
3637
)
3738

3839
func newNoExecuteTaintManager() *NoExecuteTaintManager {
@@ -57,19 +58,19 @@ func newNoExecuteTaintManager() *NoExecuteTaintManager {
5758
WithIndex(&workv1alpha2.ResourceBinding{}, rbClusterKeyIndex, rbIndexerFunc).
5859
WithIndex(&workv1alpha2.ClusterResourceBinding{}, crbClusterKeyIndex, crbIndexerFunc).Build(),
5960
}
60-
bindingEvictionWorkerOptions := util.Options{
61+
bindingEvictionWorkerOptions := worker.Options{
6162
Name: "binding-eviction",
6263
KeyFunc: nil,
6364
ReconcileFunc: mgr.syncBindingEviction,
6465
}
65-
mgr.bindingEvictionWorker = util.NewAsyncWorker(bindingEvictionWorkerOptions)
66+
mgr.bindingEvictionWorker = worker.NewAsyncWorker(bindingEvictionWorkerOptions)
6667

67-
clusterBindingEvictionWorkerOptions := util.Options{
68+
clusterBindingEvictionWorkerOptions := worker.Options{
6869
Name: "cluster-binding-eviction",
6970
KeyFunc: nil,
7071
ReconcileFunc: mgr.syncClusterBindingEviction,
7172
}
72-
mgr.clusterBindingEvictionWorker = util.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
73+
mgr.clusterBindingEvictionWorker = worker.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
7374
return mgr
7475
}
7576

pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
controllerruntime "sigs.k8s.io/controller-runtime"
2626
"sigs.k8s.io/controller-runtime/pkg/builder"
2727

28-
"github.com/karmada-io/karmada/pkg/util"
28+
"github.com/karmada-io/karmada/pkg/util/worker"
2929
)
3030

3131
const (
@@ -40,16 +40,16 @@ type HpaScaleTargetMarker struct {
4040
DynamicClient dynamic.Interface
4141
RESTMapper meta.RESTMapper
4242

43-
scaleTargetWorker util.AsyncWorker
43+
scaleTargetWorker worker.AsyncWorker
4444
}
4545

4646
// SetupWithManager creates a controller and register to controller manager.
4747
func (r *HpaScaleTargetMarker) SetupWithManager(mgr controllerruntime.Manager) error {
48-
scaleTargetWorkerOptions := util.Options{
48+
scaleTargetWorkerOptions := worker.Options{
4949
Name: "scale target worker",
5050
ReconcileFunc: r.reconcileScaleRef,
5151
}
52-
r.scaleTargetWorker = util.NewAsyncWorker(scaleTargetWorkerOptions)
52+
r.scaleTargetWorker = worker.NewAsyncWorker(scaleTargetWorkerOptions)
5353
r.scaleTargetWorker.Run(scaleTargetWorkerNum, context.Background().Done())
5454

5555
return controllerruntime.NewControllerManagedBy(mgr).

pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_worker.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/karmada-io/karmada/pkg/util"
3131
"github.com/karmada-io/karmada/pkg/util/helper"
32+
"github.com/karmada-io/karmada/pkg/util/worker"
3233
)
3334

3435
type labelEventKind int
@@ -45,7 +46,7 @@ type labelEvent struct {
4546
hpa *autoscalingv2.HorizontalPodAutoscaler
4647
}
4748

48-
func (r *HpaScaleTargetMarker) reconcileScaleRef(key util.QueueKey) (err error) {
49+
func (r *HpaScaleTargetMarker) reconcileScaleRef(key worker.QueueKey) (err error) {
4950
event, ok := key.(labelEvent)
5051
if !ok {
5152
klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key)

pkg/controllers/mcs/service_export_controller.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
5454
"github.com/karmada-io/karmada/pkg/util/helper"
5555
"github.com/karmada-io/karmada/pkg/util/names"
56+
"github.com/karmada-io/karmada/pkg/util/worker"
5657
)
5758

5859
// ServiceExportControllerName is the controller name that will be used when reporting events and metrics.
@@ -75,7 +76,7 @@ type ServiceExportController struct {
7576
// "member1": instance of ResourceEventHandler
7677
eventHandlers sync.Map
7778
// worker process resources periodic from rateLimitingQueue.
78-
worker util.AsyncWorker
79+
worker worker.AsyncWorker
7980
}
8081

8182
var (
@@ -136,12 +137,12 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager
136137

137138
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
138139
func (c *ServiceExportController) RunWorkQueue() {
139-
workerOptions := util.Options{
140+
workerOptions := worker.Options{
140141
Name: "service-export",
141142
KeyFunc: nil,
142143
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
143144
}
144-
c.worker = util.NewAsyncWorker(workerOptions)
145+
c.worker = worker.NewAsyncWorker(workerOptions)
145146
c.worker.Run(c.WorkerNumber, c.StopChan)
146147

147148
go c.enqueueReportedEpsServiceExport()
@@ -191,7 +192,7 @@ func (c *ServiceExportController) enqueueReportedEpsServiceExport() {
191192
}
192193
}
193194

194-
func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key util.QueueKey) error {
195+
func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key worker.QueueKey) error {
195196
ctx := context.Background()
196197
fedKey, ok := key.(keys.FederatedKey)
197198
if !ok {

pkg/controllers/multiclusterservice/endpointslice_collect_controller.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
5050
"github.com/karmada-io/karmada/pkg/util/helper"
5151
"github.com/karmada-io/karmada/pkg/util/names"
52+
"github.com/karmada-io/karmada/pkg/util/worker"
5253
)
5354

5455
// EndpointSliceCollectController collects EndpointSlice from member clusters and reports them to control-plane.
@@ -64,7 +65,7 @@ type EndpointSliceCollectController struct {
6465
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
6566
// "member1": instance of ResourceEventHandler
6667
eventHandlers sync.Map
67-
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
68+
worker worker.AsyncWorker // worker process resources periodic from rateLimitingQueue.
6869

6970
ClusterCacheSyncTimeout metav1.Duration
7071
}
@@ -124,16 +125,16 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.
124125

125126
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
126127
func (c *EndpointSliceCollectController) RunWorkQueue() {
127-
workerOptions := util.Options{
128+
workerOptions := worker.Options{
128129
Name: "endpointslice-collect",
129130
KeyFunc: nil,
130131
ReconcileFunc: c.collectEndpointSlice,
131132
}
132-
c.worker = util.NewAsyncWorker(workerOptions)
133+
c.worker = worker.NewAsyncWorker(workerOptions)
133134
c.worker.Run(c.WorkerNumber, c.StopChan)
134135
}
135136

136-
func (c *EndpointSliceCollectController) collectEndpointSlice(key util.QueueKey) error {
137+
func (c *EndpointSliceCollectController) collectEndpointSlice(key worker.QueueKey) error {
137138
ctx := context.Background()
138139
fedKey, ok := key.(keys.FederatedKey)
139140
if !ok {

pkg/controllers/status/work_status_controller.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"github.com/karmada-io/karmada/pkg/util/names"
5454
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
5555
"github.com/karmada-io/karmada/pkg/util/restmapper"
56+
"github.com/karmada-io/karmada/pkg/util/worker"
5657
)
5758

5859
// WorkStatusControllerName is the controller name that will be used when reporting events and metrics.
@@ -66,7 +67,7 @@ type WorkStatusController struct {
6667
InformerManager genericmanager.MultiClusterInformerManager
6768
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
6869
StopChan <-chan struct{}
69-
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
70+
worker worker.AsyncWorker // worker process resources periodic from rateLimitingQueue.
7071
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
7172
ConcurrentWorkStatusSyncs int
7273
ObjectWatcher objectwatcher.ObjectWatcher
@@ -142,17 +143,17 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
142143

143144
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
144145
func (c *WorkStatusController) RunWorkQueue() {
145-
workerOptions := util.Options{
146+
workerOptions := worker.Options{
146147
Name: "work-status",
147148
KeyFunc: generateKey,
148149
ReconcileFunc: c.syncWorkStatus,
149150
}
150-
c.worker = util.NewAsyncWorker(workerOptions)
151+
c.worker = worker.NewAsyncWorker(workerOptions)
151152
c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan)
152153
}
153154

154155
// generateKey generates a key from obj, the key contains cluster, GVK, namespace and name.
155-
func generateKey(obj interface{}) (util.QueueKey, error) {
156+
func generateKey(obj interface{}) (worker.QueueKey, error) {
156157
resource := obj.(*unstructured.Unstructured)
157158
cluster, err := getClusterNameFromAnnotation(resource)
158159
if err != nil {
@@ -183,7 +184,7 @@ func getClusterNameFromAnnotation(resource *unstructured.Unstructured) (string,
183184
}
184185

185186
// syncWorkStatus will collect status of object referencing by key and update to work which holds the object.
186-
func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
187+
func (c *WorkStatusController) syncWorkStatus(key worker.QueueKey) error {
187188
ctx := context.Background()
188189
fedKey, ok := key.(keys.FederatedKey)
189190
if !ok {

pkg/controllers/status/work_status_controller_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"github.com/karmada-io/karmada/pkg/util/gclient"
5454
"github.com/karmada-io/karmada/pkg/util/helper"
5555
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
56+
"github.com/karmada-io/karmada/pkg/util/worker"
5657
testhelper "github.com/karmada-io/karmada/test/helper"
5758
)
5859

@@ -325,7 +326,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
325326
}
326327

327328
func TestWorkStatusController_getEventHandler(t *testing.T) {
328-
opt := util.Options{
329+
opt := worker.Options{
329330
Name: "opt",
330331
KeyFunc: nil,
331332
ReconcileFunc: nil,
@@ -339,7 +340,7 @@ func TestWorkStatusController_getEventHandler(t *testing.T) {
339340
ClusterCacheSyncTimeout: metav1.Duration{},
340341
RateLimiterOptions: ratelimiterflag.Options{},
341342
eventHandler: nil,
342-
worker: util.NewAsyncWorker(opt),
343+
worker: worker.NewAsyncWorker(opt),
343344
}
344345

345346
eventHandler := c.getEventHandler()
@@ -980,12 +981,12 @@ func TestWorkStatusController_registerInformersAndStart(t *testing.T) {
980981
defer close(stopCh)
981982
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
982983
c := newWorkStatusController(cluster)
983-
opt := util.Options{
984+
opt := worker.Options{
984985
Name: "opt",
985986
KeyFunc: nil,
986987
ReconcileFunc: nil,
987988
}
988-
c.worker = util.NewAsyncWorker(opt)
989+
c.worker = worker.NewAsyncWorker(opt)
989990

990991
workUID := "92345678-1234-5678-1234-567812345678"
991992
raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`)

0 commit comments

Comments
 (0)