Skip to content

Commit 530f847

Browse files
Add topologySpreadConstraints configuration to pod spec.
1 parent 37d6993 commit 530f847

File tree

9 files changed

+176
-12
lines changed

9 files changed

+176
-12
lines changed

e2e/tests/test_e2e.py

+79-10
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ def compare_config():
556556

557557
pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
558558
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]
559-
559+
560560
k8s.api.custom_objects_api.patch_namespaced_custom_object(
561561
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)
562562

@@ -573,7 +573,7 @@ def compare_config():
573573

574574
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar",
575575
"The replication slot cannot be updated", 10, 5)
576-
576+
577577
# make sure slot from Patroni didn't get deleted
578578
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1,
579579
"The replication slot from Patroni gets deleted", 10, 5)
@@ -929,7 +929,7 @@ def test_ignored_annotations(self):
929929
},
930930
}
931931
}
932-
932+
933933
old_sts_creation_timestamp = sts.metadata.creation_timestamp
934934
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
935935
old_svc_creation_timestamp = svc.metadata.creation_timestamp
@@ -1254,7 +1254,7 @@ def test_persistent_volume_claim_retention_policy(self):
12541254
}
12551255
k8s.update_config(patch_scaled_policy_retain)
12561256
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
1257-
1257+
12581258
# decrease the number of instances
12591259
k8s.api.custom_objects_api.patch_namespaced_custom_object(
12601260
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances)
@@ -1622,7 +1622,7 @@ def test_password_rotation(self):
16221622
},
16231623
}
16241624
k8s.api.core_v1.patch_namespaced_secret(
1625-
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
1625+
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
16261626
namespace="default",
16271627
body=secret_fake_rotation)
16281628

@@ -1638,7 +1638,7 @@ def test_password_rotation(self):
16381638
"data": {
16391639
"enable_password_rotation": "true",
16401640
"password_rotation_interval": "30",
1641-
"password_rotation_user_retention": "30", # should be set to 60
1641+
"password_rotation_user_retention": "30", # should be set to 60
16421642
},
16431643
}
16441644
k8s.update_config(enable_password_rotation)
@@ -1691,7 +1691,7 @@ def test_password_rotation(self):
16911691
"Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username))
16921692

