Skip to content

Files

Latest commit

Dec 15, 2023
f400df2 · Dec 15, 2023

History

History
191 lines (142 loc) · 9.01 KB

20210719_03.md

File metadata and controls

191 lines (142 loc) · 9.01 KB

PostgreSQL 逻辑复制支持in progress 事务, 大事务复制性能提升2倍以上.

作者

digoal

日期

2021-07-19

标签

PostgreSQL , 大事务 , 逻辑复制


背景

http://amitkapila16.blogspot.com/2021/07/logical-replication-of-in-progress.html

这个功能2017年提的proposal, 2020年竣工. 涉及多个feature和commits.

This feature was proposed in 2017 and committed in 2020 as part of various commits 0bead9af48, c55040ccd0, 45fdc9738b, 7259736a6e, and 464824323e.

1、wal sender端. logical_decoding_work_mem, 超过写磁盘(或streaming send). 解决大事务导致wal sender进程占用大量内存.

streaming is triggered when the total amount of changes decoded from the WAL (for all in-progress transactions) exceeds the limit defined by the logical_decoding_work_mem setting.

2、wal sender端. 找出子事务关联的top事务. 每个子事务的wal日志开端写一笔 XLOG_XACT_ASSIGNMENT WAL record.
Before PostgreSQL-14, we build this association at XLOG_XACT_ASSIGNMENT WAL record which we normally log after 64 subtransactions or at commit time because these are the only two times when we get such an association in the WAL. To find this association as it happened, we now also write the assignment info into WAL immediately, as part of the first WAL record for each subtransaction. This is done only when wal_level=logical to minimize the overhead.

3、wal sender端. 实时的relcache失效处理(类似在必要时提供discard all command, 用于刷新schema, 反映最新的catalog.). For this, when wal_level=logical, we write invalidations at the command end into WAL so that decoding can use this information.
Yet, another thing that is required for incremental decoding was to process invalidations at each command end. The basic idea of invalidations is that they make the caches (like relation cache) up-to-date to allow the next command to use up-to-date schema. This was required to correctly decode WAL incrementally as while decoding we will use the relation attributes from the caches. For this, when wal_level=logical, we write invalidations at the command end into WAL so that decoding can use this information. The invalidations are decoded and accumulated in top-transaction, and then executed during replay. This obviates the need to decode the invalidations as part of a commit record.

/*  
 * Invalidations for standby, currently only when transactions without an  
 * assigned xid commit.  
 */  
typedef struct xl_invalidations  
{  
        Oid                     dbId;                   /* MyDatabaseId */  
        Oid                     tsId;                   /* MyDatabaseTableSpace */  
        bool            relcacheInitFileInval;  /* invalidate relcache init files */  
        int                     nmsgs;                  /* number of shared inval msgs */  
        SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];  
} xl_invalidations;  

4、OPENAPI

When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded changes are transmitted, the transaction can be committed using the stream_commit_cb callback (or possibly aborted using the stream_abort_cb callback). One example sequence of streaming transaction may look like the following:

/* Change logical_decoding_work_mem to 64kB in the session */  
postgres=# show logical_decoding_work_mem;  
 logical_decoding_work_mem  
---------------------------  
 64kB  
(1 row)  
postgres=# CREATE TABLE stream_test(data text);  
CREATE TABLE  
postgres=# SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');  
 ?column?  
----------  
 init  
(1 row)  
postgres=# INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 500) g(i);  
INSERT 0 500  
postgres=# SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '1', 'skip-empty-xacts', '1', 'stream-changes', '1');  
                       data  
--------------------------------------------------  
 opening a streamed block for transaction TXN 741  
 streaming change for TXN 741  
 streaming change for TXN 741  
 streaming change for TXN 741  
...  
...  
 streaming change for TXN 741  
 streaming change for TXN 741  
 streaming change for TXN 741  
 closing a streamed block for transaction TXN 741  
 opening a streamed block for transaction TXN 741  
 streaming change for TXN 741  
 streaming change for TXN 741  
 streaming change for TXN 741  
...  
...  
 streaming change for TXN 741  
 streaming change for TXN 741  
 closing a streamed block for transaction TXN 741  
 committing streamed transaction TXN 741  
(505 rows)  

5、内置逻辑复制改造

(a) Extend the logical replication protocol to identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). Refer to PostgreSQL docs for the protocol details.

(b) Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol.

(c) Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit.

(d) Provide a new option for streaming while creating a subscription.

Publisher node:  
  
Set logical_decoding_work_mem = '64kB';  
# Set up publication with some initial data  
  
CREATE TABLE test_tab (a int primary key, b varchar);  
INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar');  
CREATE PUBLICATION tap_pub FOR TABLE test_tab;  
Subscriber node:  
  
CREATE TABLE test_tab (a int primary key, b varchar);  
CREATE SUBSCRIPTION tap_sub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION tap_pub WITH (streaming = on);  
Publisher Node:  
  
# Ensure the corresponding replication slot is created on publisher node  
select slot_name, plugin, slot_type from pg_replication_slots;  
 slot_name |  plugin  | slot_type  
-----------+----------+-----------  
 tap_sub   | pgoutput | logical  
(1 row)  
  
# Confirm there is no streamed bytes yet  
postgres=# SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;  
 slot_name | stream_txns | stream_count | stream_bytes  
-----------+-------------+--------------+--------------  
 tap_sub   |           0 |            0 |            0  
(1 row)  
  
# Insert, update and delete enough rows to exceed the logical_decoding_work_mem (64kB) limit.  
BEGIN;  
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);  
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;  
DELETE FROM test_tab WHERE mod(a,3) = 0;  
  
# Confirm that streaming happened  
SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;  
 slot_name | stream_txns | stream_count | stream_bytes  
-----------+-------------+--------------+--------------  
 tap_sub   |           1 |           22 |      1444410  
(1 row)  
  
Subscriber Node:  
# The streamed data is still not visible.  
select * from test_tab;  
 a |  b  
---+-----  
 1 | foo  
 2 | bar  
(2 rows)  
  
Publisher Node:  
# Commit the large transactions  
Commit;  
  
Subscriber Node:  
# The data must be visible on the subscriber  
select count(*) from test_tab;  
 count  
-------  
  3334  
(1 row)  

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

digoal's wechat