diff --git a/canned/kafka.go b/canned/kafka.go new file mode 100644 index 0000000000..4d151bff91 --- /dev/null +++ b/canned/kafka.go @@ -0,0 +1,144 @@ +// inspired by Java Kafka testcontainers' module +//https://github.com/testcontainers/testcontainers-java/blob/master/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java + +package canned + +import ( + "context" + "github.com/testcontainers/testcontainers-go" + "io/ioutil" + "os" +) + +const ( + clusterName = "kafka-cluster" + zookeeperPort = "2181" + kafkaBrokerPort = "9092" + kafkaClientPort = "9093" + zookeeperImage = "confluentinc/cp-zookeeper:5.2.1" + kafkaImage = "confluentinc/cp-kafka:5.2.1" +) + +type KafkaCluster struct { + kafkaContainer testcontainers.Container + zookeeperContainer testcontainers.Container +} + +// StartCluster starts kafka cluster +func (kc *KafkaCluster) StartCluster() { + ctx := context.Background() + + kc.zookeeperContainer.Start(ctx) + kc.kafkaContainer.Start(ctx) + kc.startKafka() +} + +// GetKafkaHost gets the kafka host:port so it can be accessed from outside the container +func (kc *KafkaCluster) GetKafkaHost() string { + ctx := context.Background() + host, err := kc.kafkaContainer.Host(ctx) + if err != nil { + panic(err) + } + port, err := kc.kafkaContainer.MappedPort(ctx, kafkaClientPort) + if err != nil { + panic(err) + } + + // returns the exposed kafka host:port + return host + ":" + port.Port() +} + +func (kc *KafkaCluster) startKafka() { + ctx := context.Background() + + kafkaStartFile, err := ioutil.TempFile("", "testcontainers_start.sh") + if err != nil { + panic(err) + } + defer os.Remove(kafkaStartFile.Name()) + + // needs to set KAFKA_ADVERTISED_LISTENERS with the exposed kafka port + exposedHost := kc.GetKafkaHost() + kafkaStartFile.WriteString("#!/bin/bash \n") + kafkaStartFile.WriteString("export KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://" + exposedHost + ",BROKER://kafka:" + kafkaBrokerPort + "'\n") + kafkaStartFile.WriteString(". /etc/confluent/docker/bash-config \n") + kafkaStartFile.WriteString("/etc/confluent/docker/configure \n") + kafkaStartFile.WriteString("/etc/confluent/docker/launch \n") + + err = kc.kafkaContainer.CopyFileToContainer(ctx, kafkaStartFile.Name(), "testcontainers_start.sh", 0700) + if err != nil { + panic(err) + } +} + +func NewKafkaCluster() *KafkaCluster { + ctx := context.Background() + + // creates a network, so kafka and zookeeper can communicate directly + network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ + NetworkRequest: testcontainers.NetworkRequest{Name: clusterName}, + }) + if err != nil { + panic(err) + } + + dockerNetwork := network.(*testcontainers.DockerNetwork) + + zookeeperContainer := createZookeeperContainer(dockerNetwork) + kafkaContainer := createKafkaContainer(dockerNetwork) + + return &KafkaCluster{ + zookeeperContainer: zookeeperContainer, + kafkaContainer: kafkaContainer, + } +} + +func createZookeeperContainer(network *testcontainers.DockerNetwork) testcontainers.Container { + ctx := context.Background() + + // creates the zookeeper container, but do not start it yet + zookeeperContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: zookeeperImage, + ExposedPorts: []string{zookeeperPort}, + Env: map[string]string{"ZOOKEEPER_CLIENT_PORT": zookeeperPort, "ZOOKEEPER_TICK_TIME": "2000"}, + Networks: []string{network.Name}, + NetworkAliases: map[string][]string{network.Name: {"zookeeper"}}, + }, + }) + if err != nil { + panic(err) + } + + return zookeeperContainer +} + +func createKafkaContainer(network *testcontainers.DockerNetwork) testcontainers.Container { + ctx := context.Background() + + // creates the kafka container, but do not start it yet + kafkaContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: kafkaImage, + ExposedPorts: []string{kafkaClientPort}, + Env: map[string]string{ + "KAFKA_BROKER_ID": "1", + "KAFKA_ZOOKEEPER_CONNECT": "zookeeper:" + zookeeperPort, + "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:" + kafkaClientPort + ",BROKER://0.0.0.0:" + kafkaBrokerPort, + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT", + "KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER", + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", + }, + Networks: []string{network.Name}, + NetworkAliases: map[string][]string{network.Name: {"kafka"}}, + // the container only starts when it finds and run /testcontainers_start.sh + Cmd: []string{"sh", "-c", "while [ ! -f /testcontainers_start.sh ]; do sleep 0.1; done; /testcontainers_start.sh"}, + }, + }) + if err != nil { + panic(err) + } + + return kafkaContainer +} diff --git a/canned/kafka_test.go b/canned/kafka_test.go new file mode 100644 index 0000000000..948f27cc8c --- /dev/null +++ b/canned/kafka_test.go @@ -0,0 +1,101 @@ +package canned_test + +import ( + "fmt" + "github.com/testcontainers/testcontainers-go/canned" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "reflect" + "testing" +) + +const ( + KAFKA_TOPIC = "myTopic" +) + +func TestKafkaConsumerAndProducerUsingTestContainer(t *testing.T) { + kafkaCluster := canned.NewKafkaCluster() + kafkaCluster.StartCluster() + kafkaServer := kafkaCluster.GetKafkaHost() + producedMessages := []string{"Trying", "out", "kafka", "with", "test", "containers"} + + produceKafkaMessages(kafkaServer, producedMessages) + consumedMessages := consumeKafkaMessages(kafkaServer) + + if !reflect.DeepEqual(producedMessages, consumedMessages) { + t.Fatalf("Consumed messages are not equal to produced messages. [%s] != [%s]", consumedMessages, producedMessages) + } +} + +func produceKafkaMessages(kafkaServer string, messages []string) { + kafkaProducer, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaServer, + }) + if err != nil { + panic(err) + } + defer kafkaProducer.Close() + + fmt.Printf("Producing messages into kafka...\n") + + topic := KAFKA_TOPIC + for _, word := range messages { + deliveryChan := make(chan kafka.Event) + + kafkaProducer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Value: []byte(word), + Key: []byte("key"), + }, deliveryChan) + + e := <-deliveryChan + m := e.(*kafka.Message) + + if m.TopicPartition.Error != nil { + fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) + } else { + fmt.Printf("Delivered message [%s] to topic %s [%d] at offset %v\n", + word, *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + } +} + +func consumeKafkaMessages(kafkaServer string) []string { + kafkaConsumer, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaServer, + "group.id": "myGroup", + "auto.offset.reset": "earliest", + }) + if err != nil { + panic(err) + } + err = kafkaConsumer.SubscribeTopics([]string{KAFKA_TOPIC}, nil) + if err != nil { + panic(err) + } + defer kafkaConsumer.Close() + + fmt.Printf("Consuming messages from kafka...\n") + + var consumedMessages []string + run := true + for run == true { + select { + default: + ev := kafkaConsumer.Poll(100) + if ev == nil { + continue + } + + switch e := ev.(type) { + case *kafka.Message: + fmt.Printf("Message on %s: %s\n", e.TopicPartition, string(e.Value)) + consumedMessages = append(consumedMessages, string(e.Value)) + default: + fmt.Printf("Consumed all messges. Stopping consumer.\n") + run = false + } + } + } + + return consumedMessages +} diff --git a/go.mod b/go.mod index 690270f2f4..9e19b33f62 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( github.com/Microsoft/hcsshim v0.8.16 // indirect github.com/cenkalti/backoff v2.2.1+incompatible + github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect github.com/docker/docker v20.10.8+incompatible github.com/docker/go-connections v0.4.0 github.com/go-redis/redis v6.15.9+incompatible @@ -16,6 +17,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 golang.org/x/sys v0.0.0-20210324051608-47abb6519492 + gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0 gopkg.in/yaml.v2 v2.4.0 gotest.tools v2.2.0+incompatible ) diff --git a/go.sum b/go.sum index 9f42210d22..74d7c8cce9 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= +github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM= +github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE= github.com/containerd/aufs v0.0.0-20201003224125-76a6863f2989/go.mod h1:AkGGQs9NM2vtYHaUen+NljV0/baGCAPELGm2q9ZXpWU= github.com/containerd/aufs v0.0.0-20210316121734-20793ff83c97/go.mod h1:kL5kd6KM5TzQjR79jljyi4olc1Vrx6XBlcyj3gNv2PU= @@ -820,6 +822,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0 h1:+RlmciBLDd/XwM1iudiG3HtCg45purnsOxEoY/+JZdQ= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=