Skip to content

Commit f7d9368

Browse files
committed
refactor: improve publisher initialization
1 parent 042e70f commit f7d9368

File tree

3 files changed

+72
-24
lines changed

3 files changed

+72
-24
lines changed

amqp/amqp.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"log"
66

7-
"github.com/Gate2Up/rabbitmq-go/publisher"
87
"github.com/Gate2Up/rabbitmq-go/subscriber"
98
"github.com/streadway/amqp"
109
)
@@ -38,28 +37,12 @@ func NewClient(config Config) (*AmqpClient, error) {
3837
return &AmqpClient{Connection: amqpConn, ServiceName: config.ServiceName}, nil
3938
}
4039

41-
func (a *AmqpClient) AddPublisher(publisher *publisher.PublisherConfig) {
42-
channel, err := a.Connection.Channel()
43-
if err != nil {
44-
log.Println(err.Error())
45-
return
46-
}
47-
48-
err = channel.ExchangeDeclare(
49-
publisher.TopicName,
50-
amqp.ExchangeTopic,
51-
true,
52-
false,
53-
false,
54-
false,
55-
nil,
56-
)
57-
58-
if err != nil {
59-
log.Println(`Create exchange failed: `, err.Error())
60-
}
40+
type Publisher interface {
41+
Build(client *AmqpClient)
42+
}
6143

62-
log.Println(fmt.Sprintf(`Exchange: %s created`, publisher.TopicName))
44+
func (a *AmqpClient) AddPublisher(publisher Publisher) {
45+
publisher.Build(a)
6346
}
6447

6548
func (a *AmqpClient) AddSubscriber(subscriber *subscriber.SubscriberConfig) {

amqp/amqp_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ func TestNewClient(t *testing.T) {
3535

3636
func TestAddPublisher(t *testing.T) {
3737

38-
publisherConfig := publisher.NewPublisher("TEST_TOPIC", nil)
38+
publisher := publisher.NewPublisher("TEST_TOPIC", nil)
3939
conn, _ := amqp.NewClient(config)
4040

4141
// if no error this case is passed - void
42-
conn.AddPublisher(publisherConfig)
42+
conn.AddPublisher(publisher)
43+
status, err := publisher.Publish([]byte(`Hello World`))
44+
45+
assert.Equal(t, status, true)
46+
assert.Equal(t, err, nil)
4347
}
4448

4549
func TestAddSubscriber(t *testing.T) {

publisher/publisher.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
package publisher
22

3+
import (
4+
"fmt"
5+
"log"
6+
7+
"github.com/Gate2Up/rabbitmq-go/amqp"
8+
amqpLegacy "github.com/streadway/amqp"
9+
)
10+
311
type PublisherConfig struct {
412
TopicName string
513
Schema interface{}
14+
Client *amqp.AmqpClient
615
}
716

817
type schemaType interface{}
@@ -11,7 +20,59 @@ func NewPublisher(topicName string, schema schemaType) *PublisherConfig {
1120
publisherConfig := PublisherConfig{
1221
TopicName: topicName,
1322
Schema: schema,
23+
Client: nil,
1424
}
1525

1626
return &publisherConfig
1727
}
28+
29+
func (p *PublisherConfig) Build(client *amqp.AmqpClient) {
30+
31+
if client == nil {
32+
log.Fatalln(`amqp client is nil`)
33+
}
34+
35+
p.Client = client
36+
37+
channel, err := client.Connection.Channel()
38+
if err != nil {
39+
log.Println(err.Error())
40+
return
41+
}
42+
43+
err = channel.ExchangeDeclare(
44+
p.TopicName,
45+
amqpLegacy.ExchangeTopic,
46+
true,
47+
false,
48+
false,
49+
false,
50+
nil,
51+
)
52+
53+
if err != nil {
54+
log.Println(`Create exchange failed: `, err.Error())
55+
}
56+
57+
log.Println(fmt.Sprintf(`Exchange: %s created`, p.TopicName))
58+
}
59+
60+
func (p *PublisherConfig) Publish(data []byte) (bool, error) {
61+
channel, err := p.Client.Connection.Channel()
62+
if err != nil {
63+
return false, err
64+
}
65+
66+
content := amqpLegacy.Publishing{
67+
ContentType: "text/plain",
68+
Body: data,
69+
}
70+
71+
err = channel.Publish(p.TopicName, "*", true, true, content)
72+
73+
if err != nil {
74+
return false, err
75+
}
76+
77+
return true, nil
78+
}

0 commit comments

Comments
 (0)