Skip to content

Commit

Permalink
add more events for barista #3
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Nov 13, 2022
1 parent e6bdb70 commit 22f8b78
Show file tree
Hide file tree
Showing 25 changed files with 1,084 additions and 477 deletions.
7 changes: 1 addition & 6 deletions cmd/barista/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@ http:
port: 5002

rabbit_mq:
url: amqp://guest:[email protected]:5672/
exchange: orders-exchange
queue: orders-queue
routing_key: orders-routing-key
consumer_tag: orders-consumer
worker_pool_size: 24
url: amqp://guest:[email protected]:5672/

logger:
log_level: 'debug'
Expand Down
7 changes: 1 addition & 6 deletions cmd/barista/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@ type (
}

RabbitMQ struct {
URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"`
Exchange string `env-required:"true" yaml:"exchange" env:"RABBITMQ_Exchange"`
Queue string `env-required:"true" yaml:"queue" env:"RABBITMQ_Queue"`
RoutingKey string `env-required:"true" yaml:"routing_key" env:"RABBITMQ_RoutingKey"`
ConsumerTag string `env-required:"true" yaml:"consumer_tag" env:"RABBITMQ_ConsumerTag"`
WorkerPoolSize int `env-required:"true" yaml:"worker_pool_size" env:"RABBITMQ_WorkerPoolSize"`
URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"`
}
)

Expand Down
8 changes: 2 additions & 6 deletions cmd/counter/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ http:

postgres:
pool_max: 2
url: postgres://postgres:P@ssw0rd@172.22.174.138:5432/postgres?sslmode=disable
url: postgres://postgres:P@ssw0rd@172.25.33.104:5432/postgres?sslmode=disable

rabbit_mq:
url: amqp://guest:[email protected]:5672/
exchange: orders-exchange
queue: orders-queue
routing_key: orders-routing-key
consumer_tag: orders-consumer
url: amqp://guest:[email protected]:5672/

product_client:
url: 0.0.0.0:5001
Expand Down
6 changes: 1 addition & 5 deletions cmd/counter/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ type (
}

RabbitMQ struct {
URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"`
Exchange string `env-required:"true" yaml:"exchange" env:"RABBITMQ_Exchange"`
Queue string `env-required:"true" yaml:"queue" env:"RABBITMQ_Queue"`
RoutingKey string `env-required:"true" yaml:"routing_key" env:"RABBITMQ_RoutingKey"`
ConsumerTag string `env-required:"true" yaml:"consumer_tag" env:"RABBITMQ_ConsumerTag"`
URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"`
}

