From 22f8b7897fa2e76b6a53b05850c2d43ac93514fd Mon Sep 17 00:00:00 2001 From: thangchung Date: Sun, 13 Nov 2022 22:51:16 +0700 Subject: [PATCH] add more events for barista #3 --- cmd/barista/config.yml | 7 +- cmd/barista/config/config.go | 7 +- cmd/counter/config.yml | 8 +- cmd/counter/config/config.go | 6 +- cmd/counter/main.go | 3 +- docker-compose-full.yaml | 102 +++++++++ docker-compose.yaml | 68 ------ internal/barista/app/app.go | 44 +++- internal/barista/app/migrate.go | 23 +- .../barista_ordered_event_handler.go | 54 ++++- internal/barista/rabbitmq/consumer.go | 193 ---------------- .../barista/rabbitmq/consumer/consumer.go | 215 ++++++++++++++++++ internal/barista/rabbitmq/consumer/options.go | 39 ++++ .../barista/rabbitmq/publisher/options.go | 21 ++ .../barista/rabbitmq/publisher/publisher.go | 89 ++++++++ internal/counter/app/app.go | 71 +++++- internal/counter/app/migrate.go | 23 +- internal/counter/domain/order.go | 38 +++- .../barista_order_updated_event_handler.go | 24 ++ internal/counter/grpc/counter_server.go | 85 ++----- .../counter/rabbitmq/consumer/consumer.go | 215 ++++++++++++++++++ internal/counter/rabbitmq/consumer/options.go | 39 ++++ internal/counter/rabbitmq/publisher.go | 77 ------- .../counter/rabbitmq/publisher/options.go | 21 ++ .../counter/rabbitmq/publisher/publisher.go | 89 ++++++++ 25 files changed, 1084 insertions(+), 477 deletions(-) create mode 100755 docker-compose-full.yaml delete mode 100644 internal/barista/rabbitmq/consumer.go create mode 100644 internal/barista/rabbitmq/consumer/consumer.go create mode 100644 internal/barista/rabbitmq/consumer/options.go create mode 100644 internal/barista/rabbitmq/publisher/options.go create mode 100644 internal/barista/rabbitmq/publisher/publisher.go create mode 100644 internal/counter/features/barista/eventhandlers/barista_order_updated_event_handler.go create mode 100644 internal/counter/rabbitmq/consumer/consumer.go create mode 100644 internal/counter/rabbitmq/consumer/options.go delete mode 100644 internal/counter/rabbitmq/publisher.go create mode 100644 internal/counter/rabbitmq/publisher/options.go create mode 100644 internal/counter/rabbitmq/publisher/publisher.go diff --git a/cmd/barista/config.yml b/cmd/barista/config.yml index 6f95f07..9c89362 100755 --- a/cmd/barista/config.yml +++ b/cmd/barista/config.yml @@ -7,12 +7,7 @@ http: port: 5002 rabbit_mq: - url: amqp://guest:guest@172.22.174.138:5672/ - exchange: orders-exchange - queue: orders-queue - routing_key: orders-routing-key - consumer_tag: orders-consumer - worker_pool_size: 24 + url: amqp://guest:guest@172.25.33.104:5672/ logger: log_level: 'debug' diff --git a/cmd/barista/config/config.go b/cmd/barista/config/config.go index 1196eb2..53832da 100755 --- a/cmd/barista/config/config.go +++ b/cmd/barista/config/config.go @@ -18,12 +18,7 @@ type ( } RabbitMQ struct { - URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"` - Exchange string `env-required:"true" yaml:"exchange" env:"RABBITMQ_Exchange"` - Queue string `env-required:"true" yaml:"queue" env:"RABBITMQ_Queue"` - RoutingKey string `env-required:"true" yaml:"routing_key" env:"RABBITMQ_RoutingKey"` - ConsumerTag string `env-required:"true" yaml:"consumer_tag" env:"RABBITMQ_ConsumerTag"` - WorkerPoolSize int `env-required:"true" yaml:"worker_pool_size" env:"RABBITMQ_WorkerPoolSize"` + URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"` } ) diff --git a/cmd/counter/config.yml b/cmd/counter/config.yml index 82f11ea..97b8233 100755 --- a/cmd/counter/config.yml +++ b/cmd/counter/config.yml @@ -8,14 +8,10 @@ http: postgres: pool_max: 2 - url: postgres://postgres:P@ssw0rd@172.22.174.138:5432/postgres?sslmode=disable + url: postgres://postgres:P@ssw0rd@172.25.33.104:5432/postgres?sslmode=disable rabbit_mq: - url: amqp://guest:guest@172.22.174.138:5672/ - exchange: orders-exchange - queue: orders-queue - routing_key: orders-routing-key - consumer_tag: orders-consumer + url: amqp://guest:guest@172.25.33.104:5672/ product_client: url: 0.0.0.0:5001 diff --git a/cmd/counter/config/config.go b/cmd/counter/config/config.go index 123707f..0acfb77 100755 --- a/cmd/counter/config/config.go +++ b/cmd/counter/config/config.go @@ -25,11 +25,7 @@ type ( } RabbitMQ struct { - URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"` - Exchange string `env-required:"true" yaml:"exchange" env:"RABBITMQ_Exchange"` - Queue string `env-required:"true" yaml:"queue" env:"RABBITMQ_Queue"` - RoutingKey string `env-required:"true" yaml:"routing_key" env:"RABBITMQ_RoutingKey"` - ConsumerTag string `env-required:"true" yaml:"consumer_tag" env:"RABBITMQ_ConsumerTag"` + URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"` } ProductClient struct { diff --git a/cmd/counter/main.go b/cmd/counter/main.go index c41d9b0..0e190dc 100755 --- a/cmd/counter/main.go +++ b/cmd/counter/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "os" "github.com/golang/glog" @@ -19,7 +18,7 @@ func main() { mylog := mylogger.New(cfg.Level) a := app.New(mylog, cfg) - if err = a.Run(context.Background()); err != nil { + if err = a.Run(); err != nil { glog.Fatal(err) os.Exit(1) } diff --git a/docker-compose-full.yaml b/docker-compose-full.yaml new file mode 100755 index 0000000..29fc66d --- /dev/null +++ b/docker-compose-full.yaml @@ -0,0 +1,102 @@ +version: "3" + +services: + postgres: + image: postgres:14-alpine + environment: + - POSTGRES_DB=postgres + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=P@ssw0rd + healthcheck: + test: ["CMD", "pg_isready"] + ports: + - "5432:5432" + networks: + - coffeeshop-network + + rabbitmq: + image: rabbitmq:3.11-management-alpine + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 30s + timeout: 30s + retries: 3 + ports: + - "5672:5672" + - "15672:15672" + networks: + - coffeeshop-network + + proxy: + build: + context: . + dockerfile: ./docker/Dockerfile-proxy + image: go-coffeeshop-proxy + environment: + APP_NAME: 'proxy-service in docker' + GRPC_PRODUCT_HOST: 'product' + GRPC_PRODUCT_PORT: 5001 + GRPC_COUNTER_HOST: 'counter' + GRPC_COUNTER_PORT: 5002 + ports: + - 5000:5000 + depends_on: + - product + - counter + networks: + - coffeeshop-network + + product: + build: + context: . + dockerfile: ./docker/Dockerfile-product + image: go-coffeeshop-product + environment: + APP_NAME: 'product-service in docker' + ports: + - 5001:5001 + networks: + - coffeeshop-network + + counter: + build: + context: . + dockerfile: ./docker/Dockerfile-counter + image: go-coffeeshop-counter + environment: + APP_NAME: 'counter-service in docker' + PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres + RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/ + PRODUCT_CLIENT_URL: product:5001 + ports: + - 5002:5002 + depends_on: + postgres: + condition: service_healthy + rabbitmq: + condition: service_healthy + networks: + - coffeeshop-network + + barista: + build: + context: . + dockerfile: ./docker/Dockerfile-barista + image: go-coffeeshop-barista + environment: + APP_NAME: 'barista-service in docker' + PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres + RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/ + depends_on: + postgres: + condition: service_healthy + rabbitmq: + condition: service_healthy + networks: + - coffeeshop-network + +networks: + coffeeshop-network: diff --git a/docker-compose.yaml b/docker-compose.yaml index 29fc66d..ee6e502 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,74 +29,6 @@ services: - "15672:15672" networks: - coffeeshop-network - - proxy: - build: - context: . - dockerfile: ./docker/Dockerfile-proxy - image: go-coffeeshop-proxy - environment: - APP_NAME: 'proxy-service in docker' - GRPC_PRODUCT_HOST: 'product' - GRPC_PRODUCT_PORT: 5001 - GRPC_COUNTER_HOST: 'counter' - GRPC_COUNTER_PORT: 5002 - ports: - - 5000:5000 - depends_on: - - product - - counter - networks: - - coffeeshop-network - - product: - build: - context: . - dockerfile: ./docker/Dockerfile-product - image: go-coffeeshop-product - environment: - APP_NAME: 'product-service in docker' - ports: - - 5001:5001 - networks: - - coffeeshop-network - - counter: - build: - context: . - dockerfile: ./docker/Dockerfile-counter - image: go-coffeeshop-counter - environment: - APP_NAME: 'counter-service in docker' - PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres - RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/ - PRODUCT_CLIENT_URL: product:5001 - ports: - - 5002:5002 - depends_on: - postgres: - condition: service_healthy - rabbitmq: - condition: service_healthy - networks: - - coffeeshop-network - - barista: - build: - context: . - dockerfile: ./docker/Dockerfile-barista - image: go-coffeeshop-barista - environment: - APP_NAME: 'barista-service in docker' - PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres - RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/ - depends_on: - postgres: - condition: service_healthy - rabbitmq: - condition: service_healthy - networks: - - coffeeshop-network networks: coffeeshop-network: diff --git a/internal/barista/app/app.go b/internal/barista/app/app.go index 243ed7b..811d5de 100644 --- a/internal/barista/app/app.go +++ b/internal/barista/app/app.go @@ -7,9 +7,11 @@ import ( "os/signal" "syscall" + "github.com/pkg/errors" "github.com/thangchung/go-coffeeshop/cmd/barista/config" "github.com/thangchung/go-coffeeshop/internal/barista/features/orders/eventhandlers" - baristaRabbitMQ "github.com/thangchung/go-coffeeshop/internal/barista/rabbitmq" + "github.com/thangchung/go-coffeeshop/internal/barista/rabbitmq/consumer" + "github.com/thangchung/go-coffeeshop/internal/barista/rabbitmq/publisher" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq" ) @@ -41,21 +43,41 @@ func (a *App) Run() error { } defer amqpConn.Close() - handler := eventhandlers.NewBaristaOrderedEventHandler() - consumer, err := baristaRabbitMQ.NewOrderConsumer(amqpConn, handler, a.logger) + // publishers + counterOrderPub, err := publisher.NewPublisher( + amqpConn, + a.logger, + publisher.ExchangeName("counter-order-exchange"), + publisher.BindingKey("counter-order-routing-key"), + publisher.MessageTypeName("counter-order-updated"), + ) + defer counterOrderPub.CloseChan() if err != nil { - a.logger.Fatal("app - Run - baristaRabbitMQ.NewOrderConsumer: %s", err.Error()) + return errors.Wrap(err, "publisher-Counter-NewOrderPublisher") + } + + // event handlers. + handler := eventhandlers.NewBaristaOrderedEventHandler(counterOrderPub) + + // consumers + consumer, err := consumer.NewConsumer( + amqpConn, + handler, + a.logger, + consumer.ExchangeName("barista-order-exchange"), + consumer.QueueName("barista-order-queue"), + consumer.BindingKey("barista-order-routing-key"), + consumer.ConsumerTag("barista-order-consumer"), + consumer.MessageTypeName("barista-order-created"), + ) + + if err != nil { + a.logger.Fatal("app - Run - consumer.NewOrderConsumer: %s", err.Error()) } go func() { - err := consumer.StartConsumer( - a.cfg.RabbitMQ.WorkerPoolSize, - a.cfg.RabbitMQ.Exchange, - a.cfg.RabbitMQ.Queue, - a.cfg.RabbitMQ.RoutingKey, - a.cfg.RabbitMQ.ConsumerTag, - ) + err := consumer.StartConsumer() if err != nil { a.logger.Error("StartConsumer: %v", err) cancel() diff --git a/internal/barista/app/migrate.go b/internal/barista/app/migrate.go index 0299399..52dabba 100644 --- a/internal/barista/app/migrate.go +++ b/internal/barista/app/migrate.go @@ -5,11 +5,13 @@ package app import ( "errors" "fmt" - "log" "os" + "path/filepath" "time" "github.com/golang-migrate/migrate/v4" + "github.com/golang/glog" + // migrate tools _ "github.com/golang-migrate/migrate/v4/database/postgres" _ "github.com/golang-migrate/migrate/v4/source/file" @@ -24,7 +26,7 @@ const ( func init() { databaseURL, ok := os.LookupEnv("PG_URL") if !ok || len(databaseURL) == 0 { - log.Fatalf("migrate: environment variable not declared: PG_URL") + glog.Fatalf("migrate: environment variable not declared: PG_URL") } databaseURL += "?sslmode=disable" @@ -36,30 +38,35 @@ func init() { ) for attempts > 0 { - m, err = migrate.New(fmt.Sprintf("file://%s", _migrationFilePath), databaseURL) + cur, _ := os.Getwd() + dir := filepath.Dir(cur + "/../../..") + + glog.Infoln(fmt.Sprintf("file://%s", dir)) + + m, err = migrate.New(fmt.Sprintf("file://%s/%s", dir, _migrationFilePath), databaseURL) if err == nil { break } - log.Printf("Migrate: postgres is trying to connect, attempts left: %d", attempts) + glog.Infoln("Migrate: postgres is trying to connect, attempts left: %d", attempts) time.Sleep(_defaultTimeout) attempts-- } if err != nil { - log.Fatalf("Migrate: postgres connect error: %s", err) + glog.Fatalf("Migrate: postgres connect error: %s", err) } err = m.Up() defer m.Close() if err != nil && !errors.Is(err, migrate.ErrNoChange) { - log.Fatalf("Migrate: up error: %s", err) + glog.Fatalf("Migrate: up error: %s", err) } if errors.Is(err, migrate.ErrNoChange) { - log.Printf("Migrate: no change") + glog.Infoln("Migrate: no change") return } - log.Printf("Migrate: up success") + glog.Infoln("Migrate: up success") } diff --git a/internal/barista/features/orders/eventhandlers/barista_ordered_event_handler.go b/internal/barista/features/orders/eventhandlers/barista_ordered_event_handler.go index c76e680..10639cc 100644 --- a/internal/barista/features/orders/eventhandlers/barista_ordered_event_handler.go +++ b/internal/barista/features/orders/eventhandlers/barista_ordered_event_handler.go @@ -2,23 +2,71 @@ package eventhandlers import ( "context" + "encoding/json" "fmt" + "time" + "github.com/pkg/errors" + "github.com/thangchung/go-coffeeshop/internal/barista/rabbitmq/publisher" "github.com/thangchung/go-coffeeshop/pkg/event" + "github.com/thangchung/go-coffeeshop/proto/gen" ) type BaristaOrderedEventHandler interface { Handle(context.Context, *event.BaristaOrdered) error } -type DefaultBaristaOrderedEventHandler struct{} +type DefaultBaristaOrderedEventHandler struct { + counterPub *publisher.Publisher +} -func NewBaristaOrderedEventHandler() *DefaultBaristaOrderedEventHandler { - return &DefaultBaristaOrderedEventHandler{} +func NewBaristaOrderedEventHandler(counterPub *publisher.Publisher) *DefaultBaristaOrderedEventHandler { + return &DefaultBaristaOrderedEventHandler{ + counterPub: counterPub, + } } func (h *DefaultBaristaOrderedEventHandler) Handle(ctx context.Context, e *event.BaristaOrdered) error { fmt.Println(e) + delay := calculateDelay(e.ItemType) + time.Sleep(delay) + + message := event.BaristaOrderUpdated{ + OrderID: e.OrderID, + ItemLineID: e.ItemLineID, + Name: e.ItemType.String(), + ItemType: e.ItemType, + MadeBy: "teesee", + TimeIn: time.Now(), + TimeUp: time.Now().Add(5 * time.Minute), + } + + eventBytes, err := json.Marshal(message) + if err != nil { + return errors.Wrap(err, "json.Marshal - events.BaristaOrderUpdated") + } + + if err := h.counterPub.Publish(ctx, eventBytes, "text/plain"); err != nil { + return errors.Wrap(err, "BaristaOrderedEventHandler - Publish") + } + return nil } + +func calculateDelay(itemType gen.ItemType) time.Duration { + switch itemType { + case gen.ItemType_COFFEE_BLACK: + return 5 * time.Second + case gen.ItemType_COFFEE_WITH_ROOM: + return 5 * time.Second + case gen.ItemType_ESPRESSO: + return 7 * time.Second + case gen.ItemType_ESPRESSO_DOUBLE: + return 7 * time.Second + case gen.ItemType_CAPPUCCINO: + return 10 * time.Second + default: + return 3 * time.Second + } +} diff --git a/internal/barista/rabbitmq/consumer.go b/internal/barista/rabbitmq/consumer.go deleted file mode 100644 index 89797f1..0000000 --- a/internal/barista/rabbitmq/consumer.go +++ /dev/null @@ -1,193 +0,0 @@ -package rabbitmq - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/pkg/errors" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/thangchung/go-coffeeshop/internal/barista/features/orders/eventhandlers" - "github.com/thangchung/go-coffeeshop/pkg/event" - log "github.com/thangchung/go-coffeeshop/pkg/logger" -) - -const ( - exchangeKind = "direct" - exchangeDurable = true - exchangeAutoDelete = false - exchangeInternal = false - exchangeNoWait = false - - queueDurable = true - queueAutoDelete = false - queueExclusive = false - queueNoWait = false - - prefetchCount = 1 - prefetchSize = 0 - prefetchGlobal = false - - consumeAutoAck = false - consumeExclusive = false - consumeNoLocal = false - consumeNoWait = false -) - -type OrderConsumer struct { - amqpConn *amqp.Connection - logger *log.Logger - handler eventhandlers.BaristaOrderedEventHandler -} - -func NewOrderConsumer(amqpConn *amqp.Connection, handler eventhandlers.BaristaOrderedEventHandler, logger *log.Logger) (*OrderConsumer, error) { - // ch, err := amqpConn.Channel() - // if err != nil { - // panic(err) - // } - // defer ch.Close() - - return &OrderConsumer{ - amqpConn: amqpConn, - logger: logger, - handler: handler, - }, nil -} - -// CreateChannel Consume messages -func (c *OrderConsumer) CreateChannel(exchangeName, queueName, bindingKey, consumerTag string) (*amqp.Channel, error) { - ch, err := c.amqpConn.Channel() - if err != nil { - return nil, errors.Wrap(err, "Error amqpConn.Channel") - } - - c.logger.Info("Declaring exchange: %s", exchangeName) - err = ch.ExchangeDeclare( - exchangeName, - exchangeKind, - exchangeDurable, - exchangeAutoDelete, - exchangeInternal, - exchangeNoWait, - nil, - ) - if err != nil { - return nil, errors.Wrap(err, "Error ch.ExchangeDeclare") - } - - queue, err := ch.QueueDeclare( - queueName, - queueDurable, - queueAutoDelete, - queueExclusive, - queueNoWait, - nil, - ) - if err != nil { - return nil, errors.Wrap(err, "Error ch.QueueDeclare") - } - - c.logger.Info("Declared queue, binding it to exchange: Queue: %v, messagesCount: %v, "+ - "consumerCount: %v, exchange: %v, bindingKey: %v", - queue.Name, - queue.Messages, - queue.Consumers, - exchangeName, - bindingKey, - ) - - err = ch.QueueBind( - queue.Name, - bindingKey, - exchangeName, - queueNoWait, - nil, - ) - if err != nil { - return nil, errors.Wrap(err, "Error ch.QueueBind") - } - - c.logger.Info("Queue bound to exchange, starting to consume from queue, consumerTag: %v", consumerTag) - - err = ch.Qos( - prefetchCount, // prefetch count - prefetchSize, // prefetch size - prefetchGlobal, // global - ) - if err != nil { - return nil, errors.Wrap(err, "Error ch.Qos") - } - - return ch, nil -} - -func (c *OrderConsumer) worker(ctx context.Context, messages <-chan amqp.Delivery) { - for delivery := range messages { - c.logger.Info("processDeliveries deliveryTag% v", delivery.DeliveryTag) - - switch delivery.Type { - case "barista.ordered": - var payload event.BaristaOrdered - err := json.Unmarshal(delivery.Body, &payload) - - if err != nil { - c.logger.LogError(err) - } - - err = c.handler.Handle(ctx, &payload) - - if err != nil { - if err = delivery.Reject(false); err != nil { - c.logger.Error("Err delivery.Reject: %v", err) - } - c.logger.Error("Failed to process delivery: %v", err) - } else { - err = delivery.Ack(false) - if err != nil { - c.logger.Error("Failed to acknowledge delivery: %v", err) - } - } - default: - fmt.Println("default") - } - } - - c.logger.Info("Deliveries channel closed") -} - -// StartConsumer Start new rabbitmq consumer -func (c *OrderConsumer) StartConsumer(workerPoolSize int, exchange, queueName, bindingKey, consumerTag string) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ch, err := c.CreateChannel(exchange, queueName, bindingKey, consumerTag) - if err != nil { - return errors.Wrap(err, "CreateChannel") - } - defer ch.Close() - - deliveries, err := ch.Consume( - queueName, - consumerTag, - consumeAutoAck, - consumeExclusive, - consumeNoLocal, - consumeNoWait, - nil, - ) - if err != nil { - return errors.Wrap(err, "Consume") - } - - forever := make(chan bool) - - for i := 0; i < workerPoolSize; i++ { - go c.worker(ctx, deliveries) - } - - chanErr := <-ch.NotifyClose(make(chan *amqp.Error)) - c.logger.Error("ch.NotifyClose: %v", chanErr) - <-forever - - return chanErr -} diff --git a/internal/barista/rabbitmq/consumer/consumer.go b/internal/barista/rabbitmq/consumer/consumer.go new file mode 100644 index 0000000..718be14 --- /dev/null +++ b/internal/barista/rabbitmq/consumer/consumer.go @@ -0,0 +1,215 @@ +package consumer + +import ( + "context" + "encoding/json" + + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/thangchung/go-coffeeshop/internal/barista/features/orders/eventhandlers" + "github.com/thangchung/go-coffeeshop/pkg/event" + log "github.com/thangchung/go-coffeeshop/pkg/logger" +) + +const ( + _exchangeKind = "direct" + _exchangeDurable = true + _exchangeAutoDelete = false + _exchangeInternal = false + _exchangeNoWait = false + + _queueDurable = true + _queueAutoDelete = false + _queueExclusive = false + _queueNoWait = false + + _prefetchCount = 1 + _prefetchSize = 0 + _prefetchGlobal = false + + _consumeAutoAck = false + _consumeExclusive = false + _consumeNoLocal = false + _consumeNoWait = false + + _exchangeName = "orders-exchange" + _queueName = "orders-queue" + _bindingKey = "orders-routing-key" + _consumerTag = "orders-consumer" + _messageTypeName = "ordered" + _workerPoolSize = 24 +) + +type Consumer struct { + exchangeName, queueName, bindingKey, consumerTag string + messageTypeName string + workerPoolSize int + amqpConn *amqp.Connection + logger *log.Logger + handler eventhandlers.BaristaOrderedEventHandler +} + +func NewConsumer( + amqpConn *amqp.Connection, + handler eventhandlers.BaristaOrderedEventHandler, + logger *log.Logger, + opts ...Option, +) (*Consumer, error) { + sub := &Consumer{ + amqpConn: amqpConn, + logger: logger, + handler: handler, + exchangeName: _exchangeName, + queueName: _queueName, + bindingKey: _bindingKey, + consumerTag: _consumerTag, + messageTypeName: _messageTypeName, + workerPoolSize: _workerPoolSize, + } + + for _, opt := range opts { + opt(sub) + } + + return sub, nil +} + +// CreateChannel Consume messages. +func (c *Consumer) CreateChannel() (*amqp.Channel, error) { + ch, err := c.amqpConn.Channel() + if err != nil { + return nil, errors.Wrap(err, "Error amqpConn.Channel") + } + + c.logger.Info("Declaring exchange: %s", c.exchangeName) + err = ch.ExchangeDeclare( + c.exchangeName, + _exchangeKind, + _exchangeDurable, + _exchangeAutoDelete, + _exchangeInternal, + _exchangeNoWait, + nil, + ) + + if err != nil { + return nil, errors.Wrap(err, "Error ch.ExchangeDeclare") + } + + queue, err := ch.QueueDeclare( + c.queueName, + _queueDurable, + _queueAutoDelete, + _queueExclusive, + _queueNoWait, + nil, + ) + if err != nil { + return nil, errors.Wrap(err, "Error ch.QueueDeclare") + } + + c.logger.Info("Declared queue, binding it to exchange: Queue: %v, messagesCount: %v, "+ + "consumerCount: %v, exchange: %v, bindingKey: %v", + queue.Name, + queue.Messages, + queue.Consumers, + c.exchangeName, + c.bindingKey, + ) + + err = ch.QueueBind( + queue.Name, + c.bindingKey, + c.exchangeName, + _queueNoWait, + nil, + ) + if err != nil { + return nil, errors.Wrap(err, "Error ch.QueueBind") + } + + c.logger.Info("Queue bound to exchange, starting to consume from queue, consumerTag: %v", c.consumerTag) + + err = ch.Qos( + _prefetchCount, // prefetch count + _prefetchSize, // prefetch size + _prefetchGlobal, // global + ) + if err != nil { + return nil, errors.Wrap(err, "Error ch.Qos") + } + + return ch, nil +} + +func (c *Consumer) worker(ctx context.Context, messages <-chan amqp.Delivery) { + for delivery := range messages { + c.logger.Info("processDeliveries deliveryTag% v", delivery.DeliveryTag) + + switch delivery.Type { + case c.messageTypeName: + var payload event.BaristaOrdered + err := json.Unmarshal(delivery.Body, &payload) + + if err != nil { + c.logger.LogError(err) + } + + err = c.handler.Handle(ctx, &payload) + + if err != nil { + if err = delivery.Reject(false); err != nil { + c.logger.Error("Err delivery.Reject: %v", err) + } + + c.logger.Error("Failed to process delivery: %v", err) + } else { + err = delivery.Ack(false) + if err != nil { + c.logger.Error("Failed to acknowledge delivery: %v", err) + } + } + default: + c.logger.Info("default") + } + } + + c.logger.Info("Deliveries channel closed") +} + +// StartConsumer Start new rabbitmq consumer. +func (c *Consumer) StartConsumer() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := c.CreateChannel() + if err != nil { + return errors.Wrap(err, "CreateChannel") + } + defer ch.Close() + + deliveries, err := ch.Consume( + c.queueName, + c.consumerTag, + _consumeAutoAck, + _consumeExclusive, + _consumeNoLocal, + _consumeNoWait, + nil, + ) + if err != nil { + return errors.Wrap(err, "Consume") + } + + forever := make(chan bool) + + for i := 0; i < c.workerPoolSize; i++ { + go c.worker(ctx, deliveries) + } + + chanErr := <-ch.NotifyClose(make(chan *amqp.Error)) + c.logger.Error("ch.NotifyClose: %v", chanErr) + <-forever + + return chanErr +} diff --git a/internal/barista/rabbitmq/consumer/options.go b/internal/barista/rabbitmq/consumer/options.go new file mode 100644 index 0000000..03e33c1 --- /dev/null +++ b/internal/barista/rabbitmq/consumer/options.go @@ -0,0 +1,39 @@ +package consumer + +type Option func(*Consumer) + +func ExchangeName(exchangeName string) Option { + return func(p *Consumer) { + p.exchangeName = exchangeName + } +} + +func QueueName(queueName string) Option { + return func(p *Consumer) { + p.queueName = queueName + } +} + +func BindingKey(bindingKey string) Option { + return func(p *Consumer) { + p.bindingKey = bindingKey + } +} + +func ConsumerTag(consumerTag string) Option { + return func(p *Consumer) { + p.consumerTag = consumerTag + } +} + +func MessageTypeName(messageTypeName string) Option { + return func(p *Consumer) { + p.messageTypeName = messageTypeName + } +} + +func WorkerPoolSize(workerPoolSize int) Option { + return func(p *Consumer) { + p.workerPoolSize = workerPoolSize + } +} diff --git a/internal/barista/rabbitmq/publisher/options.go b/internal/barista/rabbitmq/publisher/options.go new file mode 100644 index 0000000..665820d --- /dev/null +++ b/internal/barista/rabbitmq/publisher/options.go @@ -0,0 +1,21 @@ +package publisher + +type Option func(*Publisher) + +func ExchangeName(exchangeName string) Option { + return func(p *Publisher) { + p.exchangeName = exchangeName + } +} + +func BindingKey(bindingKey string) Option { + return func(p *Publisher) { + p.bindingKey = bindingKey + } +} + +func MessageTypeName(messageTypeName string) Option { + return func(p *Publisher) { + p.messageTypeName = messageTypeName + } +} diff --git a/internal/barista/rabbitmq/publisher/publisher.go b/internal/barista/rabbitmq/publisher/publisher.go new file mode 100644 index 0000000..77ae59f --- /dev/null +++ b/internal/barista/rabbitmq/publisher/publisher.go @@ -0,0 +1,89 @@ +package publisher + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + log "github.com/thangchung/go-coffeeshop/pkg/logger" +) + +const ( + _publishMandatory = false + _publishImmediate = false + + _exchangeName = "orders-exchange" + _bindingKey = "orders-routing-key" + _messageTypeName = "ordered" +) + +type Publisher struct { + exchangeName, bindingKey string + messageTypeName string + amqpChan *amqp.Channel + amqpConn *amqp.Connection + logger *log.Logger +} + +func NewPublisher(amqpConn *amqp.Connection, logger *log.Logger, opts ...Option) (*Publisher, error) { + ch, err := amqpConn.Channel() + if err != nil { + panic(err) + } + defer ch.Close() + + pub := &Publisher{ + amqpConn: amqpConn, + amqpChan: ch, + logger: logger, + exchangeName: _exchangeName, + bindingKey: _bindingKey, + messageTypeName: _messageTypeName, + } + + for _, opt := range opts { + opt(pub) + } + + return pub, nil +} + +// CloseChan Close messages chan. +func (p *Publisher) CloseChan() { + if err := p.amqpChan.Close(); err != nil { + p.logger.Error("Publisher CloseChan: %v", err) + } +} + +// Publish message. +func (p *Publisher) Publish(ctx context.Context, body []byte, contentType string) error { + ch, err := p.amqpConn.Channel() + if err != nil { + return errors.Wrap(err, "CreateChannel") + } + defer ch.Close() + + p.logger.Info("Publishing message Exchange: %s, RoutingKey: %s", p.exchangeName, p.bindingKey) + + if err := ch.PublishWithContext( + ctx, + p.exchangeName, + p.bindingKey, + _publishMandatory, + _publishImmediate, + amqp.Publishing{ + ContentType: contentType, + DeliveryMode: amqp.Persistent, + MessageId: uuid.New().String(), + Timestamp: time.Now(), + Body: body, + Type: p.messageTypeName, + }, + ); err != nil { + return errors.Wrap(err, "ch.Publish") + } + + return nil +} diff --git a/internal/counter/app/app.go b/internal/counter/app/app.go index e37c188..4b39bfb 100644 --- a/internal/counter/app/app.go +++ b/internal/counter/app/app.go @@ -8,8 +8,10 @@ import ( "github.com/pkg/errors" "github.com/thangchung/go-coffeeshop/cmd/counter/config" "github.com/thangchung/go-coffeeshop/internal/counter/domain" + "github.com/thangchung/go-coffeeshop/internal/counter/features/barista/eventhandlers" counterGrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc" - counterRabbitMQ "github.com/thangchung/go-coffeeshop/internal/counter/rabbitmq" + "github.com/thangchung/go-coffeeshop/internal/counter/rabbitmq/consumer" + counterPublisher "github.com/thangchung/go-coffeeshop/internal/counter/rabbitmq/publisher" "github.com/thangchung/go-coffeeshop/internal/counter/usecase" "github.com/thangchung/go-coffeeshop/internal/counter/usecase/repo" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" @@ -35,9 +37,11 @@ func New(log *mylogger.Logger, cfg *config.Config) *App { } } -func (a *App) Run(ctx context.Context) error { +func (a *App) Run() error { a.logger.Info("Init %s %s\n", a.cfg.Name, a.cfg.Version) + ctx, cancel := context.WithCancel(context.Background()) + // PostgresDB pg, err := postgres.NewPostgresDB(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax)) if err != nil { @@ -63,19 +67,65 @@ func (a *App) Run(ctx context.Context) error { } defer conn.Close() - orderPublisher, err := counterRabbitMQ.NewOrderPublisher(amqpConn, a.cfg, a.logger) + baristaOrderPub, err := counterPublisher.NewPublisher( + amqpConn, + a.logger, + counterPublisher.ExchangeName("barista-order-exchange"), + counterPublisher.BindingKey("barista-order-routing-key"), + counterPublisher.MessageTypeName("barista-order-created"), + ) + defer baristaOrderPub.CloseChan() + if err != nil { - return errors.Wrap(err, "counterRabbitMQ-NewOrderPublisher") + return errors.Wrap(err, "counterRabbitMQ-Barista-NewOrderPublisher") } - defer orderPublisher.CloseChan() + kitchenOrderPub, err := counterPublisher.NewPublisher( + amqpConn, + a.logger, + counterPublisher.ExchangeName("kitchen-order-exchange"), + counterPublisher.BindingKey("kitchen-order-routing-key"), + counterPublisher.MessageTypeName("kitchen-order-created"), + ) + defer kitchenOrderPub.CloseChan() + + if err != nil { + return errors.Wrap(err, "counterRabbitMQ-Kitchen-NewOrderPublisher") + } a.logger.Info("Order Publisher initialized") - var productDomainService domain.ProductDomainService = counterGrpc.NewProductServiceClient(ctx, conn) + var productDomainSvc domain.ProductDomainService = counterGrpc.NewProductServiceClient(ctx, conn) // Use case - queryOrderFulfillmentUseCase := usecase.NewQueryOrderFulfillmentUseCase(ctx, repo.NewQueryOrderFulfillmentRepo(ctx, pg)) + queryOrderFulfillmentUC := usecase.NewQueryOrderFulfillmentUseCase(ctx, repo.NewQueryOrderFulfillmentRepo(ctx, pg)) + + // event handlers. + handler := eventhandlers.NewBaristaOrderUpdatedEventHandler() + + // consumers + consumer, err := consumer.NewConsumer( + amqpConn, + handler, + a.logger, + consumer.ExchangeName("counter-order-exchange"), + consumer.QueueName("counter-order-queue"), + consumer.BindingKey("counter-order-routing-key"), + consumer.ConsumerTag("counter-order-consumer"), + consumer.MessageTypeName("counter-order-updated"), + ) + + if err != nil { + a.logger.Fatal("app - Run - consumer.NewOrderConsumer: %s", err.Error()) + } + + go func() { + err = consumer.StartConsumer() + if err != nil { + a.logger.Error("StartConsumer: %v", err) + cancel() + } + }() // gRPC Server l, err := net.Listen(a.network, a.address) @@ -97,9 +147,10 @@ func (a *App) Run(ctx context.Context) error { amqpConn, a.cfg, a.logger, - queryOrderFulfillmentUseCase, - productDomainService, - *orderPublisher, + queryOrderFulfillmentUC, + productDomainSvc, + *baristaOrderPub, + *kitchenOrderPub, ) go func() { diff --git a/internal/counter/app/migrate.go b/internal/counter/app/migrate.go index 0299399..52dabba 100644 --- a/internal/counter/app/migrate.go +++ b/internal/counter/app/migrate.go @@ -5,11 +5,13 @@ package app import ( "errors" "fmt" - "log" "os" + "path/filepath" "time" "github.com/golang-migrate/migrate/v4" + "github.com/golang/glog" + // migrate tools _ "github.com/golang-migrate/migrate/v4/database/postgres" _ "github.com/golang-migrate/migrate/v4/source/file" @@ -24,7 +26,7 @@ const ( func init() { databaseURL, ok := os.LookupEnv("PG_URL") if !ok || len(databaseURL) == 0 { - log.Fatalf("migrate: environment variable not declared: PG_URL") + glog.Fatalf("migrate: environment variable not declared: PG_URL") } databaseURL += "?sslmode=disable" @@ -36,30 +38,35 @@ func init() { ) for attempts > 0 { - m, err = migrate.New(fmt.Sprintf("file://%s", _migrationFilePath), databaseURL) + cur, _ := os.Getwd() + dir := filepath.Dir(cur + "/../../..") + + glog.Infoln(fmt.Sprintf("file://%s", dir)) + + m, err = migrate.New(fmt.Sprintf("file://%s/%s", dir, _migrationFilePath), databaseURL) if err == nil { break } - log.Printf("Migrate: postgres is trying to connect, attempts left: %d", attempts) + glog.Infoln("Migrate: postgres is trying to connect, attempts left: %d", attempts) time.Sleep(_defaultTimeout) attempts-- } if err != nil { - log.Fatalf("Migrate: postgres connect error: %s", err) + glog.Fatalf("Migrate: postgres connect error: %s", err) } err = m.Up() defer m.Close() if err != nil && !errors.Is(err, migrate.ErrNoChange) { - log.Fatalf("Migrate: up error: %s", err) + glog.Fatalf("Migrate: up error: %s", err) } if errors.Is(err, migrate.ErrNoChange) { - log.Printf("Migrate: no change") + glog.Infoln("Migrate: no change") return } - log.Printf("Migrate: up success") + glog.Infoln("Migrate: up success") } diff --git a/internal/counter/domain/order.go b/internal/counter/domain/order.go index 6b9a8c3..d3e6329 100644 --- a/internal/counter/domain/order.go +++ b/internal/counter/domain/order.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/samber/lo" - counterRabbitMQ "github.com/thangchung/go-coffeeshop/internal/counter/rabbitmq" + "github.com/thangchung/go-coffeeshop/internal/counter/rabbitmq/publisher" events "github.com/thangchung/go-coffeeshop/pkg/event" gen "github.com/thangchung/go-coffeeshop/proto/gen" ) @@ -39,8 +39,9 @@ func NewOrder( func CreateOrderFrom( ctx context.Context, request *gen.PlaceOrderRequest, - productDomainService ProductDomainService, - orderPublisher counterRabbitMQ.OrderPublisher, + productDomainSvc ProductDomainService, + baristaOrderPub publisher.Publisher, + kitchenOrderPub publisher.Publisher, ) (*Order, error) { loyaltyMemberID, err := uuid.Parse(request.LoyaltyMemberId) if err != nil { @@ -53,7 +54,7 @@ func CreateOrderFrom( numberOfKitchenItems := len(request.KitchenItems) > 0 if numberOfBaristaItems { - itemTypesRes, err := productDomainService.GetItemsByType(request, true) + itemTypesRes, err := productDomainSvc.GetItemsByType(request, true) if err != nil { return nil, err } @@ -66,7 +67,15 @@ func CreateOrderFrom( if ok { lineItem := NewLineItem(item.ItemType, item.ItemType.String(), float32(find.Price), gen.Status_IN_PROGRESS, true) - err = publishBaristaOrderEvent(ctx, order.ID, lineItem.ID, lineItem.ItemType, orderPublisher, true) + err = publishBaristaOrderEvent( + ctx, + order.ID, + lineItem.ID, + lineItem.ItemType, + baristaOrderPub, + kitchenOrderPub, + true, + ) order.LineItems = append(order.LineItems, *lineItem) } @@ -78,7 +87,7 @@ func CreateOrderFrom( } if numberOfKitchenItems { - itemTypesRes, err := productDomainService.GetItemsByType(request, false) + itemTypesRes, err := productDomainSvc.GetItemsByType(request, false) if err != nil { return nil, err } @@ -91,7 +100,15 @@ func CreateOrderFrom( if ok { lineItem := NewLineItem(item.ItemType, item.ItemType.String(), float32(find.Price), gen.Status_IN_PROGRESS, false) - err = publishBaristaOrderEvent(ctx, order.ID, lineItem.ID, lineItem.ItemType, orderPublisher, false) + err = publishBaristaOrderEvent( + ctx, + order.ID, + lineItem.ID, + lineItem.ItemType, + baristaOrderPub, + kitchenOrderPub, + false, + ) order.LineItems = append(order.LineItems, *lineItem) } @@ -110,7 +127,8 @@ func publishBaristaOrderEvent( orderID uuid.UUID, lineItemID uuid.UUID, itemType gen.ItemType, - publisher counterRabbitMQ.OrderPublisher, + baristaOrderPub publisher.Publisher, + kitchenOrderPub publisher.Publisher, isBarista bool, ) error { if isBarista { @@ -127,7 +145,7 @@ func publishBaristaOrderEvent( return errors.Wrap(err, "json.Marshal - events.BaristaOrdered") } - err = publisher.Publish(ctx, eventBytes, "text/plain") + err = baristaOrderPub.Publish(ctx, eventBytes, "text/plain") if err != nil { return errors.Wrap(err, "orderPublisher - Publish") } @@ -147,7 +165,7 @@ func publishBaristaOrderEvent( return errors.Wrap(err, "json.Marshal - events.BaristaOrdered") } - err = publisher.Publish(ctx, eventBytes, "text/plain") + err = kitchenOrderPub.Publish(ctx, eventBytes, "text/plain") if err != nil { return errors.Wrap(err, "orderPublisher - Publish") } diff --git a/internal/counter/features/barista/eventhandlers/barista_order_updated_event_handler.go b/internal/counter/features/barista/eventhandlers/barista_order_updated_event_handler.go new file mode 100644 index 0000000..ca8d4f0 --- /dev/null +++ b/internal/counter/features/barista/eventhandlers/barista_order_updated_event_handler.go @@ -0,0 +1,24 @@ +package eventhandlers + +import ( + "context" + "fmt" + + "github.com/thangchung/go-coffeeshop/pkg/event" +) + +type BaristaOrderUpdatedEventHandler interface { + Handle(context.Context, *event.BaristaOrderUpdated) error +} + +type DefaultBaristaOrderUpdatedEventHandler struct{} + +func NewBaristaOrderUpdatedEventHandler() *DefaultBaristaOrderUpdatedEventHandler { + return &DefaultBaristaOrderUpdatedEventHandler{} +} + +func (h *DefaultBaristaOrderUpdatedEventHandler) Handle(ctx context.Context, e *event.BaristaOrderUpdated) error { + fmt.Println(e) + + return nil +} diff --git a/internal/counter/grpc/counter_server.go b/internal/counter/grpc/counter_server.go index 93fe177..7443739 100644 --- a/internal/counter/grpc/counter_server.go +++ b/internal/counter/grpc/counter_server.go @@ -8,7 +8,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/thangchung/go-coffeeshop/cmd/counter/config" "github.com/thangchung/go-coffeeshop/internal/counter/domain" - counterRabbitMQ "github.com/thangchung/go-coffeeshop/internal/counter/rabbitmq" + "github.com/thangchung/go-coffeeshop/internal/counter/rabbitmq/publisher" "github.com/thangchung/go-coffeeshop/internal/counter/usecase" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" gen "github.com/thangchung/go-coffeeshop/proto/gen" @@ -18,12 +18,13 @@ import ( type CounterServiceServerImpl struct { gen.UnimplementedCounterServiceServer - logger *mylogger.Logger - amqpConn *amqp.Connection - cfg *config.Config - productDomainService domain.ProductDomainService - queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase - orderPublisher counterRabbitMQ.OrderPublisher + logger *mylogger.Logger + amqpConn *amqp.Connection + cfg *config.Config + productDomainSvc domain.ProductDomainService + queryOrderFulfillmentUC usecase.QueryOrderFulfillmentUseCase + baristaOrderPub publisher.Publisher + kitchenOrderPub publisher.Publisher } func NewCounterServiceServerGrpc( @@ -31,17 +32,19 @@ func NewCounterServiceServerGrpc( amqpConn *amqp.Connection, cfg *config.Config, log *mylogger.Logger, - queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase, - productDomainService domain.ProductDomainService, - orderPublisher counterRabbitMQ.OrderPublisher, + queryOrderFulfillmentUC usecase.QueryOrderFulfillmentUseCase, + productDomainSvc domain.ProductDomainService, + baristaOrderPub publisher.Publisher, + kitchenOrderPub publisher.Publisher, ) { svc := CounterServiceServerImpl{ - cfg: cfg, - logger: log, - amqpConn: amqpConn, - queryOrderFulfillmentUseCase: queryOrderFulfillmentUseCase, - productDomainService: productDomainService, - orderPublisher: orderPublisher, + cfg: cfg, + logger: log, + amqpConn: amqpConn, + queryOrderFulfillmentUC: queryOrderFulfillmentUC, + productDomainSvc: productDomainSvc, + baristaOrderPub: baristaOrderPub, + kitchenOrderPub: kitchenOrderPub, } gen.RegisterCounterServiceServer(grpcServer, &svc) @@ -57,7 +60,7 @@ func (g *CounterServiceServerImpl) GetListOrderFulfillment( res := gen.GetListOrderFulfillmentResponse{} - entities, err := g.queryOrderFulfillmentUseCase.GetListOrderFulfillment() + entities, err := g.queryOrderFulfillmentUC.GetListOrderFulfillment() if err != nil { return nil, fmt.Errorf("CounterServiceServerImpl - GetListOrderFulfillment - g.queryOrderFulfillmentUseCase.GetListOrderFulfillment: %w", err) } @@ -80,7 +83,7 @@ func (g *CounterServiceServerImpl) PlaceOrder( g.logger.Debug("request: %s", request) // add order - order, err := domain.CreateOrderFrom(ctx, request, g.productDomainService, g.orderPublisher) + order, err := domain.CreateOrderFrom(ctx, request, g.productDomainSvc, g.baristaOrderPub, g.kitchenOrderPub) if err != nil { return nil, errors.Wrap(err, "PlaceOrder - domain.CreateOrderFrom") } @@ -90,52 +93,6 @@ func (g *CounterServiceServerImpl) PlaceOrder( g.logger.Debug("order created: %s", *order) - // publish order events - // ch, err := g.amqpConn.Channel() - // if err != nil { - // panic(err) - // } - // defer ch.Close() - - // event := events.BaristaOrdered{ - // OrderID: order.ID, - // ItemLineID: uuid.New(), //todo - // ItemType: 1, //todo - // } - - // eventBytes, err := json.Marshal(event) - // if err != nil { - // g.logger.LogError(err) - // } - - // err = g.orderPublisher.Publish(ctx, eventBytes, "text/plain") - // if err != nil { - // g.logger.LogError(err) - - // return nil, errors.Wrap(err, "orderPublisher - Publish") - // } - - // err = ch.PublishWithContext( - // ctx, - // OrderTopic, - // "log.INFO", - // false, - // false, - // amqp.Publishing{ - // ContentType: "text/plain", - // Type: "barista.ordered", - // Body: eventBytes, - // }, - // ) - - // if err != nil { - // g.logger.LogError(err) - - // return nil, err - // } - - // g.logger.Info("Sending message: %s -> %s", event, g.cfg.RabbitMQ.Exchange) - res := gen.PlaceOrderResponse{} return &res, nil diff --git a/internal/counter/rabbitmq/consumer/consumer.go b/internal/counter/rabbitmq/consumer/consumer.go new file mode 100644 index 0000000..ffdb9a9 --- /dev/null +++ b/internal/counter/rabbitmq/consumer/consumer.go @@ -0,0 +1,215 @@ +package consumer + +import ( + "context" + "encoding/json" + + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/thangchung/go-coffeeshop/internal/counter/features/barista/eventhandlers" + "github.com/thangchung/go-coffeeshop/pkg/event" + log "github.com/thangchung/go-coffeeshop/pkg/logger" +) + +const ( + _exchangeKind = "direct" + _exchangeDurable = true + _exchangeAutoDelete = false + _exchangeInternal = false + _exchangeNoWait = false + + _queueDurable = true + _queueAutoDelete = false + _queueExclusive = false + _queueNoWait = false + + _prefetchCount = 1 + _prefetchSize = 0 + _prefetchGlobal = false + + _consumeAutoAck = false + _consumeExclusive = false + _consumeNoLocal = false + _consumeNoWait = false + + _exchangeName = "orders-exchange" + _queueName = "orders-queue" + _bindingKey = "orders-routing-key" + _consumerTag = "orders-consumer" + _messageTypeName = "ordered" + _workerPoolSize = 24 +) + +type Consumer struct { + exchangeName, queueName, bindingKey, consumerTag string + messageTypeName string + workerPoolSize int + amqpConn *amqp.Connection + logger *log.Logger + handler eventhandlers.BaristaOrderUpdatedEventHandler +} + +func NewConsumer( + amqpConn *amqp.Connection, + handler eventhandlers.BaristaOrderUpdatedEventHandler, + logger *log.Logger, + opts ...Option, +) (*Consumer, error) { + sub := &Consumer{ + amqpConn: amqpConn, + logger: logger, + handler: handler, + exchangeName: _exchangeName, + queueName: _queueName, + bindingKey: _bindingKey, + consumerTag: _consumerTag, + messageTypeName: _messageTypeName, + workerPoolSize: _workerPoolSize, + } + + for _, opt := range opts { + opt(sub) + } + + return sub, nil +} + +// CreateChannel Consume messages. +func (c *Consumer) CreateChannel() (*amqp.Channel, error) { + ch, err := c.amqpConn.Channel() + if err != nil { + return nil, errors.Wrap(err, "Error amqpConn.Channel") + } + + c.logger.Info("Declaring exchange: %s", c.exchangeName) + err = ch.ExchangeDeclare( + c.exchangeName, + _exchangeKind, + _exchangeDurable, + _exchangeAutoDelete, + _exchangeInternal, + _exchangeNoWait, + nil, + ) + + if err != nil { + return nil, errors.Wrap(err, "Error ch.ExchangeDeclare") + } + + queue, err := ch.QueueDeclare( + c.queueName, + _queueDurable, + _queueAutoDelete, + _queueExclusive, + _queueNoWait, + nil, + ) + if err != nil { + return nil, errors.Wrap(err, "Error ch.QueueDeclare") + } + + c.logger.Info("Declared queue, binding it to exchange: Queue: %v, messagesCount: %v, "+ + "consumerCount: %v, exchange: %v, bindingKey: %v", + queue.Name, + queue.Messages, + queue.Consumers, + c.exchangeName, + c.bindingKey, + ) + + err = ch.QueueBind( + queue.Name, + c.bindingKey, + c.exchangeName, + _queueNoWait, + nil, + ) + if err != nil { + return nil, errors.Wrap(err, "Error ch.QueueBind") + } + + c.logger.Info("Queue bound to exchange, starting to consume from queue, consumerTag: %v", c.consumerTag) + + err = ch.Qos( + _prefetchCount, // prefetch count + _prefetchSize, // prefetch size + _prefetchGlobal, // global + ) + if err != nil { + return nil, errors.Wrap(err, "Error ch.Qos") + } + + return ch, nil +} + +func (c *Consumer) worker(ctx context.Context, messages <-chan amqp.Delivery) { + for delivery := range messages { + c.logger.Info("processDeliveries deliveryTag% v", delivery.DeliveryTag) + + switch delivery.Type { + case c.messageTypeName: + var payload event.BaristaOrderUpdated + err := json.Unmarshal(delivery.Body, &payload) + + if err != nil { + c.logger.LogError(err) + } + + err = c.handler.Handle(ctx, &payload) + + if err != nil { + if err = delivery.Reject(false); err != nil { + c.logger.Error("Err delivery.Reject: %v", err) + } + + c.logger.Error("Failed to process delivery: %v", err) + } else { + err = delivery.Ack(false) + if err != nil { + c.logger.Error("Failed to acknowledge delivery: %v", err) + } + } + default: + c.logger.Info("default") + } + } + + c.logger.Info("Deliveries channel closed") +} + +// StartConsumer Start new rabbitmq consumer. +func (c *Consumer) StartConsumer() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := c.CreateChannel() + if err != nil { + return errors.Wrap(err, "CreateChannel") + } + defer ch.Close() + + deliveries, err := ch.Consume( + c.queueName, + c.consumerTag, + _consumeAutoAck, + _consumeExclusive, + _consumeNoLocal, + _consumeNoWait, + nil, + ) + if err != nil { + return errors.Wrap(err, "Consume") + } + + forever := make(chan bool) + + for i := 0; i < c.workerPoolSize; i++ { + go c.worker(ctx, deliveries) + } + + chanErr := <-ch.NotifyClose(make(chan *amqp.Error)) + c.logger.Error("ch.NotifyClose: %v", chanErr) + <-forever + + return chanErr +} diff --git a/internal/counter/rabbitmq/consumer/options.go b/internal/counter/rabbitmq/consumer/options.go new file mode 100644 index 0000000..03e33c1 --- /dev/null +++ b/internal/counter/rabbitmq/consumer/options.go @@ -0,0 +1,39 @@ +package consumer + +type Option func(*Consumer) + +func ExchangeName(exchangeName string) Option { + return func(p *Consumer) { + p.exchangeName = exchangeName + } +} + +func QueueName(queueName string) Option { + return func(p *Consumer) { + p.queueName = queueName + } +} + +func BindingKey(bindingKey string) Option { + return func(p *Consumer) { + p.bindingKey = bindingKey + } +} + +func ConsumerTag(consumerTag string) Option { + return func(p *Consumer) { + p.consumerTag = consumerTag + } +} + +func MessageTypeName(messageTypeName string) Option { + return func(p *Consumer) { + p.messageTypeName = messageTypeName + } +} + +func WorkerPoolSize(workerPoolSize int) Option { + return func(p *Consumer) { + p.workerPoolSize = workerPoolSize + } +} diff --git a/internal/counter/rabbitmq/publisher.go b/internal/counter/rabbitmq/publisher.go deleted file mode 100644 index c88bc53..0000000 --- a/internal/counter/rabbitmq/publisher.go +++ /dev/null @@ -1,77 +0,0 @@ -package rabbitmq - -import ( - "context" - "time" - - "github.com/google/uuid" - "github.com/pkg/errors" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/thangchung/go-coffeeshop/cmd/counter/config" - log "github.com/thangchung/go-coffeeshop/pkg/logger" -) - -const ( - publishMandatory = false - publishImmediate = false -) - -type OrderPublisher struct { - amqpChan *amqp.Channel - amqpConn *amqp.Connection - cfg *config.Config - logger *log.Logger -} - -func NewOrderPublisher(amqpConn *amqp.Connection, cfg *config.Config, logger *log.Logger) (*OrderPublisher, error) { - ch, err := amqpConn.Channel() - if err != nil { - panic(err) - } - defer ch.Close() - - return &OrderPublisher{ - amqpConn: amqpConn, - amqpChan: ch, - logger: logger, - cfg: cfg, - }, nil -} - -// CloseChan Close messages chan. -func (p *OrderPublisher) CloseChan() { - if err := p.amqpChan.Close(); err != nil { - p.logger.Error("OrderPublisher CloseChan: %v", err) - } -} - -// Publish message. -func (p *OrderPublisher) Publish(ctx context.Context, body []byte, contentType string) error { - ch, err := p.amqpConn.Channel() - if err != nil { - return errors.Wrap(err, "CreateChannel") - } - defer ch.Close() - - p.logger.Info("Publishing message Exchange: %s, RoutingKey: %s", p.cfg.RabbitMQ.Exchange, p.cfg.RabbitMQ.RoutingKey) - - if err := ch.PublishWithContext( - ctx, - p.cfg.RabbitMQ.Exchange, - p.cfg.RabbitMQ.RoutingKey, - publishMandatory, - publishImmediate, - amqp.Publishing{ - ContentType: contentType, - DeliveryMode: amqp.Persistent, - MessageId: uuid.New().String(), - Timestamp: time.Now(), - Body: body, - Type: "barista.ordered", //todo - }, - ); err != nil { - return errors.Wrap(err, "ch.Publish") - } - - return nil -} diff --git a/internal/counter/rabbitmq/publisher/options.go b/internal/counter/rabbitmq/publisher/options.go new file mode 100644 index 0000000..665820d --- /dev/null +++ b/internal/counter/rabbitmq/publisher/options.go @@ -0,0 +1,21 @@ +package publisher + +type Option func(*Publisher) + +func ExchangeName(exchangeName string) Option { + return func(p *Publisher) { + p.exchangeName = exchangeName + } +} + +func BindingKey(bindingKey string) Option { + return func(p *Publisher) { + p.bindingKey = bindingKey + } +} + +func MessageTypeName(messageTypeName string) Option { + return func(p *Publisher) { + p.messageTypeName = messageTypeName + } +} diff --git a/internal/counter/rabbitmq/publisher/publisher.go b/internal/counter/rabbitmq/publisher/publisher.go new file mode 100644 index 0000000..cacc1f7 --- /dev/null +++ b/internal/counter/rabbitmq/publisher/publisher.go @@ -0,0 +1,89 @@ +package publisher + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + log "github.com/thangchung/go-coffeeshop/pkg/logger" +) + +const ( + _publishMandatory = false + _publishImmediate = false + + _exchangeName = "orders-exchange" + _bindingKey = "orders-routing-key" + _messageTypeName = "ordered" +) + +type Publisher struct { + exchangeName, bindingKey string + messageTypeName string + amqpChan *amqp.Channel + amqpConn *amqp.Connection + logger *log.Logger +} + +func NewPublisher(amqpConn *amqp.Connection, logger *log.Logger, opts ...Option) (*Publisher, error) { + ch, err := amqpConn.Channel() + if err != nil { + panic(err) + } + defer ch.Close() + + pub := &Publisher{ + amqpConn: amqpConn, + amqpChan: ch, + logger: logger, + exchangeName: _exchangeName, + bindingKey: _bindingKey, + messageTypeName: _messageTypeName, + } + + for _, opt := range opts { + opt(pub) + } + + return pub, nil +} + +// CloseChan Close messages chan. +func (p *Publisher) CloseChan() { + if err := p.amqpChan.Close(); err != nil { + p.logger.Error("Publisher CloseChan: %v", err) + } +} + +// Publish message. +func (p *Publisher) Publish(ctx context.Context, body []byte, contentType string) error { + ch, err := p.amqpConn.Channel() + if err != nil { + return errors.Wrap(err, "CreateChannel") + } + defer ch.Close() + + p.logger.Info("Publishing message Exchange: %s, RoutingKey: %s", p.exchangeName, p.bindingKey) + + if err := ch.PublishWithContext( + ctx, + p.exchangeName, + p.bindingKey, + _publishMandatory, + _publishImmediate, + amqp.Publishing{ + ContentType: contentType, + DeliveryMode: amqp.Persistent, + MessageId: uuid.New().String(), + Timestamp: time.Now(), + Body: body, + Type: p.messageTypeName, //"barista.ordered", + }, + ); err != nil { + return errors.Wrap(err, "ch.Publish") + } + + return nil +}