16931693
# disable password rotation for all other users (foo_user)
1694-
# and pick smaller intervals to see if the third fake rotation user is dropped
1694+
# and pick smaller intervals to see if the third fake rotation user is dropped
16951695
enable_password_rotation = {
16961696
"data": {
16971697
"enable_password_rotation": "false",
@@ -2158,7 +2158,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
21582158

21592159
# if nodes are different we can quit here
21602160
if master_nodes[0] not in replica_nodes:
2161-
return True
2161+
return True
21622162

21632163
# enable pod anti affintiy in config map which should trigger movement of replica
21642164
patch_enable_antiaffinity = {
@@ -2182,7 +2182,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
21822182
}
21832183
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
21842184
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2185-
2185+
21862186
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
21872187
k8s.wait_for_running_pods(cluster_labels, 2)
21882188

@@ -2193,7 +2193,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
21932193
# if nodes are different we can quit here
21942194
for target_node in target_nodes:
21952195
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
2196-
print('Pods run on the same node')
2196+
print('Pods run on the same node')
21972197
return False
21982198

21992199
except timeout_decorator.TimeoutError:
@@ -2272,5 +2272,74 @@ def query_database_with_user(self, pod_name, db_name, query, user_name):
22722272

22732273
return result_set
22742274

2275+
def test_topology_spread_constraints(self):
2276+
'''
2277+
Enable topologySpreadConstraints for pods
2278+
'''
2279+
k8s = self.k8s
2280+
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
2281+
2282+
# Verify we are in good state from potential previous tests
2283+
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
2284+
2285+
patch_node_label = {
2286+
"metadata": {
2287+
"labels": {
2288+
"topology.kubernetes.io/zone": "zalando"
2289+
}
2290+
}
2291+
}
2292+
2293+
nodes = k8s.api.core_v1.list_node()
2294+
for node in nodes.items:
2295+
k8s.api.core_v1.patch_node(node.metadata.name, patch_node_label)
2296+
2297+
podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label)
2298+
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
2299+
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
2300+
2301+
patch_cordon_node = {
2302+
"spec": {
2303+
"unschedulable": True
2304+
}
2305+
}
2306+
2307+
master_nodes, replica_nodes = k8s.get_cluster_nodes()
2308+
self.assertNotEqual(master_nodes, [])
2309+
self.assertNotEqual(replica_nodes, [])
2310+
2311+
# Cordon replicas node
2312+
k8s.api.core_v1.patch_node(replica_nodes[0], patch_cordon_node)
2313+
# Delete replicas pod so it can be re-scheduled to master node
2314+
replicas_pod = k8s.get_cluster_replica_pod()
2315+
k8s.api.core_v1.delete_namespaced_pod(replicas_pod.metadata.name, 'default')
2316+
# Wait for replicas pod re-scheduled to master node
2317+
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
2318+
# Assert master pod and replicas pod are in the same node
2319+
master_nodes, replica_nodes = k8s.get_cluster_nodes()
2320+
self.assertEqual(master_nodes[0].metadata.name, replica_nodes[0].metadata.name)
2321+
2322+
patch_uncordon_node = {
2323+
"spec": {
2324+
"unschedulable": False
2325+
}
2326+
}
2327+
2328+
# Uncordon replicas node
2329+
k8s.api.core_v1.patch_node(replica_nodes[0], patch_uncordon_node)
2330+
2331+
patch_enable_topology_spread_constraints = {
2332+
"data": {
2333+
"enable_postgres_topology_spread_constraints": "true"
2334+
}
2335+
}
2336+
2337+
k8s.update_config(patch_enable_topology_spread_constraints, "enable topologySpreadConstraints")
2338+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2339+
2340+
# Assert master pod and replicas pod are spread in two diffrence nodes
2341+
master_nodes, replica_nodes = k8s.get_cluster_nodes()
2342+
self.assertNotEqual(master_nodes[0].metadata.name, replica_nodes[0].metadata.name)
2343+
22752344
if __name__ == '__main__':
22762345
unittest.main()

manifests/postgresql.crd.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,12 @@ spec:
575575
- PreferNoSchedule
576576
tolerationSeconds:
577577
type: integer
578+
topologySpreadConstraints:
579+
type: array
580+
nullable: true
581+
items:
582+
type: object
583+
x-kubernetes-preserve-unknown-fields: true
578584
useLoadBalancer:
579585
type: boolean
580586
description: deprecated

pkg/apis/acid.zalan.do/v1/crds.go

+10
Original file line numberDiff line numberDiff line change
@@ -898,6 +898,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
898898
},
899899
},
900900
},
901+
"topologySpreadConstraints": {
902+
Type: "array",
903+
Nullable: true,
904+
Items: &apiextv1.JSONSchemaPropsOrArray{
905+
Schema: &apiextv1.JSONSchemaProps{
906+
Type: "object",
907+
XPreserveUnknownFields: util.True(),
908+
},
909+
},
910+
},
901911
"useLoadBalancer": {
902912
Type: "boolean",
903913
Description: "deprecated",

pkg/apis/acid.zalan.do/v1/operator_configuration_type.go

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ type KubernetesMetaConfiguration struct {
107107
EnableReadinessProbe bool `json:"enable_readiness_probe,omitempty"`
108108
EnableCrossNamespaceSecret bool `json:"enable_cross_namespace_secret,omitempty"`
109109
EnableFinalizers *bool `json:"enable_finalizers,omitempty"`
110+
EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"`
110111
}
111112

112113
// PostgresPodResourcesDefaults defines the spec of default resources

pkg/apis/acid.zalan.do/v1/postgresql_type.go

+2
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type PostgresSpec struct {
9393
// deprecated json tags
9494
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
9595
PodPriorityClassNameOld string `json:"pod_priority_class_name,omitempty"`
96+
97+
AdditionalTopologySpreadConstraints []v1.TopologySpreadConstraint `json:"additionalTopologySpreadConstraints,omitempty"`
9698
}
9799

98100
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

pkg/cluster/cluster.go

+5
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
472472
needsRollUpdate = true
473473
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
474474
}
475+
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) {
476+
needsReplace = true
477+
needsRollUpdate = true
478+
reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one")
479+
}
475480
if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
476481
needsReplace = true
477482
needsRollUpdate = true

pkg/cluster/k8sres.go

+28-2
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,22 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring
610610
return podAntiAffinity
611611
}
612612

