|
| 1 | +## 重新发现PostgreSQL之美 - 7 垂帘听政 异步消息 |
| 2 | + |
| 3 | +### 作者 |
| 4 | +digoal |
| 5 | + |
| 6 | +### 日期 |
| 7 | +2021-05-30 |
| 8 | + |
| 9 | +### 标签 |
| 10 | +PostgreSQL , rule , 规则 , 异步消息 |
| 11 | + |
| 12 | +---- |
| 13 | + |
| 14 | +## 背景 |
| 15 | +视频回放: https://www.bilibili.com/video/BV1Nq4y1j79T/ |
| 16 | + |
| 17 | +场景: |
| 18 | +- 重要数据在写入、更新、删除时 实时告警或转存 |
| 19 | +- 流式数据 (公务车电子围栏、刑侦数据探针、股票数据规则探针、服务器运行情况) 实时预警或事件触发 |
| 20 | +- 危险操作 (DDL) 异步监控 |
| 21 | + |
| 22 | +规则 + 异步消息的优势: |
| 23 | +1、通过规则过滤掉不需要写入的正常数据, 由于业务正常数据通常占比在99%以上, 从而大幅减轻写入量. |
| 24 | +2、传统的利用定时器查询所有数据去发现问题, 还需要在时间、VAL、SID等层面去建立索引, 消耗大量存储, 同时索引增加写入RT, 性能下降. 规则+异步完全规避这个问题. |
| 25 | +3、可以实时发现并预警或触发其他动作 |
| 26 | + |
| 27 | +## 语法 |
| 28 | + |
| 29 | +``` |
| 30 | +postgres=# \h create rule |
| 31 | +Command: CREATE RULE |
| 32 | +Description: define a new rewrite rule |
| 33 | +Syntax: |
| 34 | +CREATE [ OR REPLACE ] RULE name AS ON event |
| 35 | + TO table_name [ WHERE condition ] |
| 36 | + DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) } |
| 37 | + |
| 38 | +where event can be one of: |
| 39 | + |
| 40 | + SELECT | INSERT | UPDATE | DELETE |
| 41 | + |
| 42 | +URL: https://www.postgresql.org/docs/14/sql-createrule.html |
| 43 | + |
| 44 | + |
| 45 | +postgres=# \h listen |
| 46 | +Command: LISTEN |
| 47 | +Description: listen for a notification |
| 48 | +Syntax: |
| 49 | +LISTEN channel |
| 50 | + |
| 51 | +URL: https://www.postgresql.org/docs/14/sql-listen.html |
| 52 | + |
| 53 | +postgres=# \h notify |
| 54 | +Command: NOTIFY |
| 55 | +Description: generate a notification |
| 56 | +Syntax: |
| 57 | +NOTIFY channel [ , payload ] |
| 58 | + |
| 59 | +URL: https://www.postgresql.org/docs/14/sql-notify.html |
| 60 | + |
| 61 | +postgres=# \df *.*channel* |
| 62 | + List of functions |
| 63 | + Schema | Name | Result data type | Argument data types | Type |
| 64 | +------------+-----------------------+------------------+---------------------+------ |
| 65 | + pg_catalog | pg_listening_channels | SETOF text | | func |
| 66 | +(1 row) |
| 67 | + |
| 68 | +postgres=# \df *.*notify* |
| 69 | + List of functions |
| 70 | + Schema | Name | Result data type | Argument data types | Type |
| 71 | +------------+-----------+------------------+---------------------+------ |
| 72 | + pg_catalog | pg_notify | void | text, text | func |
| 73 | +(1 row) |
| 74 | +``` |
| 75 | + |
| 76 | +## 例子 |
| 77 | +机房传感器 |
| 78 | + |
| 79 | + |
| 80 | +``` |
| 81 | +create table tbl_sensor_log ( |
| 82 | +id serial8 primary key, |
| 83 | +sid int, |
| 84 | +val jsonb, |
| 85 | +crt_time timestamp |
| 86 | +); |
| 87 | +``` |
| 88 | + |
| 89 | +定义规则, 发现异常数据向alert通道发送消息 |
| 90 | + |
| 91 | +``` |
| 92 | +create or replace rule r1 as on insert |
| 93 | +to tbl_sensor_log |
| 94 | +where coalesce(val['temp']::float4,0) >= 60 |
| 95 | +or coalesce(val['cpu_perct']::float4,0) >= 80 |
| 96 | +or coalesce(val['mem_perct']::float4,0) >= 80 |
| 97 | +or coalesce(val['io_perct']::float4,0) >= 80 |
| 98 | +do also |
| 99 | +select pg_notify('alert', format('sensor: %s, ts:%s, val:%s', NEW.sid, NEW.crt_time, NEW.val)); |
| 100 | +``` |
| 101 | + |
| 102 | +定义规则(可选), 正常数据不写入 |
| 103 | + |
| 104 | +``` |
| 105 | +create or replace rule r2 as on insert |
| 106 | +to tbl_sensor_log |
| 107 | +where not (coalesce(val['temp']::float4,0) >= 60 |
| 108 | +or coalesce(val['cpu_perct']::float4,0) >= 80 |
| 109 | +or coalesce(val['mem_perct']::float4,0) >= 80 |
| 110 | +or coalesce(val['io_perct']::float4,0) >= 80) |
| 111 | +do instead NOTHING; |
| 112 | +``` |
| 113 | + |
| 114 | +``` |
| 115 | +postgres=# \d+ tbl_sensor_log; |
| 116 | + Table "public.tbl_sensor_log" |
| 117 | + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description |
| 118 | +----------+-----------------------------+-----------+----------+--------------------------------------------+----------+-------------+--------------+------------- |
| 119 | + id | bigint | | not null | nextval('tbl_sensor_log_id_seq'::regclass) | plain | | | |
| 120 | + sid | integer | | | | plain | | | |
| 121 | + val | jsonb | | | | extended | pglz | | |
| 122 | + crt_time | timestamp without time zone | | | | plain | | | |
| 123 | +Indexes: |
| 124 | + "tbl_sensor_log_pkey" PRIMARY KEY, btree (id) |
| 125 | +Rules: |
| 126 | + r1 AS |
| 127 | + ON INSERT TO tbl_sensor_log |
| 128 | + WHERE COALESCE(new.val['temp'::text]::real, 0::real) >= 60::double precision OR COALESCE(new.val['cpu_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['mem_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['io_perct'::text]::real, 0::real) >= 80::double precision DO SELECT pg_notify('alert'::text, format('sensor: %s, val:%s'::text, new.sid, new.val)) AS pg_notify |
| 129 | +Access method: heap |
| 130 | +``` |
| 131 | + |
| 132 | + |
| 133 | +压测 |
| 134 | + |
| 135 | +``` |
| 136 | +CREATE TYPE sensor_js AS (temp float4, cpu_perct float4, mem_perct float4, io_perct float4); |
| 137 | + |
| 138 | +insert into tbl_sensor_log (sid,val,crt_time) |
| 139 | +values ( |
| 140 | + 1, |
| 141 | + row_to_json(row(1,80.1,2,99.11)::sensor_js)::jsonb, |
| 142 | + now() |
| 143 | +); |
| 144 | + |
| 145 | + |
| 146 | +vi test.sql |
| 147 | +\set sid random(1,1000000) |
| 148 | +\set v1 random(1,61) |
| 149 | +\set v2 random(1,81) |
| 150 | +\set v3 random(1,81) |
| 151 | +\set v4 random(1,81) |
| 152 | +insert into tbl_sensor_log (sid,val,crt_time) |
| 153 | +values (:sid, row_to_json(row(:v1,:v2,:v3,:v4)::sensor_js)::jsonb,now()); |
| 154 | + |
| 155 | + |
| 156 | +pgbench -M prepared -n -r -P 1 -f ./test.sql -c 5 -j 5 -T 120 |
| 157 | +``` |
| 158 | + |
| 159 | +开启其他会话, 监听alert这个通道的异步消息. |
| 160 | + |
| 161 | +PG 的异步消息为广播模式. 可以在多个会话监听同一个通道, 如果有多个业务希望接收同一类异步消息, 则可以这么做. |
| 162 | + |
| 163 | +``` |
| 164 | +listen alter; |
| 165 | + |
| 166 | +Asynchronous notification "alert" with payload "sensor: 459294, val:{"temp": 32, "io_perct": 81, "cpu_perct": 76, "mem_perct": 39}" received from server process with PID 1715. |
| 167 | +Asynchronous notification "alert" with payload "sensor: 788337, val:{"temp": 60, "io_perct": 34, "cpu_perct": 12, "mem_perct": 53}" received from server process with PID 1714. |
| 168 | +Asynchronous notification "alert" with payload "sensor: 421071, val:{"temp": 7, "io_perct": 81, "cpu_perct": 12, "mem_perct": 14}" received from server process with PID 1716. |
| 169 | +Asynchronous notification "alert" with payload "sensor: 523366, val:{"temp": 13, "io_perct": 45, "cpu_perct": 70, "mem_perct": 80}" received from server process with PID 1713. |
| 170 | +Asynchronous notification "alert" with payload "sensor: 94909, val:{"temp": 57, "io_perct": 1, "cpu_perct": 32, "mem_perct": 81}" received from server process with PID 1713. |
| 171 | +Asynchronous notification "alert" with payload "sensor: 13910, val:{"temp": 61, "io_perct": 39, "cpu_perct": 39, "mem_perct": 2}" received from server process with PID 1714. |
| 172 | +Asynchronous notification "alert" with payload "sensor: 252342, val:{"temp": 7, "io_perct": 31, "cpu_perct": 80, "mem_perct": 13}" received from server process with PID 1714. |
| 173 | +Asynchronous notification "alert" with payload "sensor: 222983, val:{"temp": 56, "io_perct": 76, "cpu_perct": 80, "mem_perct": 25}" received from server process with PID 1715. |
| 174 | +Asynchronous notification "alert" with payload "sensor: 913661, val:{"temp": 60, "io_perct": 23, "cpu_perct": 80, "mem_perct": 9}" received from server process with PID 1716. |
| 175 | +``` |
| 176 | + |
| 177 | + |
| 178 | +压测数据分析: |
| 179 | + |
| 180 | +1、在不开启rule时, 写入速度比开启rule快, 因为rule里面有CPU运算. 增加了RT. |
| 181 | + |
| 182 | +但是这是纯计算, 没有IO, 内存等开销. 总体效率绝对比定时器后查询快很多. |
| 183 | + |
| 184 | +``` |
| 185 | +progress: 1.0 s, 63373.9 tps, lat 0.078 ms stddev 0.066 |
| 186 | +progress: 2.0 s, 67591.2 tps, lat 0.074 ms stddev 0.044 |
| 187 | +progress: 3.0 s, 66330.3 tps, lat 0.075 ms stddev 0.039 |
| 188 | +progress: 4.0 s, 65786.8 tps, lat 0.076 ms stddev 0.038 |
| 189 | +progress: 5.0 s, 65436.3 tps, lat 0.076 ms stddev 0.043 |
| 190 | +progress: 6.0 s, 64276.1 tps, lat 0.077 ms stddev 0.042 |
| 191 | +progress: 7.0 s, 59162.6 tps, lat 0.084 ms stddev 0.045 |
| 192 | +progress: 8.0 s, 53887.5 tps, lat 0.092 ms stddev 0.048 |
| 193 | +``` |
| 194 | + |
| 195 | +``` |
| 196 | +progress: 1.0 s, 43413.8 tps, lat 0.114 ms stddev 0.084 |
| 197 | +progress: 2.0 s, 42803.5 tps, lat 0.116 ms stddev 0.040 |
| 198 | +progress: 3.0 s, 40092.0 tps, lat 0.124 ms stddev 0.176 |
| 199 | +progress: 4.0 s, 41419.0 tps, lat 0.120 ms stddev 0.046 |
| 200 | +progress: 5.0 s, 41637.6 tps, lat 0.120 ms stddev 0.040 |
| 201 | +progress: 6.0 s, 41918.2 tps, lat 0.119 ms stddev 0.040 |
| 202 | +progress: 7.0 s, 41753.3 tps, lat 0.119 ms stddev 0.038 |
| 203 | +progress: 8.0 s, 35983.6 tps, lat 0.139 ms stddev 0.042 |
| 204 | +``` |
| 205 | + |
| 206 | +在mac book pro上数据轻松破百万 |
| 207 | + |
| 208 | +``` |
| 209 | +postgres=# select count(*) from tbl_sensor_log; |
| 210 | + count |
| 211 | +--------- |
| 212 | + 2624221 |
| 213 | +(1 row) |
| 214 | +``` |
| 215 | + |
| 216 | +## 其他异步消息应用 |
| 217 | + |
| 218 | +##### 202103/20210311_03.md [《Postgres Notify for Real Time Dashboards》](../202103/20210311_03.md) |
| 219 | +##### 201807/20180716_01.md [《PostgreSQL 异步消息(LISTEN/NOTIFY)缓存多大?》](../201807/20180716_01.md) |
| 220 | +##### 201807/20180713_03.md [《PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)》](../201807/20180713_03.md) |
| 221 | +##### 201711/20171111_01.md [《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》](../201711/20171111_01.md) |
| 222 | +##### 201710/20171018_03.md [《[未完待续] PGQ 异步消息队列的使用》](../201710/20171018_03.md) |
| 223 | +##### 201709/20170925_02.md [《PostgreSQL 事件触发器应用 - DDL审计记录 + 异步通知(notify)》](../201709/20170925_02.md) |
| 224 | +##### 201701/20170116_01.md [《从电波表到数据库小程序之 - 数据库异步广播(notify/listen)》](../201701/20170116_01.md) |
| 225 | +##### 201111/20111122_01.md [《PostgreSQL Notify/Listen Like ESB》](../201111/20111122_01.md) |
| 226 | +##### 201701/20170113_03.md [《从微信小程序 到 数据库"小程序" , 鬼知道我经历了什么》](../201701/20170113_03.md) |
| 227 | + |
| 228 | + |
| 229 | + |
| 230 | + |
| 231 | + |
| 232 | +#### [PostgreSQL 许愿链接](https://github.com/digoal/blog/issues/76 "269ac3d1c492e938c0191101c7238216") |
| 233 | +您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。[开不开森](https://github.com/digoal/blog/issues/76 "269ac3d1c492e938c0191101c7238216"). |
| 234 | + |
| 235 | + |
| 236 | +#### [9.9元购买3个月阿里云RDS PostgreSQL实例](https://www.aliyun.com/database/postgresqlactivity "57258f76c37864c6e6d23383d05714ea") |
| 237 | + |
| 238 | + |
| 239 | +#### [PostgreSQL 解决方案集合](https://yq.aliyun.com/topic/118 "40cff096e9ed7122c512b35d8561d9c8") |
| 240 | + |
| 241 | + |
| 242 | +#### [德哥 / digoal's github - 公益是一辈子的事.](https://github.com/digoal/blog/blob/master/README.md "22709685feb7cab07d30f30387f0a9ae") |
| 243 | + |
| 244 | + |
| 245 | + |
| 246 | + |
0 commit comments