Skip to content

Commit

Permalink
Add canned kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinlindemberg committed Sep 26, 2021
1 parent 19e857f commit 170e690
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 0 deletions.
144 changes: 144 additions & 0 deletions canned/kafka.go
@@ -0,0 +1,144 @@
// inspired by Java Kafka testcontainers' module
//https://github.com/testcontainers/testcontainers-java/blob/master/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java

package canned

import (
"context"
"github.com/testcontainers/testcontainers-go"
"io/ioutil"
"os"
)

const (
clusterName = "kafka-cluster"
zookeeperPort = "2181"
kafkaBrokerPort = "9092"
kafkaClientPort = "9093"
zookeeperImage = "confluentinc/cp-zookeeper:5.2.1"
kafkaImage = "confluentinc/cp-kafka:5.2.1"
)

type KafkaCluster struct {
kafkaContainer testcontainers.Container
zookeeperContainer testcontainers.Container
}

// StartCluster starts kafka cluster
func (kc *KafkaCluster) StartCluster() {
ctx := context.Background()

kc.zookeeperContainer.Start(ctx)
kc.kafkaContainer.Start(ctx)
kc.startKafka()
}

// GetKafkaHost gets the kafka host:port so it can be accessed from outside the container
func (kc *KafkaCluster) GetKafkaHost() string {
ctx := context.Background()
host, err := kc.kafkaContainer.Host(ctx)
if err != nil {
panic(err)
}
port, err := kc.kafkaContainer.MappedPort(ctx, kafkaClientPort)
if err != nil {
panic(err)
}

// returns the exposed kafka host:port
return host + ":" + port.Port()
}

func (kc *KafkaCluster) startKafka() {
ctx := context.Background()

kafkaStartFile, err := ioutil.TempFile("", "testcontainers_start.sh")
if err != nil {
panic(err)
}
defer os.Remove(kafkaStartFile.Name())

// needs to set KAFKA_ADVERTISED_LISTENERS with the exposed kafka port
exposedHost := kc.GetKafkaHost()
kafkaStartFile.WriteString("#!/bin/bash \n")
kafkaStartFile.WriteString("export KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://" + exposedHost + ",BROKER://kafka:" + kafkaBrokerPort + "'\n")
kafkaStartFile.WriteString(". /etc/confluent/docker/bash-config \n")
kafkaStartFile.WriteString("/etc/confluent/docker/configure \n")
kafkaStartFile.WriteString("/etc/confluent/docker/launch \n")

err = kc.kafkaContainer.CopyFileToContainer(ctx, kafkaStartFile.Name(), "testcontainers_start.sh", 0700)
if err != nil {
panic(err)
}
}

func NewKafkaCluster() *KafkaCluster {
ctx := context.Background()

// creates a network, so kafka and zookeeper can communicate directly
network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{Name: clusterName},
})
if err != nil {
panic(err)
}

dockerNetwork := network.(*testcontainers.DockerNetwork)

zookeeperContainer := createZookeeperContainer(dockerNetwork)
kafkaContainer := createKafkaContainer(dockerNetwork)

return &KafkaCluster{
zookeeperContainer: zookeeperContainer,
kafkaContainer: kafkaContainer,
}
}

func createZookeeperContainer(network *testcontainers.DockerNetwork) testcontainers.Container {
ctx := context.Background()

// creates the zookeeper container, but do not start it yet
zookeeperContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: zookeeperImage,
ExposedPorts: []string{zookeeperPort},
Env: map[string]string{"ZOOKEEPER_CLIENT_PORT": zookeeperPort, "ZOOKEEPER_TICK_TIME": "2000"},
Networks: []string{network.Name},
NetworkAliases: map[string][]string{network.Name: {"zookeeper"}},
},
})
if err != nil {
panic(err)
}

return zookeeperContainer
}