ProductClient struct {
Expand Down
3 changes: 1 addition & 2 deletions cmd/counter/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"os"

"github.com/golang/glog"
Expand All @@ -19,7 +18,7 @@ func main() {
mylog := mylogger.New(cfg.Level)

a := app.New(mylog, cfg)
if err = a.Run(context.Background()); err != nil {
if err = a.Run(); err != nil {
glog.Fatal(err)
os.Exit(1)
}
Expand Down
102 changes: 102 additions & 0 deletions docker-compose-full.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
version: "3"

services:
postgres:
image: postgres:14-alpine
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=P@ssw0rd
healthcheck:
test: ["CMD", "pg_isready"]
ports:
- "5432:5432"
networks:
- coffeeshop-network

rabbitmq:
image: rabbitmq:3.11-management-alpine
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 3
ports:
- "5672:5672"
- "15672:15672"
networks:
- coffeeshop-network

proxy:
build:
context: .
dockerfile: ./docker/Dockerfile-proxy
image: go-coffeeshop-proxy
environment:
APP_NAME: 'proxy-service in docker'
GRPC_PRODUCT_HOST: 'product'
GRPC_PRODUCT_PORT: 5001
GRPC_COUNTER_HOST: 'counter'
GRPC_COUNTER_PORT: 5002
ports:
- 5000:5000
depends_on:
- product
- counter
networks:
- coffeeshop-network

product:
build:
context: .
dockerfile: ./docker/Dockerfile-product
image: go-coffeeshop-product
environment:
APP_NAME: 'product-service in docker'
ports:
- 5001:5001
networks:
- coffeeshop-network

counter:
build:
context: .
dockerfile: ./docker/Dockerfile-counter
image: go-coffeeshop-counter
environment:
APP_NAME: 'counter-service in docker'
PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
PRODUCT_CLIENT_URL: product:5001
ports:
- 5002:5002
depends_on:
postgres:
condition: service_healthy
rabbitmq:
condition: service_healthy
networks:
- coffeeshop-network

barista:
build:
context: .
dockerfile: ./docker/Dockerfile-barista
image: go-coffeeshop-barista
environment:
APP_NAME: 'barista-service in docker'
PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
depends_on:
postgres:
condition: service_healthy
rabbitmq:
condition: service_healthy
networks:
- coffeeshop-network

networks:
coffeeshop-network:
68 changes: 0 additions & 68 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,74 +29,6 @@ services:
- "15672:15672"
networks:
- coffeeshop-network

proxy:
build:
context: .
dockerfile: ./docker/Dockerfile-proxy
image: go-coffeeshop-proxy
environment:
APP_NAME: 'proxy-service in docker'
GRPC_PRODUCT_HOST: 'product'
GRPC_PRODUCT_PORT: 5001
GRPC_COUNTER_HOST: 'counter'
GRPC_COUNTER_PORT: 5002
ports:
- 5000:5000
depends_on:
- product
- counter
networks:
- coffeeshop-network

product:
build:
context: .
dockerfile: ./docker/Dockerfile-product
image: go-coffeeshop-product
environment:
APP_NAME: 'product-service in docker'
ports:
- 5001:5001
networks:
- coffeeshop-network

counter:
build:
context: .
dockerfile: ./docker/Dockerfile-counter
image: go-coffeeshop-counter
environment:
APP_NAME: 'counter-service in docker'
PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
PRODUCT_CLIENT_URL: product:5001
ports:
- 5002:5002
depends_on:
postgres:
condition: service_healthy
rabbitmq:
condition: service_healthy
networks:
- coffeeshop-network

barista:
build:
context: .
dockerfile: ./docker/Dockerfile-barista
image: go-coffeeshop-barista
environment:
APP_NAME: 'barista-service in docker'
PG_URL: postgres://postgres:P@ssw0rd@postgres:5432/postgres
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
depends_on:
postgres:
condition: service_healthy
rabbitmq:
condition: service_healthy
networks:
- coffeeshop-network

networks:
coffeeshop-network:
44 changes: 33 additions & 11 deletions internal/barista/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"os/signal"
"syscall"

"github.com/pkg/errors"
"github.com/thangchung/go-coffeeshop/cmd/barista/config"
"github.com/thangchung/go-coffeeshop/internal/barista/features/orders/eventhandlers"
baristaRabbitMQ "github.com/thangchung/go-coffeeshop/internal/barista/rabbitmq"
"github.com/thangchung/go-coffeeshop/internal/barista/rabbitmq/consumer"
"github.com/thangchung/go-coffeeshop/internal/barista/rabbitmq/publisher"
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
"github.com/thangchung/go-coffeeshop/pkg/rabbitmq"
)
Expand Down Expand Up @@ -41,21 +43,41 @@ func (a *App) Run() error {
}
defer amqpConn.Close()

handler := eventhandlers.NewBaristaOrderedEventHandler()
consumer, err := baristaRabbitMQ.NewOrderConsumer(amqpConn, handler, a.logger)
// publishers
counterOrderPub, err := publisher.NewPublisher(
amqpConn,
a.logger,
publisher.ExchangeName("counter-order-exchange"),
publisher.BindingKey("counter-order-routing-key"),
publisher.MessageTypeName("counter-order-updated"),
)
defer counterOrderPub.CloseChan()

if err != nil {
a.logger.Fatal("app - Run - baristaRabbitMQ.NewOrderConsumer: %s", err.Error())
return errors.Wrap(err, "publisher-Counter-NewOrderPublisher")
}

// event handlers.
handler := eventhandlers.NewBaristaOrderedEventHandler(counterOrderPub)

// consumers
consumer, err := consumer.NewConsumer(
amqpConn,
handler,
a.logger,
consumer.ExchangeName("barista-order-exchange"),
consumer.QueueName("barista-order-queue"),
consumer.BindingKey("barista-order-routing-key"),
consumer.ConsumerTag("barista-order-consumer"),
consumer.MessageTypeName("barista-order-created"),
)

if err != nil {
a.logger.Fatal("app - Run - consumer.NewOrderConsumer: %s", err.Error())
}

go func() {
err := consumer.StartConsumer(
a.cfg.RabbitMQ.WorkerPoolSize,
a.cfg.RabbitMQ.Exchange,
a.cfg.RabbitMQ.Queue,
a.cfg.RabbitMQ.RoutingKey,
a.cfg.RabbitMQ.ConsumerTag,
)
err := consumer.StartConsumer()
if err != nil {
a.logger.Error("StartConsumer: %v", err)
cancel()
Expand Down
23 changes: 15 additions & 8 deletions internal/barista/app/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package app
import (
"errors"
"fmt"
"log"
"os"
"path/filepath"
"time"

"github.com/golang-migrate/migrate/v4"
"github.com/golang/glog"

// migrate tools
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
Expand All @@ -24,7 +26,7 @@ const (
func init() {
databaseURL, ok := os.LookupEnv("PG_URL")
if !ok || len(databaseURL) == 0 {
log.Fatalf("migrate: environment variable not declared: PG_URL")
glog.Fatalf("migrate: environment variable not declared: PG_URL")
}

databaseURL += "?sslmode=disable"
Expand All @@ -36,30 +38,35 @@ func init() {
)

for attempts > 0 {
m, err = migrate.New(fmt.Sprintf("file://%s", _migrationFilePath), databaseURL)
cur, _ := os.Getwd()
dir := filepath.Dir(cur + "/../../..")

glog.Infoln(fmt.Sprintf("file://%s", dir))

m, err = migrate.New(fmt.Sprintf("file://%s/%s", dir, _migrationFilePath), databaseURL)
if err == nil {
break
}

log.Printf("Migrate: postgres is trying to connect, attempts left: %d", attempts)
glog.Infoln("Migrate: postgres is trying to connect, attempts left: %d", attempts)
time.Sleep(_defaultTimeout)
attempts--
}

if err != nil {
log.Fatalf("Migrate: postgres connect error: %s", err)
glog.Fatalf("Migrate: postgres connect error: %s", err)
}

err = m.Up()
defer m.Close()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
log.Fatalf("Migrate: up error: %s", err)
glog.Fatalf("Migrate: up error: %s", err)
}

if errors.Is(err, migrate.ErrNoChange) {
log.Printf("Migrate: no change")
glog.Infoln("Migrate: no change")
return
}

log.Printf("Migrate: up success")
glog.Infoln("Migrate: up success")
}
Loading

0 comments on commit 22f8b78

Please sign in to comment.