From b73b5dea751767c363a877fd6225ceb66d181660 Mon Sep 17 00:00:00 2001 From: Yury Fedorov Date: Sun, 25 Sep 2022 14:28:27 +0300 Subject: [PATCH] Support schema registry in kafka preset Schema registry startup time is a bit longer than the broker, so registry usage is hidden behind a flag to not make existing tests slower. --- preset/kafka/options.go | 10 +++++++++ preset/kafka/preset.go | 45 ++++++++++++++++++++++++++++--------- preset/kafka/preset_test.go | 22 ++++++++++++++++++ swagger/swagger.yaml | 4 ++++ 4 files changed, 70 insertions(+), 11 deletions(-) diff --git a/preset/kafka/options.go b/preset/kafka/options.go index f1850e73..090b5adf 100644 --- a/preset/kafka/options.go +++ b/preset/kafka/options.go @@ -34,3 +34,13 @@ func WithMessagesFile(files string) Option { o.MessagesFiles = append(o.MessagesFiles, files) } } + +// WithSchemaRegistry makes the container wait for the schema registry port to +// become available. Note that it takes longer to setup schema registry than +// the broker itself. Gnomock will not wait for the registry by default, but it +// may become available eventually. +func WithSchemaRegistry() Option { + return func(o *P) { + o.UseSchemaRegistry = true + } +} diff --git a/preset/kafka/preset.go b/preset/kafka/preset.go index 1159beb6..d56a011b 100644 --- a/preset/kafka/preset.go +++ b/preset/kafka/preset.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "time" @@ -17,16 +18,18 @@ import ( // The following ports are exposed by this preset:. const ( - BrokerPort = "broker" - ZooKeeperPort = "zookeeper" - WebPort = "web" + BrokerPort = "broker" + ZooKeeperPort = "zookeeper" + WebPort = "web" + SchemaRegistryPort = "registry" ) const ( - defaultVersion = "2.5.1-L0" - brokerPort = 49092 - zookeeperPort = 2181 - webPort = 3030 + defaultVersion = "2.5.1-L0" + brokerPort = 49092 + zookeeperPort = 2181 + webPort = 3030 + schemaRegistryPort = 8081 ) // Message is a single message sent to Kafka. @@ -62,10 +65,11 @@ func Preset(opts ...Option) gnomock.Preset { // P is a Gnomock Preset implementation of Kafka. type P struct { - Version string `json:"version"` - Topics []string `json:"topics"` - Messages []Message `json:"messages"` - MessagesFiles []string `json:"messages_files"` + Version string `json:"version"` + Topics []string `json:"topics"` + Messages []Message `json:"messages"` + MessagesFiles []string `json:"messages_files"` + UseSchemaRegistry bool `json:"use_schema_registry"` } // Image returns an image that should be pulled to create this container. @@ -83,6 +87,7 @@ func (p *P) Ports() gnomock.NamedPorts { namedPorts[ZooKeeperPort] = gnomock.TCP(zookeeperPort) namedPorts[WebPort] = gnomock.TCP(webPort) + namedPorts[SchemaRegistryPort] = gnomock.TCP(schemaRegistryPort) return namedPorts } @@ -133,6 +138,24 @@ func (p *P) healthcheck(ctx context.Context, c *gnomock.Container) (err error) { return fmt.Errorf("can't create topic: %w", err) } + if p.UseSchemaRegistry { + url := "http://" + c.Address(SchemaRegistryPort) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("invalid request: %w", err) + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("schema registry is not available: %w", err) + } + + if err := res.Body.Close(); err != nil { + return fmt.Errorf("error closing schema registry response body: %w", err) + } + } + return nil } diff --git a/preset/kafka/preset_test.go b/preset/kafka/preset_test.go index 89b20345..55565e26 100644 --- a/preset/kafka/preset_test.go +++ b/preset/kafka/preset_test.go @@ -2,6 +2,7 @@ package kafka_test import ( "context" + "net/http" "testing" "time" @@ -94,3 +95,24 @@ func TestPreset_withDefaults(t *testing.T) { require.NoError(t, err) require.NoError(t, c.Close()) } + +func TestPreset_withSchemaRegistry(t *testing.T) { + p := kafka.Preset(kafka.WithSchemaRegistry()) + container, err := gnomock.Start( + p, + gnomock.WithContainerName("kafka-with-registry"), + gnomock.WithTimeout(time.Minute*10), + ) + require.NoError(t, err) + + defer func() { require.NoError(t, gnomock.Stop(container)) }() + + c, err := kafkaclient.Dial("tcp", container.Address(kafka.BrokerPort)) + require.NoError(t, err) + require.NoError(t, c.Close()) + + out, err := http.Get("http://" + container.Address(kafka.SchemaRegistryPort)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, out.StatusCode) + require.NoError(t, out.Body.Close()) +} diff --git a/swagger/swagger.yaml b/swagger/swagger.yaml index 7cea9dcd..8e23a942 100644 --- a/swagger/swagger.yaml +++ b/swagger/swagger.yaml @@ -1038,6 +1038,10 @@ components: description: Kafka version. default: 2.5.1-L0 example: latest + use_schema_registry: + type: boolean + description: Set to true to wait for schema registry on startup + default: false description: > This object describes a Kafka container.