From 6b993dac8367f0a190f8422e55d1a3a31a0811c1 Mon Sep 17 00:00:00 2001 From: thangchung Date: Mon, 14 Nov 2022 21:58:14 +0700 Subject: [PATCH] add more data code #3 --- docker-compose-full.yaml | 2 + internal/counter/app/app.go | 44 ++-- internal/counter/app/migrate.go | 23 +- internal/counter/domain/interfaces.go | 26 ++- internal/counter/domain/order.go | 36 ++- .../barista_order_updated_event_handler.go | 24 -- .../counter/features/order_fulfillment.go | 36 --- .../features/orders/command/command.go | 39 ++++ .../eventhandlers/barista_order_updated.go | 63 +++++ .../counter/features/orders/query/query.go | 30 +++ .../features/orders/repo/orders_postgres.go | 219 ++++++++++++++++++ .../repo/order_fulfillment_postgres.go | 51 ---- internal/counter/grpc/counter_server.go | 63 +++-- internal/counter/grpc/product_client.go | 19 +- pkg/utils/utils.go | 12 + proto/counter.proto | 11 +- proto/gen/counter.pb.go | 31 ++- 17 files changed, 549 insertions(+), 180 deletions(-) delete mode 100644 internal/counter/features/eventhandlers/barista_order_updated_event_handler.go delete mode 100644 internal/counter/features/order_fulfillment.go create mode 100644 internal/counter/features/orders/command/command.go create mode 100644 internal/counter/features/orders/eventhandlers/barista_order_updated.go create mode 100644 internal/counter/features/orders/query/query.go create mode 100644 internal/counter/features/orders/repo/orders_postgres.go delete mode 100644 internal/counter/features/repo/order_fulfillment_postgres.go create mode 100644 pkg/utils/utils.go diff --git a/docker-compose-full.yaml b/docker-compose-full.yaml index 29fc66d..6548d60 100755 --- a/docker-compose-full.yaml +++ b/docker-compose-full.yaml @@ -68,6 +68,7 @@ services: image: go-coffeeshop-counter environment: APP_NAME: 'counter-service in docker' + IN_DOCKER: "true" PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/ PRODUCT_CLIENT_URL: product:5001 @@ -88,6 +89,7 @@ services: image: go-coffeeshop-barista environment: APP_NAME: 'barista-service in docker' + IN_DOCKER: "true" PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/ depends_on: diff --git a/internal/counter/app/app.go b/internal/counter/app/app.go index 85f8a72..6b5b06b 100644 --- a/internal/counter/app/app.go +++ b/internal/counter/app/app.go @@ -10,15 +10,16 @@ import ( "github.com/rabbitmq/amqp091-go" "github.com/thangchung/go-coffeeshop/cmd/counter/config" "github.com/thangchung/go-coffeeshop/internal/counter/domain" - "github.com/thangchung/go-coffeeshop/internal/counter/features" - "github.com/thangchung/go-coffeeshop/internal/counter/features/eventhandlers" - "github.com/thangchung/go-coffeeshop/internal/counter/features/repo" + "github.com/thangchung/go-coffeeshop/internal/counter/features/orders/command" + "github.com/thangchung/go-coffeeshop/internal/counter/features/orders/eventhandlers" + "github.com/thangchung/go-coffeeshop/internal/counter/features/orders/query" + "github.com/thangchung/go-coffeeshop/internal/counter/features/orders/repo" counterGrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc" "github.com/thangchung/go-coffeeshop/pkg/event" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" "github.com/thangchung/go-coffeeshop/pkg/postgres" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq" - "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/consumer" + rabConsumer "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/consumer" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/publisher" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -29,7 +30,7 @@ type App struct { cfg *config.Config network string address string - handler eventhandlers.BaristaOrderUpdatedEventHandler + handler domain.BaristaOrderUpdatedEventHandler } func New(log *mylogger.Logger, cfg *config.Config) *App { @@ -51,6 +52,8 @@ func (a *App) Run() error { if err != nil { a.logger.Fatal("app - Run - postgres.NewPostgres: %s", err.Error()) + cancel() + return err } defer pg.Close() @@ -60,6 +63,8 @@ func (a *App) Run() error { if err != nil { a.logger.Fatal("app - Run - rabbitmq.NewRabbitMQConn: %s", err.Error()) + cancel() + return err } defer amqpConn.Close() @@ -67,6 +72,8 @@ func (a *App) Run() error { // gRPC Client conn, err := grpc.Dial(a.cfg.ProductClient.URL, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { + cancel() + return err } defer conn.Close() @@ -81,6 +88,8 @@ func (a *App) Run() error { defer baristaOrderPub.CloseChan() if err != nil { + cancel() + return errors.Wrap(err, "counterRabbitMQ-Barista-NewOrderPublisher") } @@ -94,27 +103,31 @@ func (a *App) Run() error { defer kitchenOrderPub.CloseChan() if err != nil { + cancel() + return errors.Wrap(err, "counterRabbitMQ-Kitchen-NewOrderPublisher") } a.logger.Info("Order Publisher initialized") - var productDomainSvc domain.ProductDomainService = counterGrpc.NewProductServiceClient(ctx, conn) + // domain service + productDomainSvc := counterGrpc.NewProductDomainService(conn) - // Use case - queryOrderFulfillmentUC := features.NewQueryOrderFulfillmentUseCase(ctx, repo.NewQueryOrderFulfillmentRepo(ctx, pg)) + // CQRS components + orderQuery := query.NewOrderQuery(ctx, repo.NewOrderRepo(pg)) + orderCommand := command.NewOrderCommand(ctx, repo.NewOrderRepo(pg)) // event handlers. - a.handler = eventhandlers.NewBaristaOrderUpdatedEventHandler() + a.handler = eventhandlers.NewBaristaOrderUpdatedEventHandler(repo.NewOrderRepo(pg)) // consumers - consumer, err := consumer.NewConsumer( + consumer, err := rabConsumer.NewConsumer( amqpConn, a.logger, - consumer.ExchangeName("counter-order-exchange"), - consumer.QueueName("counter-order-queue"), - consumer.BindingKey("counter-order-routing-key"), - consumer.ConsumerTag("counter-order-consumer"), + rabConsumer.ExchangeName("counter-order-exchange"), + rabConsumer.QueueName("counter-order-queue"), + rabConsumer.BindingKey("counter-order-routing-key"), + rabConsumer.ConsumerTag("counter-order-consumer"), ) if err != nil { @@ -149,7 +162,8 @@ func (a *App) Run() error { amqpConn, a.cfg, a.logger, - queryOrderFulfillmentUC, + orderCommand, + orderQuery, productDomainSvc, *baristaOrderPub, *kitchenOrderPub, diff --git a/internal/counter/app/migrate.go b/internal/counter/app/migrate.go index 52dabba..aee67e5 100644 --- a/internal/counter/app/migrate.go +++ b/internal/counter/app/migrate.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "time" "github.com/golang-migrate/migrate/v4" @@ -18,8 +19,11 @@ import ( ) const ( - _defaultAttempts = 5 - _defaultTimeout = time.Second + _defaultAttempts = 5 + _defaultTimeout = time.Second +) + +var ( _migrationFilePath = "db/migrations" ) @@ -38,12 +42,19 @@ func init() { ) for attempts > 0 { - cur, _ := os.Getwd() - dir := filepath.Dir(cur + "/../../..") + inDocker, ok := os.LookupEnv("IN_DOCKER") + if !ok || len(inDocker) == 0 { + glog.Fatalf("migrate: environment variable not declared: IN_DOCKER") + } - glog.Infoln(fmt.Sprintf("file://%s", dir)) + dir := fmt.Sprintf("file://%s", _migrationFilePath) + if dockered, _ := strconv.ParseBool(inDocker); !dockered { + cur, _ := os.Getwd() + dir = fmt.Sprintf("file://%s/%s", filepath.Dir(cur+"/../../.."), _migrationFilePath) + } - m, err = migrate.New(fmt.Sprintf("file://%s/%s", dir, _migrationFilePath), databaseURL) + glog.Infoln(dir) + m, err = migrate.New(dir, databaseURL) if err == nil { break } diff --git a/internal/counter/domain/interfaces.go b/internal/counter/domain/interfaces.go index 23f184a..79a1552 100644 --- a/internal/counter/domain/interfaces.go +++ b/internal/counter/domain/interfaces.go @@ -1,15 +1,35 @@ package domain import ( + "context" + + "github.com/google/uuid" + "github.com/thangchung/go-coffeeshop/pkg/event" gen "github.com/thangchung/go-coffeeshop/proto/gen" ) type ( - QueryOrderFulfillmentRepo interface { - GetListOrderFulfillment() ([]gen.OrderDto, error) + OrderRepo interface { + GetAll(context.Context) ([]gen.OrderDto, error) + GetOrderByID(context.Context, uuid.UUID) (*Order, error) + Create(context.Context, *gen.OrderDto) error + Update(context.Context, *gen.OrderDto) (*gen.OrderDto, error) } ProductDomainService interface { - GetItemsByType(*gen.PlaceOrderRequest, bool) (*gen.GetItemsByTypeResponse, error) + GetItemsByType(context.Context, *gen.PlaceOrderRequest, bool) (*gen.GetItemsByTypeResponse, error) + } + + OrderCommand interface { + Create(context.Context, *gen.OrderDto) error + Update(context.Context, *gen.OrderDto) (*gen.OrderDto, error) + } + + OrderQuery interface { + GetAll(ctx context.Context) ([]gen.OrderDto, error) + } + + BaristaOrderUpdatedEventHandler interface { + Handle(context.Context, *event.BaristaOrderUpdated) error } ) diff --git a/internal/counter/domain/order.go b/internal/counter/domain/order.go index df83be8..d810327 100644 --- a/internal/counter/domain/order.go +++ b/internal/counter/domain/order.go @@ -54,7 +54,7 @@ func CreateOrderFrom( numberOfKitchenItems := len(request.KitchenItems) > 0 if numberOfBaristaItems { - itemTypesRes, err := productDomainSvc.GetItemsByType(request, true) + itemTypesRes, err := productDomainSvc.GetItemsByType(ctx, request, true) if err != nil { return nil, err } @@ -87,7 +87,7 @@ func CreateOrderFrom( } if numberOfKitchenItems { - itemTypesRes, err := productDomainSvc.GetItemsByType(request, false) + itemTypesRes, err := productDomainSvc.GetItemsByType(ctx, request, false) if err != nil { return nil, err } @@ -122,6 +122,28 @@ func CreateOrderFrom( return order, nil } +func (o *Order) Apply(event *events.BaristaOrderUpdated) error { + if len(o.LineItems) == 0 { + return nil // we dont do anything + } + + _, index, ok := lo.FindIndexOf(o.LineItems, func(i LineItem) bool { + return i.ItemType == event.ItemType + }) + + if !ok { + return errors.New("item not found") + } + + o.LineItems[index].ItemStatus = gen.Status_FULFILLED + + if checkFulfilledStatus(o.LineItems) { + o.OrderStatus = gen.Status_FULFILLED + } + + return nil +} + func publishBaristaOrderEvent( ctx context.Context, orderID uuid.UUID, @@ -173,3 +195,13 @@ func publishBaristaOrderEvent( return nil } } + +func checkFulfilledStatus(lineItems []LineItem) bool { + for _, item := range lineItems { + if item.ItemStatus != gen.Status_FULFILLED { + return false + } + } + + return true +} diff --git a/internal/counter/features/eventhandlers/barista_order_updated_event_handler.go b/internal/counter/features/eventhandlers/barista_order_updated_event_handler.go deleted file mode 100644 index ca8d4f0..0000000 --- a/internal/counter/features/eventhandlers/barista_order_updated_event_handler.go +++ /dev/null @@ -1,24 +0,0 @@ -package eventhandlers - -import ( - "context" - "fmt" - - "github.com/thangchung/go-coffeeshop/pkg/event" -) - -type BaristaOrderUpdatedEventHandler interface { - Handle(context.Context, *event.BaristaOrderUpdated) error -} - -type DefaultBaristaOrderUpdatedEventHandler struct{} - -func NewBaristaOrderUpdatedEventHandler() *DefaultBaristaOrderUpdatedEventHandler { - return &DefaultBaristaOrderUpdatedEventHandler{} -} - -func (h *DefaultBaristaOrderUpdatedEventHandler) Handle(ctx context.Context, e *event.BaristaOrderUpdated) error { - fmt.Println(e) - - return nil -} diff --git a/internal/counter/features/order_fulfillment.go b/internal/counter/features/order_fulfillment.go deleted file mode 100644 index 9dadaef..0000000 --- a/internal/counter/features/order_fulfillment.go +++ /dev/null @@ -1,36 +0,0 @@ -package features - -import ( - "context" - "fmt" - - "github.com/thangchung/go-coffeeshop/internal/counter/domain" - gen "github.com/thangchung/go-coffeeshop/proto/gen" -) - -type ( - QueryOrderFulfillmentUseCase interface { - GetListOrderFulfillment() ([]gen.OrderDto, error) - } -) - -type DefaultQueryOrderFulfillmentUseCase struct { - ctx context.Context - repo domain.QueryOrderFulfillmentRepo -} - -func NewQueryOrderFulfillmentUseCase(ctx context.Context, r domain.QueryOrderFulfillmentRepo) *DefaultQueryOrderFulfillmentUseCase { - return &DefaultQueryOrderFulfillmentUseCase{ - ctx: ctx, - repo: r, - } -} - -func (d *DefaultQueryOrderFulfillmentUseCase) GetListOrderFulfillment() ([]gen.OrderDto, error) { - entities, err := d.repo.GetListOrderFulfillment() - if err != nil { - return nil, fmt.Errorf("NewQueryOrderFulfillmentUseCase - GetListOrderFulfillment - s.repo.GetListOrderFulfillment: %w", err) - } - - return entities, nil -} diff --git a/internal/counter/features/orders/command/command.go b/internal/counter/features/orders/command/command.go new file mode 100644 index 0000000..6299ff8 --- /dev/null +++ b/internal/counter/features/orders/command/command.go @@ -0,0 +1,39 @@ +package command + +import ( + "context" + "fmt" + + "github.com/thangchung/go-coffeeshop/internal/counter/domain" + "github.com/thangchung/go-coffeeshop/proto/gen" +) + +type orderCommand struct { + repo domain.OrderRepo +} + +var _ domain.OrderCommand = (*orderCommand)(nil) + +func NewOrderCommand(ctx context.Context, r domain.OrderRepo) domain.OrderCommand { + return &orderCommand{ + repo: r, + } +} + +func (d *orderCommand) Create(ctx context.Context, orderModel *gen.OrderDto) error { + err := d.repo.Create(ctx, orderModel) + if err != nil { + return fmt.Errorf("NewOrderCommand-Create-d.repo.Create(ctx, orderModel): %w", err) + } + + return nil +} + +func (d *orderCommand) Update(ctx context.Context, orderModel *gen.OrderDto) (*gen.OrderDto, error) { + order, err := d.repo.Update(ctx, orderModel) + if err != nil { + return nil, fmt.Errorf("NewOrderCommand-Update-d.repo.Update(ctx, orderModel): %w", err) + } + + return order, nil +} diff --git a/internal/counter/features/orders/eventhandlers/barista_order_updated.go b/internal/counter/features/orders/eventhandlers/barista_order_updated.go new file mode 100644 index 0000000..398cb9c --- /dev/null +++ b/internal/counter/features/orders/eventhandlers/barista_order_updated.go @@ -0,0 +1,63 @@ +package eventhandlers + +import ( + "context" + "fmt" + + "github.com/thangchung/go-coffeeshop/internal/counter/domain" + "github.com/thangchung/go-coffeeshop/pkg/event" + "github.com/thangchung/go-coffeeshop/proto/gen" +) + +type baristaOrderUpdatedEventHandler struct { + orderRepo domain.OrderRepo +} + +var _ domain.BaristaOrderUpdatedEventHandler = (*baristaOrderUpdatedEventHandler)(nil) + +func NewBaristaOrderUpdatedEventHandler(orderRepo domain.OrderRepo) domain.BaristaOrderUpdatedEventHandler { + return &baristaOrderUpdatedEventHandler{ + orderRepo: orderRepo, + } +} + +func (h *baristaOrderUpdatedEventHandler) Handle(ctx context.Context, e *event.BaristaOrderUpdated) error { + order, err := h.orderRepo.GetOrderByID(ctx, e.OrderID) + if err != nil { + return fmt.Errorf("NewBaristaOrderUpdatedEventHandler-Handle-h.orderRepo.GetOrderByID(ctx, e.OrderID): %w", err) + } + + if err = order.Apply(e); err != nil { + return fmt.Errorf("NewBaristaOrderUpdatedEventHandler-Handle-order.Apply(e): %w", err) + } + + _, err = h.orderRepo.Update(ctx, ToDto(order)) + if err != nil { + return fmt.Errorf("NewBaristaOrderUpdatedEventHandler-Handle-h.orderRepo.Update(ctx, ToDto(order)): %w", err) + } + + return nil +} + +func ToDto(order *domain.Order) *gen.OrderDto { + orderModel := &gen.OrderDto{ + Id: order.ID.String(), + Localtion: order.Location, + OrderSource: order.OrderSource, + OrderStatus: order.OrderStatus, + LoyaltyMemberId: order.LoyaltyMemberID.String(), + } + + for _, item := range order.LineItems { + orderModel.LineItems = append(orderModel.LineItems, &gen.LineItemDto{ + Id: item.ID.String(), + Name: item.Name, + Price: float64(item.Price), + ItemType: item.ItemType, + ItemStatus: item.ItemStatus, + IsBaristaOrder: item.IsBaristaOrder, + }) + } + + return orderModel +} diff --git a/internal/counter/features/orders/query/query.go b/internal/counter/features/orders/query/query.go new file mode 100644 index 0000000..b79fd23 --- /dev/null +++ b/internal/counter/features/orders/query/query.go @@ -0,0 +1,30 @@ +package query + +import ( + "context" + "fmt" + + "github.com/thangchung/go-coffeeshop/internal/counter/domain" + "github.com/thangchung/go-coffeeshop/proto/gen" +) + +type orderQuery struct { + repo domain.OrderRepo +} + +var _ domain.OrderQuery = (*orderQuery)(nil) + +func NewOrderQuery(ctx context.Context, r domain.OrderRepo) domain.OrderQuery { + return &orderQuery{ + repo: r, + } +} + +func (d *orderQuery) GetAll(ctx context.Context) ([]gen.OrderDto, error) { + ents, err := d.repo.GetAll(ctx) + if err != nil { + return nil, fmt.Errorf("NewOrderQuery - GetAll - d.repo.GetAll(): %w", err) + } + + return ents, nil +} diff --git a/internal/counter/features/orders/repo/orders_postgres.go b/internal/counter/features/orders/repo/orders_postgres.go new file mode 100644 index 0000000..5e3949d --- /dev/null +++ b/internal/counter/features/orders/repo/orders_postgres.go @@ -0,0 +1,219 @@ +package repo + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/thangchung/go-coffeeshop/internal/counter/domain" + "github.com/thangchung/go-coffeeshop/pkg/postgres" + "github.com/thangchung/go-coffeeshop/proto/gen" +) + +const _defaultEntityCap = 64 + +type orderRepo struct { + pg *postgres.Postgres +} + +var _ domain.OrderRepo = (*orderRepo)(nil) + +func NewOrderRepo(pg *postgres.Postgres) domain.OrderRepo { + return &orderRepo{pg: pg} +} + +func (d *orderRepo) GetAll(ctx context.Context) ([]gen.OrderDto, error) { + sql, _, err := d.pg.Builder. + Select("orders.id"). + From(`"order".orders`).Join(`"order".line_items USING(id)`). + ToSql() + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-r.Builder: %w", err) + } + + rows, err := d.pg.Pool.Query(ctx, sql) + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-r.Pool.Query: %w", err) + } + defer rows.Close() + + entities := make([]gen.OrderDto, 0, _defaultEntityCap) + + for rows.Next() { + o := gen.OrderDto{} + + err = rows.Scan(&o.Id, &o.OrderSource, &o.LoyaltyMemberId, &o.OrderStatus) + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-rows.Scan: %w", err) + } + + entities = append(entities, o) + } + + return entities, nil +} + +func (d *orderRepo) GetOrderByID(ctx context.Context, id uuid.UUID) (*domain.Order, error) { + sql, args, err := d.pg.Builder. + Select("o.id, order_source, loyalty_member_id, order_status"). + From(`"order".orders o`).Join(`"order".line_items l ON o.id = l.order_id`). + Where("o.id = ?", id). + GroupBy("o.id"). + ToSql() + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-r.Builder: %w", err) + } + + rows, err := d.pg.Pool.Query(ctx, sql, args...) + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-r.Pool.Query: %w", err) + } + defer rows.Close() + + orders := make([]domain.Order, 0, _defaultEntityCap) + + for rows.Next() { + o := domain.Order{} + + err = rows.Scan(&o.ID, &o.OrderSource, &o.LoyaltyMemberID, &o.OrderStatus) + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-rows.Scan: %w", err) + } + + orders = append(orders, o) + } + + // continue to load order items + order := orders[0] + if len(orders) >= 1 { + sql, args, err := d.pg.Builder. + Select("id, item_type, name, price, item_status, is_barista_order"). + From(`"order".line_items`). + Where("order_id = ?", order.ID). + ToSql() + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-r.Builder: %w", err) + } + + rows, err = d.pg.Pool.Query(ctx, sql, args...) + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-r.Pool.Query: %w", err) + } + defer rows.Close() + + for rows.Next() { + o := domain.LineItem{} + + err = rows.Scan(&o.ID, &o.ItemType, &o.Name, &o.Price, &o.ItemStatus, &o.IsBaristaOrder) + if err != nil { + return nil, fmt.Errorf("NewOrderRepo-GetAll-rows.Scan: %w", err) + } + + order.LineItems = append(order.LineItems, o) + } + } + + return &order, nil +} + +func (d *orderRepo) Create(ctx context.Context, orderModel *gen.OrderDto) error { + tx, err := d.pg.Pool.Begin(ctx) + if err != nil { + return errors.Wrapf(err, "orderRepo-Create-d.pg.Pool.Begin(ctx)") + } + + // insert order + sql, args, err := d.pg.Builder. + Insert(`"order".orders`). + Columns("id", "order_source", "loyalty_member_id", "order_status", "updated"). + Values( + orderModel.Id, + orderModel.OrderSource, + orderModel.LoyaltyMemberId, + orderModel.OrderStatus, + time.Now(), + ). + ToSql() + if err != nil { + return tx.Rollback(ctx) + } + + _, err = d.pg.Pool.Exec(ctx, sql, args...) + if err != nil { + return tx.Rollback(ctx) + } + + // continue to insert order items + for _, item := range orderModel.LineItems { + sql, args, err = d.pg.Builder. + Insert(`"order".line_items`). + Columns("id", "item_type", "name", "price", "item_status", "is_barista_order", "order_id", "created", "updated"). + Values( + uuid.New(), + item.ItemType, + item.Name, + item.Price, + item.ItemStatus, + item.IsBaristaOrder, + orderModel.Id, + time.Now(), + time.Now(), + ). + ToSql() + if err != nil { + return tx.Rollback(ctx) + } + + _, err = d.pg.Pool.Exec(ctx, sql, args...) + if err != nil { + return tx.Rollback(ctx) + } + } + + return tx.Commit(ctx) +} + +func (d *orderRepo) Update(ctx context.Context, orderModel *gen.OrderDto) (*gen.OrderDto, error) { + tx, err := d.pg.Pool.Begin(ctx) + if err != nil { + return nil, errors.Wrapf(err, "orderRepo-Update-d.pg.Pool.Begin(ctx)") + } + + // update order + sql, args, err := d.pg.Builder. + Update(`"order".orders`). + Set("order_status", orderModel.OrderStatus). + Set("updated", time.Now()). + Where("id = ?", orderModel.Id). + ToSql() + if err != nil { + return nil, tx.Rollback(ctx) + } + + _, err = d.pg.Pool.Exec(ctx, sql, args...) + if err != nil { + return nil, tx.Rollback(ctx) + } + + // continue to update order items + for _, item := range orderModel.LineItems { + sql, args, err = d.pg.Builder. + Update(`"order".line_items`). + Set("item_status", item.ItemStatus). + Set("updated", time.Now()). + Where("id = ?", item.Id). + ToSql() + if err != nil { + return nil, tx.Rollback(ctx) + } + + _, err = d.pg.Pool.Exec(ctx, sql, args...) + if err != nil { + return nil, tx.Rollback(ctx) + } + } + + return orderModel, tx.Commit(ctx) +} diff --git a/internal/counter/features/repo/order_fulfillment_postgres.go b/internal/counter/features/repo/order_fulfillment_postgres.go deleted file mode 100644 index ea2a0af..0000000 --- a/internal/counter/features/repo/order_fulfillment_postgres.go +++ /dev/null @@ -1,51 +0,0 @@ -package repo - -import ( - "context" - "fmt" - - "github.com/thangchung/go-coffeeshop/pkg/postgres" - "github.com/thangchung/go-coffeeshop/proto/gen" -) - -const _defaultEntityCap = 64 - -type DefaultQueryOrderFulfillmentRepo struct { - ctx context.Context - pg *postgres.Postgres -} - -func NewQueryOrderFulfillmentRepo(ctx context.Context, pg *postgres.Postgres) *DefaultQueryOrderFulfillmentRepo { - return &DefaultQueryOrderFulfillmentRepo{ctx: ctx, pg: pg} -} - -func (d *DefaultQueryOrderFulfillmentRepo) GetListOrderFulfillment() ([]gen.OrderDto, error) { - sql, _, err := d.pg.Builder. - Select("orders.id"). - From(`"order".orders`).Join(`"order".line_items USING(id)`). - ToSql() - if err != nil { - return nil, fmt.Errorf("DefaultQueryOrderFulfillmentRepo - GetListOrderFulfillment - r.Builder: %w", err) - } - - rows, err := d.pg.Pool.Query(d.ctx, sql) - if err != nil { - return nil, fmt.Errorf("DefaultQueryOrderFulfillmentRepo - GetListOrderFulfillment - r.Pool.Query: %w", err) - } - defer rows.Close() - - entities := make([]gen.OrderDto, 0, _defaultEntityCap) - - for rows.Next() { - o := gen.OrderDto{} - - err = rows.Scan(&o.Id) - if err != nil { - return nil, fmt.Errorf("DefaultQueryOrderFulfillmentRepo - GetListOrderFulfillment - rows.Scan: %w", err) - } - - entities = append(entities, o) - } - - return entities, nil -} diff --git a/internal/counter/grpc/counter_server.go b/internal/counter/grpc/counter_server.go index b499fe5..a716821 100644 --- a/internal/counter/grpc/counter_server.go +++ b/internal/counter/grpc/counter_server.go @@ -8,7 +8,6 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/thangchung/go-coffeeshop/cmd/counter/config" "github.com/thangchung/go-coffeeshop/internal/counter/domain" - "github.com/thangchung/go-coffeeshop/internal/counter/features" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/publisher" gen "github.com/thangchung/go-coffeeshop/proto/gen" @@ -18,13 +17,14 @@ import ( type CounterServiceServerImpl struct { gen.UnimplementedCounterServiceServer - logger *mylogger.Logger - amqpConn *amqp.Connection - cfg *config.Config - productDomainSvc domain.ProductDomainService - queryOrderFulfillmentUC features.QueryOrderFulfillmentUseCase - baristaOrderPub publisher.Publisher - kitchenOrderPub publisher.Publisher + logger *mylogger.Logger + amqpConn *amqp.Connection + cfg *config.Config + productDomainSvc domain.ProductDomainService + orderCommand domain.OrderCommand + orderQuery domain.OrderQuery + baristaOrderPub publisher.Publisher + kitchenOrderPub publisher.Publisher } func NewCounterServiceServerGrpc( @@ -32,19 +32,21 @@ func NewCounterServiceServerGrpc( amqpConn *amqp.Connection, cfg *config.Config, log *mylogger.Logger, - queryOrderFulfillmentUC features.QueryOrderFulfillmentUseCase, + orderCommand domain.OrderCommand, + orderQuery domain.OrderQuery, productDomainSvc domain.ProductDomainService, baristaOrderPub publisher.Publisher, kitchenOrderPub publisher.Publisher, ) { svc := CounterServiceServerImpl{ - cfg: cfg, - logger: log, - amqpConn: amqpConn, - queryOrderFulfillmentUC: queryOrderFulfillmentUC, - productDomainSvc: productDomainSvc, - baristaOrderPub: baristaOrderPub, - kitchenOrderPub: kitchenOrderPub, + cfg: cfg, + logger: log, + amqpConn: amqpConn, + orderCommand: orderCommand, + orderQuery: orderQuery, + productDomainSvc: productDomainSvc, + baristaOrderPub: baristaOrderPub, + kitchenOrderPub: kitchenOrderPub, } gen.RegisterCounterServiceServer(grpcServer, &svc) @@ -60,7 +62,7 @@ func (g *CounterServiceServerImpl) GetListOrderFulfillment( res := gen.GetListOrderFulfillmentResponse{} - entities, err := g.queryOrderFulfillmentUC.GetListOrderFulfillment() + entities, err := g.orderQuery.GetAll(ctx) if err != nil { return nil, fmt.Errorf("CounterServiceServerImpl - GetListOrderFulfillment - g.queryOrderFulfillmentUseCase.GetListOrderFulfillment: %w", err) } @@ -88,10 +90,31 @@ func (g *CounterServiceServerImpl) PlaceOrder( return nil, errors.Wrap(err, "PlaceOrder - domain.CreateOrderFrom") } - // todo: save to database - // ... + // save to database + orderModel := &gen.OrderDto{ + Id: order.ID.String(), + Localtion: order.Location, + LoyaltyMemberId: order.LoyaltyMemberID.String(), + OrderSource: order.OrderSource, + OrderStatus: order.OrderStatus, + } + + for _, item := range order.LineItems { + orderModel.LineItems = append(orderModel.LineItems, &gen.LineItemDto{ + ItemType: item.ItemType, + Name: item.Name, + Price: float64(item.Price), + ItemStatus: item.ItemStatus, + IsBaristaOrder: item.IsBaristaOrder, + }) + } + + err = g.orderCommand.Create(ctx, orderModel) + if err != nil { + return nil, errors.Wrap(err, "PlaceOrder-g.orderCommand.Create") + } - g.logger.Debug("order created: %s", *order) + g.logger.Debug("order created: %v", *order) res := gen.PlaceOrderResponse{} diff --git a/internal/counter/grpc/product_client.go b/internal/counter/grpc/product_client.go index daf049d..b520b23 100644 --- a/internal/counter/grpc/product_client.go +++ b/internal/counter/grpc/product_client.go @@ -6,23 +6,28 @@ import ( "strings" "github.com/samber/lo" + "github.com/thangchung/go-coffeeshop/internal/counter/domain" gen "github.com/thangchung/go-coffeeshop/proto/gen" "google.golang.org/grpc" ) -type ProductServiceClient struct { - ctx context.Context +type productDomainService struct { conn *grpc.ClientConn } -func NewProductServiceClient(ctx context.Context, conn *grpc.ClientConn) *ProductServiceClient { - return &ProductServiceClient{ - ctx: ctx, +var _ domain.ProductDomainService = (*productDomainService)(nil) + +func NewProductDomainService(conn *grpc.ClientConn) domain.ProductDomainService { + return &productDomainService{ conn: conn, } } -func (p *ProductServiceClient) GetItemsByType(request *gen.PlaceOrderRequest, isBarista bool) (*gen.GetItemsByTypeResponse, error) { +func (p *productDomainService) GetItemsByType( + ctx context.Context, + request *gen.PlaceOrderRequest, + isBarista bool, +) (*gen.GetItemsByTypeResponse, error) { c := gen.NewProductServiceClient(p.conn) itemTypes := "" @@ -36,5 +41,5 @@ func (p *ProductServiceClient) GetItemsByType(request *gen.PlaceOrderRequest, is }, "") } - return c.GetItemsByType(p.ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")}) + return c.GetItemsByType(ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")}) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go new file mode 100644 index 0000000..62e1e10 --- /dev/null +++ b/pkg/utils/utils.go @@ -0,0 +1,12 @@ +package utils + +import "os" + +// ref: https://www.thorsten-hans.com/check-if-application-is-running-in-docker-container/ +func IsRunningInContainer() bool { + if _, err := os.Stat("/.dockerenv"); err != nil { + return false + } + + return true +} diff --git a/proto/counter.proto b/proto/counter.proto index 79d6797..8f3dcc5 100644 --- a/proto/counter.proto +++ b/proto/counter.proto @@ -48,11 +48,12 @@ message OrderDto { } message LineItemDto { - common.ItemType item_type = 1; - string name = 2; - double price = 3; - common.Status item_status = 4; - bool is_barista_order = 5; + string id = 1; + common.ItemType item_type = 2; + string name = 3; + double price = 4; + common.Status item_status = 5; + bool is_barista_order = 6; } message PlaceOrderRequest { diff --git a/proto/gen/counter.pb.go b/proto/gen/counter.pb.go index f77cc48..e7298e4 100644 --- a/proto/gen/counter.pb.go +++ b/proto/gen/counter.pb.go @@ -200,11 +200,12 @@ type LineItemDto struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ItemType ItemType `protobuf:"varint,1,opt,name=item_type,json=itemType,proto3,enum=go.coffeeshop.proto.common.ItemType" json:"item_type,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` - Price float64 `protobuf:"fixed64,3,opt,name=price,proto3" json:"price,omitempty"` - ItemStatus Status `protobuf:"varint,4,opt,name=item_status,json=itemStatus,proto3,enum=go.coffeeshop.proto.common.Status" json:"item_status,omitempty"` - IsBaristaOrder bool `protobuf:"varint,5,opt,name=is_barista_order,json=isBaristaOrder,proto3" json:"is_barista_order,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + ItemType ItemType `protobuf:"varint,2,opt,name=item_type,json=itemType,proto3,enum=go.coffeeshop.proto.common.ItemType" json:"item_type,omitempty"` + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` + Price float64 `protobuf:"fixed64,4,opt,name=price,proto3" json:"price,omitempty"` + ItemStatus Status `protobuf:"varint,5,opt,name=item_status,json=itemStatus,proto3,enum=go.coffeeshop.proto.common.Status" json:"item_status,omitempty"` + IsBaristaOrder bool `protobuf:"varint,6,opt,name=is_barista_order,json=isBaristaOrder,proto3" json:"is_barista_order,omitempty"` } func (x *LineItemDto) Reset() { @@ -239,6 +240,13 @@ func (*LineItemDto) Descriptor() ([]byte, []int) { return file_counter_proto_rawDescGZIP(), []int{3} } +func (x *LineItemDto) GetId() string { + if x != nil { + return x.Id + } + return "" +} + func (x *LineItemDto) GetItemType() ItemType { if x != nil { return x.ItemType @@ -499,20 +507,21 @@ var file_counter_proto_rawDesc = []byte{ 0x66, 0x65, 0x65, 0x73, 0x68, 0x6f, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x61, 0x70, 0x69, 0x2e, 0x4c, 0x69, 0x6e, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x44, 0x74, 0x6f, 0x52, 0x09, 0x6c, 0x69, 0x6e, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x22, - 0xe9, 0x01, 0x0a, 0x0b, 0x4c, 0x69, 0x6e, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x44, 0x74, 0x6f, 0x12, - 0x41, 0x0a, 0x09, 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, + 0xf9, 0x01, 0x0a, 0x0b, 0x4c, 0x69, 0x6e, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x44, 0x74, 0x6f, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x41, 0x0a, 0x09, 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x67, 0x6f, 0x2e, 0x63, 0x6f, 0x66, 0x66, 0x65, 0x65, 0x73, 0x68, 0x6f, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x54, 0x79, 0x70, 0x65, 0x52, 0x08, 0x69, 0x74, 0x65, 0x6d, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x12, 0x43, 0x0a, 0x0b, - 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x04, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x12, 0x43, 0x0a, 0x0b, + 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x22, 0x2e, 0x67, 0x6f, 0x2e, 0x63, 0x6f, 0x66, 0x66, 0x65, 0x65, 0x73, 0x68, 0x6f, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x69, 0x74, 0x65, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x28, 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x62, 0x61, 0x72, 0x69, 0x73, 0x74, 0x61, 0x5f, - 0x6f, 0x72, 0x64, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x69, 0x73, 0x42, + 0x6f, 0x72, 0x64, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x69, 0x73, 0x42, 0x61, 0x72, 0x69, 0x73, 0x74, 0x61, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x22, 0xf7, 0x03, 0x0a, 0x11, 0x50, 0x6c, 0x61, 0x63, 0x65, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x4a, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x5f, 0x74, 0x79, 0x70,