Skip to content

Commit

Permalink
expose GetRetryCount in rabbitmq pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang.balkundi committed Feb 17, 2022
1 parent 6454b5c commit 6286e8a
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 14 deletions.
2 changes: 1 addition & 1 deletion mw/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func startConsumer(ctx context.Context, d *amqpextra.Dialer, c QueueConfig, h zi
pfc = c.ConsumerPrefetchCount
}

qname := fmt.Sprintf("%s_%s_%s", c.QueueKey, QueueInstant, "queue")
qname := fmt.Sprintf("%s_%s_%s", c.QueueKey, QueueTypeInstant, "queue")
consumerName := fmt.Sprintf("%s_consumer", c.QueueKey)
cons, err := d.Consumer(
consumer.WithContext(ctx),
Expand Down
2 changes: 1 addition & 1 deletion mw/rabbitmq/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

const KeyRetryCount = "rabbitmqAutoRetryCount"

func getRetryCount(e *ziggurat.Event) int {
func GetRetryCount(e *ziggurat.Event) int {
if e.Metadata == nil {
return 0
}
Expand Down
2 changes: 1 addition & 1 deletion mw/rabbitmq/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_getRetryCount(t *testing.T) {
}}
for _, c := range tests {
t.Run(c.name, func(t *testing.T) {
count := getRetryCount(c.input)
count := GetRetryCount(c.input)
if count != c.expected {
t.Errorf("expected count to be %d got %d", c.expected, count)
}
Expand Down
6 changes: 3 additions & 3 deletions mw/rabbitmq/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ func publishInternal(p *publisher.Publisher, queue string, retryCount int, delay
event.Metadata = map[string]interface{}{KeyRetryCount: 0}
}

newCount := getRetryCount(event) + 1
newCount := GetRetryCount(event) + 1
exchange := fmt.Sprintf("%s_%s", queue, "exchange")
routingKey := QueueDelay
routingKey := QueueTypeDelay

if newCount > retryCount {
event.Metadata[KeyRetryCount] = retryCount
routingKey = QueueDL
routingKey = QueueTypeDL
expiration = ""
} else {
event.Metadata[KeyRetryCount] = newCount
Expand Down
4 changes: 2 additions & 2 deletions mw/rabbitmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/streadway/amqp"
)

var queueTypes = []string{QueueDelay, QueueInstant, QueueDL}
var queueTypes = []string{QueueTypeDelay, QueueTypeInstant, QueueTypeDL}

func createAndBindQueue(ch *amqp.Channel, queueName string, queueType string, args amqp.Table) error {
queueWithType := fmt.Sprintf("%s_%s_%s", queueName, queueType, "queue")
Expand All @@ -30,7 +30,7 @@ func createQueuesAndExchanges(ch *amqp.Channel, queueName string, logger ziggura
if qt == "delay" {
args = amqp.Table{
"x-dead-letter-exchange": fmt.Sprintf("%s_%s", queueName, "exchange"),
"x-dead-letter-routing-key": QueueInstant,
"x-dead-letter-routing-key": QueueTypeInstant,
}
}
if err := createAndBindQueue(ch, queueName, qt, args); err != nil {
Expand Down
22 changes: 16 additions & 6 deletions mw/rabbitmq/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ var ErrPublisherNotInit = errors.New("auto retry publish error: publisher not in
var ErrCleanShutdown = errors.New("clean shutdown of rabbitmq streams")

const (
QueueDL = "dlq"
QueueInstant = "instant"
QueueDelay = "delay"
QueueTypeDL = "dlq"
QueueTypeInstant = "instant"
QueueTypeDelay = "delay"
)

type dsViewResp struct {
Expand Down Expand Up @@ -129,6 +129,16 @@ func (r *ARetry) Publish(ctx context.Context, event *ziggurat.Event, queueKey st
return p.Publish(msg)
}

func (r *ARetry) Retry(ctx context.Context, event *ziggurat.Event, queueKey string) error {
r.once.Do(func() {
err := r.InitPublishers(ctx)
if err != nil {
panic(fmt.Sprintf("could not start RabbitMQ publishers:%v", err))
}
})
return r.publish(ctx, event, queueKey)
}

func (r *ARetry) Wrap(f ziggurat.HandlerFunc, queueKey string) ziggurat.HandlerFunc {
hf := func(ctx context.Context, event *ziggurat.Event) error {
// start the publishers once only
Expand Down Expand Up @@ -281,7 +291,7 @@ func (r *ARetry) DSViewHandler(ctx context.Context) http.Handler {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
qn := fmt.Sprintf("%s_%s_%s", qname, QueueDL, "queue")
qn := fmt.Sprintf("%s_%s_%s", qname, QueueTypeDL, "queue")
events, err := r.view(ctx, qn, count, false)
if err != nil {
http.Error(w, fmt.Sprintf("couldn't view messages from dlq: %v", err), http.StatusInternalServerError)
Expand Down Expand Up @@ -333,7 +343,7 @@ func (r *ARetry) replay(ctx context.Context, queue string, count int) (int, erro
return replayCount, fmt.Errorf("error getting channel:%w", err)
}
defer ch.Close()
srcQueue := fmt.Sprintf("%s_%s_%s", queue, QueueDL, "queue")
srcQueue := fmt.Sprintf("%s_%s_%s", queue, QueueTypeDL, "queue")
q, err := ch.QueueInspect(srcQueue)
if err != nil {
return replayCount, fmt.Errorf("error inspecting queue:%w", err)
Expand All @@ -354,7 +364,7 @@ func (r *ARetry) replay(ctx context.Context, queue string, count int) (int, erro
return replayCount, fmt.Errorf("error getting message from queue:%w", err)
}
err = p.Publish(publisher.Message{
Key: QueueInstant,
Key: QueueTypeInstant,
Exchange: fmt.Sprintf("%s_%s", queue, "exchange"),
Publishing: amqp.Publishing{
Body: m.Body,
Expand Down

0 comments on commit 6286e8a

Please sign in to comment.