From 1f3e9961f74435ef35f96e3cb82ba0f9d621d980 Mon Sep 17 00:00:00 2001 From: Sergei Setin Date: Mon, 18 Mar 2024 20:33:31 +0300 Subject: [PATCH] pubsub/kafkapubsub: Allow setting config key_name via URL --- internal/website/data/examples.json | 2 +- pubsub/kafkapubsub/example_test.go | 2 +- pubsub/kafkapubsub/kafka.go | 14 ++++++++++++-- pubsub/kafkapubsub/kafka_test.go | 24 +++++++++++++++++------- 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/internal/website/data/examples.json b/internal/website/data/examples.json index 6b24a6e848..21538d650e 100644 --- a/internal/website/data/examples.json +++ b/internal/website/data/examples.json @@ -245,7 +245,7 @@ }, "gocloud.dev/pubsub/kafkapubsub.Example_openTopicFromURL": { "imports": "import (\n\t\"context\"\n\n\t\"gocloud.dev/pubsub\"\n\t_ \"gocloud.dev/pubsub/kafkapubsub\"\n)", - "code": "// pubsub.OpenTopic creates a *pubsub.Topic from a URL.\n// The host + path are the topic name to send to.\n// The set of brokers must be in an environment variable KAFKA_BROKERS.\ntopic, err := pubsub.OpenTopic(ctx, \"kafka://my-topic\")\nif err != nil {\n\treturn err\n}\ndefer topic.Shutdown(ctx)" + "code": "// pubsub.OpenTopic creates a *pubsub.Topic from a URL.\n// The host + path are the topic name to send to.\n// The set of brokers must be in an environment variable KAFKA_BROKERS.\ntopic, err := pubsub.OpenTopic(ctx, \"kafka://my-topic?key_name=x-partition-key\")\nif err != nil {\n\treturn err\n}\ndefer topic.Shutdown(ctx)" }, "gocloud.dev/pubsub/mempubsub.ExampleNewSubscription": { "imports": "import (\n\t\"context\"\n\t\"time\"\n\n\t\"gocloud.dev/pubsub/mempubsub\"\n)", diff --git a/pubsub/kafkapubsub/example_test.go b/pubsub/kafkapubsub/example_test.go index 8fb74e5434..24289805c4 100644 --- a/pubsub/kafkapubsub/example_test.go +++ b/pubsub/kafkapubsub/example_test.go @@ -69,7 +69,7 @@ func Example_openTopicFromURL() { // pubsub.OpenTopic creates a *pubsub.Topic from a URL. // The host + path are the topic name to send to. // The set of brokers must be in an environment variable KAFKA_BROKERS. - topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic") + topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic?key_name=x-partition-key") if err != nil { log.Fatal(err) } diff --git a/pubsub/kafkapubsub/kafka.go b/pubsub/kafkapubsub/kafka.go index 45d28e88c6..2d3743e0de 100644 --- a/pubsub/kafkapubsub/kafka.go +++ b/pubsub/kafkapubsub/kafka.go @@ -159,9 +159,19 @@ type URLOpener struct { // OpenTopicURL opens a pubsub.Topic based on u. func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) { - for param := range u.Query() { - return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param) + for param, value := range u.Query() { + switch param { + case "key_name": + if len(value) != 1 || len(value[0]) == 0 { + return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param) + } + + o.TopicOptions.KeyName = value[0] + default: + return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param) + } } + topicName := path.Join(u.Host, u.Path) return OpenTopic(o.Brokers, o.Config, topicName, &o.TopicOptions) } diff --git a/pubsub/kafkapubsub/kafka_test.go b/pubsub/kafkapubsub/kafka_test.go index af8ce3ff86..9f5c617536 100644 --- a/pubsub/kafkapubsub/kafka_test.go +++ b/pubsub/kafkapubsub/kafka_test.go @@ -471,8 +471,12 @@ func TestOpenTopicFromURL(t *testing.T) { URL string WantErr bool }{ - // OK, but still error because broker doesn't exist. - {"kafka://mytopic", true}, + // OK. + {"kafka://mytopic", false}, + // OK, specifying key_name. + {"kafka://mytopic?key_name=x-partition-key", false}, + // Invalid key_name value. + {"kafka://mytopic?key_name=", true}, // Invalid parameter. {"kafka://mytopic?param=value", true}, } @@ -480,6 +484,13 @@ func TestOpenTopicFromURL(t *testing.T) { ctx := context.Background() for _, test := range tests { topic, err := pubsub.OpenTopic(ctx, test.URL) + if err != nil && errors.Is(err, sarama.ErrOutOfBrokers) { + // Since we don't have a real kafka broker to talk to, we will always get an error when + // opening a topic. This test is checking specifically for query parameter usage, so + // we treat the "no brokers" error message as a nil error. + err = nil + } + if (err != nil) != test.WantErr { t.Errorf("%s: got error %v, want error %v", test.URL, err, test.WantErr) } @@ -497,23 +508,22 @@ func TestOpenSubscriptionFromURL(t *testing.T) { URL string WantErr bool }{ - // OK, but still error because broker doesn't exist. + // OK. {"kafka://mygroup?topic=mytopic", false}, - // OK, specifying initial offset, but still error because broker doesn't exist. + // OK, specifying initial offset. {"kafka://mygroup?topic=mytopic&offset=oldest", false}, {"kafka://mygroup?topic=mytopic&offset=newest", false}, - // Invalid offset specified + // Invalid offset specified. {"kafka://mygroup?topic=mytopic&offset=value", true}, // Invalid parameter. {"kafka://mygroup?topic=mytopic¶m=value", true}, } ctx := context.Background() - const ignore = "kafka: client has run out of available brokers to talk to" for _, test := range tests { sub, err := pubsub.OpenSubscription(ctx, test.URL) - if err != nil && strings.Contains(err.Error(), ignore) { + if err != nil && errors.Is(err, sarama.ErrOutOfBrokers) { // Since we don't have a real kafka broker to talk to, we will always get an error when // opening a subscription. This test is checking specifically for query parameter usage, so // we treat the "no brokers" error message as a nil error.