Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(binding): create or update work parallel #5815

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)

// ensureWork ensure Work to be created or updated.
func ensureWork(

Check failure on line 41 in pkg/controllers/binding/common.go

View workflow job for this annotation

GitHub Actions / lint

cyclomatic complexity 18 of func `ensureWork` is high (> 15) (gocyclo)
ctx context.Context, c client.Client, resourceInterpreter resourceinterpreter.ResourceInterpreter, workload *unstructured.Unstructured,
overrideManager overridemanager.OverrideManager, binding metav1.Object, scope apiextensionsv1.ResourceScope,
) error {
Expand Down Expand Up @@ -81,6 +81,7 @@
}
}

var createOrUpdateWorkArgs []*CreateOrUpdateWorkArg
for i := range targetClusters {
targetCluster := targetClusters[i]
clonedWorkload := workload.DeepCopy()
Expand Down Expand Up @@ -135,16 +136,24 @@
Labels: workLabel,
Annotations: annotations,
}

if err = helper.CreateOrUpdateWork(
ctx,
c,
workMeta,
clonedWorkload,
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
); err != nil {
return err
createOrUpdateWorkArgs = append(createOrUpdateWorkArgs, &CreateOrUpdateWorkArg{
WorkMeta: workMeta,
ClonedWorkload: clonedWorkload,
Options: []helper.WorkOption{helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)), helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false))},
})
}
resChan := make(chan error)
for _, item := range createOrUpdateWorkArgs {
go CreateOrUpdateWorkParallel(ctx, resChan, c, item)
}
for i := 0; i < len(targetClusters); i++ {
select {
case res := <-resChan:
if res != nil {
return err
}
case <-ctx.Done():
return nil
}
}
return nil
Expand Down Expand Up @@ -293,3 +302,22 @@
}
return suspendDispatching
}

// CreateOrUpdateWorkArg create or update work args struct.
type CreateOrUpdateWorkArg struct {
WorkMeta metav1.ObjectMeta
ClonedWorkload *unstructured.Unstructured
Options []helper.WorkOption
}

// CreateOrUpdateWorkParallel creates or update work object parallel.
func CreateOrUpdateWorkParallel(ctx context.Context, resChan chan error, c client.Client, createOrUpdateWorkArg *CreateOrUpdateWorkArg) {
err := helper.CreateOrUpdateWork(
ctx,
c,
createOrUpdateWorkArg.WorkMeta,
createOrUpdateWorkArg.ClonedWorkload,
createOrUpdateWorkArg.Options...,
)
resChan <- err
}
Loading