Skip to content

Commit d15d024

Browse files
feat(syncwaves): add DAG ordering for syncwaves
Signed-off-by: SebastienFelix <[email protected]>
1 parent b74cf45 commit d15d024

File tree

12 files changed

+526
-58
lines changed

12 files changed

+526
-58
lines changed

controller/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ func hasSharedResourceCondition(app *v1alpha1.Application) (bool, string) {
571571
// Note, this is not foolproof, since a proper fix would require the CRD record
572572
// status.observedGeneration coupled with a health.lua that verifies
573573
// status.observedGeneration == metadata.generation
574-
func delayBetweenSyncWaves(_ common.SyncPhase, _ int, finalWave bool) error {
574+
func delayBetweenSyncWaves(_ []common.SyncIdentity, finalWave bool) error {
575575
if !finalWave {
576576
delaySec := 2
577577
if delaySecStr := os.Getenv(EnvVarSyncWaveDelay); delaySecStr != "" {

docs/user-guide/sync-waves.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ It repeats this process until all phases and waves are in-sync and healthy.
8080

8181
Because an application can have resources that are unhealthy in the first wave, it may be that the app can never get to healthy.
8282

83+
## How Sync Waves Groups work?
84+
85+
On top of Sync waves, Argo CD offers a way to group resources belonging to a same component (examples : Kafka, UI, Database, MyCusonComponent, ...). These are sync wave groups. They are defined by the argocd.argoproj.io/sync-wave-group annotation. The value is an integer that defines the component of which the resource belongs. Sync Wave groups behave like apps within the main app. Resources within a same Sync Wave group will be synced according to their Sync wave's values.
86+
87+
It is possible to define dependencies between Sync Wave groups. These are sync wave group dependencies. They are defined at resource level by the argocd.argoproj.io/sync-wave-group-dependencies annotation. The value is a list of integers, separated by commas. These integers define the Sync Wave groups that need to be synced before the resource in which this annotation is defined.
88+
89+
Note that in order to avoid circular dependencies, values defined in argocd.argoproj.io/sync-wave-group-dependencies will only be taken into account if they are strictly less than the Sync Wave group value.
90+
8391
## How Do I Configure Phases?
8492

8593
Pre-sync and post-sync can only contain hooks. Apply the hook annotation:
@@ -100,9 +108,11 @@ Specify the wave using the following annotation:
100108
metadata:
101109
annotations:
102110
argocd.argoproj.io/sync-wave: "5"
111+
argocd.argoproj.io/sync-wave-group: "2"
112+
argocd.argoproj.io/sync-wave-group-dependencies: "0,1"
103113
```
104114
105-
Hooks and resources are assigned to wave zero by default. The wave can be negative, so you can create a wave that runs before all other resources.
115+
Hooks and resources are assigned to wave zero and wave goup zero by default. The wave can be negative, so you can create a wave that runs before all other resources.
106116
107117
## Examples
108118

gitops-engine/pkg/sync/common/types.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ const (
1212
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
1313
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
1414
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
15+
// AnnotationSyncWaveGroup indicates which wave of the sync the resource or hook should be in
16+
AnnotationSyncWaveGroup = "argocd.argoproj.io/sync-wave-group"
17+
// AnnotationSyncWaveGroupDependencies indicates which wave of the sync the resource or hook should be in
18+
AnnotationSyncWaveGroupDependencies = "argocd.argoproj.io/sync-wave-group-dependencies"
1519
// AnnotationKeyHook contains the hook type of a resource
1620
AnnotationKeyHook = "argocd.argoproj.io/hook"
1721
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook
@@ -59,10 +63,33 @@ type PermissionValidator func(un *unstructured.Unstructured, res *metav1.APIReso
5963

6064
type SyncPhase string
6165

66+
type SyncIdentity struct {
67+
Phase SyncPhase
68+
Wave int
69+
WaveGroup int
70+
}
71+
72+
// Will be used when using Dependency graph
73+
type GroupIdentity struct {
74+
Phase SyncPhase
75+
WaveGroup int
76+
}
77+
78+
// Will be used when using Dependency graph
79+
type WaveDependency struct {
80+
Origin GroupIdentity
81+
Destination GroupIdentity
82+
}
83+
84+
// Will be used when using Dependency graph
85+
type WaveDependencyGraph struct {
86+
Dependencies []WaveDependency
87+
}
88+
6289
// SyncWaveHook is a callback function which will be invoked after each sync wave is successfully
6390
// applied during a sync operation. The callback indicates which phase and wave it had just
6491
// executed, and whether or not that wave was the final one.
65-
type SyncWaveHook func(phase SyncPhase, wave int, final bool) error
92+
type SyncWaveHook func(t []SyncIdentity, final bool) error
6693

6794
const (
6895
SyncPhasePreSync = "PreSync"

gitops-engine/pkg/sync/doc.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,30 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat
7575
annotations:
7676
argocd.argoproj.io/sync-wave: "5"
7777
78+
# Sync Waves Groups & Dependencies
79+
80+
The wave groups allow to group resources belonging to a same component (example: Kafka, MongoDB, FrontEnd, ...). Hooks and
81+
resources are assigned to wave group "Default" by default. The `argocd.argoproj.io/sync-wave-group` annotation assign resource to a
82+
wave group:
83+
84+
metadata:
85+
annotations:
86+
argocd.argoproj.io/sync-wave: "5"
87+
argocd.argoproj.io/sync-wave-group: "2"
88+
89+
It is also possible to define dependencies between waves groups, if you want a group of resources to be synced before another group.
90+
By default, hooks and resources have no dependencies. The `argocd.argoproj.io/sync-wave-group-dependencies` annotation defines the
91+
wave groups that need to be synced before a resource can:
92+
93+
metadata:
94+
annotations:
95+
argocd.argoproj.io/sync-wave: "5"
96+
argocd.argoproj.io/sync-wave-group: "2"
97+
argocd.argoproj.io/sync-wave-group-dependencies "0,1"
98+
99+
In the previous example, a resource belonging to wave group "MyCustomComponent" is defined. This resource will be synced only when
100+
all resources from wave groups "Kafka" and "FrontEnd" has been synced.
101+
78102
# Sync Options
79103
80104
The sync options allows customizing the synchronization of selected resources. The options are specified using the
@@ -89,6 +113,7 @@ How Does It Work Together?
89113
Syncing process orders the resources in the following precedence:
90114
91115
- The phase
116+
- The group with respect to group dependencies
92117
- The wave they are in (lower values first)
93118
- By kind (e.g. namespaces first)
94119
- By name

gitops-engine/pkg/sync/sync_context.go

Lines changed: 71 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"slices"
78
"sort"
89
"strings"
910
"sync"
@@ -416,6 +417,23 @@ func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool
416417
func (sc *syncContext) Sync() {
417418
sc.log.WithValues("skipHooks", sc.skipHooks, "started", sc.started()).Info("Syncing")
418419
tasks, ok := sc.getSyncTasks()
420+
421+
// dependencyGraph will be used to detect circular dependencies and allow for direct dependencies definition
422+
// in argocd.argoproj.io/sync-wave-group-dependencies
423+
dependencyGraph := common.WaveDependencyGraph{Dependencies: make([]common.WaveDependency, 0)}
424+
for _, task := range tasks {
425+
if task.targetObj != nil {
426+
origin := common.GroupIdentity{Phase: task.phase, WaveGroup: task.wave()}
427+
for dependency := range task.waveGroupDependencies() {
428+
destination := common.GroupIdentity{Phase: task.phase, WaveGroup: dependency}
429+
waveDependency := &common.WaveDependency{Origin: origin, Destination: destination}
430+
if !slices.Contains(dependencyGraph.Dependencies, *waveDependency) {
431+
dependencyGraph.Dependencies = append(dependencyGraph.Dependencies, *waveDependency)
432+
}
433+
}
434+
}
435+
}
436+
419437
if !ok {
420438
sc.setOperationPhase(common.OperationFailed, "one or more synchronization tasks are not valid")
421439
return
@@ -560,26 +578,29 @@ func (sc *syncContext) Sync() {
560578
return
561579
}
562580

563-
// remove any tasks not in this wave
581+
// remove any tasks which have unsynced dependencies
564582
phase := tasks.phase()
565-
wave := tasks.wave()
566-
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
583+
independantSyncIdentities := tasks.independantSyncIdentities()
584+
allSyncIdentities := tasks.syncIdentities()
567585

568586
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
569587
// EVEN if those objects subsequently degraded
570588
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
571-
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
589+
remainingTasks := tasks.Filter(func(t *syncTask) bool {
590+
return !slices.Contains(independantSyncIdentities, t.identity()) || t.isHook()
591+
})
572592

573-
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
574-
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
593+
sc.log.WithValues("phase", phase, "independantSyncIdentities", independantSyncIdentities, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
594+
tasks = tasks.Filter(func(t *syncTask) bool { return slices.Contains(independantSyncIdentities, t.identity()) })
575595

576596
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
577597

578598
sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
579599
runState := sc.runTasks(tasks, false)
580600

581601
if sc.syncWaveHook != nil && runState != failed {
582-
err := sc.syncWaveHook(phase, wave, finalWave)
602+
finalWave := phase == tasks.lastPhase() && len(independantSyncIdentities) == len(allSyncIdentities)
603+
err := sc.syncWaveHook(independantSyncIdentities, finalWave)
583604
if err != nil {
584605
sc.deleteHooks(hooksPendingDeletionFailed)
585606
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
@@ -909,52 +930,61 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
909930
}
910931

911932
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
912-
pruneTasks := make(map[int][]*syncTask)
933+
934+
tasksByWaveGroup := make(map[int][]*syncTask)
913935
for _, task := range tasks {
914-
if task.isPrune() {
915-
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
916-
}
936+
tasksByWaveGroup[task.waveGroup()] = append(tasksByWaveGroup[task.waveGroup()], task)
917937
}
938+
for waveGroup := range tasksByWaveGroup {
939+
pruneTasks := make(map[int][]*syncTask)
940+
for _, task := range tasksByWaveGroup[waveGroup] {
941+
if task.isPrune() {
942+
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
943+
}
944+
}
918945

919-
var uniquePruneWaves []int
920-
for k := range pruneTasks {
921-
uniquePruneWaves = append(uniquePruneWaves, k)
922-
}
923-
sort.Ints(uniquePruneWaves)
946+
var uniquePruneWaves []int
947+
for k := range pruneTasks {
948+
uniquePruneWaves = append(uniquePruneWaves, k)
949+
}
950+
sort.Ints(uniquePruneWaves)
924951

925-
// reorder waves for pruning tasks using symmetric swap on prune waves
926-
n := len(uniquePruneWaves)
927-
for i := 0; i < n/2; i++ {
928-
// waves to swap
929-
startWave := uniquePruneWaves[i]
930-
endWave := uniquePruneWaves[n-1-i]
952+
// reorder waves for pruning tasks using symmetric swap on prune waves
953+
n := len(uniquePruneWaves)
954+
for j := 0; j < n/2; j++ {
955+
// waves to swap
956+
startWave := uniquePruneWaves[j]
957+
endWave := uniquePruneWaves[n-1-j]
931958

932-
for _, task := range pruneTasks[startWave] {
933-
task.waveOverride = &endWave
934-
}
959+
for _, task := range pruneTasks[startWave] {
960+
task.waveOverride = &endWave
961+
}
935962

936-
for _, task := range pruneTasks[endWave] {
937-
task.waveOverride = &startWave
963+
for _, task := range pruneTasks[endWave] {
964+
task.waveOverride = &startWave
965+
}
938966
}
939-
}
940967

941-
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
942-
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
943-
syncPhaseLastWave := 0
944-
for _, task := range tasks {
945-
if task.phase == common.SyncPhaseSync {
946-
if task.wave() > syncPhaseLastWave {
947-
syncPhaseLastWave = task.wave()
968+
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
969+
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
970+
971+
syncPhaseLastWave := 0
972+
for _, task := range tasksByWaveGroup[waveGroup] {
973+
if task.phase == common.SyncPhaseSync {
974+
if task.wave() > syncPhaseLastWave {
975+
syncPhaseLastWave = task.wave()
976+
}
948977
}
949978
}
950-
}
951-
syncPhaseLastWave = syncPhaseLastWave + 1
979+
syncPhaseLastWave = syncPhaseLastWave + 1
952980

953-
for _, task := range tasks {
954-
if task.isPrune() &&
955-
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
956-
task.waveOverride = &syncPhaseLastWave
981+
for _, task := range tasksByWaveGroup[waveGroup] {
982+
if task.isPrune() &&
983+
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
984+
task.waveOverride = &syncPhaseLastWave
985+
}
957986
}
987+
958988
}
959989

960990
tasks.Sort()

0 commit comments

Comments
 (0)