diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 89f25cf2..2b85bd1e 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -133,6 +133,39 @@ spec: uid: format: uuid type: string + pgbackrest: + type: object + properties: + configuration: + type: object + properties: + secret: + type: string + options: + type: object + additionalProperties: + type: string + repo: + type: object + properties: + storage: + type: string + enum: + - "s3" + - "gcs" + - "azure" + - "pvc" + resource: + type: string + endpoint: + type: string + region: + type: string + required: + - storage + - resource + required: + - repo connectionPooler: type: object properties: diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 8962567b..e1a9837a 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -440,9 +440,11 @@ configMultisite: # Must be unique for each site. site: "" # IP address or hostname of shared etcd cluster used for multicluster operation. - etcd_host: "" - etcd_user: "" - etcd_password: "" + etcd: + hosts: + user: "" + password: "" + protocol: http # Timeout for cross site failover, and timeout for demoting to read only when accessing shared etcd cluster fails. # There should be adequate safety margin between the two to allow for demotion to take place. #ttl: 90 diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index b7d7ead1..20b32fe8 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -131,6 +131,39 @@ spec: uid: format: uuid type: string + pgbackrest: + type: object + properties: + configuration: + type: object + properties: + secret: + type: string + options: + type: object + additionalProperties: + type: string + repo: + type: object + properties: + storage: + type: string + enum: + - "s3" + - "gcs" + - "azure" + - "pvc" + resource: + type: string + endpoint: + type: string + region: + type: string + required: + - storage + - resource + required: + - repo connectionPooler: type: object properties: diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go index 0e9da3d2..d85a0a84 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go @@ -213,6 +213,68 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ Type: "string", Format: "uuid", }, + "pgbackrest": { + Type: "object", + Properties: map[string]apiextv1.JSONSchemaProps{ + "configuration": { + Type: "object", + Properties: map[string]apiextv1.JSONSchemaProps{ + "secret": { + Type: "string", + }, + }, + }, + "options": { + Type: "object", + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + XPreserveUnknownFields: util.True(), + }, + }, + }, + "repo": { + Type: "object", + Properties: map[string]apiextv1.JSONSchemaProps{ + "storage": { + Type: "string", + Enum: []apiextv1.JSON{ + { + Raw: []byte(`"s3"`), + }, + { + Raw: []byte(`"gcs"`), + }, + { + Raw: []byte(`"azure"`), + }, + { + Raw: []byte(`"pvc"`), + }, + }, + }, + "resource": { + Type: "string", + }, + "endpoint": { + Type: "string", + }, + "region": { + Type: "string", + }, + "account": { + Type: "string", + }, + "key": { + Type: "string", + }, + "keyType": { + Type: "string", + }, + }, + }, + }, + }, }, }, "connectionPooler": { diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go index 66c94e0c..4070e1e8 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go @@ -203,14 +203,15 @@ type TLSDescription struct { // CloneDescription describes which cluster the new should clone and up to which point in time type CloneDescription struct { - ClusterName string `json:"cluster,omitempty"` - UID string `json:"uid,omitempty"` - EndTimestamp string `json:"timestamp,omitempty"` - S3WalPath string `json:"s3_wal_path,omitempty"` - S3Endpoint string `json:"s3_endpoint,omitempty"` - S3AccessKeyId string `json:"s3_access_key_id,omitempty"` - S3SecretAccessKey string `json:"s3_secret_access_key,omitempty"` - S3ForcePathStyle *bool `json:"s3_force_path_style,omitempty" defaults:"false"` + ClusterName string `json:"cluster,omitempty"` + UID string `json:"uid,omitempty"` + EndTimestamp string `json:"timestamp,omitempty"` + S3WalPath string `json:"s3_wal_path,omitempty"` + S3Endpoint string `json:"s3_endpoint,omitempty"` + S3AccessKeyId string `json:"s3_access_key_id,omitempty"` + S3SecretAccessKey string `json:"s3_secret_access_key,omitempty"` + S3ForcePathStyle *bool `json:"s3_force_path_style,omitempty" defaults:"false"` + Pgbackrest *PgbackrestClone `json:"pgbackrest,omitempty"` } // Sidecar defines a container to be run in the same pod as the Postgres container. @@ -283,6 +284,12 @@ type Pgbackrest struct { Resources *Resources `json:"resources,omitempty"` } +type PgbackrestClone struct { + Repo Repo `json:"repo"` + Options map[string]string `json:"options"` + Configuration Configuration `json:"configuration"` +} + type Repo struct { Name string `json:"name"` Storage string `json:"storage"` diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go index a98323e2..8d5b34be 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go @@ -101,6 +101,11 @@ func (in *CloneDescription) DeepCopyInto(out *CloneDescription) { *out = new(bool) **out = **in } + if in.Pgbackrest != nil { + in, out := &in.Pgbackrest, &out.Pgbackrest + *out = new(PgbackrestClone) + (*in).DeepCopyInto(*out) + } return } @@ -770,6 +775,31 @@ func (in *Pgbackrest) DeepCopy() *Pgbackrest { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PgbackrestClone) DeepCopyInto(out *PgbackrestClone) { + *out = *in + in.Repo.DeepCopyInto(&out.Repo) + if in.Options != nil { + in, out := &in.Options, &out.Options + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + out.Configuration = in.Configuration + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PgbackrestClone. +func (in *PgbackrestClone) DeepCopy() *PgbackrestClone { + if in == nil { + return nil + } + out := new(PgbackrestClone) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PostgresPodResourcesDefaults) DeepCopyInto(out *PostgresPodResourcesDefaults) { *out = *in diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 6e4b0398..615df9d6 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -366,6 +366,12 @@ func (c *Cluster) Create() (err error) { c.logger.Info("a monitoring secret was successfully created") } + if specHasPgbackrestClone(&c.Postgresql.Spec) { + if err := c.createPgbackrestCloneConfig(); err != nil { + return fmt.Errorf("could not create pgbackrest clone config: %v", err) + } + } + if c.multisiteEnabled() { c.logger.Infof("waiting for load balancer IP to be assigned") c.waitForPrimaryLoadBalancerIp() @@ -1067,6 +1073,28 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { } }() + // Clone configmap for pgbackrest + func() { + if specHasPgbackrestClone(&oldSpec.Spec) { + if specHasPgbackrestClone(&newSpec.Spec) { + // TODO: if we know cluster state and it has been initialized, then should ignore this + if err := c.updatePgbackrestCloneConfig(); err != nil { + c.logger.Warningf("could not update pgbackrest clone config: %v", err) + updateFailed = true + } + } else { + if err := c.deletePgbackrestCloneConfig(); err != nil { + c.logger.Warningf("could not delete pgbackrest clone config: %v", err) + } + } + } else if specHasPgbackrestClone(&newSpec.Spec) { + c.logger.Warningf("Can't add a clone specification after cluster has been initialized") + updateFailed = true + } else { + // TODO: try to delete just in case? + } + }() + // Statefulset func() { oldSs, err := c.generateStatefulSet(&oldSpec.Spec) @@ -1225,6 +1253,10 @@ func specHasPgbackrestPVCRepo(newSpec *cpov1.PostgresSpec) bool { return false } +func specHasPgbackrestClone(newSpec *cpov1.PostgresSpec) bool { + return newSpec.Clone != nil && newSpec.Clone.Pgbackrest != nil +} + func syncResources(a, b *v1.ResourceRequirements) bool { for _, res := range []v1.ResourceName{ v1.ResourceCPU, @@ -1291,6 +1323,10 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not delete pod disruption budget: %v", err) } + if err := c.deletePgbackrestCloneConfig(); err != nil { + c.logger.Warningf("could not delete pgbackrest clone config: %v", err) + } + for _, role := range []PostgresRole{Master, Replica, ClusterPods} { if !c.patroniKubernetesUseConfigMaps() { diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 2a0a04b7..ed3c36da 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1,6 +1,7 @@ package cluster import ( + "bytes" "context" "encoding/json" "fmt" @@ -9,6 +10,7 @@ import ( "sort" "strconv" "strings" + "text/template" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -1043,7 +1045,7 @@ func (c *Cluster) generateSpiloPodEnvVars( envVars = appendEnvVars(envVars, spec.Env...) } - if spec.Clone != nil && spec.Clone.ClusterName != "" { + if spec.Clone != nil && (spec.Clone.ClusterName != "" || spec.Clone.Pgbackrest != nil) { envVars = append(envVars, c.generateCloneEnvironment(spec.Clone)...) } @@ -1522,6 +1524,10 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu } } + if specHasPgbackrestClone(spec) { + additionalVolumes = append(additionalVolumes, c.generatePgbackrestCloneConfigVolumes(spec.Clone)...) + } + // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = c.generatePodTemplate( c.Namespace, @@ -1951,6 +1957,53 @@ func (c *Cluster) generatePgbackrestConfigVolume(backrestSpec *cpov1.Pgbackrest, } } +func (c *Cluster) generatePgbackrestCloneConfigVolumes(description *cpov1.CloneDescription) []cpov1.AdditionalVolume { + defaultMode := int32(0640) + + projections := []v1.VolumeProjection{{ + ConfigMap: &v1.ConfigMapProjection{ + LocalObjectReference: v1.LocalObjectReference{Name: c.getPgbackrestCloneConfigmapName()}, + }, + }} + + if description.Pgbackrest.Configuration.Secret != "" { + projections = append(projections, v1.VolumeProjection{ + Secret: &v1.SecretProjection{ + LocalObjectReference: v1.LocalObjectReference{Name: description.Pgbackrest.Configuration.Secret}, + }, + }) + } + + volumes := []cpov1.AdditionalVolume{ + { + Name: "pgbackrest-clone", + MountPath: "/etc/pgbackrest/clone-conf.d", + VolumeSource: v1.VolumeSource{ + Projected: &v1.ProjectedVolumeSource{ + DefaultMode: &defaultMode, + Sources: projections, + }, + }, + }, + } + + if description.Pgbackrest.Repo.Storage == "pvc" && description.ClusterName != "" { + // Cloning from another cluster, mount that clusters certs + volumes = append(volumes, cpov1.AdditionalVolume{ + Name: "pgbackrest-clone-certs", + MountPath: "/etc/pgbackrest/clone-certs", + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: description.ClusterName + "-pgbackrest-cert", // TODO: refactor name generation + DefaultMode: &defaultMode, + }, + }, + }) + } + + return volumes +} + func (c *Cluster) generateCertSecretVolume() cpov1.AdditionalVolume { defaultMode := int32(0640) @@ -2468,6 +2521,16 @@ func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubse func (c *Cluster) generateCloneEnvironment(description *cpov1.CloneDescription) []v1.EnvVar { result := make([]v1.EnvVar, 0) + if description.Pgbackrest != nil { + result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_PGBACKREST"}) + result = append(result, v1.EnvVar{Name: "CLONE_PGBACKREST_CONFIG", Value: "/etc/pgbackrest/clone-conf.d"}) + if description.EndTimestamp != "" { + result = append(result, v1.EnvVar{Name: "CLONE_TARGET_TIME", Value: description.EndTimestamp}) + } + + return result + } + if description.ClusterName == "" { return result } @@ -2869,6 +2932,10 @@ func (c *Cluster) getPgbackrestRepoHostConfigmapName() (jobName string) { return fmt.Sprintf("%s-pgbackrest-repohost-config", c.Name) } +func (c *Cluster) getPgbackrestCloneConfigmapName() (jobName string) { + return fmt.Sprintf("%s-pgbackrest-clone-config", c.Name) +} + func (c *Cluster) getTDESecretName() string { return fmt.Sprintf("%s-tde", c.Name) } @@ -3057,6 +3124,103 @@ func (c *Cluster) generatePgbackrestRepoHostConfigmap() (*v1.ConfigMap, error) { return configmap, nil } +func (c *Cluster) generatePgbackrestCloneConfigmap(clone *cpov1.CloneDescription) (*v1.ConfigMap, error) { + config := map[string]map[string]string{ + "db": { + "pg1-path": "/home/postgres/pgdata/pgroot/data", + "pg1-port": "5432", + "pg1-socket-path": "/var/run/postgresql/", + }, + "global": { + "log-path": "/home/postgres/pgdata/pgbackrest/log", + "spool-path": "/home/postgres/pgdata/pgbackrest/spool-path", + }, + } + + if clone.Pgbackrest.Options != nil { + maps.Copy(config["global"], clone.Pgbackrest.Options) + } + + repo := clone.Pgbackrest.Repo + repoName := "repo1" + repoConf := func(conf map[string]string) { + for k, v := range conf { + config["global"][repoName+"-"+k] = v + } + } + + switch repo.Storage { + case "pvc": + // TODO: enable Cluster.serviceName to ask for other clusters services + serviceName := fmt.Sprintf("%s-%s", clone.ClusterName, "clusterpods") + // TODO: allow for cross namespace cloning + repoConf(map[string]string{ + "host": clone.ClusterName + "-pgbackrest-repo-host-0." + serviceName + "." + c.Namespace + ".svc." + c.OpConfig.ClusterDomain, + "host-ca-file": "/etc/pgbackrest/clone-certs/pgbackrest.ca-roots", + "host-cert-file": "/etc/pgbackrest/clone-certs/pgbackrest-client.crt", + "host-key-file": "/etc/pgbackrest/clone-certs/pgbackrest-client.key", + "host-type": "tls", + "host-user": "postgres", + }) + case "s3": + repoConf(map[string]string{ + "type": "s3", + "s3-bucket": repo.Resource, + "s3-endpoint": repo.Endpoint, + "s3-region": repo.Region, + }) + case "gcs": + repoConf(map[string]string{ + "type": "gcs", + "gcs-bucket": repo.Resource, + "gcs-key": fmt.Sprintf("/etc/pgbackrest/conf.d/%s", repo.Key), + "gcs-key-type": repo.KeyType, + }) + case "azure": + repoConf(map[string]string{ + "type": "azure", + "azure-container": repo.Resource, + "azure-endpoint": repo.Endpoint, + "azure-key": repo.Key, + "azure-account": repo.Account, + }) + default: + return nil, fmt.Errorf("Invalid repository storage %s", repo.Storage) + } + + confStr, err := renderPgbackrestConfig(config) + if err != nil { + return nil, fmt.Errorf("Error rendering pgbackrest config: %v", err) + } + configmap := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: c.Namespace, + Name: c.getPgbackrestCloneConfigmapName(), + }, + Data: map[string]string{ + "pgbackrest_clone.conf": confStr, + }, + } + return configmap, nil +} + +func renderPgbackrestConfig(config map[string]map[string]string) (string, error) { + var out bytes.Buffer + tpl := template.Must(template.New("pgbackrest_instance.conf").Parse(` +{{- range $section, $config := . }} +[{{ $section }}] + +{{- range $key, $value := . }} +{{ $key }} = {{ $value }} +{{ end -}} +{{ end -}} +`)) + if err := tpl.Execute(&out, config); err != nil { + return "", err + } + return out.String(), nil +} + func (c *Cluster) generatePgbackrestJob(backup *cpov1.Pgbackrest, repo *cpov1.Repo, backupType string, schedule string) (*batchv1.CronJob, error) { var ( diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 24fc7a58..b40e35da 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -786,6 +786,66 @@ func (c *Cluster) updatePgbackrestRepoHostConfig() (err error) { return nil } +func (c *Cluster) createPgbackrestCloneConfig() (err error) { + + c.setProcessName("creating a configmap for pgbackrest clone") + + pgbackrestCloneConfigmapSpec, err := c.generatePgbackrestCloneConfigmap(c.Spec.Clone) + if err != nil { + return fmt.Errorf("could not generate pgbackrest clone configmap spec: %v", err) + } + c.logger.Debugf("Generated clone configmapSpec: %v", pgbackrestCloneConfigmapSpec) + + _, err = c.KubeClient.ConfigMaps(c.Namespace).Create(context.TODO(), pgbackrestCloneConfigmapSpec, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create pgbackrest clone config: %v", err) + } + + return nil +} + +func (c *Cluster) deletePgbackrestCloneConfig() error { + c.setProcessName("deleting pgbackrest configmap clone") + c.logger.Debugln("deleting pgbackrest configmap clone") + + err := c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), c.getPgbackrestCloneConfigmapName(), c.deleteOptions) + if err != nil { + return err + } + c.logger.Infof("configmap %q has been deleted", c.getPgbackrestCloneConfigmapName()) + + return nil +} + +func (c *Cluster) updatePgbackrestCloneConfig() (err error) { + + c.setProcessName("patching configmap for pgbackrest clone") + + pgbackrestCloneConfigmapSpec, err := c.generatePgbackrestCloneConfigmap(c.Spec.Clone) + if err != nil { + return fmt.Errorf("could not generate pgbackrest clone configmap spec: %v", err) + } + c.logger.Debugf("Generated pgbackrest repo-host configmapSpec: %v", pgbackrestCloneConfigmapSpec) + patchData, err := dataPatch(pgbackrestCloneConfigmapSpec.Data) + if err != nil { + return fmt.Errorf("could not form patch for the pgbackrest clone configmap: %v", err) + } + + // update the pgbackrest repo-host configmap + _, err = c.KubeClient.ConfigMaps(c.Namespace).Patch( + context.TODO(), + c.getPgbackrestCloneConfigmapName(), + types.MergePatchType, + patchData, + metav1.PatchOptions{}, + "") + if err != nil { + return fmt.Errorf("could not patch pgbackrest config: %v", err) + } + + return nil +} + func (c *Cluster) createPgbackrestJob(pgbackrestJobSpec *batchv1.CronJob) (err error) { c.setProcessName("creating a k8s cron job for pgbackrest backups")