Skip to content

Commit e58bc07

Browse files
author
Rizal Widyarta Gowandy
authored
Merge pull request #29 from rizalgowandy/arwego/feat/watermill
Add watermillx package
2 parents dbd516a + df7df1a commit e58bc07

File tree

24 files changed

+997
-6
lines changed

24 files changed

+997
-6
lines changed

docker-compose.yaml

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ services:
33
postgres:
44
image: postgres:10.6
55
container_name: gdk-postgres
6-
# volumes:
7-
# - ./schema:/docker-entrypoint-initdb.d
8-
#- gdk_database:/var/lib/postgresql/data
96
ports:
107
- "5432:5432"
118
environment:
@@ -54,6 +51,36 @@ services:
5451
- "4171:4171"
5552
restart: unless-stopped
5653

57-
#volumes:
58-
# gdk_database:
59-
# driver: local
54+
redpanda: # supports kafka
55+
image: docker.vectorized.io/vectorized/redpanda:v21.9.5
56+
container_name: gdk-redpanda
57+
ports:
58+
- 9092:9092
59+
- 29092:29092
60+
command:
61+
- redpanda
62+
- start
63+
- --smp
64+
- '1'
65+
- --reserve-memory
66+
- 0M
67+
- --overprovisioned
68+
- --node-id
69+
- '0'
70+
- --kafka-addr
71+
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
72+
- --advertise-kafka-addr
73+
- PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
74+
restart: unless-stopped
75+
76+
kafdrop: # kafka-ui
77+
image: obsidiandynamics/kafdrop:3.27.0
78+
container_name: gdk-kafdrop
79+
ports:
80+
- 9100:9000
81+
environment:
82+
- KAFKA_BROKERCONNECT=redpanda:29092
83+
- JVM_OPTS=-Xms32M -Xmx64M
84+
depends_on:
85+
- redpanda
86+
restart: unless-stopped

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@ module github.com/rizalgowandy/gdk
33
go 1.15
44

