Skip to content

Commit

Permalink
feat(diskq): 添加关闭机制
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Nov 23, 2022
1 parent f69406c commit 690225d
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions mq_impl/diskq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Ccheers/kratos-mq/mq_impl/diskq/config"
"github.com/ccheers/xpkg/sync/routinepool"
"github.com/go-kratos/kratos/v2/log"
"github.com/nsqio/go-diskqueue"
"golang.org/x/sync/singleflight"
)

Expand All @@ -24,6 +25,7 @@ type ConsumerImpl struct {
mu sync.Mutex
sf singleflight.Group
consumerChan map[string]chan mq.Message
queueMap map[string]diskqueue.Interface
pool routinepool.Pool

status uint32
Expand All @@ -35,6 +37,7 @@ func NewConsumer(c *config.Config, logger log.Logger) (mq.Consumer, error) {
c: c,
logger: logger,
consumerChan: make(map[string]chan mq.Message),
queueMap: make(map[string]diskqueue.Interface),
pool: routinepool.NewPool("[diskq][Consumer]", 4, routinepool.NewConfig()),
status: statusRunning,
}, nil
Expand All @@ -53,6 +56,7 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
return ch, nil
}
queue := gDiskQueueManager.NewDiskQueue(topic, x.c.DataPath, x.c.MaxBytesPerFile, x.c.MinMsgSize, x.c.MaxMsgSize, x.c.SyncEvery, time.Duration(x.c.SyncTimeout)*time.Millisecond, x.logger)
x.queueMap[topic] = queue
ch := make(chan mq.Message, 1)
x.pool.Go(func(ctx context.Context) {
for {
Expand Down Expand Up @@ -86,7 +90,26 @@ func (x *ConsumerImpl) Close(ctx context.Context) error {
}
x.mu.Lock()
defer x.mu.Unlock()
//drain channel
for uniKey := range x.consumerChan {
func() {
for {
select {
case msg := <-x.consumerChan[uniKey]:
body, err := msg.Marshal()
if err != nil {
_ = x.logger.Log(log.LevelError, "module", "queue.Put", "err", err, "body", string(body))
continue
}
err = x.queueMap[uniKey].Put(body)
if err != nil {
_ = x.logger.Log(log.LevelError, "module", "queue.Put", "err", err, "body", string(body))
}
default:
return
}
}
}()
close(x.consumerChan[uniKey])
}
err := gDiskQueueManager.Close(ctx)
Expand Down

0 comments on commit 690225d

Please sign in to comment.