Skip to content

Commit ad655d4

Browse files
committed
add code for postgres #2
1 parent 591ebaa commit ad655d4

File tree

16 files changed

+537
-147
lines changed

16 files changed

+537
-147
lines changed

cmd/barista/config.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ http:
77
port: 5002
88

99
rabbit_mq:
10-
url: amqp://guest:guest@172.28.191.206:5672/
10+
url: amqp://guest:guest@172.22.168.120:5672/
1111

1212
logger:
1313
log_level: 'debug'

cmd/barista/event/consumer.go

100644100755
File mode changed.

cmd/counter/config.yml

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ http:
66
host: '0.0.0.0'
77
port: 5002
88

9+
postgres:
10+
pool_max: 2
11+
url: postgres://postgres:P@[email protected]:5432/counterdb_dev?sslmode=disable
12+
913
rabbit_mq:
10-
url: amqp://guest:guest@172.28.191.206:5672/
14+
url: amqp://guest:guest@172.22.168.120:5672/
1115

1216
logger:
1317
log_level: 'debug'

cmd/counter/config/config.go

+6
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,15 @@ type (
1414
configs.App `yaml:"app"`
1515
configs.HTTP `yaml:"http"`
1616
configs.Log `yaml:"logger"`
17+
PG `yaml:"postgres"`
1718
RabbitMQ `yaml:"rabbit_mq"`
1819
}
1920

21+
PG struct {
22+
PoolMax int `env-required:"true" yaml:"pool_max" env:"PG_POOL_MAX"`
23+
URL string `env-required:"true" yaml:"url" env:"PG_URL"`
24+
}
25+
2026
RabbitMQ struct {
2127
URL string `env-required:"true" yaml:"url" env:"RABBITMQ_URL"`
2228
}

go.mod

+15-2
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ module github.com/thangchung/go-coffeeshop
33
go 1.19
44

