Skip to content

Commit f93057e

Browse files
authored
pubsub/rabbitpubsub: wrap pubsub rabbitmq errors (#3437)
1 parent 07f7aa0 commit f93057e

File tree

2 files changed

+24
-18
lines changed

2 files changed

+24
-18
lines changed

pubsub/rabbitpubsub/amqp.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,10 @@ func (ch *channel) NotifyClose(c chan *amqp.Error) chan *amqp.Error {
143143

144144
func (ch *channel) ExchangeDeclare(name string) error {
145145
return ch.ch.ExchangeDeclare(name,
146-
"fanout", // kind
147-
false, // durable
148-
false, // delete when unused
149-
false, // internal
146+
amqp.ExchangeFanout, // kind
147+
false, // durable
148+
false, // delete when unused
149+
false, // internal
150150
wait,
151151
nil) // args
152152
}

pubsub/rabbitpubsub/rabbit.go

+20-14
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (o *defaultDialer) defaultConn(ctx context.Context) (*URLOpener, error) {
6464
}
6565
conn, err := amqp.Dial(serverURL)
6666
if err != nil {
67-
return nil, fmt.Errorf("failed to dial RABBIT_SERVER_URL %q: %v", serverURL, err)
67+
return nil, fmt.Errorf("failed to dial RABBIT_SERVER_URL %q: %w", serverURL, err)
6868
}
6969
o.conn = conn
7070
o.opener = &URLOpener{Connection: conn}
@@ -74,15 +74,15 @@ func (o *defaultDialer) defaultConn(ctx context.Context) (*URLOpener, error) {
7474
func (o *defaultDialer) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
7575
opener, err := o.defaultConn(ctx)
7676
if err != nil {
77-
return nil, fmt.Errorf("open topic %v: failed to open default connection: %v", u, err)
77+
return nil, fmt.Errorf("open topic %v: failed to open default connection: %w", u, err)
7878
}
7979
return opener.OpenTopicURL(ctx, u)
8080
}
8181

8282
func (o *defaultDialer) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
8383
opener, err := o.defaultConn(ctx)
8484
if err != nil {
85-
return nil, fmt.Errorf("open subscription %v: failed to open default connection: %v", u, err)
85+
return nil, fmt.Errorf("open subscription %v: failed to open default connection: %w", u, err)
8686
}
8787
return opener.OpenSubscriptionURL(ctx, u)
8888
}
@@ -335,7 +335,8 @@ func (t *topic) SendBatch(ctx context.Context, ms []*driver.Message) error {
335335
}
336336
// If there is only one error, return it rather than a MultiError. That
337337
// will work better with ErrorCode and ErrorAs.
338-
if merr, ok := err.(MultiError); ok && len(merr) == 1 {
338+
var merr MultiError
339+
if errors.As(err, &merr) && len(merr) == 1 {
339340
return merr[0]
340341
}
341342
return err
@@ -478,8 +479,8 @@ var errorCodes = map[int]gcerrors.ErrorCode{
478479
}
479480

480481
func errorCode(err error) gcerrors.ErrorCode {
481-
aerr, ok := err.(*amqp.Error)
482-
if !ok {
482+
var aerr *amqp.Error
483+
if !errors.As(err, &aerr) {
483484
return gcerrors.Unknown
484485
}
485486
if ec, ok := errorCodes[aerr.Code]; ok {
@@ -489,8 +490,8 @@ func errorCode(err error) gcerrors.ErrorCode {
489490
}
490491

491492
func isRetryable(err error) bool {
492-
aerr, ok := err.(*amqp.Error)
493-
if !ok {
493+
var aerr *amqp.Error
494+
if !errors.As(err, &aerr) {
494495
return false
495496
}
496497
// amqp.Error has a Recover field which sounds like it should mean "retryable".
@@ -540,18 +541,22 @@ func (*topic) ErrorAs(err error, i interface{}) bool {
540541
}
541542

542543
func errorAs(err error, i interface{}) bool {
543-
switch e := err.(type) {
544-
case *amqp.Error:
544+
var aerr *amqp.Error
545+
if errors.As(err, &aerr) {
545546
if p, ok := i.(**amqp.Error); ok {
546-
*p = e
547+
*p = aerr
547548
return true
548549
}
549-
case MultiError:
550+
}
551+
552+
var merr MultiError
553+
if errors.As(err, &merr) {
550554
if p, ok := i.(*MultiError); ok {
551-
*p = e
555+
*p = merr
552556
return true
553557
}
554558
}
559+
555560
return false
556561
}
557562

@@ -691,7 +696,8 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr
691696
if err := closeErr(s.closec); err != nil {
692697
// PreconditionFailed can happen if we send an Ack or Nack for a
693698
// message that has already been acked/nacked. Ignore those errors.
694-
if aerr, ok := err.(*amqp.Error); ok && aerr.Code == amqp.PreconditionFailed {
699+
var aerr *amqp.Error
700+
if errors.As(err, &aerr) && aerr.Code == amqp.PreconditionFailed {
695701
return nil, nil
696702
}
697703
return nil, err

0 commit comments

Comments
 (0)