613+
func generateTopologySpreadConstraints(labels labels.Set, additionalTopologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint {
614+
topologySpreadConstraint := v1.TopologySpreadConstraint{
615+
MaxSkew: int32(1),
616+
TopologyKey: "topology.kubernetes.io/zone",
617+
WhenUnsatisfiable: v1.DoNotSchedule,
618+
LabelSelector: &metav1.LabelSelector{
619+
MatchLabels: labels,
620+
},
621+
}
622+
topologySpreadConstraints := []v1.TopologySpreadConstraint{topologySpreadConstraint}
623+
if len(additionalTopologySpreadConstraints) > 0 {
624+
topologySpreadConstraints = append(topologySpreadConstraints, additionalTopologySpreadConstraints...)
625+
}
626+
return topologySpreadConstraints
627+
}
628+
613629
func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration {
614630
// allow to override tolerations by postgresql manifest
615631
if len(*tolerationsSpec) > 0 {
@@ -832,6 +848,8 @@ func (c *Cluster) generatePodTemplate(
832848
additionalSecretMount string,
833849
additionalSecretMountPath string,
834850
additionalVolumes []acidv1.AdditionalVolume,
851+
topologySpreadConstraints bool,
852+
additionalTopologySpreadConstraints []v1.TopologySpreadConstraint,
835853
) (*v1.PodTemplateSpec, error) {
836854

837855
terminateGracePeriodSeconds := terminateGracePeriod
@@ -884,6 +902,10 @@ func (c *Cluster) generatePodTemplate(
884902
podSpec.PriorityClassName = priorityClassName
885903
}
886904

905+
if topologySpreadConstraints {
906+
podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, additionalTopologySpreadConstraints)
907+
}
908+
887909
if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars {
888910
addVarRunVolume(&podSpec)
889911
}
@@ -1487,7 +1509,9 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
14871509
c.OpConfig.PodAntiAffinityPreferredDuringScheduling,
14881510
c.OpConfig.AdditionalSecretMount,
14891511
c.OpConfig.AdditionalSecretMountPath,
1490-
additionalVolumes)
1512+
additionalVolumes,
1513+
c.OpConfig.EnablePostgresTopologySpreadConstraints,
1514+
spec.AdditionalTopologySpreadConstraints)
14911515

14921516
if err != nil {
14931517
return nil, fmt.Errorf("could not generate pod template: %v", err)
@@ -2334,7 +2358,9 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
23342358
false,
23352359
c.OpConfig.AdditionalSecretMount,
23362360
c.OpConfig.AdditionalSecretMountPath,
2337-
[]acidv1.AdditionalVolume{}); err != nil {
2361+
[]acidv1.AdditionalVolume{},
2362+
true,
2363+
[]v1.TopologySpreadConstraint{}); err != nil {
23382364
return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err)
23392365
}
23402366

pkg/cluster/k8sres_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -3795,3 +3795,47 @@ func TestGenerateCapabilities(t *testing.T) {
37953795
}
37963796
}
37973797
}
3798+
3799+
func TestTopologySpreadConstraints(t *testing.T) {
3800+
clusterName := "acid-test-cluster"
3801+
namespace := "default"
3802+
3803+
pg := acidv1.Postgresql{
3804+
ObjectMeta: metav1.ObjectMeta{
3805+
Name: clusterName,
3806+
Namespace: namespace,
3807+
},
3808+
Spec: acidv1.PostgresSpec{
3809+
NumberOfInstances: 1,
3810+
Resources: &acidv1.Resources{
3811+
ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
3812+
ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
3813+
},
3814+
Volume: acidv1.Volume{
3815+
Size: "1G",
3816+
},
3817+
},
3818+
}
3819+
3820+
cluster := New(
3821+
Config{
3822+
OpConfig: config.Config{
3823+
PodManagementPolicy: "ordered_ready",
3824+
EnablePostgresTopologySpreadConstraints: true,
3825+
},
3826+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
3827+
cluster.Name = clusterName
3828+
cluster.Namespace = namespace
3829+
cluster.labelsSet(true)
3830+
3831+
s, err := cluster.generateStatefulSet(&pg.Spec)
3832+
assert.NoError(t, err)
3833+
assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{
3834+
MaxSkew: int32(1),
3835+
TopologyKey: "topology.kubernetes.io/zone",
3836+
WhenUnsatisfiable: v1.DoNotSchedule,
3837+
LabelSelector: &metav1.LabelSelector{
3838+
MatchLabels: cluster.labelsSet(true),
3839+
},
3840+
})
3841+
}

pkg/util/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ type Config struct {
253253
EnableSecretsDeletion *bool `name:"enable_secrets_deletion" default:"true"`
254254
EnablePersistentVolumeClaimDeletion *bool `name:"enable_persistent_volume_claim_deletion" default:"true"`
255255
PersistentVolumeClaimRetentionPolicy map[string]string `name:"persistent_volume_claim_retention_policy" default:"when_deleted:retain,when_scaled:retain"`
256+
EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"`
256257
}
257258

258259
// MustMarshal marshals the config or panics

0 commit comments

Comments
 (0)