Skip to content

Commit

Permalink
Fix incompatibility with Postgres 14.x
Browse files Browse the repository at this point in the history
Postgres v14.0 to v14.5 had an issue with functions taking a `RECORD` typed argument when within
a single transaction this type was instantiated with different actual types (tables). In
combination with my refactorings in Carbonite v0.12.0, extracting two smaller procs from the main
change tracking procedure.

This patch reverts these refactorings and restores compatibility with the affected Postgres
versions. Refactorings weren't exactly pretty, either. But the issues with the 150 line procedure
I keep copying over to new versions remain.

Fixes #112
  • Loading branch information
maltoe committed Aug 21, 2024
1 parent 315f843 commit 63bec17
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pull.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
# - ubuntu-latest (Ubuntu 22) mandates OTP >= 24
# - dialyxir uses `Kernel.then` and hence mandates Elixir >= 1.12
lang: [{otp: '24.3.4.13', elixir: '1.12.3'}, {otp: '26.0.2', elixir: '1.15.5'}, {otp: '26.2.3', elixir: '1.17.1'}]
postgres: ['13.12', '16.0']
postgres: ['13.12', '14.5', '16.0']
services:
postgres:
image: postgres:${{matrix.postgres}}
Expand Down
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
## Unreleased

**New migration patches:** 11

### Fixed

- Reverted refactoring from v0.12 causing an exception in Postgres 14.0 to 14.5 when inserting into or deleting from multiple tables within a single transaction.

## [0.14.1] - 2024-08-20

### Added

* Add `:order_by` option to `Carbonite.Query.changes/2` to either disable default ordering or specify the order.
- Add `:order_by` option to `Carbonite.Query.changes/2` to either disable default ordering or specify the order.

## [0.14.0] - 2024-07-25

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ defmodule MyApp.Repo.Migrations.InstallCarbonite do
use Ecto.Migration

def up do
Carbonite.Migrations.up(1..10)
Carbonite.Migrations.up(1..11)

# For each table that you want to capture changes of, you need to install the trigger.
Carbonite.Migrations.create_trigger(:rabbits)
Expand All @@ -140,7 +140,7 @@ defmodule MyApp.Repo.Migrations.InstallCarbonite do
Carbonite.Migrations.drop_trigger(:rabbits)

