Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure podAnnotations are removed from pods if reset in the config #2826

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
return false, fmt.Sprint("new job's pod template metadata annotations do not match " + reason)
}

newPgVersion := getPgVersion(new)
Expand Down
63 changes: 49 additions & 14 deletions pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,10 +1037,47 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
syncReason = append(syncReason, specReason...)
}

listOptions := metav1.ListOptions{
LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(),
}
pods, err = c.listPoolerPods(listOptions)
newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec))
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed {
specSync = true
syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...)

if strings.Contains(reason, "Removed") {
annotationToRemove := `{"metadata":{"annotations":{`
annotationToRemoveTemplate := `{"spec":{"template":{"metadata":{"annotations":{`
for anno := range deployment.Spec.Template.Annotations {
if _, ok := newPodAnnotations[anno]; !ok {
// template annotation was removed
for _, ignore := range c.OpConfig.IgnoredAnnotations {
if anno == ignore {
continue
}
}
annotationToRemove += fmt.Sprintf(`"%s":null,`, anno)
annotationToRemoveTemplate += fmt.Sprintf(`"%s":null,`, anno)
}
}
annotationToRemove = strings.TrimSuffix(annotationToRemove, ",") + `}}}`
annotationToRemoveTemplate = strings.TrimSuffix(annotationToRemoveTemplate, ",") + `}}}}}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not use json marshalling here? This concatenating {{{ + }}} looks a bit "ugly".

deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(),
deployment.Name, types.StrategicMergePatchType, []byte(annotationToRemoveTemplate), metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to remove annotations from %s connection pooler's pod template: %v", role, err)
return nil, err
}
for _, pod := range pods {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name,
types.StrategicMergePatchType, []byte(annotationToRemove), metav1.PatchOptions{})
if err != nil {
c.logger.Errorf("failed to remove annotations from pod %s: %v", pod.Name, err)
return nil, err
}
}
}
deployment.Spec.Template.Annotations = newPodAnnotations
}

Expand All @@ -1060,7 +1097,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return syncReason, err
}
c.ConnectionPooler[role].Deployment = deployment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we change to update internal deployment only if every sync step works, right? Have to think about consequences it might have

}

newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
Expand All @@ -1069,15 +1105,10 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return nil, err
}
c.ConnectionPooler[role].Deployment = deployment
}
}

// check if pooler pods must be replaced due to secret update
listOptions := metav1.ListOptions{
LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(),
}
pods, err = c.listPoolerPods(listOptions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToDo for me: Check if we can go with a version of pods we now fetch before changing annotations? Or do we need to fetch them again for the following steps?

if err != nil {
return nil, err
}
Expand All @@ -1098,18 +1129,22 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return nil, fmt.Errorf("could not delete pooler pod: %v", err)
}
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err)
} else {
if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err)
}
}
}
}

c.ConnectionPooler[role].Deployment = deployment

if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
c.ConnectionPooler[role].Service = service
desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role])
Expand Down
48 changes: 48 additions & 0 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,33 @@ func (c *Cluster) syncStatefulSet() error {
}
}
}
annotationToRemove := ""
for anno := range c.Statefulset.Spec.Template.Annotations {
if _, ok := desiredSts.Spec.Template.Annotations[anno]; !ok {
// template annotation was removed
for _, ignore := range c.OpConfig.IgnoredAnnotations {
if anno == ignore {
continue
}
}
if annotationToRemove != "" {
annotationToRemove = `{"metadata":{"annotations":{`
}
annotationToRemove += fmt.Sprintf(`"%s":null,`, anno)
// annotationToRemove := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, anno))
}
}
if annotationToRemove != "" {
annotationToRemove = strings.TrimSuffix(annotationToRemove, ",") + `}}}`
for _, pod := range pods {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, []byte(annotationToRemove), metav1.PatchOptions{})
if err != nil {
c.logger.Errorf("failed to remove annotations from pod %s: %v", pod.Name, err)
return err
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, another thinking out loud comment: We already introduced patch code for annotations. Could it not be reused?

}
if !cmp.match {
if cmp.rollingUpdate {
Expand Down Expand Up @@ -1594,6 +1621,27 @@ func (c *Cluster) syncLogicalBackupJob() error {
if reason != "" {
c.logger.Infof("reason: %s", reason)
}
if strings.Contains(reason, "annotations do not match") {
annotationToRemoveTemplate := `{"spec":{"jobTemplate":{"spec":{"template":{"metadata":{"annotations":{`
for anno := range job.Spec.JobTemplate.Spec.Template.Annotations {
if _, ok := desiredJob.Spec.JobTemplate.Spec.Template.Annotations[anno]; !ok {
// template annotation was removed
for _, ignore := range c.OpConfig.IgnoredAnnotations {
if anno == ignore {
continue
}
}
annotationToRemoveTemplate += fmt.Sprintf(`"%s":null,`, anno)
}
}
annotationToRemoveTemplate = strings.TrimSuffix(annotationToRemoveTemplate, ",") + `}}}}}}}`
job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(),
jobName, types.StrategicMergePatchType, []byte(annotationToRemoveTemplate), metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to remove annotations from the logical backup job %q pod template: %v", jobName, err)
return err
}
}
if err = c.patchLogicalBackupJob(desiredJob); err != nil {
return fmt.Errorf("could not update logical backup job to match desired state: %v", err)
}
Expand Down
157 changes: 157 additions & 0 deletions pkg/cluster/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,163 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) {
}
}

func TestPodAnnotationsSync(t *testing.T) {
clusterName := "acid-test-cluster-2"
namespace := "default"
podAnnotation := "no-scale-down"
podAnnotations := map[string]string{podAnnotation: "true"}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := mocks.NewMockHTTPClient(ctrl)
client, _ := newFakeK8sAnnotationsClient()

pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
Volume: acidv1.Volume{
Size: "1Gi",
},
EnableConnectionPooler: boolToPointer(true),
EnableLogicalBackup: true,
EnableReplicaConnectionPooler: boolToPointer(true),
PodAnnotations: podAnnotations,
NumberOfInstances: 2,
},
}

var cluster = New(
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
PatroniAPICheckTimeout: time.Duration(5),
PodManagementPolicy: "ordered_ready",
ConnectionPooler: config.ConnectionPooler{
ConnectionPoolerDefaultCPURequest: "100m",
ConnectionPoolerDefaultCPULimit: "100m",
ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnectionPoolerDefaultMemoryLimit: "100Mi",
NumberOfInstances: k8sutil.Int32ToPointer(1),
},
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
ResourceCheckInterval: time.Duration(3),
ResourceCheckTimeout: time.Duration(10),
},
},
}, client, pg, logger, eventRecorder)

configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}`
response := http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(configJson))),
}

mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes()
cluster.patroni = patroni.New(patroniLogger, mockClient)
cluster.Name = clusterName
cluster.Namespace = namespace
clusterOptions := clusterLabelsOptions(cluster)

// create a statefulset
_, err := cluster.createStatefulSet()
assert.NoError(t, err)
// create a pods
podsList := createPods(cluster)
for _, pod := range podsList {
_, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
assert.NoError(t, err)
}
// create connection pooler
_, err = cluster.createConnectionPooler(mockInstallLookupFunction)
assert.NoError(t, err)

// create cron job
err = cluster.createLogicalBackupJob()
assert.NoError(t, err)

annotateResources(cluster)
err = cluster.Sync(&cluster.Postgresql)
assert.NoError(t, err)

// 1. PodAnnotations set
stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
assert.Contains(t, sts.Spec.Template.Annotations, podAnnotation)
}

for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
assert.Contains(t, deploy.Spec.Template.Annotations, podAnnotation,
fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v",
deploy.Name, podAnnotation, deploy.Spec.Template.Annotations))
assert.NoError(t, err)
}

podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
assert.Contains(t, pod.Annotations, podAnnotation,
fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations))
assert.NoError(t, err)
}

cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, podAnnotation,
fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v",
podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))
}

// 2 PodAnnotations removed
newSpec := cluster.Postgresql.DeepCopy()
newSpec.Spec.PodAnnotations = nil
err = cluster.Sync(newSpec)
assert.NoError(t, err)

stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
assert.NotContains(t, sts.Spec.Template.Annotations, podAnnotation)
}

for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
assert.NotContains(t, deploy.Spec.Template.Annotations, podAnnotation,
fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v",
deploy.Name, podAnnotation, deploy.Spec.Template.Annotations))
assert.NoError(t, err)
}

podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
assert.NotContains(t, pod.Annotations, podAnnotation,
fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations))
}

cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
assert.NotContains(t, cronJob.Annotations, podAnnotation,
fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v",
podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))
}
}

func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
testName := "test config comparison"
client, _ := newFakeK8sSyncClient()
Expand Down
Loading