Skip to content

Commit

Permalink
Add topologySpreadConstraints configuration to pod spec.
Browse files Browse the repository at this point in the history
  • Loading branch information
laiminhtrung1997 committed Oct 20, 2024
1 parent 002d0f9 commit fbac974
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 14 deletions.
73 changes: 61 additions & 12 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def compare_config():

pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]

k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)

Expand All @@ -577,7 +577,7 @@ def compare_config():

self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar",
"The replication slot cannot be updated", 10, 5)

# make sure slot from Patroni didn't get deleted
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1,
"The replication slot from Patroni gets deleted", 10, 5)
Expand Down Expand Up @@ -933,7 +933,7 @@ def test_ignored_annotations(self):
},
}
}

old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
old_svc_creation_timestamp = svc.metadata.creation_timestamp
Expand Down Expand Up @@ -1370,7 +1370,7 @@ def test_persistent_volume_claim_retention_policy(self):
}
k8s.update_config(patch_scaled_policy_retain)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# decrease the number of instances
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances)
Expand Down Expand Up @@ -1647,7 +1647,6 @@ def test_node_readiness_label(self):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)


@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_overwrite_pooler_deployment(self):
pooler_name = 'acid-minimal-cluster-pooler'
Expand Down Expand Up @@ -1796,7 +1795,7 @@ def test_password_rotation(self):
},
}
k8s.api.core_v1.patch_namespaced_secret(
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
namespace="default",
body=secret_fake_rotation)

Expand All @@ -1812,7 +1811,7 @@ def test_password_rotation(self):
"data": {
"enable_password_rotation": "true",
"password_rotation_interval": "30",
"password_rotation_user_retention": "30", # should be set to 60
"password_rotation_user_retention": "30", # should be set to 60
},
}
k8s.update_config(enable_password_rotation)
Expand Down Expand Up @@ -1865,7 +1864,7 @@ def test_password_rotation(self):
"Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username))

# disable password rotation for all other users (foo_user)
# and pick smaller intervals to see if the third fake rotation user is dropped
# and pick smaller intervals to see if the third fake rotation user is dropped
enable_password_rotation = {
"data": {
"enable_password_rotation": "false",
Expand Down Expand Up @@ -2363,6 +2362,56 @@ def test_taint_based_eviction(self):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_topology_spread_constraints(self):
'''
Enable topologySpreadConstraints for pods
'''
k8s = self.k8s
cluster_labels = "application=spilo,cluster-name=acid-minimal-cluster"

# Verify we are in good state from potential previous tests
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")

master_nodes, replica_nodes = k8s.get_cluster_nodes()
self.assertNotEqual(master_nodes, [])
self.assertNotEqual(replica_nodes, [])

# Patch label to nodes for topologySpreadConstraints
patch_node_label = {
"metadata": {
"labels": {
"topology.kubernetes.io/zone": "zalando"
}
}
}
k8s.api.core_v1.patch_node(master_nodes[0], patch_node_label)
k8s.api.core_v1.patch_node(replica_nodes[0], patch_node_label)

# Scale-out postgresql pods
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
{"spec": {"numberOfInstances": 6}})
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 6, "Postgresql StatefulSet are scale to 6")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 6, "All pods are running")

worker_node_1 = 0
worker_node_2 = 0
pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_labels)
for pod in pods.items:
if pod.spec.node_name == 'postgres-operator-e2e-tests-worker':
worker_node_1 += 1
elif pod.spec.node_name == 'postgres-operator-e2e-tests-worker2':
worker_node_2 += 1

self.assertEqual(worker_node_1, worker_node_2)
self.assertEqual(worker_node_1, 3)
self.assertEqual(worker_node_2, 3)

