Skip to content

Commit 526f0f7

Browse files
committed
feat(diskq): 本地队列 取消 channel 机制
1 parent 922f04d commit 526f0f7

File tree

1 file changed

+10
-19
lines changed

1 file changed

+10
-19
lines changed

mq_impl/diskq/consumer.go

+10-19
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ type ConsumerImpl struct {
2222

2323
mu sync.Mutex
2424
sf singleflight.Group
25-
consumerMap map[string][]chan mq.Message
2625
consumerChan map[string]chan mq.Message
2726
pool routinepool.Pool
2827
}
@@ -33,7 +32,6 @@ func NewConsumer(c *config.Config, logger log.Logger) (mq.Consumer, error) {
3332
c: c,
3433
logger: logger,
3534
consumerChan: make(map[string]chan mq.Message),
36-
consumerMap: make(map[string][]chan mq.Message),
3735
pool: routinepool.NewPool("[diskq][Consumer]", 4, routinepool.NewConfig()),
3836
}, nil
3937
}
@@ -42,40 +40,33 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
4240
x.mu.Lock()
4341
defer x.mu.Unlock()
4442

45-
uniKey := x.generateKey(topic, channel)
46-
if ch, ok := x.consumerChan[uniKey]; ok {
43+
if ch, ok := x.consumerChan[topic]; ok {
4744
return ch, nil
4845
}
4946

50-
_, _, _ = x.sf.Do(topic, func() (interface{}, error) {
51-
if consumer, ok := x.consumerMap[topic]; ok {
52-
return consumer, nil
47+
ch, _, _ := x.sf.Do(topic, func() (interface{}, error) {
48+
if ch, ok := x.consumerChan[topic]; ok {
49+
return ch, nil
5350
}
5451
queue := gDiskQueueManager.NewDiskQueue(topic, x.c.DataPath, x.c.MaxBytesPerFile, x.c.MinMsgSize, x.c.MaxMsgSize, x.c.SyncEvery, time.Duration(x.c.SyncTimeout)*time.Millisecond, x.logger)
52+
ch := make(chan mq.Message, 1)
5553
x.pool.Go(func(ctx context.Context) {
5654
for {
5755
select {
5856
case body := <-queue.ReadChan():
5957
msg, err := mq.NewMessageFromByte(body)
6058
if err != nil {
61-
x.logger.Log(log.LevelError, "module", "NewMessageFromByte", "err", err, "body", string(body))
62-
}
63-
for _, cch := range x.consumerMap[topic] {
64-
select {
65-
case cch <- msg:
66-
default:
67-
}
59+
_ = x.logger.Log(log.LevelError, "module", "NewMessageFromByte", "err", err, "body", string(body))
6860
}
61+
ch <- msg
6962
}
7063
}
7164
})
72-
return queue, nil
65+
return ch, nil
7366
})
7467

75-
ch := make(chan mq.Message, 1024)
76-
x.consumerMap[topic] = append(x.consumerMap[topic], ch)
77-
x.consumerChan[uniKey] = ch
78-
return ch, nil
68+
x.consumerChan[topic] = ch.(chan mq.Message)
69+
return x.consumerChan[topic], nil
7970
}
8071

8172
func (x *ConsumerImpl) Close(ctx context.Context) error {

0 commit comments

Comments
 (0)