Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support schema registry in kafka preset #672

Merged
merged 1 commit into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions preset/kafka/options.go
Expand Up @@ -34,3 +34,13 @@ func WithMessagesFile(files string) Option {
o.MessagesFiles = append(o.MessagesFiles, files)
}
}

// WithSchemaRegistry makes the container wait for the schema registry port to
// become available. Note that it takes longer to setup schema registry than
// the broker itself. Gnomock will not wait for the registry by default, but it
// may become available eventually.
func WithSchemaRegistry() Option {
return func(o *P) {
o.UseSchemaRegistry = true
}
}
45 changes: 34 additions & 11 deletions preset/kafka/preset.go
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"os"
"time"

Expand All @@ -17,16 +18,18 @@ import (

// The following ports are exposed by this preset:.
const (
BrokerPort = "broker"
ZooKeeperPort = "zookeeper"
WebPort = "web"
BrokerPort = "broker"
ZooKeeperPort = "zookeeper"
WebPort = "web"
SchemaRegistryPort = "registry"
)

const (
defaultVersion = "2.5.1-L0"
brokerPort = 49092
zookeeperPort = 2181
webPort = 3030
defaultVersion = "2.5.1-L0"
brokerPort = 49092
zookeeperPort = 2181
webPort = 3030
schemaRegistryPort = 8081
)

// Message is a single message sent to Kafka.
Expand Down Expand Up @@ -62,10 +65,11 @@ func Preset(opts ...Option) gnomock.Preset {

// P is a Gnomock Preset implementation of Kafka.
type P struct {
Version string `json:"version"`
Topics []string `json:"topics"`
Messages []Message `json:"messages"`
MessagesFiles []string `json:"messages_files"`
Version string `json:"version"`
Topics []string `json:"topics"`
Messages []Message `json:"messages"`
MessagesFiles []string `json:"messages_files"`
UseSchemaRegistry bool `json:"use_schema_registry"`
}

// Image returns an image that should be pulled to create this container.
Expand All @@ -83,6 +87,7 @@ func (p *P) Ports() gnomock.NamedPorts {

namedPorts[ZooKeeperPort] = gnomock.TCP(zookeeperPort)
namedPorts[WebPort] = gnomock.TCP(webPort)
namedPorts[SchemaRegistryPort] = gnomock.TCP(schemaRegistryPort)

return namedPorts
}
Expand Down Expand Up @@ -133,6 +138,24 @@ func (p *P) healthcheck(ctx context.Context, c *gnomock.Container) (err error) {
return fmt.Errorf("can't create topic: %w", err)
}

if p.UseSchemaRegistry {
url := "http://" + c.Address(SchemaRegistryPort)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("invalid request: %w", err)
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("schema registry is not available: %w", err)
}

if err := res.Body.Close(); err != nil {
return fmt.Errorf("error closing schema registry response body: %w", err)
}
}

return nil
}

Expand Down
22 changes: 22 additions & 0 deletions preset/kafka/preset_test.go
Expand Up @@ -2,6 +2,7 @@ package kafka_test

import (
"context"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -94,3 +95,24 @@ func TestPreset_withDefaults(t *testing.T) {
require.NoError(t, err)
require.NoError(t, c.Close())
}

func TestPreset_withSchemaRegistry(t *testing.T) {
p := kafka.Preset(kafka.WithSchemaRegistry())
container, err := gnomock.Start(
p,
gnomock.WithContainerName("kafka-with-registry"),
gnomock.WithTimeout(time.Minute*10),
)
require.NoError(t, err)

defer func() { require.NoError(t, gnomock.Stop(container)) }()

c, err := kafkaclient.Dial("tcp", container.Address(kafka.BrokerPort))
require.NoError(t, err)
require.NoError(t, c.Close())

out, err := http.Get("http://" + container.Address(kafka.SchemaRegistryPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, out.StatusCode)
require.NoError(t, out.Body.Close())
}
4 changes: 4 additions & 0 deletions swagger/swagger.yaml
Expand Up @@ -1038,6 +1038,10 @@ components:
description: Kafka version.
default: 2.5.1-L0
example: latest
use_schema_registry:
type: boolean
description: Set to true to wait for schema registry on startup
default: false
description: >
This object describes a Kafka container.
Expand Down