From f1c6ef26365a90d6ed1208c0676e1b9ae03488d8 Mon Sep 17 00:00:00 2001 From: Tom Pang Date: Mon, 16 Feb 2026 17:33:40 +0000 Subject: [PATCH] add pg_synchronized_standby_slots_invalid `synchronized_standby_slots` must be a subset of replication slots. However, it's possible to configure postgres so that is not true. This breaks replication, but worse than that, any queries that use parallel background workers will fail to execute. Add a `pg_synchronized_standby_slots_invalid` metric that is `0` when no slots are invalid, and then the metric size is the number of invalid slots to track and detect this. Does not include slot name as label/etc as this could cause unbounded series growth. --- collector/pg_synchronized_standby_slots.go | 87 ++++++++++ .../pg_synchronized_standby_slots_test.go | 162 ++++++++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 collector/pg_synchronized_standby_slots.go create mode 100644 collector/pg_synchronized_standby_slots_test.go diff --git a/collector/pg_synchronized_standby_slots.go b/collector/pg_synchronized_standby_slots.go new file mode 100644 index 000000000..f690e2322 --- /dev/null +++ b/collector/pg_synchronized_standby_slots.go @@ -0,0 +1,87 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + "log/slog" + + "github.com/blang/semver/v4" + "github.com/prometheus/client_golang/prometheus" +) + +const synchronizedStandbySlotsSubsystem = "synchronized_standby_slots" + +func init() { + registerCollector(synchronizedStandbySlotsSubsystem, defaultEnabled, NewPGSynchronizedStandbySlotsCollector) +} + +type PGSynchronizedStandbySlotsCollector struct { + log *slog.Logger +} + +func NewPGSynchronizedStandbySlotsCollector(config collectorConfig) (Collector, error) { + return &PGSynchronizedStandbySlotsCollector{log: config.logger}, nil +} + +var ( + synchronizedStandbySlotsInvalidDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, synchronizedStandbySlotsSubsystem, "invalid"), + "Number of slots listed in synchronized_standby_slots that do not exist as physical replication slots. Non-zero means logical replication is blocked.", + []string{}, + nil, + ) + + synchronizedStandbySlotsQuery = ` +SELECT count(*) AS invalid_count +FROM unnest(string_to_array( + (SELECT setting FROM pg_settings WHERE name = 'synchronized_standby_slots'), + ',' +)) AS configured(slot_name) +WHERE trim(configured.slot_name) != '' + AND NOT EXISTS( + SELECT 1 FROM pg_replication_slots s + WHERE s.slot_name = trim(configured.slot_name) + AND s.slot_type = 'physical' + ) +` +) + +func (c *PGSynchronizedStandbySlotsCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { + if instance.version.LT(semver.MustParse("17.0.0")) { + c.log.Debug("synchronized_standby_slots collector is not available on PostgreSQL < 17, skipping") + return nil + } + + db := instance.getDB() + + var invalidCount sql.NullInt64 + if err := db.QueryRowContext(ctx, synchronizedStandbySlotsQuery).Scan(&invalidCount); err != nil { + return err + } + + value := 0.0 + if invalidCount.Valid { + value = float64(invalidCount.Int64) + } + + ch <- prometheus.MustNewConstMetric( + synchronizedStandbySlotsInvalidDesc, + prometheus.GaugeValue, + value, + ) + + return nil +} diff --git a/collector/pg_synchronized_standby_slots_test.go b/collector/pg_synchronized_standby_slots_test.go new file mode 100644 index 000000000..55ff87db1 --- /dev/null +++ b/collector/pg_synchronized_standby_slots_test.go @@ -0,0 +1,162 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/blang/semver/v4" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/promslog" + "github.com/smartystreets/goconvey/convey" +) + +func newTestSyncStandbySlotsCollector() *PGSynchronizedStandbySlotsCollector { + return &PGSynchronizedStandbySlotsCollector{log: promslog.NewNopLogger()} +} + +func TestPGSynchronizedStandbySlotsBeforePG17(t *testing.T) { + db, _, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub database connection: %s", err) + } + defer db.Close() + + inst := &Instance{db: db, version: semver.MustParse("16.4.0")} + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := newTestSyncStandbySlotsCollector() + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling Update: %s", err) + } + }() + + // No metrics should be emitted for PG < 17 + for range ch { + t.Error("Expected no metrics for PG < 17, but got one") + } +} + +func TestPGSynchronizedStandbySlotsAllValid(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub database connection: %s", err) + } + defer db.Close() + + inst := &Instance{db: db, version: semver.MustParse("17.0.0")} + + mock.ExpectQuery(sanitizeQuery(synchronizedStandbySlotsQuery)). + WillReturnRows(sqlmock.NewRows([]string{"invalid_count"}).AddRow(0)) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := newTestSyncStandbySlotsCollector() + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: 0, metricType: dto.MetricType_GAUGE}, + } + + convey.Convey("All slots valid", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestPGSynchronizedStandbySlotsSomeInvalid(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub database connection: %s", err) + } + defer db.Close() + + inst := &Instance{db: db, version: semver.MustParse("17.0.0")} + + mock.ExpectQuery(sanitizeQuery(synchronizedStandbySlotsQuery)). + WillReturnRows(sqlmock.NewRows([]string{"invalid_count"}).AddRow(2)) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := newTestSyncStandbySlotsCollector() + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: 2, metricType: dto.MetricType_GAUGE}, + } + + convey.Convey("Some slots invalid", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestPGSynchronizedStandbySlotsEmptyGUC(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub database connection: %s", err) + } + defer db.Close() + + inst := &Instance{db: db, version: semver.MustParse("17.0.0")} + + mock.ExpectQuery(sanitizeQuery(synchronizedStandbySlotsQuery)). + WillReturnRows(sqlmock.NewRows([]string{"invalid_count"}).AddRow(0)) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := newTestSyncStandbySlotsCollector() + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: 0, metricType: dto.MetricType_GAUGE}, + } + + convey.Convey("Empty GUC emits 0", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +}