Skip to content

Commit

Permalink
add more data code #3
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Nov 14, 2022
1 parent c7334d6 commit 6b993da
Show file tree
Hide file tree
Showing 17 changed files with 549 additions and 180 deletions.
2 changes: 2 additions & 0 deletions docker-compose-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ services:
image: go-coffeeshop-counter
environment:
APP_NAME: 'counter-service in docker'
IN_DOCKER: "true"
PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
PRODUCT_CLIENT_URL: product:5001
Expand All @@ -88,6 +89,7 @@ services:
image: go-coffeeshop-barista
environment:
APP_NAME: 'barista-service in docker'
IN_DOCKER: "true"
PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
depends_on:
Expand Down
44 changes: 29 additions & 15 deletions internal/counter/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
"github.com/rabbitmq/amqp091-go"
"github.com/thangchung/go-coffeeshop/cmd/counter/config"
"github.com/thangchung/go-coffeeshop/internal/counter/domain"
"github.com/thangchung/go-coffeeshop/internal/counter/features"
"github.com/thangchung/go-coffeeshop/internal/counter/features/eventhandlers"
"github.com/thangchung/go-coffeeshop/internal/counter/features/repo"
"github.com/thangchung/go-coffeeshop/internal/counter/features/orders/command"
"github.com/thangchung/go-coffeeshop/internal/counter/features/orders/eventhandlers"
"github.com/thangchung/go-coffeeshop/internal/counter/features/orders/query"
"github.com/thangchung/go-coffeeshop/internal/counter/features/orders/repo"
counterGrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc"
"github.com/thangchung/go-coffeeshop/pkg/event"
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
"github.com/thangchung/go-coffeeshop/pkg/postgres"
"github.com/thangchung/go-coffeeshop/pkg/rabbitmq"
"github.com/thangchung/go-coffeeshop/pkg/rabbitmq/consumer"
rabConsumer "github.com/thangchung/go-coffeeshop/pkg/rabbitmq/consumer"
"github.com/thangchung/go-coffeeshop/pkg/rabbitmq/publisher"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -29,7 +30,7 @@ type App struct {
cfg *config.Config
network string
address string
handler eventhandlers.BaristaOrderUpdatedEventHandler
handler domain.BaristaOrderUpdatedEventHandler
}

