From 2e702aa4b6ae69effc2aca25d167914afe7085a7 Mon Sep 17 00:00:00 2001 From: "huangqing.zhu" Date: Fri, 20 Sep 2024 14:07:47 +0800 Subject: [PATCH] =?UTF-8?q?feat(delay):=20=E5=A2=9E=E5=8A=A0=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E4=BB=BB=E5=8A=A1=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- delay.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/delay.go b/delay.go index 2b536e8..b65a97f 100644 --- a/delay.go +++ b/delay.go @@ -38,6 +38,14 @@ redis.call('ZADD', delay_set, score or 0.0, value) return {true} ` +var delDelayTaskLua = ` +local delay_set, doing_set = KEYS[1], KEYS[2] +local value = ARGV[1] +redis.call('ZREM', doing_set, value) +redis.call('ZREM', delay_set, value) +return {true} +` + var consumeDelayTaskSuccessLua = ` local doing_set = KEYS[1] local value = ARGV[1] @@ -70,6 +78,9 @@ type DelayQueue interface { // Add 添加任务 // bytes 具有唯一性,即相同的 bytes 看做相同的任务,相同的 bytes 会进行覆盖 Add(ctx context.Context, bytes []byte, seconds time.Duration) error + // Del 删除任务 + Del(ctx context.Context, bytes []byte) error + // Length 队列长度 Length(ctx context.Context) (int64, error) // Close 关闭队列 @@ -89,6 +100,7 @@ type delayQueue struct { moveScript Scripter addScript Scripter + delScript Scripter lengthScript Scripter consumeSuccessScript Scripter consumeFailedScript Scripter @@ -110,6 +122,7 @@ func newDelayQueue(c *client, name string, f func([]byte) error, opts ...DelayOp name: name, moveScript: c.CreateScript(moveDelayTaskLua), addScript: c.CreateScript(addDelayTaskLua), + delScript: c.CreateScript(delDelayTaskLua), lengthScript: c.CreateScript(delayTaskLengthLua), consumeSuccessScript: c.CreateScript(consumeDelayTaskSuccessLua), consumeFailedScript: c.CreateScript(consumeDelayTaskFailedLua), @@ -136,6 +149,10 @@ func (q *delayQueue) Add(ctx context.Context, bytes []byte, seconds time.Duratio return q.addScript.Run(ctx, q.pollKeys, bytes, now+sec).Err() } +func (q *delayQueue) Del(ctx context.Context, bytes []byte) error { + return q.delScript.Run(ctx, q.pollKeys, bytes).Err() +} + func (q *delayQueue) Length(ctx context.Context) (int64, error) { return q.lengthScript.Run(ctx, q.pollKeys).Int64() }