diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 04c6465c9..d85547002 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2547,7 +2547,10 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) - self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget owner reference check failed") + + pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-operation-pdb".format(cluster_name), cluster_namespace) + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed") pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed") diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1a8d6f762..de9972ef8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,16 +59,17 @@ type Config struct { } type kubeResources struct { - Services map[PostgresRole]*v1.Service - Endpoints map[PostgresRole]*v1.Endpoints - PatroniEndpoints map[string]*v1.Endpoints - PatroniConfigMaps map[string]*v1.ConfigMap - Secrets map[types.UID]*v1.Secret - Statefulset *appsv1.StatefulSet - VolumeClaims map[types.UID]*v1.PersistentVolumeClaim - PodDisruptionBudget *policyv1.PodDisruptionBudget - LogicalBackupJob *batchv1.CronJob - Streams map[string]*zalandov1.FabricEventStream + Services map[PostgresRole]*v1.Service + Endpoints map[PostgresRole]*v1.Endpoints + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap + Secrets map[types.UID]*v1.Secret + Statefulset *appsv1.StatefulSet + VolumeClaims map[types.UID]*v1.PersistentVolumeClaim + GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget + LogicalBackupJob *batchv1.CronJob + Streams map[string]*zalandov1.FabricEventStream //Pods are treated separately } @@ -336,14 +337,10 @@ func (c *Cluster) Create() (err error) { c.logger.Infof("secrets have been successfully created") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") - if c.PodDisruptionBudget != nil { - return fmt.Errorf("pod disruption budget already exists in the cluster") + if err = c.createPodDisruptionBudgets(); err != nil { + return fmt.Errorf("could not create pod disruption budgets: %v", err) } - pdb, err := c.createPodDisruptionBudget() - if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) - } - c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta)) + c.logger.Info("pod disruption budgets have been successfully created") if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") @@ -1060,9 +1057,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - // pod disruption budget - if err := c.syncPodDisruptionBudget(true); err != nil { - c.logger.Errorf("could not sync pod disruption budget: %v", err) + // pod disruption budgets + if err := c.syncPodDisruptionBudgets(true); err != nil { + c.logger.Errorf("could not sync pod disruption budgets: %v", err) updateFailed = true } @@ -1207,10 +1204,10 @@ func (c *Cluster) Delete() error { c.logger.Info("not deleting secrets because disabled in configuration") } - if err := c.deletePodDisruptionBudget(); err != nil { + if err := c.deletePodDisruptionBudgets(); err != nil { anyErrors = true - c.logger.Warningf("could not delete pod disruption budget: %v", err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) + c.logger.Warningf("could not delete pod disruption budgets: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err) } for _, role := range []PostgresRole{Master, Replica} { @@ -1709,16 +1706,17 @@ func (c *Cluster) GetCurrentProcess() Process { // GetStatus provides status of the cluster func (c *Cluster) GetStatus() *ClusterStatus { status := &ClusterStatus{ - Cluster: c.Name, - Namespace: c.Namespace, - Team: c.Spec.TeamID, - Status: c.Status, - Spec: c.Spec, - MasterService: c.GetServiceMaster(), - ReplicaService: c.GetServiceReplica(), - StatefulSet: c.GetStatefulSet(), - PodDisruptionBudget: c.GetPodDisruptionBudget(), - CurrentProcess: c.GetCurrentProcess(), + Cluster: c.Name, + Namespace: c.Namespace, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + StatefulSet: c.GetStatefulSet(), + GeneralPodDisruptionBudget: c.GetGeneralPodDisruptionBudget(), + CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(), + CurrentProcess: c.GetCurrentProcess(), Error: fmt.Errorf("error: %s", c.Error), } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index ff5536303..452eb21bc 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -109,10 +109,15 @@ func (c *Cluster) servicePort(role PostgresRole) int32 { return pgPort } -func (c *Cluster) podDisruptionBudgetName() string { +func (c *Cluster) generalPodDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } +func (c *Cluster) criticalOpPodDisruptionBudgetName() string { + pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-operation-pdb") + return pdbTemplate.Format("cluster", c.Name) +} + func makeDefaultResources(config *config.Config) acidv1.Resources { defaultRequests := acidv1.ResourceDescription{ @@ -2207,7 +2212,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript return result } -func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { +func (c *Cluster) generateGeneralPodDisruptionBudget() *policyv1.PodDisruptionBudget { minAvailable := intstr.FromInt(1) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector @@ -2225,7 +2230,36 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: c.podDisruptionBudgetName(), + Name: c.generalPodDisruptionBudgetName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: c.annotationsSet(nil), + OwnerReferences: c.ownerReferences(), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + }, + } +} + +func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances) + pdbEnabled := c.OpConfig.EnablePodDisruptionBudget + + // if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0. + if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 { + minAvailable = intstr.FromInt(0) + } + + labels := c.labelsSet(false) + labels["critical-operaton"] = "true" + + return &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.criticalOpPodDisruptionBudgetName(), Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: c.annotationsSet(nil), diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 612e4525a..8f2e25d5d 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } } - testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { - masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector - if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { - return fmt.Errorf("Object Namespace incorrect.") - } - if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) { - return fmt.Errorf("Labels incorrect.") - } - if !masterLabelSelectorDisabled && - !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{ - MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) { + testLabelsAndSelectors := func(isGeneral bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector + if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { + return fmt.Errorf("Object Namespace incorrect.") + } + expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"} + if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) { + return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels) + } + if !masterLabelSelectorDisabled { + if isGeneral { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } else { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operaton": "true"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } + } - return fmt.Errorf("MatchLabels incorrect.") + return nil } - - return nil } testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { @@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-databass-budget"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2485,13 +2497,99 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, } for _, tt := range tests { - result := tt.spec.generatePodDisruptionBudget() + result := tt.spec.generateGeneralPodDisruptionBudget() + for _, check := range tt.check { + err := check(tt.spec, result) + if err != nil { + t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v", + testName, tt.scenario, err) + } + } + } + + testCriticalOp := []struct { + scenario string + spec *Cluster + check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error + }{ + { + scenario: "With multiple instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With zero instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With PodDisruptionBudget disabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With OwnerReference enabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + } + + for _, tt := range testCriticalOp { + result := tt.spec.generateCriticalOpPodDisruptionBudget() for _, check := range tt.check { err := check(tt.spec, result) if err != nil { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 3f47328ee..506d37730 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -23,8 +23,8 @@ const ( ) func (c *Cluster) listResources() error { - if c.PodDisruptionBudget != nil { - c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) + if c.GeneralPodDisruptionBudget != nil { + c.logger.Infof("found general pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta), c.GeneralPodDisruptionBudget.UID) } if c.Statefulset != nil { @@ -417,59 +417,126 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset return result } -func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) { - podDisruptionBudgetSpec := c.generatePodDisruptionBudget() +func (c *Cluster) createGeneralPodDisruptionBudget() error { + c.logger.Debug("creating general pod disruption budget") + if c.GeneralPodDisruptionBudget != nil { + return fmt.Errorf("general pod disruption budget already exists in the cluster") + } + + podDisruptionBudgetSpec := c.generateGeneralPodDisruptionBudget() podDisruptionBudget, err := c.KubeClient. PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) if err != nil { - return nil, err + return err + } + c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.GeneralPodDisruptionBudget = podDisruptionBudget + + return nil +} + +func (c *Cluster) createCriticalOpPodDisruptionBudget() error { + c.logger.Debug("creating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget != nil { + return fmt.Errorf("pod disruption budget for critical operations already exists in the cluster") + } + + podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget() + podDisruptionBudget, err := c.KubeClient. + PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). + Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) + + if err != nil { + return err + } + c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = podDisruptionBudget + + return nil +} + +func (c *Cluster) createPodDisruptionBudgets() error { + errors := make([]string, 0) + + err := c.createGeneralPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create general pod disruption budget: %v", err)) + } + + err = c.createCriticalOpPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err)) } - c.PodDisruptionBudget = podDisruptionBudget - return podDisruptionBudget, nil + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil } -func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { - if c.PodDisruptionBudget == nil { - return fmt.Errorf("there is no pod disruption budget in the cluster") +func (c *Cluster) updateGeneralPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating general pod disruption budget") + if c.GeneralPodDisruptionBudget == nil { + return fmt.Errorf("there is no general pod disruption budget in the cluster") } - if err := c.deletePodDisruptionBudget(); err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + if err := c.deleteGeneralPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete general pod disruption budget: %v", err) } newPdb, err := c.KubeClient. PodDisruptionBudgets(pdb.Namespace). Create(context.TODO(), pdb, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create general pod disruption budget: %v", err) } - c.PodDisruptionBudget = newPdb + c.GeneralPodDisruptionBudget = newPdb return nil } -func (c *Cluster) deletePodDisruptionBudget() error { - c.logger.Debug("deleting pod disruption budget") - if c.PodDisruptionBudget == nil { - c.logger.Debug("there is no pod disruption budget in the cluster") +func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster") + } + + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) + } + + newPdb, err := c.KubeClient. + PodDisruptionBudgets(pdb.Namespace). + Create(context.TODO(), pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) + } + c.CriticalOpPodDisruptionBudget = newPdb + + return nil +} + +func (c *Cluster) deleteGeneralPodDisruptionBudget() error { + c.logger.Debug("deleting general pod disruption budget") + if c.GeneralPodDisruptionBudget == nil { + c.logger.Debug("there is no general pod disruption budget in the cluster") return nil } - pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) + pdbName := util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta) err := c.KubeClient. - PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). - Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) + PodDisruptionBudgets(c.GeneralPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.GeneralPodDisruptionBudget.Name, c.deleteOptions) if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta)) } else if err != nil { - return fmt.Errorf("could not delete PodDisruptionBudget: %v", err) + return fmt.Errorf("could not delete general pod disruption budget: %v", err) } - c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) - c.PodDisruptionBudget = nil + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta)) + c.GeneralPodDisruptionBudget = nil err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { @@ -483,12 +550,67 @@ func (c *Cluster) deletePodDisruptionBudget() error { return false, err2 }) if err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + return fmt.Errorf("could not delete general pod disruption budget: %v", err) } return nil } +func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error { + c.logger.Debug("deleting pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + c.logger.Debug("there is no pod disruption budget for critical operations in the cluster") + return nil + } + + pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta) + err := c.KubeClient. + PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) + } else if err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) + } + + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = nil + + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + func() (bool, error) { + _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{}) + if err2 == nil { + return false, nil + } + if k8sutil.ResourceNotFound(err2) { + return true, nil + } + return false, err2 + }) + if err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) + } + + return nil +} + +func (c *Cluster) deletePodDisruptionBudgets() error { + errors := make([]string, 0) + + if err := c.deleteGeneralPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + func (c *Cluster) deleteEndpoint(role PostgresRole) error { c.setProcessName("deleting endpoint") c.logger.Debugf("deleting %s endpoint", role) @@ -705,7 +827,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { return c.Statefulset } -// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget -func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget { - return c.PodDisruptionBudget +// GetPodDisruptionBudget returns cluster's general kubernetes PodDisruptionBudget +func (c *Cluster) GetGeneralPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.GeneralPodDisruptionBudget +} + +// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations +func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.CriticalOpPodDisruptionBudget } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d1a339001..1d8eb77a6 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -112,8 +112,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } c.logger.Debug("syncing pod disruption budgets") - if err = c.syncPodDisruptionBudget(false); err != nil { - err = fmt.Errorf("could not sync pod disruption budget: %v", err) + if err = c.syncPodDisruptionBudgets(false); err != nil { + err = fmt.Errorf("could not sync pod disruption budgets: %v", err) return err } @@ -447,22 +447,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return nil } -func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { +func (c *Cluster) syncGeneralPodDisruptionBudget(isUpdate bool) error { var ( pdb *policyv1.PodDisruptionBudget err error ) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { - c.PodDisruptionBudget = pdb - newPDB := c.generatePodDisruptionBudget() + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.generalPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.GeneralPodDisruptionBudget = pdb + newPDB := c.generateGeneralPodDisruptionBudget() match, reason := c.comparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) - if err = c.updatePodDisruptionBudget(newPDB); err != nil { + if err = c.updateGeneralPodDisruptionBudget(newPDB); err != nil { return err } } else { - c.PodDisruptionBudget = pdb + c.GeneralPodDisruptionBudget = pdb } return nil @@ -471,24 +471,83 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not get pod disruption budget: %v", err) } // no existing pod disruption budget, create new one - c.logger.Infof("could not find the cluster's pod disruption budget") + c.logger.Infof("could not find the general pod disruption budget") - if pdb, err = c.createPodDisruptionBudget(); err != nil { + if err = c.createGeneralPodDisruptionBudget(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create general pod disruption budget: %v", err) } c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.generalPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) } } c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.PodDisruptionBudget = pdb + c.GeneralPodDisruptionBudget = pdb return nil } +func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error { + var ( + pdb *policyv1.PodDisruptionBudget + err error + ) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.CriticalOpPodDisruptionBudget = pdb + newPDB := c.generateCriticalOpPodDisruptionBudget() + match, reason := c.comparePodDisruptionBudget(pdb, newPDB) + if !match { + c.logPDBChanges(pdb, newPDB, isUpdate, reason) + if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil { + return err + } + } else { + c.CriticalOpPodDisruptionBudget = pdb + } + return nil + + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get pod disruption budget: %v", err) + } + // no existing pod disruption budget, create new one + c.logger.Infof("could not find pod disruption budget for critical operations") + + if err = c.createCriticalOpPodDisruptionBudget(); err != nil { + if !k8sutil.ResourceAlreadyExists(err) { + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) + } + c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) + } + } + + c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = pdb + + return nil +} + +func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { + errors := make([]string, 0) + + if err := c.syncGeneralPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + func (c *Cluster) syncStatefulSet() error { var ( restartWait uint32 diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 8e9263d49..306f728e6 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -58,15 +58,16 @@ type WorkerStatus struct { // ClusterStatus describes status of the cluster type ClusterStatus struct { - Team string - Cluster string - Namespace string - MasterService *v1.Service - ReplicaService *v1.Service - MasterEndpoint *v1.Endpoints - ReplicaEndpoint *v1.Endpoints - StatefulSet *appsv1.StatefulSet - PodDisruptionBudget *policyv1.PodDisruptionBudget + Team string + Cluster string + Namespace string + MasterService *v1.Service + ReplicaService *v1.Service + MasterEndpoint *v1.Endpoints + ReplicaEndpoint *v1.Endpoints + StatefulSet *appsv1.StatefulSet + GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget CurrentProcess Process Worker uint32 diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 2cb755c6c..9cefa0ba9 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, if err != nil { return nil, err } - _, err = cluster.createPodDisruptionBudget() + err = cluster.createPodDisruptionBudgets() if err != nil { return nil, err }