func createKafkaContainer(network *testcontainers.DockerNetwork) testcontainers.Container {
ctx := context.Background()

// creates the kafka container, but do not start it yet
kafkaContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: kafkaImage,
ExposedPorts: []string{kafkaClientPort},
Env: map[string]string{
"KAFKA_BROKER_ID": "1",
"KAFKA_ZOOKEEPER_CONNECT": "zookeeper:" + zookeeperPort,
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:" + kafkaClientPort + ",BROKER://0.0.0.0:" + kafkaBrokerPort,
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
},
Networks: []string{network.Name},
NetworkAliases: map[string][]string{network.Name: {"kafka"}},
// the container only starts when it finds and run /testcontainers_start.sh
Cmd: []string{"sh", "-c", "while [ ! -f /testcontainers_start.sh ]; do sleep 0.1; done; /testcontainers_start.sh"},
},
})
if err != nil {
panic(err)
}

return kafkaContainer
}
101 changes: 101 additions & 0 deletions canned/kafka_test.go
@@ -0,0 +1,101 @@
package canned_test

import (
"fmt"
"github.com/testcontainers/testcontainers-go/canned"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"reflect"
"testing"
)

const (
KAFKA_TOPIC = "myTopic"
)

func TestKafkaConsumerAndProducerUsingTestContainer(t *testing.T) {
kafkaCluster := canned.NewKafkaCluster()
kafkaCluster.StartCluster()
kafkaServer := kafkaCluster.GetKafkaHost()
producedMessages := []string{"Trying", "out", "kafka", "with", "test", "containers"}

produceKafkaMessages(kafkaServer, producedMessages)
consumedMessages := consumeKafkaMessages(kafkaServer)

if !reflect.DeepEqual(producedMessages, consumedMessages) {
t.Fatalf("Consumed messages are not equal to produced messages. [%s] != [%s]", consumedMessages, producedMessages)
}
}

func produceKafkaMessages(kafkaServer string, messages []string) {
kafkaProducer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaServer,
})
if err != nil {
panic(err)
}
defer kafkaProducer.Close()

fmt.Printf("Producing messages into kafka...\n")

topic := KAFKA_TOPIC
for _, word := range messages {
deliveryChan := make(chan kafka.Event)

kafkaProducer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
Key: []byte("key"),
}, deliveryChan)

e := <-deliveryChan
m := e.(*kafka.Message)

if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message [%s] to topic %s [%d] at offset %v\n",
word, *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
}

func consumeKafkaMessages(kafkaServer string) []string {
kafkaConsumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kafkaServer,
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = kafkaConsumer.SubscribeTopics([]string{KAFKA_TOPIC}, nil)
if err != nil {
panic(err)
}
defer kafkaConsumer.Close()

fmt.Printf("Consuming messages from kafka...\n")

var consumedMessages []string
run := true
for run == true {
select {
default:
ev := kafkaConsumer.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Message on %s: %s\n", e.TopicPartition, string(e.Value))
consumedMessages = append(consumedMessages, string(e.Value))
default:
fmt.Printf("Consumed all messges. Stopping consumer.\n")
run = false
}
}
}

return consumedMessages
}
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
github.com/Microsoft/hcsshim v0.8.16 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect
github.com/docker/docker v20.10.8+incompatible
github.com/docker/go-connections v0.4.0
github.com/go-redis/redis v6.15.9+incompatible
Expand All @@ -16,6 +17,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
golang.org/x/sys v0.0.0-20210324051608-47abb6519492
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0
gopkg.in/yaml.v2 v2.4.0
gotest.tools v2.2.0+incompatible
)
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -93,6 +93,8 @@ github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE=
github.com/containerd/aufs v0.0.0-20201003224125-76a6863f2989/go.mod h1:AkGGQs9NM2vtYHaUen+NljV0/baGCAPELGm2q9ZXpWU=
github.com/containerd/aufs v0.0.0-20210316121734-20793ff83c97/go.mod h1:kL5kd6KM5TzQjR79jljyi4olc1Vrx6XBlcyj3gNv2PU=
Expand Down Expand Up @@ -820,6 +822,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0 h1:+RlmciBLDd/XwM1iudiG3HtCg45purnsOxEoY/+JZdQ=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
Expand Down

0 comments on commit 170e690

Please sign in to comment.