func New(log *mylogger.Logger, cfg *config.Config) *App {
Expand All @@ -51,6 +52,8 @@ func (a *App) Run() error {
if err != nil {
a.logger.Fatal("app - Run - postgres.NewPostgres: %s", err.Error())

cancel()

return err
}
defer pg.Close()
Expand All @@ -60,13 +63,17 @@ func (a *App) Run() error {
if err != nil {
a.logger.Fatal("app - Run - rabbitmq.NewRabbitMQConn: %s", err.Error())

cancel()

return err
}
defer amqpConn.Close()

// gRPC Client
conn, err := grpc.Dial(a.cfg.ProductClient.URL, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
cancel()

return err
}
defer conn.Close()
Expand All @@ -81,6 +88,8 @@ func (a *App) Run() error {
defer baristaOrderPub.CloseChan()

if err != nil {
cancel()

return errors.Wrap(err, "counterRabbitMQ-Barista-NewOrderPublisher")
}

Expand All @@ -94,27 +103,31 @@ func (a *App) Run() error {
defer kitchenOrderPub.CloseChan()

if err != nil {
cancel()

return errors.Wrap(err, "counterRabbitMQ-Kitchen-NewOrderPublisher")
}

a.logger.Info("Order Publisher initialized")

var productDomainSvc domain.ProductDomainService = counterGrpc.NewProductServiceClient(ctx, conn)
// domain service
productDomainSvc := counterGrpc.NewProductDomainService(conn)

// Use case
queryOrderFulfillmentUC := features.NewQueryOrderFulfillmentUseCase(ctx, repo.NewQueryOrderFulfillmentRepo(ctx, pg))
// CQRS components
orderQuery := query.NewOrderQuery(ctx, repo.NewOrderRepo(pg))
orderCommand := command.NewOrderCommand(ctx, repo.NewOrderRepo(pg))

// event handlers.
a.handler = eventhandlers.NewBaristaOrderUpdatedEventHandler()
a.handler = eventhandlers.NewBaristaOrderUpdatedEventHandler(repo.NewOrderRepo(pg))

// consumers
consumer, err := consumer.NewConsumer(
consumer, err := rabConsumer.NewConsumer(
amqpConn,
a.logger,
consumer.ExchangeName("counter-order-exchange"),
consumer.QueueName("counter-order-queue"),
consumer.BindingKey("counter-order-routing-key"),
consumer.ConsumerTag("counter-order-consumer"),
rabConsumer.ExchangeName("counter-order-exchange"),
rabConsumer.QueueName("counter-order-queue"),
rabConsumer.BindingKey("counter-order-routing-key"),
rabConsumer.ConsumerTag("counter-order-consumer"),
)

if err != nil {
Expand Down Expand Up @@ -149,7 +162,8 @@ func (a *App) Run() error {
amqpConn,
a.cfg,
a.logger,
queryOrderFulfillmentUC,
orderCommand,
orderQuery,
productDomainSvc,
*baristaOrderPub,
*kitchenOrderPub,
Expand Down
23 changes: 17 additions & 6 deletions internal/counter/app/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"

"github.com/golang-migrate/migrate/v4"
Expand All @@ -18,8 +19,11 @@ import (
)

const (
_defaultAttempts = 5
_defaultTimeout = time.Second
_defaultAttempts = 5
_defaultTimeout = time.Second
)

var (
_migrationFilePath = "db/migrations"
)

Expand All @@ -38,12 +42,19 @@ func init() {
)

for attempts > 0 {
cur, _ := os.Getwd()
dir := filepath.Dir(cur + "/../../..")
inDocker, ok := os.LookupEnv("IN_DOCKER")
if !ok || len(inDocker) == 0 {
glog.Fatalf("migrate: environment variable not declared: IN_DOCKER")
}

glog.Infoln(fmt.Sprintf("file://%s", dir))
dir := fmt.Sprintf("file://%s", _migrationFilePath)
if dockered, _ := strconv.ParseBool(inDocker); !dockered {
cur, _ := os.Getwd()
dir = fmt.Sprintf("file://%s/%s", filepath.Dir(cur+"/../../.."), _migrationFilePath)
}

m, err = migrate.New(fmt.Sprintf("file://%s/%s", dir, _migrationFilePath), databaseURL)
glog.Infoln(dir)
m, err = migrate.New(dir, databaseURL)
if err == nil {
break
}
Expand Down
26 changes: 23 additions & 3 deletions internal/counter/domain/interfaces.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
package domain

import (
"context"

"github.com/google/uuid"
"github.com/thangchung/go-coffeeshop/pkg/event"
gen "github.com/thangchung/go-coffeeshop/proto/gen"
)

type (
QueryOrderFulfillmentRepo interface {
GetListOrderFulfillment() ([]gen.OrderDto, error)
OrderRepo interface {
GetAll(context.Context) ([]gen.OrderDto, error)
GetOrderByID(context.Context, uuid.UUID) (*Order, error)
Create(context.Context, *gen.OrderDto) error
Update(context.Context, *gen.OrderDto) (*gen.OrderDto, error)
}

ProductDomainService interface {
GetItemsByType(*gen.PlaceOrderRequest, bool) (*gen.GetItemsByTypeResponse, error)
GetItemsByType(context.Context, *gen.PlaceOrderRequest, bool) (*gen.GetItemsByTypeResponse, error)
}

OrderCommand interface {
Create(context.Context, *gen.OrderDto) error
Update(context.Context, *gen.OrderDto) (*gen.OrderDto, error)
}

OrderQuery interface {
GetAll(ctx context.Context) ([]gen.OrderDto, error)
}

BaristaOrderUpdatedEventHandler interface {
Handle(context.Context, *event.BaristaOrderUpdated) error
}
)
36 changes: 34 additions & 2 deletions internal/counter/domain/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func CreateOrderFrom(
numberOfKitchenItems := len(request.KitchenItems) > 0

if numberOfBaristaItems {
itemTypesRes, err := productDomainSvc.GetItemsByType(request, true)
itemTypesRes, err := productDomainSvc.GetItemsByType(ctx, request, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func CreateOrderFrom(
}

if numberOfKitchenItems {
itemTypesRes, err := productDomainSvc.GetItemsByType(request, false)
itemTypesRes, err := productDomainSvc.GetItemsByType(ctx, request, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -122,6 +122,28 @@ func CreateOrderFrom(
return order, nil
}

func (o *Order) Apply(event *events.BaristaOrderUpdated) error {
if len(o.LineItems) == 0 {
return nil // we dont do anything
}

_, index, ok := lo.FindIndexOf(o.LineItems, func(i LineItem) bool {
return i.ItemType == event.ItemType
})

if !ok {
return errors.New("item not found")
}

o.LineItems[index].ItemStatus = gen.Status_FULFILLED

if checkFulfilledStatus(o.LineItems) {
o.OrderStatus = gen.Status_FULFILLED
}

return nil
}

func publishBaristaOrderEvent(
ctx context.Context,
orderID uuid.UUID,
Expand Down Expand Up @@ -173,3 +195,13 @@ func publishBaristaOrderEvent(
return nil
}
}

func checkFulfilledStatus(lineItems []LineItem) bool {
for _, item := range lineItems {
if item.ItemStatus != gen.Status_FULFILLED {
return false
}
}

return true
}

This file was deleted.

36 changes: 0 additions & 36 deletions internal/counter/features/order_fulfillment.go

This file was deleted.

39 changes: 39 additions & 0 deletions internal/counter/features/orders/command/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package command

import (
"context"
"fmt"

"github.com/thangchung/go-coffeeshop/internal/counter/domain"
"github.com/thangchung/go-coffeeshop/proto/gen"
)

type orderCommand struct {
repo domain.OrderRepo
}

var _ domain.OrderCommand = (*orderCommand)(nil)

func NewOrderCommand(ctx context.Context, r domain.OrderRepo) domain.OrderCommand {
return &orderCommand{
repo: r,
}
}

func (d *orderCommand) Create(ctx context.Context, orderModel *gen.OrderDto) error {
err := d.repo.Create(ctx, orderModel)
if err != nil {
return fmt.Errorf("NewOrderCommand-Create-d.repo.Create(ctx, orderModel): %w", err)
}

return nil
}

func (d *orderCommand) Update(ctx context.Context, orderModel *gen.OrderDto) (*gen.OrderDto, error) {
order, err := d.repo.Update(ctx, orderModel)
if err != nil {
return nil, fmt.Errorf("NewOrderCommand-Update-d.repo.Update(ctx, orderModel): %w", err)
}

return order, nil
}
Loading

0 comments on commit 6b993da

Please sign in to comment.