You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
with tmp as (delete from tag_log where ctid = any ( array (
158
170
select ctid from tag_log order by crt_time limit 10000 -- 按时序,批量取1万条
159
171
)) returning * )
160
-
, tmp1 as (... 按时序生成合并数据 ...)
161
-
, ... 写入
162
-
, ... 更新
163
-
, ... 删除
164
-
```
165
-
166
-
阅后即焚的处理速度,每秒百万行。
167
-
172
+
, tmp1 as (select imei,
173
+
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
174
+
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
175
+
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
176
+
insert into tbl3 (imei, tagids, ins_tags, del_tags)
177
+
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
178
+
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags;
179
+
```
180
+
181
+
并行任务,阅后即焚
182
+
183
+
```
184
+
例如开启16个并行
185
+
186
+
abs(mod(hashtext(imei), 16))=?
187
+
```
188
+
189
+
```
190
+
-- CTE语法,支持阅后即焚的批量合并方法
191
+
with tmp as (delete from tag_log where ctid = any ( array (
192
+
select ctid from tag_log where abs(mod(hashtext(imei), 16))=0 order by crt_time limit 10000 -- 按时序,批量取1万条,按HASH并行
193
+
)) returning * )
194
+
, tmp1 as (select imei,
195
+
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
196
+
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
197
+
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
198
+
insert into tbl3 (imei, tagids, ins_tags, del_tags)
199
+
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
200
+
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags;
201
+
```
202
+
203
+
写成函数,方便调用
204
+
205
+
```
206
+
create or replace function consume_tag_log(mo int, mov int, lim int) returns void as $$
207
+
declare
208
+
begin
209
+
execute format($_$with tmp as (delete from tag_log where ctid = any ( array (
210
+
select ctid from tag_log where abs(mod(hashtext(imei), %s))=%s order by crt_time limit %s
211
+
)) returning * )
212
+
, tmp1 as (select imei,
213
+
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
214
+
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
215
+
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
216
+
insert into tbl3 (imei, tagids, ins_tags, del_tags)
217
+
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
218
+
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,
219
+
mo, mov, lim);
220
+
end;
221
+
$$ language plpgsql strict;
222
+
223
+
224
+
select consume_tag_log(16,0,10000); -- 并行处理
225
+
select consume_tag_log(16,1,10000);
226
+
.....
227
+
select consume_tag_log(16,15,10000);
228
+
```
229
+
230
+
```
231
+
create or replace function consume_tag_log(lim int) returns void as $$
232
+
declare
233
+
begin
234
+
execute format($_$with tmp as (delete from tag_log where ctid = any ( array (
235
+
select ctid from tag_log order by crt_time limit %s
236
+
)) returning * )
237
+
, tmp1 as (select imei,
238
+
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
239
+
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
240
+
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
241
+
insert into tbl3 (imei, tagids, ins_tags, del_tags)
242
+
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
243
+
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,
0 commit comments