55
require (
6+
github.com/ThreeDotsLabs/watermill v1.2.0-rc.6
7+
github.com/ThreeDotsLabs/watermill-amqp v1.1.4
8+
github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.1
69
github.com/cloudfoundry/gosigar v1.2.0
710
github.com/dgraph-io/ristretto v0.1.0
811
github.com/go-redis/redis/v8 v8.11.3
912
github.com/golang/mock v1.4.4
1013
github.com/gomodule/redigo v1.8.5
1114
github.com/gorilla/mux v1.8.0
15+
github.com/hashicorp/go-multierror v1.1.1
1216
github.com/imdario/mergo v0.3.12
1317
github.com/jackc/pgx/v4 v4.13.0
1418
github.com/json-iterator/go v1.1.11
@@ -17,6 +21,7 @@ require (
1721
github.com/modern-go/reflect2 v1.0.1 // indirect
1822
github.com/nsqio/go-nsq v1.0.8
1923
github.com/pingcap/go-ycsb v0.0.0-20210129115622-04d8656123e4
24+
github.com/pkg/errors v0.9.1
2025
github.com/robfig/cron/v3 v3.0.1
2126
github.com/rs/zerolog v1.21.0
2227
github.com/segmentio/ksuid v1.0.4

go.sum

Lines changed: 124 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package job
2+
3+
import (
4+
"context"
5+
6+
"github.com/ThreeDotsLabs/watermill/message"
7+
"github.com/rizalgowandy/gdk/pkg/logx"
8+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx"
9+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/examples/with-amqp/topic"
10+
"github.com/segmentio/ksuid"
11+
)
12+
13+
type PayloadMessage struct {
14+
UserID string `json:"user_id"`
15+
}
16+
17+
func NewA(
18+
ctx context.Context,
19+
pub watermillx.Publisher,
20+
) message.NoPublishHandlerFunc {
21+
return func(msg *message.Message) error {
22+
ids := []string{
23+
ksuid.New().String(),
24+
}
25+
26+
for _, v := range ids {
27+
task := PayloadMessage{UserID: v}
28+
29+
msg, err := watermillx.NewMessage(ctx, task)
30+
if err != nil {
31+
continue
32+
}
33+
34+
err = pub.Publish(topic.A, msg)
35+
if err != nil {
36+
continue
37+
}
38+
39+
logx.INF(ctx, task, "success")
40+
}
41+
42+
return nil
43+
}
44+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package job
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/ThreeDotsLabs/watermill/message"
8+
"github.com/rizalgowandy/gdk/pkg/logx"
9+
)
10+
11+
func NewB(ctx context.Context) message.NoPublishHandlerFunc {
12+
return func(msg *message.Message) error {
13+
var payload PayloadMessage
14+
15+
err := json.Unmarshal(msg.Payload, &payload)
16+
if err != nil {
17+
return err
18+
}
19+
20+
logx.INF(ctx, payload, "success")
21+
22+
return nil
23+
}
24+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"github.com/rizalgowandy/gdk/pkg/logx"
7+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx"
8+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/examples/with-amqp/job"
9+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/examples/with-amqp/topic"
10+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/pubsub"
11+
)
12+
13+
const consumerName = "default"
14+
15+
func main() {
16+
ctx := logx.NewContext()
17+
18+
logger, err := watermillx.NewLogger()
19+
if err != nil {
20+
log.Fatal(err)
21+
}
22+
23+
address := "amqp://guest:guest@localhost:5672/"
24+
25+
// Create publisher.
26+
publisher, err := pubsub.NewAMQPPublisher(logger, address)
27+
if err != nil {
28+
log.Fatal(err)
29+
}
30+
31+
// Create subscriber.
32+
subscriber, err := pubsub.NewAMQPSubscriber(logger, address)
33+
if err != nil {
34+
log.Fatal(err)
35+
}
36+
37+
// Create a list of workers.
38+
handlers := []watermillx.Handler{
39+
{
40+
Topic: topic.A,
41+
Channel: consumerName,
42+
Subscriber: subscriber,
43+
Exec: job.NewA(ctx, publisher),
44+
},
45+
{
46+
Topic: topic.B,
47+
Channel: consumerName,
48+
Subscriber: subscriber,
49+
Exec: job.NewB(ctx),
50+
},
51+
}
52+
53+
// Create router with middleware.
54+
router, err := watermillx.NewRouter(ctx, logger, handlers)
55+
if err != nil {
56+
log.Fatal(err)
57+
}
58+
59+
// Run subscriber for real.
60+
// This call is blocking while the router is running.
61+
err = router.Run(ctx)
62+
if err != nil {
63+
log.Fatal(err)
64+
}
65+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package topic
2+
3+
const (
4+
A = "a"
5+
B = "b"
6+
)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package job
2+
3+
import (
4+
"context"
5+
6+
"github.com/ThreeDotsLabs/watermill/message"
7+
"github.com/rizalgowandy/gdk/pkg/logx"
8+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx"
9+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/examples/with-amqp/topic"
10+
"github.com/segmentio/ksuid"
11+
)
12+
13+
func NewA(
14+
ctx context.Context,
15+
pub watermillx.Publisher,
16+
) message.NoPublishHandlerFunc {
17+
return func(msg *message.Message) error {
18+
ids := []string{
19+
ksuid.New().String(),
20+
}
21+
22+
for _, v := range ids {
23+
task := BMessage{UserID: v}
24+
25+
msg, err := watermillx.NewMessage(ctx, task)
26+
if err != nil {
27+
continue
28+
}
29+
30+
err = pub.Publish(topic.B, msg)
31+
if err != nil {
32+
continue
33+
}
34+
35+
logx.INF(ctx, task, "success")
36+
}
37+
38+
return nil
39+
}
40+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package job
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/ThreeDotsLabs/watermill/message"
8+
"github.com/rizalgowandy/gdk/pkg/logx"
9+
)
10+
11+
type BMessage struct {
12+
UserID string `json:"user_id"`
13+
}
14+
15+
func NewB(ctx context.Context) message.NoPublishHandlerFunc {
16+
return func(msg *message.Message) error {
17+
var payload BMessage
18+
19+
err := json.Unmarshal(msg.Payload, &payload)
20+
if err != nil {
21+
return err
22+
}
23+
24+
logx.INF(ctx, payload, "success")
25+
26+
return nil
27+
}
28+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"github.com/rizalgowandy/gdk/pkg/logx"
7+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx"
8+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/examples/with-kafka/job"
9+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/examples/with-kafka/topic"
10+
"github.com/rizalgowandy/gdk/pkg/queue/watermillx/pubsub"
11+
)
12+
13+
const consumerName = "default"
14+
15+
func main() {
16+
ctx := logx.NewContext()
17+
18+
logger, err := watermillx.NewLogger()
19+
if err != nil {
20+
log.Fatal(err)
21+
}
22+
23+
address := []string{"kafka:9092"}
24+
25+
// Create publisher.
26+
publisher, err := pubsub.NewKafkaPublisher(logger, address)
27+
if err != nil {
28+
log.Fatal(err)
29+
}
30+
31+
// Create subscriber.
32+
subscriber, err := pubsub.NewKafkaSubscriber(logger, address)
33+
if err != nil {
34+
log.Fatal(err)
35+
}
36+
37+
// Create a list of workers.
38+
handlers := []watermillx.Handler{
39+
{
40+
Topic: topic.A,
41+
Channel: consumerName,
42+
Subscriber: subscriber,
43+
Exec: job.NewA(ctx, publisher),
44+
},
45+
{
46+
Topic: topic.B,
47+
Channel: consumerName,
48+
Subscriber: subscriber,
49+
Exec: job.NewB(ctx),
50+
},
51+
}
52+
53+
// Create router with middleware.
54+
router, err := watermillx.NewRouter(ctx, logger, handlers)
55+
if err != nil {
56+
log.Fatal(err)
57+
}
58+
59+
// Run subscriber for real.
60+
// This call is blocking while the router is running.
61+
err = router.Run(ctx)
62+
if err != nil {
63+
log.Fatal(err)
64+
}
65+
}

0 commit comments

Comments
 (0)