Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use docker-compose to run the functional tests #1695

Merged
merged 7 commits into from Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 1 addition & 13 deletions .github/workflows/ci.yml
Expand Up @@ -14,13 +14,8 @@ jobs:
platform: [ubuntu-latest]

env:
KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095
TOXIPROXY_ADDR: http://localhost:8474
KAFKA_INSTALL_ROOT: /home/runner/kafka
KAFKA_HOSTNAME: localhost
DEBUG: true
KAFKA_VERSION: ${{ matrix.kafka-version }}
KAFKA_SCALA_VERSION: 2.12

steps:
- uses: actions/checkout@v1
Expand Down Expand Up @@ -48,16 +43,9 @@ jobs:
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.23.6
export REPOSITORY_ROOT=${GITHUB_WORKSPACE}
vagrant/install_cluster.sh
vagrant/boot_cluster.sh
vagrant/create_topics.sh
vagrant/run_java_producer.sh

- name: Run test suite
run: make test
run: make test_functional

- name: Run linter
run: make lint

- name: Teardown
run: vagrant/halt_cluster.sh
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -25,3 +25,6 @@ _testmain.go

coverage.txt
profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar
d1egoaz marked this conversation as resolved.
Show resolved Hide resolved

6 changes: 5 additions & 1 deletion Makefile
Expand Up @@ -21,7 +21,11 @@ fmt:
gofmt -s -l -w $(FILES) $(TESTS)

lint:
golangci-lint run
GOFLAGS="-tags=functional" golangci-lint run

test:
$(GOTEST) ./...

.PHONY: test_functional
test_functional:
$(GOTEST) -tags=functional ./...
134 changes: 134 additions & 0 deletions docker-compose.yml
@@ -0,0 +1,134 @@
version: '3.7'
services:
zookeeper-1:
image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
d1egoaz marked this conversation as resolved.
Show resolved Hide resolved
restart: always
environment:
ZOOKEEPER_SERVER_ID: '1'
ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_PEER_PORT: '2888'
ZOOKEEPER_LEADER_PORT: '3888'
ZOOKEEPER_INIT_LIMIT: '10'
ZOOKEEPER_SYNC_LIMIT: '5'
ZOOKEEPER_MAX_CLIENT_CONNS: '0'
zookeeper-2:
image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
ZOOKEEPER_SERVER_ID: '2'
ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_PEER_PORT: '2888'
ZOOKEEPER_LEADER_PORT: '3888'
ZOOKEEPER_INIT_LIMIT: '10'
ZOOKEEPER_SYNC_LIMIT: '5'
ZOOKEEPER_MAX_CLIENT_CONNS: '0'
zookeeper-3:
image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
ZOOKEEPER_SERVER_ID: '3'
ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_PEER_PORT: '2888'
ZOOKEEPER_LEADER_PORT: '3888'
ZOOKEEPER_INIT_LIMIT: '10'
ZOOKEEPER_SYNC_LIMIT: '5'
ZOOKEEPER_MAX_CLIENT_CONNS: '0'
kafka-1:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '1'
KAFKA_BROKER_RACK: '1'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-2:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '2'
KAFKA_BROKER_RACK: '2'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-3:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '3'
KAFKA_BROKER_RACK: '3'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-4:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '4'
KAFKA_BROKER_RACK: '4'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-5:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '5'
KAFKA_BROKER_RACK: '5'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
toxiproxy:
image: 'shopify/toxiproxy:2.1.4'
ports:
# The tests themselves actually start the proies on these ports
- '29091:29091'
- '29092:29092'
- '29093:29093'
- '29094:29094'
- '29095:29095'
# This is the toxiproxy API port
- '8474:8474'
10 changes: 6 additions & 4 deletions functional_client_test.go
@@ -1,3 +1,5 @@
//+build functional

package sarama

import (
Expand All @@ -10,13 +12,13 @@ func TestFuncConnectionFailure(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Proxies["kafka1"].Enabled = false
FunctionalTestEnv.Proxies["kafka1"].Enabled = false
SaveProxy(t, "kafka1")

config := NewConfig()
config.Metadata.Retry.Max = 1

_, err := NewClient([]string{kafkaBrokers[0]}, config)
_, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config)
if err != ErrOutOfBrokers {
t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
}
Expand All @@ -29,7 +31,7 @@ func TestFuncClientMetadata(t *testing.T) {
config := NewConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 10 * time.Millisecond
client, err := NewClient(kafkaBrokers, config)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -70,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions functional_consumer_group_test.go
@@ -1,4 +1,4 @@
// +build go1.9
//+build functional

package sarama

Expand Down Expand Up @@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string {
}

func testFuncConsumerGroupFuzzySeed(topic string) error {
client, err := NewClient(kafkaBrokers, nil)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxM
config.Consumer.Offsets.Initial = OffsetOldest
config.Consumer.Group.Rebalance.Timeout = 10 * time.Second

group, err := NewConsumerGroup(kafkaBrokers, groupID, config)
group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config)
if err != nil {
t.Fatal(err)
return nil
Expand Down
14 changes: 8 additions & 6 deletions functional_consumer_test.go
@@ -1,3 +1,5 @@
//+build functional

package sarama

import (
Expand All @@ -16,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

consumer, err := NewConsumer(kafkaBrokers, nil)
consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -36,7 +38,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

p, err := NewSyncProducer(kafkaBrokers, nil)
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -47,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
t.Fatal(err)
}

c, err := NewConsumer(kafkaBrokers, nil)
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -143,7 +145,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
config.Consumer.IsolationLevel = ReadCommitted
config.Version = V0_11_0_0

consumer, err := NewConsumer(kafkaBrokers, config)
consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -205,7 +207,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
prodCfg.Net.MaxOpenRequests = 1
}

p, err := NewSyncProducer(kafkaBrokers, prodCfg)
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
continue
Expand Down Expand Up @@ -251,7 +253,7 @@ consumerVersionLoop:
// message.
consCfg := NewConfig()
consCfg.Version = consVer
c, err := NewConsumer(kafkaBrokers, consCfg)
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion functional_offset_manager_test.go
@@ -1,3 +1,5 @@
//+build functional

package sarama

import (
Expand All @@ -9,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand Down