diff --git a/mq_impl/diskq/consumer.go b/mq_impl/diskq/consumer.go index 57099ad..4a9219f 100644 --- a/mq_impl/diskq/consumer.go +++ b/mq_impl/diskq/consumer.go @@ -11,6 +11,7 @@ import ( "github.com/Ccheers/kratos-mq/mq_impl/diskq/config" "github.com/ccheers/xpkg/sync/routinepool" "github.com/go-kratos/kratos/v2/log" + "github.com/nsqio/go-diskqueue" "golang.org/x/sync/singleflight" ) @@ -24,6 +25,7 @@ type ConsumerImpl struct { mu sync.Mutex sf singleflight.Group consumerChan map[string]chan mq.Message + queueMap map[string]diskqueue.Interface pool routinepool.Pool status uint32 @@ -35,6 +37,7 @@ func NewConsumer(c *config.Config, logger log.Logger) (mq.Consumer, error) { c: c, logger: logger, consumerChan: make(map[string]chan mq.Message), + queueMap: make(map[string]diskqueue.Interface), pool: routinepool.NewPool("[diskq][Consumer]", 4, routinepool.NewConfig()), status: statusRunning, }, nil @@ -53,6 +56,7 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri return ch, nil } queue := gDiskQueueManager.NewDiskQueue(topic, x.c.DataPath, x.c.MaxBytesPerFile, x.c.MinMsgSize, x.c.MaxMsgSize, x.c.SyncEvery, time.Duration(x.c.SyncTimeout)*time.Millisecond, x.logger) + x.queueMap[topic] = queue ch := make(chan mq.Message, 1) x.pool.Go(func(ctx context.Context) { for { @@ -86,7 +90,26 @@ func (x *ConsumerImpl) Close(ctx context.Context) error { } x.mu.Lock() defer x.mu.Unlock() + //drain channel for uniKey := range x.consumerChan { + func() { + for { + select { + case msg := <-x.consumerChan[uniKey]: + body, err := msg.Marshal() + if err != nil { + _ = x.logger.Log(log.LevelError, "module", "queue.Put", "err", err, "body", string(body)) + continue + } + err = x.queueMap[uniKey].Put(body) + if err != nil { + _ = x.logger.Log(log.LevelError, "module", "queue.Put", "err", err, "body", string(body)) + } + default: + return + } + } + }() close(x.consumerChan[uniKey]) } err := gDiskQueueManager.Close(ctx)