From 0866b65646995a7c11e964b85ba611b4cf40204f Mon Sep 17 00:00:00 2001 From: Tiago Peczenyj Date: Fri, 31 May 2024 21:39:06 +0200 Subject: [PATCH] pubsub/rabbitpubsub: add query string to set the routing key from metadata (#3433) --- pubsub/kafkapubsub/kafka.go | 4 +- pubsub/rabbitpubsub/amqp.go | 8 +--- pubsub/rabbitpubsub/fake_test.go | 9 ++-- pubsub/rabbitpubsub/rabbit.go | 69 +++++++++++++++++++++++------- pubsub/rabbitpubsub/rabbit_test.go | 8 ++-- 5 files changed, 69 insertions(+), 29 deletions(-) diff --git a/pubsub/kafkapubsub/kafka.go b/pubsub/kafkapubsub/kafka.go index 2d3743e0de..4a20fff2ba 100644 --- a/pubsub/kafkapubsub/kafka.go +++ b/pubsub/kafkapubsub/kafka.go @@ -136,7 +136,9 @@ const Scheme = "kafka" // URLOpener opens Kafka URLs like "kafka://mytopic" for topics and // "kafka://group?topic=mytopic" for subscriptions. // -// For topics, the URL's host+path is used as the topic name. +// For topics, the URL's host+path is used as the topic name, +// and the "key_name" query parameter is used to extract the routing key +// from metadata. // // For subscriptions, the URL's host+path is used as the group name, // and the "topic" query parameter(s) are used as the set of topics to diff --git a/pubsub/rabbitpubsub/amqp.go b/pubsub/rabbitpubsub/amqp.go index 5a5a174885..503509602b 100644 --- a/pubsub/rabbitpubsub/amqp.go +++ b/pubsub/rabbitpubsub/amqp.go @@ -29,10 +29,6 @@ const ( // response. We always want to wait. wait = false - // Always use the empty routing key. This driver expects to be used with topic - // exchanges, which disregard the routing key. - routingKey = "" - // If the message can't be enqueued, return it to the sender rather than silently // dropping it. mandatory = true @@ -49,7 +45,7 @@ type amqpConnection interface { // See https://pkg.go.dev/github.com/rabbitmq/amqp091-go#Channel for the documentation of these methods. type amqpChannel interface { - Publish(exchange string, msg amqp.Publishing) error + Publish(exchange, routingKey string, msg amqp.Publishing) error Consume(queue, consumer string) (<-chan amqp.Delivery, error) Ack(tag uint64) error Nack(tag uint64) error @@ -93,7 +89,7 @@ type channel struct { ch *amqp.Channel } -func (ch *channel) Publish(exchange string, msg amqp.Publishing) error { +func (ch *channel) Publish(exchange, routingKey string, msg amqp.Publishing) error { return ch.ch.Publish(exchange, routingKey, mandatory, immediate, msg) } diff --git a/pubsub/rabbitpubsub/fake_test.go b/pubsub/rabbitpubsub/fake_test.go index 97c3badd61..7a4d7dc501 100644 --- a/pubsub/rabbitpubsub/fake_test.go +++ b/pubsub/rabbitpubsub/fake_test.go @@ -152,7 +152,7 @@ func (ch *fakeChannel) QueueDeclareAndBind(queueName, exchangeName string) error return nil } -func (ch *fakeChannel) Publish(exchangeName string, pub amqp.Publishing) error { +func (ch *fakeChannel) Publish(exchangeName, routingKey string, pub amqp.Publishing) error { if ch.isClosed() { return amqp.ErrClosed } @@ -168,9 +168,10 @@ func (ch *fakeChannel) Publish(exchangeName string, pub amqp.Publishing) error { // The message is unroutable. Send a Return to all channels registered with // NotifyReturn. ret := amqp.Return{ - Exchange: exchangeName, - ReplyCode: amqp.NoRoute, - ReplyText: "NO_ROUTE: no queues bound to exchange", + Exchange: exchangeName, + ReplyCode: amqp.NoRoute, + ReplyText: "NO_ROUTE: no queues bound to exchange", + RoutingKey: routingKey, } for _, c := range ch.returnChans { select { diff --git a/pubsub/rabbitpubsub/rabbit.go b/pubsub/rabbitpubsub/rabbit.go index 3142f37c27..5d2a28ecd2 100644 --- a/pubsub/rabbitpubsub/rabbit.go +++ b/pubsub/rabbitpubsub/rabbit.go @@ -112,11 +112,22 @@ type URLOpener struct { // OpenTopicURL opens a pubsub.Topic based on u. func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) { - for param := range u.Query() { - return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param) + opts := o.TopicOptions + for param, value := range u.Query() { + switch param { + case "key_name": + if len(value) != 1 || len(value[0]) == 0 { + return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param) + } + + opts.KeyName = value[0] + default: + return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param) + } } + exchangeName := path.Join(u.Host, u.Path) - return OpenTopic(o.Connection, exchangeName, &o.TopicOptions), nil + return OpenTopic(o.Connection, exchangeName, &opts), nil } // OpenSubscriptionURL opens a pubsub.Subscription based on u. @@ -147,6 +158,7 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu type topic struct { exchange string // the AMQP exchange conn amqpConnection + opts *TopicOptions mu sync.Mutex ch amqpChannel // AMQP channel used for all communication. @@ -157,11 +169,22 @@ type topic struct { // TopicOptions sets options for constructing a *pubsub.Topic backed by // RabbitMQ. -type TopicOptions struct{} +type TopicOptions struct { + // KeyName optionally sets the Message.Metadata key to use as the optional + // RabbitMQ message key. If set, and if a matching Message.Metadata key is found, + // the value for that key will be used as the routing key when sending to + // RabbitMQ, instead of being added to the message headers. + KeyName string +} // SubscriptionOptions sets options for constructing a *pubsub.Subscription // backed by RabbitMQ. type SubscriptionOptions struct { + // KeyName optionally sets the Message.Metadata key in which to store the + // RabbitMQ message key. If set, and if the RabbitMQ message key is non-empty, + // the key value will be stored in Message.Metadata under KeyName. + KeyName string + // Qos property prefetch count. Optional. PrefetchCount *int } @@ -181,13 +204,18 @@ type SubscriptionOptions struct { // The documentation of the amqp package recommends using separate connections for // publishing and subscribing. func OpenTopic(conn *amqp.Connection, name string, opts *TopicOptions) *pubsub.Topic { - return pubsub.NewTopic(newTopic(&connection{conn}, name), nil) + return pubsub.NewTopic(newTopic(&connection{conn}, name, opts), nil) } -func newTopic(conn amqpConnection, name string) *topic { +func newTopic(conn amqpConnection, name string, opts *TopicOptions) *topic { + if opts == nil { + opts = &TopicOptions{} + } + return &topic{ conn: conn, exchange: name, + opts: opts, } } @@ -271,7 +299,7 @@ func (t *topic) SendBatch(ctx context.Context, ms []*driver.Message) error { var perr error for _, m := range ms { - pub := toPublishing(m) + routingKey, pub := toRoutingKeyAndAMQPPublishing(m, t.opts) if m.BeforeSend != nil { asFunc := func(i interface{}) bool { if p, ok := i.(**amqp.Publishing); ok { @@ -284,7 +312,7 @@ func (t *topic) SendBatch(ctx context.Context, ms []*driver.Message) error { return err } } - if perr = ch.Publish(t.exchange, pub); perr != nil { + if perr = ch.Publish(t.exchange, routingKey, pub); perr != nil { cancel() break } @@ -410,16 +438,23 @@ func closeErr(closec <-chan *amqp.Error) error { } } -// toPublishing converts a driver.Message to an amqp.Publishing. -func toPublishing(m *driver.Message) amqp.Publishing { +// toRoutingKeyAndAMQPPublishing converts a driver.Message to a pair routingKey + amqp.Publishing. +func toRoutingKeyAndAMQPPublishing(m *driver.Message, opts *TopicOptions) (routingKey string, msg amqp.Publishing) { h := amqp.Table{} for k, v := range m.Metadata { - h[k] = v + if opts.KeyName == k { + routingKey = v + } else { + h[k] = v + } } - return amqp.Publishing{ + + msg = amqp.Publishing{ Headers: h, Body: m.Body, } + + return routingKey, msg } // IsRetryable implements driver.Topic.IsRetryable. @@ -665,7 +700,7 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr // error. return nil, errors.New("rabbitpubsub: delivery channel closed unexpectedly") } - ms = append(ms, toMessage(d)) + ms = append(ms, toDriverMessage(d, s.opts)) if len(ms) >= maxMessages { return ms, nil } @@ -679,14 +714,18 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr } } -// toMessage converts an amqp.Delivery (a received message) to a driver.Message. -func toMessage(d amqp.Delivery) *driver.Message { +// toDriverMessage converts an amqp.Delivery (a received message) to a driver.Message. +func toDriverMessage(d amqp.Delivery, opts *SubscriptionOptions) *driver.Message { // Delivery.Headers is a map[string]interface{}, so we have to // convert each value to a string. md := map[string]string{} for k, v := range d.Headers { md[k] = fmt.Sprint(v) } + // Add a metadata entry for the message routing key if appropriate. + if d.RoutingKey != "" && opts.KeyName != "" { + md[opts.KeyName] = d.RoutingKey + } loggableID := d.MessageId if loggableID == "" { loggableID = d.CorrelationId diff --git a/pubsub/rabbitpubsub/rabbit_test.go b/pubsub/rabbitpubsub/rabbit_test.go index ec25be339a..84f9ba8347 100644 --- a/pubsub/rabbitpubsub/rabbit_test.go +++ b/pubsub/rabbitpubsub/rabbit_test.go @@ -125,11 +125,11 @@ func (h *harness) CreateTopic(_ context.Context, testName string) (dt driver.Top } ch.ExchangeDelete(exchange) } - return newTopic(h.conn, exchange), cleanup, nil + return newTopic(h.conn, exchange, nil), cleanup, nil } func (h *harness) MakeNonexistentTopic(context.Context) (driver.Topic, error) { - return newTopic(h.conn, "nonexistent-topic"), nil + return newTopic(h.conn, "nonexistent-topic", nil), nil } func (h *harness) CreateSubscription(_ context.Context, dt driver.Topic, testName string) (ds driver.Subscription, cleanup func(), err error) { @@ -170,7 +170,7 @@ func TestUnroutable(t *testing.T) { if err := declareExchange(conn, "u"); err != nil { t.Fatal(err) } - topic := newTopic(conn, "u") + topic := newTopic(conn, "u", nil) msgs := []*driver.Message{ {Body: []byte("")}, {Body: []byte("")}, @@ -394,7 +394,9 @@ func TestOpenTopicFromURL(t *testing.T) { WantErr bool }{ {"valid url", "rabbit://%s", false}, + {"valid url with key name parameter", "rabbit://%s?key_name=foo", false}, {"invalid url with parameters", "rabbit://%s?param=value", true}, + {"invalid url with key name parameter", "rabbit://%s?key_name=", true}, } for _, test := range tests {