Skip to content

Commit

Permalink
Support schema registry in kafka preset
Browse files Browse the repository at this point in the history
Schema registry startup time is a bit longer than the broker, so
registry usage is hidden behind a flag to not make existing tests
slower.
  • Loading branch information
orlangure committed Sep 27, 2022
1 parent 764ab14 commit d7885f7
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
10 changes: 10 additions & 0 deletions preset/kafka/options.go
Expand Up @@ -34,3 +34,13 @@ func WithMessagesFile(files string) Option {
o.MessagesFiles = append(o.MessagesFiles, files)
}
}

// WithSchemaRegistry makes the container wait for the schema registry port to
// become available. Note that it takes longer to setup schema registry than
// the broker itself. Gnomock will not wait for the registry by default, but it
// may become available eventually.
func WithSchemaRegistry() Option {
return func(o *P) {
o.UseSchemaRegistry = true
}
}
45 changes: 34 additions & 11 deletions preset/kafka/preset.go
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"os"
"time"

Expand All @@ -17,16 +18,18 @@ import (

// The following ports are exposed by this preset:.
const (
BrokerPort = "broker"
ZooKeeperPort = "zookeeper"
WebPort = "web"
BrokerPort = "broker"
ZooKeeperPort = "zookeeper"
WebPort = "web"
SchemaRegistryPort = "registry"
)

const (
defaultVersion = "2.5.1-L0"
brokerPort = 49092
zookeeperPort = 2181
webPort = 3030
defaultVersion = "2.5.1-L0"
brokerPort = 49092
zookeeperPort = 2181
webPort = 3030
schemaRegistryPort = 8081
)

// Message is a single message sent to Kafka.
Expand Down Expand Up @@ -62,10 +65,11 @@ func Preset(opts ...Option) gnomock.Preset {

// P is a Gnomock Preset implementation of Kafka.
type P struct {
Version string `json:"version"`
Topics []string `json:"topics"`
Messages []Message `json:"messages"`
MessagesFiles []string `json:"messages_files"`
Version string `json:"version"`
Topics []string `json:"topics"`
Messages []Message `json:"messages"`
MessagesFiles []string `json:"messages_files"`
UseSchemaRegistry bool `json:"use_schema_registry"`
}

// Image returns an image that should be pulled to create this container.
Expand All @@ -83,6 +87,7 @@ func (p *P) Ports() gnomock.NamedPorts {

namedPorts[ZooKeeperPort] = gnomock.TCP(zookeeperPort)
namedPorts[WebPort] = gnomock.TCP(webPort)
namedPorts[SchemaRegistryPort] = gnomock.TCP(schemaRegistryPort)

return namedPorts
}
Expand Down Expand Up @@ -133,6 +138,24 @@ func (p *P) healthcheck(ctx context.Context, c *gnomock.Container) (err error) {
return fmt.Errorf("can't create topic: %w", err)
}

if p.UseSchemaRegistry {
url := "http://" + c.Address(SchemaRegistryPort)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("invalid request: %w", err)
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("schema registry is not available: %w", err)
}

if err := res.Body.Close(); err != nil {
return fmt.Errorf("error closing schema registry response body: %w", err)
}
}

return nil
}

Expand Down
22 changes: 22 additions & 0 deletions preset/kafka/preset_test.go
Expand Up @@ -2,6 +2,7 @@ package kafka_test

import (
"context"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -94,3 +95,24 @@ func TestPreset_withDefaults(t *testing.T) {
require.NoError(t, err)
require.NoError(t, c.Close())
}

func TestPreset_withSchemaRegistry(t *testing.T) {
p := kafka.Preset(kafka.WithSchemaRegistry())
container, err := gnomock.Start(
p,
gnomock.WithContainerName("kafka-with-registry"),
gnomock.WithTimeout(time.Minute*10),
)
require.NoError(t, err)

defer func() { require.NoError(t, gnomock.Stop(container)) }()

c, err := kafkaclient.Dial("tcp", container.Address(kafka.BrokerPort))
require.NoError(t, err)
require.NoError(t, c.Close())

out, err := http.Get("http://" + container.Address(kafka.SchemaRegistryPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, out.StatusCode)
require.NoError(t, out.Body.Close())
}
4 changes: 4 additions & 0 deletions swagger/swagger.yaml
Expand Up @@ -1038,6 +1038,10 @@ components:
description: Kafka version.
default: 2.5.1-L0
example: latest
use_schema_registry:
type: boolean
description: Set to true to wait for schema registry on startup
default: false
description: >
This object describes a Kafka container.
Expand Down

0 comments on commit d7885f7

Please sign in to comment.