# Scale-it postgresql pods to previous replicas
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
{"spec": {"numberOfInstances": 2}})

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_zz_cluster_deletion(self):
'''
Expand Down Expand Up @@ -2438,7 +2487,7 @@ def test_zz_cluster_deletion(self):
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 6, "PVCs were deleted although disabled in config")

except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
Expand Down Expand Up @@ -2480,7 +2529,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci

# if nodes are different we can quit here
if master_nodes[0] not in replica_nodes:
return True
return True

# enable pod anti affintiy in config map which should trigger movement of replica
patch_enable_antiaffinity = {
Expand All @@ -2504,7 +2553,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
}
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
k8s.wait_for_running_pods(cluster_labels, 2)

Expand All @@ -2515,7 +2564,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
# if nodes are different we can quit here
for target_node in target_nodes:
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
print('Pods run on the same node')
print('Pods run on the same node')
return False

except timeout_decorator.TimeoutError:
Expand Down
6 changes: 6 additions & 0 deletions manifests/postgresql.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,12 @@ spec:
- PreferNoSchedule
tolerationSeconds:
type: integer
topologySpreadConstraints:
type: array
nullable: true
items:
type: object
x-kubernetes-preserve-unknown-fields: true
useLoadBalancer:
type: boolean
description: deprecated
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/acid.zalan.do/v1/crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
},
},
},
"topologySpreadConstraints": {
Type: "array",
Nullable: true,
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
XPreserveUnknownFields: util.True(),
},
},
},
"useLoadBalancer": {
Type: "boolean",
Description: "deprecated",
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/acid.zalan.do/v1/operator_configuration_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type KubernetesMetaConfiguration struct {
EnableReadinessProbe bool `json:"enable_readiness_probe,omitempty"`
EnableCrossNamespaceSecret bool `json:"enable_cross_namespace_secret,omitempty"`
EnableFinalizers *bool `json:"enable_finalizers,omitempty"`
EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"`
}

// PostgresPodResourcesDefaults defines the spec of default resources
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/acid.zalan.do/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type PostgresSpec struct {
// deprecated json tags
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
PodPriorityClassNameOld string `json:"pod_priority_class_name,omitempty"`

AdditionalTopologySpreadConstraints []v1.TopologySpreadConstraint `json:"additionalTopologySpreadConstraints,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) {
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one")
}
if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
needsReplace = true
needsRollUpdate = true
Expand Down
30 changes: 28 additions & 2 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,22 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring
return podAntiAffinity
}

func generateTopologySpreadConstraints(labels labels.Set, additionalTopologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint {
topologySpreadConstraint := v1.TopologySpreadConstraint{
MaxSkew: int32(1),
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
}
topologySpreadConstraints := []v1.TopologySpreadConstraint{topologySpreadConstraint}
if len(additionalTopologySpreadConstraints) > 0 {
topologySpreadConstraints = append(topologySpreadConstraints, additionalTopologySpreadConstraints...)
}
return topologySpreadConstraints
}

func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration {
// allow to override tolerations by postgresql manifest
if len(*tolerationsSpec) > 0 {
Expand Down Expand Up @@ -821,6 +837,8 @@ func (c *Cluster) generatePodTemplate(
additionalSecretMount string,
additionalSecretMountPath string,
additionalVolumes []acidv1.AdditionalVolume,
topologySpreadConstraints bool,
additionalTopologySpreadConstraints []v1.TopologySpreadConstraint,
) (*v1.PodTemplateSpec, error) {

terminateGracePeriodSeconds := terminateGracePeriod
Expand Down Expand Up @@ -873,6 +891,10 @@ func (c *Cluster) generatePodTemplate(
podSpec.PriorityClassName = priorityClassName
}

if topologySpreadConstraints {
podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, additionalTopologySpreadConstraints)
}

if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars {
addVarRunVolume(&podSpec)
}
Expand Down Expand Up @@ -1476,7 +1498,9 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
c.OpConfig.PodAntiAffinityPreferredDuringScheduling,
c.OpConfig.AdditionalSecretMount,
c.OpConfig.AdditionalSecretMountPath,
additionalVolumes)
additionalVolumes,
c.OpConfig.EnablePostgresTopologySpreadConstraints,
spec.AdditionalTopologySpreadConstraints)

if err != nil {
return nil, fmt.Errorf("could not generate pod template: %v", err)
Expand Down Expand Up @@ -2334,7 +2358,9 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
false,
c.OpConfig.AdditionalSecretMount,
c.OpConfig.AdditionalSecretMountPath,
[]acidv1.AdditionalVolume{}); err != nil {
[]acidv1.AdditionalVolume{},
true,
[]v1.TopologySpreadConstraint{}); err != nil {
return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err)
}

Expand Down
44 changes: 44 additions & 0 deletions pkg/cluster/k8sres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3886,3 +3886,47 @@ func TestGenerateCapabilities(t *testing.T) {
}
}
}

func TestTopologySpreadConstraints(t *testing.T) {
clusterName := "acid-test-cluster"
namespace := "default"

pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
NumberOfInstances: 1,
Resources: &acidv1.Resources{
ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
},
Volume: acidv1.Volume{
Size: "1G",
},
},
}

cluster := New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
EnablePostgresTopologySpreadConstraints: true,
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Name = clusterName
cluster.Namespace = namespace
cluster.labelsSet(true)

s, err := cluster.generateStatefulSet(&pg.Spec)
assert.NoError(t, err)
assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{
MaxSkew: int32(1),
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{
MatchLabels: cluster.labelsSet(true),
},
})
}
1 change: 1 addition & 0 deletions pkg/util/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type Config struct {
EnableSecretsDeletion *bool `name:"enable_secrets_deletion" default:"true"`
EnablePersistentVolumeClaimDeletion *bool `name:"enable_persistent_volume_claim_deletion" default:"true"`
PersistentVolumeClaimRetentionPolicy map[string]string `name:"persistent_volume_claim_retention_policy" default:"when_deleted:retain,when_scaled:retain"`
EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"`
}

// MustMarshal marshals the config or panics
Expand Down

0 comments on commit fbac974

Please sign in to comment.