From d83a5f2daae8ab38c42d786e5e00f0a082ea9be8 Mon Sep 17 00:00:00 2001 From: thangchung Date: Mon, 21 Nov 2022 13:57:14 +0700 Subject: [PATCH] add more codes for kitchen-svc #3 --- cmd/barista/config.yml | 6 ++- cmd/barista/config/config.go | 6 +++ cmd/counter/config.yml | 4 +- cmd/kitchen/config.yml | 6 ++- cmd/kitchen/config/config.go | 6 +++ internal/barista/app/app.go | 19 ++++++- internal/barista/domain/interfaces.go | 11 ++++ internal/barista/domain/order.go | 17 +++++++ .../orders/eventhandlers/barista_ordered.go | 26 ++++++++-- .../features/orders/repo/orders_postgres.go | 50 ++++++++++++++++++ internal/counter/domain/order.go | 2 - .../features/orders/repo/orders_postgres.go | 1 + internal/counter/grpc/counter_server.go | 1 + internal/kitchen/app/app.go | 19 ++++++- internal/kitchen/domain/interfaces.go | 11 ++++ internal/kitchen/domain/order.go | 18 +++++++ .../orders/eventhandlers/kitchen_ordered.go | 27 ++++++++-- .../features/orders/repo/orders_postgres.go | 51 +++++++++++++++++++ 18 files changed, 263 insertions(+), 18 deletions(-) create mode 100644 internal/barista/domain/interfaces.go create mode 100644 internal/barista/domain/order.go create mode 100644 internal/barista/features/orders/repo/orders_postgres.go create mode 100644 internal/kitchen/domain/interfaces.go create mode 100644 internal/kitchen/domain/order.go create mode 100644 internal/kitchen/features/orders/repo/orders_postgres.go diff --git a/cmd/barista/config.yml b/cmd/barista/config.yml index 4b4665d..cdc63fa 100755 --- a/cmd/barista/config.yml +++ b/cmd/barista/config.yml @@ -6,8 +6,12 @@ http: host: '0.0.0.0' port: 5002 +postgres: + pool_max: 2 + url: postgres://postgres:P@ssw0rd@172.28.240.102:5432/postgres?sslmode=disable + rabbit_mq: - url: amqp://guest:guest@172.28.255.101:5672/ + url: amqp://guest:guest@172.28.240.102:5672/ logger: log_level: 'debug' diff --git a/cmd/barista/config/config.go b/cmd/barista/config/config.go index 53832da..9d999d7 100755 --- a/cmd/barista/config/config.go +++ b/cmd/barista/config/config.go @@ -14,9 +14,15 @@ type ( configs.App `yaml:"app"` configs.HTTP `yaml:"http"` configs.Log `yaml:"logger"` + PG `yaml:"postgres"` RabbitMQ `yaml:"rabbit_mq"` } + PG struct { + PoolMax int `env-required:"true" yaml:"pool_max" env:"PG_POOL_MAX"` + URL string `env-required:"true" yaml:"url" env:"PG_URL"` + } + RabbitMQ struct { URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"` } diff --git a/cmd/counter/config.yml b/cmd/counter/config.yml index 99904e7..15e5a50 100755 --- a/cmd/counter/config.yml +++ b/cmd/counter/config.yml @@ -8,10 +8,10 @@ http: postgres: pool_max: 2 - url: postgres://postgres:P@ssw0rd@172.28.255.101:5432/postgres?sslmode=disable + url: postgres://postgres:P@ssw0rd@172.28.240.102:5432/postgres?sslmode=disable rabbit_mq: - url: amqp://guest:guest@172.28.255.101:5672/ + url: amqp://guest:guest@172.28.240.102:5672/ product_client: url: 0.0.0.0:5001 diff --git a/cmd/kitchen/config.yml b/cmd/kitchen/config.yml index c884bbf..4faf26b 100755 --- a/cmd/kitchen/config.yml +++ b/cmd/kitchen/config.yml @@ -6,8 +6,12 @@ http: host: '0.0.0.0' port: 5004 +postgres: + pool_max: 2 + url: postgres://postgres:P@ssw0rd@172.28.240.102:5432/postgres?sslmode=disable + rabbit_mq: - url: amqp://guest:guest@172.28.255.101:5672/ + url: amqp://guest:guest@172.28.240.102:5672/ logger: log_level: 'debug' diff --git a/cmd/kitchen/config/config.go b/cmd/kitchen/config/config.go index 53832da..9d999d7 100755 --- a/cmd/kitchen/config/config.go +++ b/cmd/kitchen/config/config.go @@ -14,9 +14,15 @@ type ( configs.App `yaml:"app"` configs.HTTP `yaml:"http"` configs.Log `yaml:"logger"` + PG `yaml:"postgres"` RabbitMQ `yaml:"rabbit_mq"` } + PG struct { + PoolMax int `env-required:"true" yaml:"pool_max" env:"PG_POOL_MAX"` + URL string `env-required:"true" yaml:"url" env:"PG_URL"` + } + RabbitMQ struct { URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"` } diff --git a/internal/barista/app/app.go b/internal/barista/app/app.go index 0498a11..1fc79e7 100644 --- a/internal/barista/app/app.go +++ b/internal/barista/app/app.go @@ -12,8 +12,10 @@ import ( "github.com/rabbitmq/amqp091-go" "github.com/thangchung/go-coffeeshop/cmd/barista/config" "github.com/thangchung/go-coffeeshop/internal/barista/features/orders/eventhandlers" + "github.com/thangchung/go-coffeeshop/internal/barista/features/orders/repo" "github.com/thangchung/go-coffeeshop/pkg/event" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" + "github.com/thangchung/go-coffeeshop/pkg/postgres" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/consumer" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/publisher" @@ -41,6 +43,18 @@ func (a *App) Run() error { ctx, cancel := context.WithCancel(context.Background()) + // PostgresDB + pg, err := postgres.NewPostgresDB(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax)) + if err != nil { + a.logger.Fatal("app - Run - postgres.NewPostgres: %s", err.Error()) + + cancel() + + return err + } + defer pg.Close() + + // rabbitmq amqpConn, err := rabbitmq.NewRabbitMQConn(a.cfg.RabbitMQ.URL, a.logger) if err != nil { cancel() @@ -65,8 +79,11 @@ func (a *App) Run() error { return errors.Wrap(err, "publisher-Counter-NewOrderPublisher") } + // repository + orderRepo := repo.NewOrderRepo(pg) + // event handlers. - a.handler = eventhandlers.NewBaristaOrderedEventHandler(counterOrderPub) + a.handler = eventhandlers.NewBaristaOrderedEventHandler(orderRepo, counterOrderPub) // consumers consumer, err := consumer.NewConsumer( diff --git a/internal/barista/domain/interfaces.go b/internal/barista/domain/interfaces.go new file mode 100644 index 0000000..b7b56ca --- /dev/null +++ b/internal/barista/domain/interfaces.go @@ -0,0 +1,11 @@ +package domain + +import ( + "context" +) + +type ( + OrderRepo interface { + Create(context.Context, *BaristaOrder) error + } +) diff --git a/internal/barista/domain/order.go b/internal/barista/domain/order.go new file mode 100644 index 0000000..809d009 --- /dev/null +++ b/internal/barista/domain/order.go @@ -0,0 +1,17 @@ +package domain + +import ( + "time" + + "github.com/google/uuid" + "github.com/thangchung/go-coffeeshop/proto/gen" +) + +type BaristaOrder struct { + ID uuid.UUID `json:"id" db:"id"` + ItemName string `json:"itemName" db:"item_name"` + ItemType gen.ItemType `json:"itemType" db:"item_type"` + TimeUp time.Time `json:"timeUp" db:"time_up"` + Created time.Time `json:"created" db:"created"` + Updated time.Time `json:"updated" db:"updated"` +} diff --git a/internal/barista/features/orders/eventhandlers/barista_ordered.go b/internal/barista/features/orders/eventhandlers/barista_ordered.go index 516f863..b2074cb 100644 --- a/internal/barista/features/orders/eventhandlers/barista_ordered.go +++ b/internal/barista/features/orders/eventhandlers/barista_ordered.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/thangchung/go-coffeeshop/internal/barista/domain" "github.com/thangchung/go-coffeeshop/pkg/event" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/publisher" "github.com/thangchung/go-coffeeshop/proto/gen" @@ -19,11 +20,13 @@ type BaristaOrderedEventHandler interface { var _ BaristaOrderedEventHandler = (*baristaOrderedEventHandler)(nil) type baristaOrderedEventHandler struct { + repo domain.OrderRepo counterPub *publisher.Publisher } -func NewBaristaOrderedEventHandler(counterPub *publisher.Publisher) BaristaOrderedEventHandler { +func NewBaristaOrderedEventHandler(repo domain.OrderRepo, counterPub *publisher.Publisher) BaristaOrderedEventHandler { return &baristaOrderedEventHandler{ + repo: repo, counterPub: counterPub, } } @@ -31,11 +34,24 @@ func NewBaristaOrderedEventHandler(counterPub *publisher.Publisher) BaristaOrder func (h *baristaOrderedEventHandler) Handle(ctx context.Context, e *event.BaristaOrdered) error { fmt.Println(e) + timeIn := time.Now() + delay := calculateDelay(e.ItemType) time.Sleep(delay) - // todo: save to db - // ... + timeUp := time.Now().Add(delay) + + err := h.repo.Create(ctx, &domain.BaristaOrder{ + ID: e.ItemLineID, + ItemType: e.ItemType, + ItemName: e.ItemType.String(), + TimeUp: timeUp, + Created: time.Now(), + Updated: time.Now(), + }) + if err != nil { + return errors.Wrap(err, "baristaOrderedEventHandler-h.repo.Create") + } message := event.BaristaOrderUpdated{ OrderID: e.OrderID, @@ -43,8 +59,8 @@ func (h *baristaOrderedEventHandler) Handle(ctx context.Context, e *event.Barist Name: e.ItemType.String(), ItemType: e.ItemType, MadeBy: "teesee", - TimeIn: time.Now(), - TimeUp: time.Now().Add(5 * time.Minute), + TimeIn: timeIn, + TimeUp: timeUp, } eventBytes, err := json.Marshal(message) diff --git a/internal/barista/features/orders/repo/orders_postgres.go b/internal/barista/features/orders/repo/orders_postgres.go new file mode 100644 index 0000000..414acbf --- /dev/null +++ b/internal/barista/features/orders/repo/orders_postgres.go @@ -0,0 +1,50 @@ +package repo + +import ( + "context" + + "github.com/pkg/errors" + "github.com/thangchung/go-coffeeshop/internal/barista/domain" + "github.com/thangchung/go-coffeeshop/pkg/postgres" +) + +var _ domain.OrderRepo = (*orderRepo)(nil) + +type orderRepo struct { + pg *postgres.Postgres +} + +func NewOrderRepo(pg *postgres.Postgres) domain.OrderRepo { + return &orderRepo{pg: pg} +} + +func (d *orderRepo) Create(ctx context.Context, baristaOrder *domain.BaristaOrder) error { + tx, err := d.pg.Pool.Begin(ctx) + if err != nil { + return errors.Wrapf(err, "orderRepo-Create-d.pg.Pool.Begin(ctx)") + } + + // insert order + sql, args, err := d.pg.Builder. + Insert(`"barista".barista_orders`). + Columns("id", "item_type", "item_name", "time_up", "created", "updated"). + Values( + baristaOrder.ID, + baristaOrder.ItemType, + baristaOrder.ItemName, + baristaOrder.TimeUp, + baristaOrder.Created, + baristaOrder.Updated, + ). + ToSql() + if err != nil { + return tx.Rollback(ctx) + } + + _, err = d.pg.Pool.Exec(ctx, sql, args...) + if err != nil { + return tx.Rollback(ctx) + } + + return tx.Commit(ctx) +} diff --git a/internal/counter/domain/order.go b/internal/counter/domain/order.go index 4286399..d28643e 100644 --- a/internal/counter/domain/order.go +++ b/internal/counter/domain/order.go @@ -155,7 +155,6 @@ func publishBaristaOrderEvent( ) error { if isBarista { // todo: refactor to event domain dispatcher - // ... event := events.BaristaOrdered{ OrderID: orderID, ItemLineID: lineItemID, @@ -175,7 +174,6 @@ func publishBaristaOrderEvent( return nil } else { // todo: refactor to event domain dispatcher - // ... event := events.KitchenOrdered{ OrderID: orderID, ItemLineID: lineItemID, diff --git a/internal/counter/features/orders/repo/orders_postgres.go b/internal/counter/features/orders/repo/orders_postgres.go index d98a45e..d2a5cd1 100644 --- a/internal/counter/features/orders/repo/orders_postgres.go +++ b/internal/counter/features/orders/repo/orders_postgres.go @@ -117,6 +117,7 @@ func (d *orderRepo) GetAll(ctx context.Context) ([]*domain.Order, error) { Price: ol.Price, ItemStatus: ol.ItemStatus, IsBaristaOrder: ol.IsBaristaOrder, + OrderID: ol.OrderID, }) } diff --git a/internal/counter/grpc/counter_server.go b/internal/counter/grpc/counter_server.go index 0745d38..8c6d573 100644 --- a/internal/counter/grpc/counter_server.go +++ b/internal/counter/grpc/counter_server.go @@ -74,6 +74,7 @@ func (g *CounterServiceServerImpl) GetListOrderFulfillment( LoyaltyMemberId: entity.LoyaltyMemberID.String(), LineItems: lo.Map(entity.LineItems, func(item *domain.LineItem, _ int) *gen.LineItemDto { return &gen.LineItemDto{ + Id: item.ID.String(), ItemType: item.ItemType, Name: item.Name, Price: float64(item.Price), diff --git a/internal/kitchen/app/app.go b/internal/kitchen/app/app.go index dd1795f..bc49d83 100644 --- a/internal/kitchen/app/app.go +++ b/internal/kitchen/app/app.go @@ -12,8 +12,10 @@ import ( "github.com/rabbitmq/amqp091-go" "github.com/thangchung/go-coffeeshop/cmd/kitchen/config" "github.com/thangchung/go-coffeeshop/internal/kitchen/features/orders/eventhandlers" + "github.com/thangchung/go-coffeeshop/internal/kitchen/features/orders/repo" "github.com/thangchung/go-coffeeshop/pkg/event" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" + "github.com/thangchung/go-coffeeshop/pkg/postgres" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/consumer" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/publisher" @@ -41,6 +43,18 @@ func (a *App) Run() error { ctx, cancel := context.WithCancel(context.Background()) + // PostgresDB + pg, err := postgres.NewPostgresDB(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax)) + if err != nil { + a.logger.Fatal("app - Run - postgres.NewPostgres: %s", err.Error()) + + cancel() + + return err + } + defer pg.Close() + + // rabbitmq amqpConn, err := rabbitmq.NewRabbitMQConn(a.cfg.RabbitMQ.URL, a.logger) if err != nil { cancel() @@ -65,8 +79,11 @@ func (a *App) Run() error { return errors.Wrap(err, "publisher-Counter-NewOrderPublisher") } + // repository + orderRepo := repo.NewOrderRepo(pg) + // event handlers. - a.handler = eventhandlers.NewKitchenOrderedEventHandler(counterOrderPub) + a.handler = eventhandlers.NewKitchenOrderedEventHandler(orderRepo, counterOrderPub) // consumers consumer, err := consumer.NewConsumer( diff --git a/internal/kitchen/domain/interfaces.go b/internal/kitchen/domain/interfaces.go new file mode 100644 index 0000000..16101cb --- /dev/null +++ b/internal/kitchen/domain/interfaces.go @@ -0,0 +1,11 @@ +package domain + +import ( + "context" +) + +type ( + OrderRepo interface { + Create(context.Context, *KitchenOrder) error + } +) diff --git a/internal/kitchen/domain/order.go b/internal/kitchen/domain/order.go new file mode 100644 index 0000000..da7b8ad --- /dev/null +++ b/internal/kitchen/domain/order.go @@ -0,0 +1,18 @@ +package domain + +import ( + "time" + + "github.com/google/uuid" + "github.com/thangchung/go-coffeeshop/proto/gen" +) + +type KitchenOrder struct { + ID uuid.UUID `json:"id" db:"id"` + OrderID uuid.UUID `json:"orderId" db:"order_id"` + ItemName string `json:"itemName" db:"item_name"` + ItemType gen.ItemType `json:"itemType" db:"item_type"` + TimeUp time.Time `json:"timeUp" db:"time_up"` + Created time.Time `json:"created" db:"created"` + Updated time.Time `json:"updated" db:"updated"` +} diff --git a/internal/kitchen/features/orders/eventhandlers/kitchen_ordered.go b/internal/kitchen/features/orders/eventhandlers/kitchen_ordered.go index 3a725a0..8b362fc 100644 --- a/internal/kitchen/features/orders/eventhandlers/kitchen_ordered.go +++ b/internal/kitchen/features/orders/eventhandlers/kitchen_ordered.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/thangchung/go-coffeeshop/internal/kitchen/domain" "github.com/thangchung/go-coffeeshop/pkg/event" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/publisher" "github.com/thangchung/go-coffeeshop/proto/gen" @@ -19,11 +20,13 @@ type KitchenOrderedEventHandler interface { var _ KitchenOrderedEventHandler = (*kitchenOrderedEventHandler)(nil) type kitchenOrderedEventHandler struct { + repo domain.OrderRepo counterPub *publisher.Publisher } -func NewKitchenOrderedEventHandler(counterPub *publisher.Publisher) KitchenOrderedEventHandler { +func NewKitchenOrderedEventHandler(repo domain.OrderRepo, counterPub *publisher.Publisher) KitchenOrderedEventHandler { return &kitchenOrderedEventHandler{ + repo: repo, counterPub: counterPub, } } @@ -31,11 +34,25 @@ func NewKitchenOrderedEventHandler(counterPub *publisher.Publisher) KitchenOrder func (h *kitchenOrderedEventHandler) Handle(ctx context.Context, e *event.KitchenOrdered) error { fmt.Println(e) + timeIn := time.Now() + delay := calculateDelay(e.ItemType) time.Sleep(delay) - // todo: save to db - // ... + timeUp := time.Now().Add(delay) + + err := h.repo.Create(ctx, &domain.KitchenOrder{ + ID: e.ItemLineID, + OrderID: e.OrderID, + ItemType: e.ItemType, + ItemName: e.ItemType.String(), + TimeUp: timeUp, + Created: time.Now(), + Updated: time.Now(), + }) + if err != nil { + return errors.Wrap(err, "kitchenOrderedEventHandler-h.repo.Create") + } message := event.KitchenOrderUpdated{ OrderID: e.OrderID, @@ -43,8 +60,8 @@ func (h *kitchenOrderedEventHandler) Handle(ctx context.Context, e *event.Kitche Name: e.ItemType.String(), ItemType: e.ItemType, MadeBy: "teesee", - TimeIn: time.Now(), - TimeUp: time.Now().Add(5 * time.Minute), + TimeIn: timeIn, + TimeUp: timeUp, } eventBytes, err := json.Marshal(message) diff --git a/internal/kitchen/features/orders/repo/orders_postgres.go b/internal/kitchen/features/orders/repo/orders_postgres.go new file mode 100644 index 0000000..573fcda --- /dev/null +++ b/internal/kitchen/features/orders/repo/orders_postgres.go @@ -0,0 +1,51 @@ +package repo + +import ( + "context" + + "github.com/pkg/errors" + "github.com/thangchung/go-coffeeshop/internal/kitchen/domain" + "github.com/thangchung/go-coffeeshop/pkg/postgres" +) + +var _ domain.OrderRepo = (*orderRepo)(nil) + +type orderRepo struct { + pg *postgres.Postgres +} + +func NewOrderRepo(pg *postgres.Postgres) domain.OrderRepo { + return &orderRepo{pg: pg} +} + +func (d *orderRepo) Create(ctx context.Context, kitchenOrder *domain.KitchenOrder) error { + tx, err := d.pg.Pool.Begin(ctx) + if err != nil { + return errors.Wrapf(err, "orderRepo-Create-d.pg.Pool.Begin(ctx)") + } + + // insert order + sql, args, err := d.pg.Builder. + Insert(`"kitchen".kitchen_orders`). + Columns("id", "order_id", "item_type", "item_name", "time_up", "created", "updated"). + Values( + kitchenOrder.ID, + kitchenOrder.OrderID, + kitchenOrder.ItemType, + kitchenOrder.ItemName, + kitchenOrder.TimeUp, + kitchenOrder.Created, + kitchenOrder.Updated, + ). + ToSql() + if err != nil { + return tx.Rollback(ctx) + } + + _, err = d.pg.Pool.Exec(ctx, sql, args...) + if err != nil { + return tx.Rollback(ctx) + } + + return tx.Commit(ctx) +}