Skip to content

Commit f69406c

Browse files
committed
feat(diskq): 添加关闭机制
1 parent 8e49455 commit f69406c

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

mq_impl/diskq/consumer.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,19 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
5656
ch := make(chan mq.Message, 1)
5757
x.pool.Go(func(ctx context.Context) {
5858
for {
59-
if atomic.LoadUint32(&x.status) == statusClosed {
60-
return
61-
}
6259
select {
6360
case body := <-queue.ReadChan():
6461
msg, err := mq.NewMessageFromByte(body)
6562
if err != nil {
6663
_ = x.logger.Log(log.LevelError, "module", "NewMessageFromByte", "err", err, "body", string(body))
6764
}
65+
if atomic.LoadUint32(&x.status) == statusClosed {
66+
err = queue.Put(body)
67+
if err != nil {
68+
_ = x.logger.Log(log.LevelError, "module", "queue.Put", "err", err, "body", string(body))
69+
}
70+
return
71+
}
6872
ch <- msg
6973
}
7074
}

0 commit comments

Comments
 (0)