# Drop the Carbonite tables.
Carbonite.Migrations.down(10..1)
Carbonite.Migrations.down(11..1)
end
end
```
Expand Down
2 changes: 1 addition & 1 deletion lib/carbonite/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Carbonite.Migrations do
# --------------------------------- patch levels ---------------------------------

@initial_patch 1
@current_patch 10
@current_patch 11

@doc false
@spec initial_patch :: non_neg_integer()
Expand Down
3 changes: 2 additions & 1 deletion lib/carbonite/migrations/v10.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ defmodule Carbonite.Migrations.V10 do

@type up_option :: {:carbonite_prefix, prefix()}

defp create_capture_changes_procedure(prefix) do
@spec create_capture_changes_procedure(prefix) :: :ok
def create_capture_changes_procedure(prefix) do
"""
CREATE OR REPLACE FUNCTION #{prefix}.capture_changes() RETURNS TRIGGER AS
$body$
Expand Down
173 changes: 173 additions & 0 deletions lib/carbonite/migrations/v11.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# SPDX-License-Identifier: Apache-2.0

defmodule Carbonite.Migrations.V11 do
@moduledoc false

use Ecto.Migration
use Carbonite.Migrations.Version
alias Carbonite.Migrations.{V8, V10}

@type prefix :: binary()

defp create_capture_changes_procedure(prefix) do
"""
CREATE OR REPLACE FUNCTION #{prefix}.capture_changes() RETURNS TRIGGER AS
$body$
DECLARE
trigger_row RECORD;
change_row #{prefix}.changes;
pk_source RECORD;
pk_col VARCHAR;
pk_col_val VARCHAR;
BEGIN
/* load trigger config */
WITH settings AS (SELECT NULLIF(current_setting('#{prefix}.override_mode', TRUE), '')::TEXT AS override_mode)
SELECT
primary_key_columns,
excluded_columns,
filtered_columns,
CASE
WHEN settings.override_mode = 'override' AND mode = 'ignore' THEN 'capture'
WHEN settings.override_mode = 'override' AND mode = 'capture' THEN 'ignore'
ELSE COALESCE(settings.override_mode, mode::text)
END AS mode,
store_changed_from
INTO trigger_row
FROM #{prefix}.triggers
JOIN settings ON TRUE
WHERE table_prefix = TG_TABLE_SCHEMA AND table_name = TG_TABLE_NAME;
/* skip if ignored */
IF (trigger_row.mode = 'ignore') THEN
RETURN NULL;
END IF;
/* instantiate change row */
change_row := ROW(
NEXTVAL('#{prefix}.changes_id_seq'),
pg_current_xact_id(),
LOWER(TG_OP::TEXT),
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
NULL,
NULL,
'{}',
NULL,
NULL
);
/* collect table pk */
IF trigger_row.primary_key_columns != '{}' THEN
IF (TG_OP IN ('INSERT', 'UPDATE')) THEN
pk_source := NEW;
ELSIF (TG_OP = 'DELETE') THEN
pk_source := OLD;
END IF;
change_row.table_pk = '{}';
FOREACH pk_col IN ARRAY trigger_row.primary_key_columns LOOP
EXECUTE 'SELECT $1.' || quote_ident(pk_col) || '::TEXT' USING pk_source INTO pk_col_val;
change_row.table_pk := change_row.table_pk || pk_col_val;
END LOOP;
END IF;
/* collect version data */
IF (TG_OP IN ('INSERT', 'UPDATE')) THEN
SELECT to_jsonb(NEW.*) - trigger_row.excluded_columns
INTO change_row.data;
ELSIF (TG_OP = 'DELETE') THEN
SELECT to_jsonb(OLD.*) - trigger_row.excluded_columns
INTO change_row.data;
END IF;
/* change tracking for UPDATEs */
IF (TG_OP = 'UPDATE') THEN
change_row.changed_from = '{}'::JSONB;
SELECT jsonb_object_agg(before.key, before.value)
FROM jsonb_each(to_jsonb(OLD.*) - trigger_row.excluded_columns) AS before
WHERE (change_row.data->before.key)::JSONB != before.value
INTO change_row.changed_from;
SELECT ARRAY(SELECT jsonb_object_keys(change_row.changed_from))
INTO change_row.changed;
/* skip persisting this update if nothing has changed */
IF change_row.changed = '{}' THEN
RETURN NULL;
END IF;
/* persisting the old data is opt-in, discard if not configured. */
IF trigger_row.store_changed_from IS FALSE THEN
change_row.changed_from := NULL;
END IF;
END IF;
/* filtered columns */
SELECT #{prefix}.jsonb_redact_keys(change_row.data, trigger_row.filtered_columns)
INTO change_row.data;
IF change_row.changed_from IS NOT NULL THEN
SELECT #{prefix}.jsonb_redact_keys(change_row.changed_from, trigger_row.filtered_columns)
INTO change_row.changed_from;
END IF;
/* insert, fail gracefully unless transaction record present or NEXTVAL has never been called */
BEGIN
change_row.transaction_id = CURRVAL('#{prefix}.transactions_id_seq');
/* verify that xact_id matches */
IF NOT
EXISTS(
SELECT 1 FROM #{prefix}.transactions
WHERE id = change_row.transaction_id AND xact_id = change_row.transaction_xact_id
)
THEN
RAISE USING ERRCODE = 'foreign_key_violation';
END IF;
INSERT INTO #{prefix}.changes VALUES (change_row.*);
EXCEPTION WHEN foreign_key_violation OR object_not_in_prerequisite_state THEN
RAISE '(carbonite) % on table %.% without prior INSERT into #{prefix}.transactions',
TG_OP, TG_TABLE_SCHEMA, TG_TABLE_NAME USING ERRCODE = 'foreign_key_violation';
END;
RETURN NULL;
END;
$body$
LANGUAGE plpgsql;
"""
|> squish_and_execute()

:ok
end

@type up_option :: {:carbonite_prefix, prefix()}

@impl true
@spec up([up_option()]) :: :ok
def up(opts) do
prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

create_capture_changes_procedure(prefix)
execute("DROP FUNCTION #{prefix}.record_dynamic_varchar_agg;")
execute("DROP FUNCTION #{prefix}.record_dynamic_varchar;")

:ok
end

@type down_option :: {:carbonite_prefix, prefix()}

@impl true
@spec down([down_option()]) :: :ok
def down(opts) do
prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

V8.create_record_dynamic_varchar_procedure(prefix)
V8.create_record_dynamic_varchar_agg_procedure(prefix)
V10.create_capture_changes_procedure(prefix)

:ok
end
end
8 changes: 5 additions & 3 deletions lib/carbonite/migrations/v8.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ defmodule Carbonite.Migrations.V8 do

@type up_option :: {:carbonite_prefix, prefix()}

defp create_record_dynamic_varchar_procedure(prefix) do
@spec create_record_dynamic_varchar_procedure(prefix) :: :ok
def create_record_dynamic_varchar_procedure(prefix) do
"""
CREATE OR REPLACE FUNCTION #{prefix}.record_dynamic_varchar(source RECORD, col VARCHAR)
RETURNS VARCHAR AS
Expand All @@ -29,7 +30,8 @@ defmodule Carbonite.Migrations.V8 do
|> squish_and_execute()
end

defp create_record_dynamic_varchar_agg(prefix) do
@spec create_record_dynamic_varchar_agg_procedure(prefix) :: :ok
def create_record_dynamic_varchar_agg_procedure(prefix) do
"""
CREATE OR REPLACE FUNCTION #{prefix}.record_dynamic_varchar_agg(source RECORD, cols VARCHAR[])
RETURNS VARCHAR[] AS
Expand Down Expand Up @@ -202,7 +204,7 @@ defmodule Carbonite.Migrations.V8 do
prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

create_record_dynamic_varchar_procedure(prefix)
create_record_dynamic_varchar_agg(prefix)
create_record_dynamic_varchar_agg_procedure(prefix)
create_jsonb_redact_keys_procedure(prefix)
create_capture_changes_procedure(prefix)

Expand Down

0 comments on commit 63bec17

Please sign in to comment.