Skip to content

Commit

Permalink
pubsub/kafkapubsub: Allow setting config key_name via URL
Browse files Browse the repository at this point in the history
  • Loading branch information
ssetin committed Mar 18, 2024
1 parent 9f34166 commit 1f3e996
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 11 deletions.
2 changes: 1 addition & 1 deletion internal/website/data/examples.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pubsub/kafkapubsub/example_test.go
Expand Up @@ -69,7 +69,7 @@ func Example_openTopicFromURL() {
// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// The host + path are the topic name to send to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic")
topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic?key_name=x-partition-key")
if err != nil {
log.Fatal(err)
}
Expand Down
14 changes: 12 additions & 2 deletions pubsub/kafkapubsub/kafka.go
Expand Up @@ -159,9 +159,19 @@ type URLOpener struct {

// OpenTopicURL opens a pubsub.Topic based on u.
func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
for param := range u.Query() {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
for param, value := range u.Query() {
switch param {
case "key_name":
if len(value) != 1 || len(value[0]) == 0 {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
}

o.TopicOptions.KeyName = value[0]
default:
return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
}
}

topicName := path.Join(u.Host, u.Path)
return OpenTopic(o.Brokers, o.Config, topicName, &o.TopicOptions)
}
Expand Down
24 changes: 17 additions & 7 deletions pubsub/kafkapubsub/kafka_test.go
Expand Up @@ -471,15 +471,26 @@ func TestOpenTopicFromURL(t *testing.T) {
URL string
WantErr bool
}{
// OK, but still error because broker doesn't exist.
{"kafka://mytopic", true},
// OK.
{"kafka://mytopic", false},
// OK, specifying key_name.
{"kafka://mytopic?key_name=x-partition-key", false},
// Invalid key_name value.
{"kafka://mytopic?key_name=", true},
// Invalid parameter.
{"kafka://mytopic?param=value", true},
}

ctx := context.Background()
for _, test := range tests {
topic, err := pubsub.OpenTopic(ctx, test.URL)
if err != nil && errors.Is(err, sarama.ErrOutOfBrokers) {
// Since we don't have a real kafka broker to talk to, we will always get an error when
// opening a topic. This test is checking specifically for query parameter usage, so
// we treat the "no brokers" error message as a nil error.
err = nil
}

if (err != nil) != test.WantErr {
t.Errorf("%s: got error %v, want error %v", test.URL, err, test.WantErr)
}
Expand All @@ -497,23 +508,22 @@ func TestOpenSubscriptionFromURL(t *testing.T) {
URL string
WantErr bool
}{
// OK, but still error because broker doesn't exist.
// OK.
{"kafka://mygroup?topic=mytopic", false},
// OK, specifying initial offset, but still error because broker doesn't exist.
// OK, specifying initial offset.
{"kafka://mygroup?topic=mytopic&offset=oldest", false},
{"kafka://mygroup?topic=mytopic&offset=newest", false},
// Invalid offset specified
// Invalid offset specified.
{"kafka://mygroup?topic=mytopic&offset=value", true},
// Invalid parameter.
{"kafka://mygroup?topic=mytopic&param=value", true},
}

ctx := context.Background()
const ignore = "kafka: client has run out of available brokers to talk to"

for _, test := range tests {
sub, err := pubsub.OpenSubscription(ctx, test.URL)
if err != nil && strings.Contains(err.Error(), ignore) {
if err != nil && errors.Is(err, sarama.ErrOutOfBrokers) {
// Since we don't have a real kafka broker to talk to, we will always get an error when
// opening a subscription. This test is checking specifically for query parameter usage, so
// we treat the "no brokers" error message as a nil error.
Expand Down

0 comments on commit 1f3e996

Please sign in to comment.