diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1a8d6f762..0f2928cb4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -853,7 +853,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed { - return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason) + return false, fmt.Sprint("new job's pod template metadata annotations do not match " + reason) } newPgVersion := getPgVersion(new) diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 6cd46f745..755e211e8 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -1037,10 +1037,47 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql syncReason = append(syncReason, specReason...) } + listOptions := metav1.ListOptions{ + LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(), + } + pods, err = c.listPoolerPods(listOptions) newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec)) if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed { specSync = true syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...) + + if strings.Contains(reason, "Removed") { + annotationToRemove := `{"metadata":{"annotations":{` + annotationToRemoveTemplate := `{"spec":{"template":{"metadata":{"annotations":{` + for anno := range deployment.Spec.Template.Annotations { + if _, ok := newPodAnnotations[anno]; !ok { + // template annotation was removed + for _, ignore := range c.OpConfig.IgnoredAnnotations { + if anno == ignore { + continue + } + } + annotationToRemove += fmt.Sprintf(`"%s":null,`, anno) + annotationToRemoveTemplate += fmt.Sprintf(`"%s":null,`, anno) + } + } + annotationToRemove = strings.TrimSuffix(annotationToRemove, ",") + `}}}` + annotationToRemoveTemplate = strings.TrimSuffix(annotationToRemoveTemplate, ",") + `}}}}}` + deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(), + deployment.Name, types.StrategicMergePatchType, []byte(annotationToRemoveTemplate), metav1.PatchOptions{}, "") + if err != nil { + c.logger.Errorf("failed to remove annotations from %s connection pooler's pod template: %v", role, err) + return nil, err + } + for _, pod := range pods { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, + types.StrategicMergePatchType, []byte(annotationToRemove), metav1.PatchOptions{}) + if err != nil { + c.logger.Errorf("failed to remove annotations from pod %s: %v", pod.Name, err) + return nil, err + } + } + } deployment.Spec.Template.Annotations = newPodAnnotations } @@ -1060,7 +1097,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return syncReason, err } - c.ConnectionPooler[role].Deployment = deployment } newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations @@ -1069,15 +1105,10 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, err } - c.ConnectionPooler[role].Deployment = deployment } } // check if pooler pods must be replaced due to secret update - listOptions := metav1.ListOptions{ - LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(), - } - pods, err = c.listPoolerPods(listOptions) if err != nil { return nil, err } @@ -1098,18 +1129,22 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, fmt.Errorf("could not delete pooler pod: %v", err) } - } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { - patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) - if err != nil { - return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) - } - _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) - if err != nil { - return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) + } else { + if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { + patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) + } + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) + } } } } + c.ConnectionPooler[role].Deployment = deployment + if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { c.ConnectionPooler[role].Service = service desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d1a339001..f298a05fe 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -573,6 +573,33 @@ func (c *Cluster) syncStatefulSet() error { } } } + annotationToRemove := "" + for anno := range c.Statefulset.Spec.Template.Annotations { + if _, ok := desiredSts.Spec.Template.Annotations[anno]; !ok { + // template annotation was removed + for _, ignore := range c.OpConfig.IgnoredAnnotations { + if anno == ignore { + continue + } + } + if annotationToRemove != "" { + annotationToRemove = `{"metadata":{"annotations":{` + } + annotationToRemove += fmt.Sprintf(`"%s":null,`, anno) + // annotationToRemove := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, anno)) + } + } + if annotationToRemove != "" { + annotationToRemove = strings.TrimSuffix(annotationToRemove, ",") + `}}}` + for _, pod := range pods { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.Background(), pod.Name, + types.StrategicMergePatchType, []byte(annotationToRemove), metav1.PatchOptions{}) + if err != nil { + c.logger.Errorf("failed to remove annotations from pod %s: %v", pod.Name, err) + return err + } + } + } } if !cmp.match { if cmp.rollingUpdate { @@ -1594,6 +1621,27 @@ func (c *Cluster) syncLogicalBackupJob() error { if reason != "" { c.logger.Infof("reason: %s", reason) } + if strings.Contains(reason, "annotations do not match") { + annotationToRemoveTemplate := `{"spec":{"jobTemplate":{"spec":{"template":{"metadata":{"annotations":{` + for anno := range job.Spec.JobTemplate.Spec.Template.Annotations { + if _, ok := desiredJob.Spec.JobTemplate.Spec.Template.Annotations[anno]; !ok { + // template annotation was removed + for _, ignore := range c.OpConfig.IgnoredAnnotations { + if anno == ignore { + continue + } + } + annotationToRemoveTemplate += fmt.Sprintf(`"%s":null,`, anno) + } + } + annotationToRemoveTemplate = strings.TrimSuffix(annotationToRemoveTemplate, ",") + `}}}}}}}` + job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), + jobName, types.StrategicMergePatchType, []byte(annotationToRemoveTemplate), metav1.PatchOptions{}, "") + if err != nil { + c.logger.Errorf("failed to remove annotations from the logical backup job %q pod template: %v", jobName, err) + return err + } + } if err = c.patchLogicalBackupJob(desiredJob); err != nil { return fmt.Errorf("could not update logical backup job to match desired state: %v", err) } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index d45a193cb..e18030912 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -142,6 +142,163 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { } } +func TestPodAnnotationsSync(t *testing.T) { + clusterName := "acid-test-cluster-2" + namespace := "default" + podAnnotation := "no-scale-down" + podAnnotations := map[string]string{podAnnotation: "true"} + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockHTTPClient(ctrl) + client, _ := newFakeK8sAnnotationsClient() + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + Volume: acidv1.Volume{ + Size: "1Gi", + }, + EnableConnectionPooler: boolToPointer(true), + EnableLogicalBackup: true, + EnableReplicaConnectionPooler: boolToPointer(true), + PodAnnotations: podAnnotations, + NumberOfInstances: 2, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + PatroniAPICheckInterval: time.Duration(1), + PatroniAPICheckTimeout: time.Duration(5), + PodManagementPolicy: "ordered_ready", + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: k8sutil.Int32ToPointer(1), + }, + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + ResourceCheckInterval: time.Duration(3), + ResourceCheckTimeout: time.Duration(10), + }, + }, + }, client, pg, logger, eventRecorder) + + configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` + response := http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(configJson))), + } + + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() + cluster.patroni = patroni.New(patroniLogger, mockClient) + cluster.Name = clusterName + cluster.Namespace = namespace + clusterOptions := clusterLabelsOptions(cluster) + + // create a statefulset + _, err := cluster.createStatefulSet() + assert.NoError(t, err) + // create a pods + podsList := createPods(cluster) + for _, pod := range podsList { + _, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + // create connection pooler + _, err = cluster.createConnectionPooler(mockInstallLookupFunction) + assert.NoError(t, err) + + // create cron job + err = cluster.createLogicalBackupJob() + assert.NoError(t, err) + + annotateResources(cluster) + err = cluster.Sync(&cluster.Postgresql) + assert.NoError(t, err) + + // 1. PodAnnotations set + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + assert.Contains(t, sts.Spec.Template.Annotations, podAnnotation) + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + assert.Contains(t, deploy.Spec.Template.Annotations, podAnnotation, + fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v", + deploy.Name, podAnnotation, deploy.Spec.Template.Annotations)) + assert.NoError(t, err) + } + + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + assert.Contains(t, pod.Annotations, podAnnotation, + fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations)) + assert.NoError(t, err) + } + + cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, podAnnotation, + fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v", + podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } + + // 2 PodAnnotations removed + newSpec := cluster.Postgresql.DeepCopy() + newSpec.Spec.PodAnnotations = nil + err = cluster.Sync(newSpec) + assert.NoError(t, err) + + stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + assert.NotContains(t, sts.Spec.Template.Annotations, podAnnotation) + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotContains(t, deploy.Spec.Template.Annotations, podAnnotation, + fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v", + deploy.Name, podAnnotation, deploy.Spec.Template.Annotations)) + assert.NoError(t, err) + } + + podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + assert.NotContains(t, pod.Annotations, podAnnotation, + fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations)) + } + + cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + assert.NotContains(t, cronJob.Annotations, podAnnotation, + fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v", + podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } +} + func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { testName := "test config comparison" client, _ := newFakeK8sSyncClient()