From 311b5b6284518478675859498e515f88a7b3e4b4 Mon Sep 17 00:00:00 2001 From: thangchung Date: Wed, 2 Nov 2022 12:21:39 +0000 Subject: [PATCH] refactor namespace #2 #3 --- .devcontainer/devcontainer.json | 6 +- internal/counter/app/app.go | 88 +++++++------------ internal/counter/domain/interfaces.go | 19 ++++ .../counter/{entity => domain}/line_item.go | 2 +- internal/counter/{entity => domain}/order.go | 34 +------ internal/counter/grpc/handler.go | 20 +++-- internal/counter/grpc/product_client.go | 40 +++++++++ internal/counter/usecase/interfaces.go | 17 ---- internal/counter/usecase/order_fulfillment.go | 11 ++- .../repo/order_fulfillment_postgres.go | 13 +-- pkg/postgres/postgres.go | 2 +- pkg/rabbitmq/rabbitmq.go | 51 +++++++++++ 12 files changed, 177 insertions(+), 126 deletions(-) create mode 100644 internal/counter/domain/interfaces.go rename internal/counter/{entity => domain}/line_item.go (97%) rename internal/counter/{entity => domain}/order.go (64%) create mode 100644 internal/counter/grpc/product_client.go delete mode 100644 internal/counter/usecase/interfaces.go create mode 100644 pkg/rabbitmq/rabbitmq.go diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 16a49fb..5756339 100755 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -11,7 +11,7 @@ // new files getting created as root, but you may need to update the USER_UID // and USER_GID in .devcontainer/Dockerfile to match your user if not 1000. "-u", - "root", + "vscode", // Mount go mod cache "-v", "coffeeshop-gomodcache:/go/pkg", @@ -96,7 +96,9 @@ "bungcip.better-toml", "eamodio.gitlens", "casualjim.gotemplate", - "davidanson.vscode-markdownlint" + "davidanson.vscode-markdownlint", + "cweijan.vscode-mysql-client2", + "bierner.markdown-mermaid" ], "postCreateCommand": "go version", "features": { diff --git a/internal/counter/app/app.go b/internal/counter/app/app.go index 599c9ab..3f0c6b9 100644 --- a/internal/counter/app/app.go +++ b/internal/counter/app/app.go @@ -2,28 +2,20 @@ package app import ( "context" - "errors" "net" - "time" - amqp "github.com/rabbitmq/amqp091-go" "github.com/thangchung/go-coffeeshop/cmd/counter/config" - counterGrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc" + "github.com/thangchung/go-coffeeshop/internal/counter/domain" + mygrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc" "github.com/thangchung/go-coffeeshop/internal/counter/usecase" "github.com/thangchung/go-coffeeshop/internal/counter/usecase/repo" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" "github.com/thangchung/go-coffeeshop/pkg/postgres" + "github.com/thangchung/go-coffeeshop/pkg/rabbitmq" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) -const ( - OrderTopic = "orders_topic" - RetryTimes = 5 - BackOffSeconds = 2 -) - -var ErrCannotConnectRabbitMQ = errors.New("cannot connect to rabbit") - type App struct { logger *mylogger.Logger cfg *config.Config @@ -43,26 +35,41 @@ func New(log *mylogger.Logger, cfg *config.Config) *App { func (a *App) Run(ctx context.Context) error { a.logger.Info("Init %s %s\n", a.cfg.Name, a.cfg.Version) - // Repository - pg, err := postgres.NewPostgres(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax)) + // PostgresDB + pg, err := postgres.NewPostgresDB(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax)) if err != nil { - a.logger.Fatal("app - Run - postgres.NewPostgres: %w", err) + a.logger.Fatal("app - Run - postgres.NewPostgres: %s", err.Error()) + + return err } defer pg.Close() - // Use case - queryOrderFulfillmentUseCase := usecase.NewQueryOrderFulfillmentUseCase(repo.NewQueryOrderFulfillmentRepo(pg)) - // RabbitMQ - amqpConn, err := a.connectToRabbit() + amqpConn, err := rabbitmq.NewRabbitMQConn(a.cfg.RabbitMQ.URL, a.logger) if err != nil { + a.logger.Fatal("app - Run - rabbitmq.NewRabbitMQConn: %s", err.Error()) + return err } defer amqpConn.Close() + // gRPC Client + conn, err := grpc.Dial("0.0.0.0:5001", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return err + } + defer conn.Close() + + var productServiceClient domain.ProductServiceClient = mygrpc.NewProductServiceClient(ctx, conn) + + // Use case + queryOrderFulfillmentUseCase := usecase.NewQueryOrderFulfillmentUseCase(ctx, repo.NewQueryOrderFulfillmentRepo(ctx, pg)) + // gRPC Server l, err := net.Listen(a.network, a.address) if err != nil { + a.logger.Fatal("app - Run - net.Listener: %s", err.Error()) + return err } @@ -72,50 +79,15 @@ func (a *App) Run(ctx context.Context) error { } }() - s := grpc.NewServer() - counterGrpc.NewCounterServiceServerGrpc(s, amqpConn, queryOrderFulfillmentUseCase, a.logger) + server := grpc.NewServer() + mygrpc.NewCounterServiceServerGrpc(server, amqpConn, queryOrderFulfillmentUseCase, productServiceClient, a.logger) go func() { - defer s.GracefulStop() + defer server.GracefulStop() <-ctx.Done() }() a.logger.Info("Start server at " + a.address + " ...") - return s.Serve(l) -} - -func (a *App) connectToRabbit() (*amqp.Connection, error) { - var ( - amqpConn *amqp.Connection - counts int64 - rabbitMqURL = a.cfg.RabbitMQ.URL - ) - - for { - connection, err := amqp.Dial(rabbitMqURL) - if err != nil { - a.logger.Error("RabbitMq at %s not ready...\n", rabbitMqURL) - counts++ - } else { - amqpConn = connection - - break - } - - if counts > RetryTimes { - a.logger.LogError(err) - - return nil, ErrCannotConnectRabbitMQ - } - - a.logger.Info("Backing off for 2 seconds...") - time.Sleep(BackOffSeconds * time.Second) - - continue - } - - a.logger.Info("Connected to RabbitMQ!") - - return amqpConn, nil + return server.Serve(l) } diff --git a/internal/counter/domain/interfaces.go b/internal/counter/domain/interfaces.go new file mode 100644 index 0000000..8c41204 --- /dev/null +++ b/internal/counter/domain/interfaces.go @@ -0,0 +1,19 @@ +package domain + +import ( + gen "github.com/thangchung/go-coffeeshop/proto/gen" +) + +type ( + QueryOrderFulfillmentUseCase interface { + GetListOrderFulfillment() ([]gen.OrderDto, error) + } + + QueryOrderFulfillmentRepo interface { + GetListOrderFulfillment() ([]gen.OrderDto, error) + } + + ProductServiceClient interface { + GetItemsByType(*gen.PlaceOrderRequest, bool) (*gen.GetItemsByTypeResponse, error) + } +) diff --git a/internal/counter/entity/line_item.go b/internal/counter/domain/line_item.go similarity index 97% rename from internal/counter/entity/line_item.go rename to internal/counter/domain/line_item.go index 8b98c51..3a2b0ef 100644 --- a/internal/counter/entity/line_item.go +++ b/internal/counter/domain/line_item.go @@ -1,4 +1,4 @@ -package entity +package domain import ( "github.com/google/uuid" diff --git a/internal/counter/entity/order.go b/internal/counter/domain/order.go similarity index 64% rename from internal/counter/entity/order.go rename to internal/counter/domain/order.go index d212dfb..13b5a6d 100644 --- a/internal/counter/entity/order.go +++ b/internal/counter/domain/order.go @@ -1,16 +1,9 @@ -package entity +package domain import ( - "context" - "fmt" - "strings" - "time" - "github.com/google/uuid" "github.com/samber/lo" gen "github.com/thangchung/go-coffeeshop/proto/gen" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type Order struct { @@ -32,7 +25,7 @@ func NewOrder(orderSource gen.OrderSource, loyaltyMemberID uuid.UUID, orderStatu } } -func CreateOrderFrom(request *gen.PlaceOrderRequest) (*Order, error) { +func CreateOrderFrom(request *gen.PlaceOrderRequest, productServiceClient ProductServiceClient) (*Order, error) { loyaltyMemberID, err := uuid.Parse(request.LoyaltyMemberId) if err != nil { return nil, err @@ -40,26 +33,11 @@ func CreateOrderFrom(request *gen.PlaceOrderRequest) (*Order, error) { order := NewOrder(request.OrderSource, loyaltyMemberID, gen.Status_IN_PROGRESS, request.Location) - //TODO: remove hard code URL - conn, err := grpc.Dial("0.0.0.0:5001", grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, err - } - defer conn.Close() - c := gen.NewProductServiceClient(conn) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - numberOfBaristaItems := len(request.BaristaItems) > 0 numberOfKitchenItems := len(request.KitchenItems) > 0 if numberOfBaristaItems { - itemTypes := lo.Reduce(request.BaristaItems, func(agg string, item *gen.CommandItem, _ int) string { - return fmt.Sprintf("%s,%s", agg, item.ItemType) - }, "") - - itemTypesRes, err := c.GetItemsByType(ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")}) + itemTypesRes, err := productServiceClient.GetItemsByType(request, true) if err != nil { return nil, err } @@ -81,11 +59,7 @@ func CreateOrderFrom(request *gen.PlaceOrderRequest) (*Order, error) { } if numberOfKitchenItems { - itemTypes := lo.Reduce(request.KitchenItems, func(agg string, item *gen.CommandItem, _ int) string { - return fmt.Sprintf("%s,%s", agg, item.ItemType) - }, "") - - itemTypesRes, err := c.GetItemsByType(ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")}) + itemTypesRes, err := productServiceClient.GetItemsByType(request, false) if err != nil { return nil, err } diff --git a/internal/counter/grpc/handler.go b/internal/counter/grpc/handler.go index be11f7d..d7a6729 100644 --- a/internal/counter/grpc/handler.go +++ b/internal/counter/grpc/handler.go @@ -7,8 +7,7 @@ import ( "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" - "github.com/thangchung/go-coffeeshop/internal/counter/entity" - "github.com/thangchung/go-coffeeshop/internal/counter/usecase" + "github.com/thangchung/go-coffeeshop/internal/counter/domain" events "github.com/thangchung/go-coffeeshop/pkg/event" mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" gen "github.com/thangchung/go-coffeeshop/proto/gen" @@ -23,9 +22,15 @@ const ( func NewCounterServiceServerGrpc( grpcServer *grpc.Server, amqpConn *amqp.Connection, - queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase, + queryOrderFulfillmentUseCase domain.QueryOrderFulfillmentUseCase, + productServiceClient domain.ProductServiceClient, log *mylogger.Logger) { - svc := CounterServiceServerImpl{logger: log, amqpConn: amqpConn, queryOrderFulfillmentUseCase: queryOrderFulfillmentUseCase} + svc := CounterServiceServerImpl{ + logger: log, + amqpConn: amqpConn, + queryOrderFulfillmentUseCase: queryOrderFulfillmentUseCase, + productServiceClient: productServiceClient, + } gen.RegisterCounterServiceServer(grpcServer, &svc) @@ -36,7 +41,8 @@ type CounterServiceServerImpl struct { gen.UnimplementedCounterServiceServer logger *mylogger.Logger amqpConn *amqp.Connection - queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase + productServiceClient domain.ProductServiceClient + queryOrderFulfillmentUseCase domain.QueryOrderFulfillmentUseCase } func (g *CounterServiceServerImpl) GetListOrderFulfillment(ctx context.Context, request *gen.GetListOrderFulfillmentRequest) (*gen.GetListOrderFulfillmentResponse, error) { @@ -44,7 +50,7 @@ func (g *CounterServiceServerImpl) GetListOrderFulfillment(ctx context.Context, res := gen.GetListOrderFulfillmentResponse{} - entities, err := g.queryOrderFulfillmentUseCase.GetListOrderFulfillment(ctx) + entities, err := g.queryOrderFulfillmentUseCase.GetListOrderFulfillment() if err != nil { return nil, fmt.Errorf("CounterServiceServerImpl - GetListOrderFulfillment - g.queryOrderFulfillmentUseCase.GetListOrderFulfillment: %w", err) } @@ -64,7 +70,7 @@ func (g *CounterServiceServerImpl) PlaceOrder(ctx context.Context, request *gen. g.logger.Debug("request: %s", request) // add order - order, err := entity.CreateOrderFrom(request) + order, err := domain.CreateOrderFrom(request, g.productServiceClient) if err != nil { return nil, err } diff --git a/internal/counter/grpc/product_client.go b/internal/counter/grpc/product_client.go new file mode 100644 index 0000000..daf049d --- /dev/null +++ b/internal/counter/grpc/product_client.go @@ -0,0 +1,40 @@ +package grpc + +import ( + "context" + "fmt" + "strings" + + "github.com/samber/lo" + gen "github.com/thangchung/go-coffeeshop/proto/gen" + "google.golang.org/grpc" +) + +type ProductServiceClient struct { + ctx context.Context + conn *grpc.ClientConn +} + +func NewProductServiceClient(ctx context.Context, conn *grpc.ClientConn) *ProductServiceClient { + return &ProductServiceClient{ + ctx: ctx, + conn: conn, + } +} + +func (p *ProductServiceClient) GetItemsByType(request *gen.PlaceOrderRequest, isBarista bool) (*gen.GetItemsByTypeResponse, error) { + c := gen.NewProductServiceClient(p.conn) + + itemTypes := "" + if isBarista { + itemTypes = lo.Reduce(request.BaristaItems, func(agg string, item *gen.CommandItem, _ int) string { + return fmt.Sprintf("%s,%s", agg, item.ItemType) + }, "") + } else { + itemTypes = lo.Reduce(request.KitchenItems, func(agg string, item *gen.CommandItem, _ int) string { + return fmt.Sprintf("%s,%s", agg, item.ItemType) + }, "") + } + + return c.GetItemsByType(p.ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")}) +} diff --git a/internal/counter/usecase/interfaces.go b/internal/counter/usecase/interfaces.go deleted file mode 100644 index 4ff4714..0000000 --- a/internal/counter/usecase/interfaces.go +++ /dev/null @@ -1,17 +0,0 @@ -package usecase - -import ( - "context" - - gen "github.com/thangchung/go-coffeeshop/proto/gen" -) - -type ( - QueryOrderFulfillmentUseCase interface { - GetListOrderFulfillment(context.Context) ([]gen.OrderDto, error) - } - - QueryOrderFulfillmentRepo interface { - GetListOrderFulfillment(context.Context) ([]gen.OrderDto, error) - } -) diff --git a/internal/counter/usecase/order_fulfillment.go b/internal/counter/usecase/order_fulfillment.go index c0128ff..28288f4 100644 --- a/internal/counter/usecase/order_fulfillment.go +++ b/internal/counter/usecase/order_fulfillment.go @@ -4,21 +4,24 @@ import ( "context" "fmt" + "github.com/thangchung/go-coffeeshop/internal/counter/domain" gen "github.com/thangchung/go-coffeeshop/proto/gen" ) type DefaultQueryOrderFulfillmentUseCase struct { - repo QueryOrderFulfillmentRepo + ctx context.Context + repo domain.QueryOrderFulfillmentRepo } -func NewQueryOrderFulfillmentUseCase(r QueryOrderFulfillmentRepo) *DefaultQueryOrderFulfillmentUseCase { +func NewQueryOrderFulfillmentUseCase(ctx context.Context, r domain.QueryOrderFulfillmentRepo) *DefaultQueryOrderFulfillmentUseCase { return &DefaultQueryOrderFulfillmentUseCase{ + ctx: ctx, repo: r, } } -func (d DefaultQueryOrderFulfillmentUseCase) GetListOrderFulfillment(ctx context.Context) ([]gen.OrderDto, error) { - entities, err := d.repo.GetListOrderFulfillment(ctx) +func (d DefaultQueryOrderFulfillmentUseCase) GetListOrderFulfillment() ([]gen.OrderDto, error) { + entities, err := d.repo.GetListOrderFulfillment() if err != nil { return nil, fmt.Errorf("NewQueryOrderFulfillmentUseCase - GetListOrderFulfillment - s.repo.GetListOrderFulfillment: %w", err) } diff --git a/internal/counter/usecase/repo/order_fulfillment_postgres.go b/internal/counter/usecase/repo/order_fulfillment_postgres.go index 0587c24..f8cf3a9 100644 --- a/internal/counter/usecase/repo/order_fulfillment_postgres.go +++ b/internal/counter/usecase/repo/order_fulfillment_postgres.go @@ -11,15 +11,16 @@ import ( const _defaultEntityCap = 64 type DefaultQueryOrderFulfillmentRepo struct { - *postgres.Postgres + ctx context.Context + pg *postgres.Postgres } -func NewQueryOrderFulfillmentRepo(pg *postgres.Postgres) *DefaultQueryOrderFulfillmentRepo { - return &DefaultQueryOrderFulfillmentRepo{pg} +func NewQueryOrderFulfillmentRepo(ctx context.Context, pg *postgres.Postgres) *DefaultQueryOrderFulfillmentRepo { + return &DefaultQueryOrderFulfillmentRepo{ctx: ctx, pg: pg} } -func (d DefaultQueryOrderFulfillmentRepo) GetListOrderFulfillment(ctx context.Context) ([]gen.OrderDto, error) { - sql, _, err := d.Builder. +func (d DefaultQueryOrderFulfillmentRepo) GetListOrderFulfillment() ([]gen.OrderDto, error) { + sql, _, err := d.pg.Builder. Select("orders.id"). From(`"order".orders`).Join(`"order".line_items USING(id)`). ToSql() @@ -27,7 +28,7 @@ func (d DefaultQueryOrderFulfillmentRepo) GetListOrderFulfillment(ctx context.Co return nil, fmt.Errorf("DefaultQueryOrderFulfillmentRepo - GetListOrderFulfillment - r.Builder: %w", err) } - rows, err := d.Pool.Query(ctx, sql) + rows, err := d.pg.Pool.Query(d.ctx, sql) if err != nil { return nil, fmt.Errorf("DefaultQueryOrderFulfillmentRepo - GetListOrderFulfillment - r.Pool.Query: %w", err) } diff --git a/pkg/postgres/postgres.go b/pkg/postgres/postgres.go index a4e3f39..13e0187 100644 --- a/pkg/postgres/postgres.go +++ b/pkg/postgres/postgres.go @@ -25,7 +25,7 @@ type Postgres struct { Pool *pgxpool.Pool } -func NewPostgres(url string, opts ...Option) (*Postgres, error) { +func NewPostgresDB(url string, opts ...Option) (*Postgres, error) { pg := &Postgres{ maxPoolSize: _defaultMaxPoolSize, connAttempts: _defaultConnAttempts, diff --git a/pkg/rabbitmq/rabbitmq.go b/pkg/rabbitmq/rabbitmq.go new file mode 100644 index 0000000..b93e91b --- /dev/null +++ b/pkg/rabbitmq/rabbitmq.go @@ -0,0 +1,51 @@ +package rabbitmq + +import ( + "errors" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + mylogger "github.com/thangchung/go-coffeeshop/pkg/logger" +) + +const ( + OrderTopic = "orders_topic" + RetryTimes = 5 + BackOffSeconds = 2 +) + +var ErrCannotConnectRabbitMQ = errors.New("cannot connect to rabbit") + +func NewRabbitMQConn(rabbitMqURL string, logger *mylogger.Logger) (*amqp.Connection, error) { + var ( + amqpConn *amqp.Connection + counts int64 + ) + + for { + connection, err := amqp.Dial(rabbitMqURL) + if err != nil { + logger.Error("RabbitMq at %s not ready...\n", rabbitMqURL) + counts++ + } else { + amqpConn = connection + + break + } + + if counts > RetryTimes { + logger.LogError(err) + + return nil, ErrCannotConnectRabbitMQ + } + + logger.Info("Backing off for 2 seconds...") + time.Sleep(BackOffSeconds * time.Second) + + continue + } + + logger.Info("Connected to RabbitMQ!") + + return amqpConn, nil +}