Skip to content

Commit d88ce8e

Browse files
committed
feat(binding): create or update work parallel
Signed-off-by: chang.qiangqiang <[email protected]>
1 parent 2f80476 commit d88ce8e

File tree

1 file changed

+38
-10
lines changed

1 file changed

+38
-10
lines changed

pkg/controllers/binding/common.go

+38-10
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func ensureWork(
8181
}
8282
}
8383

84+
var createOrUpdateWorkArgs []*CreateOrUpdateWorkArg
8485
for i := range targetClusters {
8586
targetCluster := targetClusters[i]
8687
clonedWorkload := workload.DeepCopy()
@@ -135,16 +136,24 @@ func ensureWork(
135136
Labels: workLabel,
136137
Annotations: annotations,
137138
}
138-
139-
if err = helper.CreateOrUpdateWork(
140-
ctx,
141-
c,
142-
workMeta,
143-
clonedWorkload,
144-
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
145-
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
146-
); err != nil {
147-
return err
139+
createOrUpdateWorkArgs = append(createOrUpdateWorkArgs, &CreateOrUpdateWorkArg{
140+
WorkMeta: workMeta,
141+
ClonedWorkload: clonedWorkload,
142+
Options: []helper.WorkOption{helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)), helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false))},
143+
})
144+
}
145+
resChan := make(chan error)
146+
for _, item := range createOrUpdateWorkArgs {
147+
go CreateOrUpdateWorkParallel(ctx, resChan, c, item)
148+
}
149+
for i := 0; i < len(targetClusters); i++ {
150+
select {
151+
case res := <-resChan:
152+
if res != nil {
153+
return err
154+
}
155+
case <-ctx.Done():
156+
return nil
148157
}
149158
}
150159
return nil
@@ -293,3 +302,22 @@ func shouldSuspendDispatching(suspension *policyv1alpha1.Suspension, targetClust
293302
}
294303
return suspendDispatching
295304
}
305+
306+
// CreateOrUpdateWorkArg create or update work args struct.
307+
type CreateOrUpdateWorkArg struct {
308+
WorkMeta metav1.ObjectMeta
309+
ClonedWorkload *unstructured.Unstructured
310+
Options []helper.WorkOption
311+
}
312+
313+
// CreateOrUpdateWorkParallel creates or update work object parallel.
314+
func CreateOrUpdateWorkParallel(ctx context.Context, resChan chan error, c client.Client, createOrUpdateWorkArg *CreateOrUpdateWorkArg) {
315+
err := helper.CreateOrUpdateWork(
316+
ctx,
317+
c,
318+
createOrUpdateWorkArg.WorkMeta,
319+
createOrUpdateWorkArg.ClonedWorkload,
320+
createOrUpdateWorkArg.Options...,
321+
)
322+
resChan <- err
323+
}

0 commit comments

Comments
 (0)