55
require (
6+
github.com/Masterminds/squirrel v1.5.3
67
github.com/golang/glog v1.0.0
78
github.com/google/uuid v1.3.0
89
github.com/grpc-ecosystem/grpc-gateway/v2 v2.12.0
910
github.com/ilyakaznacheev/cleanenv v1.3.0
11+
github.com/jackc/pgx/v4 v4.17.2
1012
github.com/rabbitmq/amqp091-go v1.5.0
1113
github.com/samber/lo v1.33.0
1214
github.com/sirupsen/logrus v1.9.0
@@ -18,11 +20,22 @@ require (
1820
require (
1921
github.com/BurntSushi/toml v1.1.0 // indirect
2022
github.com/golang/protobuf v1.5.2 // indirect
23+
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
24+
github.com/jackc/pgconn v1.13.0 // indirect
25+
github.com/jackc/pgio v1.0.0 // indirect
26+
github.com/jackc/pgpassfile v1.0.0 // indirect
27+
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
28+
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
29+
github.com/jackc/pgtype v1.12.0 // indirect
30+
github.com/jackc/puddle v1.3.0 // indirect
2131
github.com/joho/godotenv v1.4.0 // indirect
32+
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
33+
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
34+
golang.org/x/crypto v0.1.0 // indirect
2235
golang.org/x/exp v0.0.0-20221026153819-32f3d567a233 // indirect
23-
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect
36+
golang.org/x/net v0.1.0 // indirect
2437
golang.org/x/sys v0.1.0 // indirect
25-
golang.org/x/text v0.3.8 // indirect
38+
golang.org/x/text v0.4.0 // indirect
2639
gopkg.in/yaml.v3 v3.0.1 // indirect
2740
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect
2841
)

go.sum

+160-4
Large diffs are not rendered by default.

internal/counter/app/app.go

+16-123
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,17 @@ package app
22

33
import (
44
"context"
5-
"encoding/json"
65
"errors"
76
"net"
87
"time"
98

10-
"github.com/google/uuid"
119
amqp "github.com/rabbitmq/amqp091-go"
1210
"github.com/thangchung/go-coffeeshop/cmd/counter/config"
13-
"github.com/thangchung/go-coffeeshop/internal/counter/entity"
14-
events "github.com/thangchung/go-coffeeshop/pkg/event"
11+
counterGrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc"
12+
"github.com/thangchung/go-coffeeshop/internal/counter/usecase"
13+
"github.com/thangchung/go-coffeeshop/internal/counter/usecase/repo"
1514
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
16-
gen "github.com/thangchung/go-coffeeshop/proto/gen"
15+
"github.com/thangchung/go-coffeeshop/pkg/postgres"
1716
"google.golang.org/grpc"
1817
)
1918

@@ -32,116 +31,6 @@ type App struct {
3231
address string
3332
}
3433

35-
type CounterServiceServerImpl struct {
36-
gen.UnimplementedCounterServiceServer
37-
logger *mylogger.Logger
38-
rabbitConn *amqp.Connection
39-
}
40-
41-
type Payload struct {
42-
Name string `json:"name"`
43-
}
44-
45-
func (g *CounterServiceServerImpl) GetListOrderFulfillment(ctx context.Context, request *gen.GetListOrderFulfillmentRequest) (*gen.GetListOrderFulfillmentResponse, error) {
46-
g.logger.Info("GET: GetListOrderFulfillment")
47-
48-
ch, err := g.rabbitConn.Channel()
49-
if err != nil {
50-
panic(err)
51-
}
52-
defer ch.Close()
53-
54-
event := Payload{
55-
Name: "drink_made",
56-
}
57-
58-
eventBytes, err := json.Marshal(event)
59-
if err != nil {
60-
g.logger.LogError(err)
61-
}
62-
63-
err = ch.PublishWithContext(
64-
ctx,
65-
OrderTopic,
66-
"log.INFO",
67-
false,
68-
false,
69-
amqp.Publishing{
70-
ContentType: "text/plain",
71-
Body: eventBytes,
72-
},
73-
)
74-
75-
if err != nil {
76-
g.logger.LogError(err)
77-
78-
return nil, err
79-
}
80-
81-
g.logger.Info("Sending message: %s -> %s", event, "orders_topic")
82-
83-
res := gen.GetListOrderFulfillmentResponse{}
84-
85-
return &res, nil
86-
}
87-
88-
func (g *CounterServiceServerImpl) PlaceOrder(ctx context.Context, request *gen.PlaceOrderRequest) (*gen.PlaceOrderResponse, error) {
89-
g.logger.Info("POST: PlaceOrder")
90-
91-
g.logger.Debug("request: %s", request)
92-
93-
// add order
94-
order, err := entity.CreateOrderFrom(request)
95-
if err != nil {
96-
return nil, err
97-
}
98-
99-
g.logger.Debug("order created: %s", *order)
100-
101-
// publish order events
102-
ch, err := g.rabbitConn.Channel()
103-
if err != nil {
104-
panic(err)
105-
}
106-
defer ch.Close()
107-
108-
event := events.BaristaOrdered{
109-
OrderID: order.ID,
110-
ItemLineID: uuid.New(), //todo
111-
ItemType: 1, //todo
112-
}
113-
114-
eventBytes, err := json.Marshal(event)
115-
if err != nil {
116-
g.logger.LogError(err)
117-
}
118-
119-
err = ch.PublishWithContext(
120-
ctx,
121-
OrderTopic,
122-
"log.INFO",
123-
false,
124-
false,
125-
amqp.Publishing{
126-
ContentType: "text/plain",
127-
Type: "barista.ordered",
128-
Body: eventBytes,
129-
},
130-
)
131-
132-
if err != nil {
133-
g.logger.LogError(err)
134-
135-
return nil, err
136-
}
137-
138-
g.logger.Info("Sending message: %s -> %s", event, "orders_topic")
139-
140-
res := gen.PlaceOrderResponse{}
141-
142-
return &res, nil
143-
}
144-
14534
func New(log *mylogger.Logger, cfg *config.Config) *App {
14635
return &App{
14736
logger: log,
@@ -155,17 +44,21 @@ func (a *App) Run(ctx context.Context) error {
15544
a.logger.Info("Init %s %s\n", a.cfg.Name, a.cfg.Version)
15645

15746
// Repository
158-
// ...
47+
pg, err := postgres.NewPostgres(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax))
48+
if err != nil {
49+
a.logger.Fatal("app - Run - postgres.NewPostgres: %w", err)
50+
}
51+
defer pg.Close()
15952

16053
// Use case
161-
// ...
54+
queryOrderFulfillmentUseCase := usecase.NewQueryOrderFulfillmentUseCase(repo.NewQueryOrderFulfillmentRepo(pg))
16255

16356
// RabbitMQ
164-
conn, err := a.connectToRabbit()
57+
amqpConn, err := a.connectToRabbit()
16558
if err != nil {
16659
return err
16760
}
168-
defer conn.Close()
61+
defer amqpConn.Close()
16962

17063
// gRPC Server
17164
l, err := net.Listen(a.network, a.address)
@@ -180,7 +73,7 @@ func (a *App) Run(ctx context.Context) error {
18073
}()
18174

18275
s := grpc.NewServer()
183-
gen.RegisterCounterServiceServer(s, &CounterServiceServerImpl{logger: a.logger, rabbitConn: conn})
76+
counterGrpc.NewCounterServiceServerGrpc(s, amqpConn, queryOrderFulfillmentUseCase, a.logger)
18477

18578
go func() {
18679
defer s.GracefulStop()
@@ -194,7 +87,7 @@ func (a *App) Run(ctx context.Context) error {
19487

19588
func (a *App) connectToRabbit() (*amqp.Connection, error) {
19689
var (
197-
rabbitConn *amqp.Connection
90+
amqpConn *amqp.Connection
19891
counts int64
19992
rabbitMqURL = a.cfg.RabbitMQ.URL
20093
)
@@ -205,7 +98,7 @@ func (a *App) connectToRabbit() (*amqp.Connection, error) {
20598
a.logger.Error("RabbitMq at %s not ready...\n", rabbitMqURL)
20699
counts++
207100
} else {
208-
rabbitConn = connection
101+
amqpConn = connection
209102

210103
break
211104
}
@@ -224,5 +117,5 @@ func (a *App) connectToRabbit() (*amqp.Connection, error) {
224117

225118
a.logger.Info("Connected to RabbitMQ!")
226119

227-
return rabbitConn, nil
120+
return amqpConn, nil
228121
}

internal/counter/grpc/handler.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"github.com/google/uuid"
9+
amqp "github.com/rabbitmq/amqp091-go"
10+
"github.com/thangchung/go-coffeeshop/internal/counter/entity"
11+
"github.com/thangchung/go-coffeeshop/internal/counter/usecase"
12+
events "github.com/thangchung/go-coffeeshop/pkg/event"
13+
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
14+
gen "github.com/thangchung/go-coffeeshop/proto/gen"
15+
"google.golang.org/grpc"
16+
"google.golang.org/grpc/reflection"
17+
)
18+
19+
const (
20+
OrderTopic = "orders_topic"
21+
)
22+
23+
func NewCounterServiceServerGrpc(
24+
grpcServer *grpc.Server,
25+
amqpConn *amqp.Connection,
26+
queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase,
27+
log *mylogger.Logger) {
28+
svc := CounterServiceServerImpl{logger: log, amqpConn: amqpConn, queryOrderFulfillmentUseCase: queryOrderFulfillmentUseCase}
29+
30+
gen.RegisterCounterServiceServer(grpcServer, &svc)
31+
32+
reflection.Register(grpcServer)
33+
}
34+
35+
type CounterServiceServerImpl struct {
36+
gen.UnimplementedCounterServiceServer
37+
logger *mylogger.Logger
38+
amqpConn *amqp.Connection
39+
queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase
40+
}
41+
42+
func (g *CounterServiceServerImpl) GetListOrderFulfillment(ctx context.Context, request *gen.GetListOrderFulfillmentRequest) (*gen.GetListOrderFulfillmentResponse, error) {
43+
g.logger.Info("GET: GetListOrderFulfillment")
44+
45+
res := gen.GetListOrderFulfillmentResponse{}
46+
47+
entities, err := g.queryOrderFulfillmentUseCase.GetListOrderFulfillment(ctx)
48+
if err != nil {
49+
return nil, fmt.Errorf("CounterServiceServerImpl - GetListOrderFulfillment - g.queryOrderFulfillmentUseCase.GetListOrderFulfillment: %w", err)
50+
}
51+
52+
for _, entity := range entities {
53+
res.Orders = append(res.Orders, &gen.OrderDto{
54+
Id: entity.Id,
55+
})
56+
}
57+
58+
return &res, nil
59+
}
60+
61+
func (g *CounterServiceServerImpl) PlaceOrder(ctx context.Context, request *gen.PlaceOrderRequest) (*gen.PlaceOrderResponse, error) {
62+
g.logger.Info("POST: PlaceOrder")
63+
64+
g.logger.Debug("request: %s", request)
65+
66+
// add order
67+
order, err := entity.CreateOrderFrom(request)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
g.logger.Debug("order created: %s", *order)
73+
74+
// publish order events
75+
ch, err := g.amqpConn.Channel()
76+
if err != nil {
77+
panic(err)
78+
}
79+
defer ch.Close()
80+
81+
event := events.BaristaOrdered{
82+
OrderID: order.ID,
83+
ItemLineID: uuid.New(), //todo
84+
ItemType: 1, //todo
85+
}
86+
87+
eventBytes, err := json.Marshal(event)
88+
if err != nil {
89+
g.logger.LogError(err)
90+
}
91+
92+
err = ch.PublishWithContext(
93+
ctx,
94+
OrderTopic,
95+
"log.INFO",
96+
false,
97+
false,
98+
amqp.Publishing{
99+
ContentType: "text/plain",
100+
Type: "barista.ordered",
101+
Body: eventBytes,
102+
},
103+
)
104+
105+
if err != nil {
106+
g.logger.LogError(err)
107+
108+
return nil, err
109+
}
110+
111+
g.logger.Info("Sending message: %s -> %s", event, OrderTopic)
112+
113+
res := gen.PlaceOrderResponse{}
114+
115+
return &res, nil
116+
}

0 commit comments

Comments
 (0)