diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b5e890c88..062b76c7d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,13 +14,8 @@ jobs: platform: [ubuntu-latest] env: - KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095 - TOXIPROXY_ADDR: http://localhost:8474 - KAFKA_INSTALL_ROOT: /home/runner/kafka - KAFKA_HOSTNAME: localhost DEBUG: true KAFKA_VERSION: ${{ matrix.kafka-version }} - KAFKA_SCALA_VERSION: 2.12 steps: - uses: actions/checkout@v1 @@ -48,16 +43,9 @@ jobs: run: | curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0 export REPOSITORY_ROOT=${GITHUB_WORKSPACE} - vagrant/install_cluster.sh - vagrant/boot_cluster.sh - vagrant/create_topics.sh - vagrant/run_java_producer.sh - name: Run test suite - run: make test + run: make test_functional - name: Run linter run: make lint - - - name: Teardown - run: vagrant/halt_cluster.sh diff --git a/.gitignore b/.gitignore index 6e362e4f2..eb4b19509 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ _testmain.go coverage.txt profile.out + +simplest-uncommitted-msg-0.1-jar-with-dependencies.jar + diff --git a/Makefile b/Makefile index c3b431a56..a05863480 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,11 @@ fmt: gofmt -s -l -w $(FILES) $(TESTS) lint: - golangci-lint run + GOFLAGS="-tags=functional" golangci-lint run test: $(GOTEST) ./... + +.PHONY: test_functional +test_functional: + $(GOTEST) -tags=functional ./... diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..25593fd3b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,134 @@ +version: '3.7' +services: + zookeeper-1: + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '1' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + zookeeper-2: + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '2' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + zookeeper-3: + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '3' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + kafka-1: + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '1' + KAFKA_BROKER_RACK: '1' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-2: + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '2' + KAFKA_BROKER_RACK: '2' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-3: + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '3' + KAFKA_BROKER_RACK: '3' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-4: + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '4' + KAFKA_BROKER_RACK: '4' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-5: + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '5' + KAFKA_BROKER_RACK: '5' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + toxiproxy: + image: 'shopify/toxiproxy:2.1.4' + ports: + # The tests themselves actually start the proies on these ports + - '29091:29091' + - '29092:29092' + - '29093:29093' + - '29094:29094' + - '29095:29095' + # This is the toxiproxy API port + - '8474:8474' diff --git a/functional_client_test.go b/functional_client_test.go index 2bf99d252..513b8ee9b 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -10,13 +12,13 @@ func TestFuncConnectionFailure(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - Proxies["kafka1"].Enabled = false + FunctionalTestEnv.Proxies["kafka1"].Enabled = false SaveProxy(t, "kafka1") config := NewConfig() config.Metadata.Retry.Max = 1 - _, err := NewClient([]string{kafkaBrokers[0]}, config) + _, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config) if err != ErrOutOfBrokers { t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err) } @@ -29,7 +31,7 @@ func TestFuncClientMetadata(t *testing.T) { config := NewConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 10 * time.Millisecond - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -70,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index ae376086d..4d71510a8 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -1,4 +1,4 @@ -// +build go1.9 +//+build functional package sarama @@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string { } func testFuncConsumerGroupFuzzySeed(topic string) error { - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { return err } @@ -245,7 +245,7 @@ func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxM config.Consumer.Offsets.Initial = OffsetOldest config.Consumer.Group.Rebalance.Timeout = 10 * time.Second - group, err := NewConsumerGroup(kafkaBrokers, groupID, config) + group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config) if err != nil { t.Fatal(err) return nil diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 8b31b45c5..aca9434db 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -16,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - consumer, err := NewConsumer(kafkaBrokers, nil) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -36,7 +38,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - p, err := NewSyncProducer(kafkaBrokers, nil) + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -47,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { t.Fatal(err) } - c, err := NewConsumer(kafkaBrokers, nil) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -143,7 +145,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { config.Consumer.IsolationLevel = ReadCommitted config.Version = V0_11_0_0 - consumer, err := NewConsumer(kafkaBrokers, config) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -205,7 +207,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi prodCfg.Net.MaxOpenRequests = 1 } - p, err := NewSyncProducer(kafkaBrokers, prodCfg) + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) if err != nil { t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) continue @@ -251,7 +253,7 @@ consumerVersionLoop: // message. consCfg := NewConfig() consCfg.Version = consVer - c, err := NewConsumer(kafkaBrokers, consCfg) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) if err != nil { t.Fatal(err) } diff --git a/functional_offset_manager_test.go b/functional_offset_manager_test.go index 436f35ef4..32e160aab 100644 --- a/functional_offset_manager_test.go +++ b/functional_offset_manager_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -9,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } diff --git a/functional_producer_test.go b/functional_producer_test.go index 9e153b0d9..a4ab5a1b3 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -60,7 +62,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) { config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 config.Producer.Return.Successes = true - producer, err := NewSyncProducer(kafkaBrokers, config) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -88,7 +90,7 @@ func TestFuncProducingToInvalidTopic(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - producer, err := NewSyncProducer(kafkaBrokers, nil) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -120,7 +122,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { config.Net.MaxOpenRequests = 1 config.Version = V0_11_0_0 - producer, err := NewSyncProducer(kafkaBrokers, config) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -138,7 +140,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } // break the brokers. - for proxyName, proxy := range Proxies { + for proxyName, proxy := range FunctionalTestEnv.Proxies { if !strings.Contains(proxyName, "kafka") { continue } @@ -159,7 +161,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } // Now bring the proxy back up - for proxyName, proxy := range Proxies { + for proxyName, proxy := range FunctionalTestEnv.Proxies { if !strings.Contains(proxyName, "kafka") { continue } @@ -186,7 +188,7 @@ func testProducingMessages(t *testing.T, config *Config) { defer teardownFunctionalTest(t) // Configure some latency in order to properly validate the request latency metric - for _, proxy := range Proxies { + for _, proxy := range FunctionalTestEnv.Proxies { if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil { t.Fatal("Unable to configure latency toxicity", err) } @@ -195,7 +197,7 @@ func testProducingMessages(t *testing.T, config *Config) { config.Producer.Return.Successes = true config.Consumer.Return.Errors = true - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -387,7 +389,7 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) }() } - producer, err := NewAsyncProducer(kafkaBrokers, conf) + producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf) if err != nil { b.Fatal(err) } diff --git a/functional_test.go b/functional_test.go index 778d9e055..b61a43daf 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1,10 +1,18 @@ +//+build functional + package sarama import ( + "context" + "fmt" + "io" "log" - "math/rand" "net" + "net/http" + "net/url" "os" + "os/exec" + "path/filepath" "strconv" "strings" "testing" @@ -14,66 +22,373 @@ import ( ) const ( - VagrantToxiproxy = "http://192.168.100.67:8474" - VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095" - VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185" + uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar" ) var ( - kafkaAvailable, kafkaRequired bool - kafkaBrokers []string + testTopicDetails = map[string]*TopicDetail{ + "test.1": { + NumPartitions: 1, + ReplicationFactor: 3, + }, + "test.4": { + NumPartitions: 4, + ReplicationFactor: 3, + }, + "test.64": { + NumPartitions: 64, + ReplicationFactor: 3, + }, + "uncommitted-topic-test-4": { + NumPartitions: 1, + ReplicationFactor: 3, + }, + } - proxyClient *toxiproxy.Client - Proxies map[string]*toxiproxy.Proxy + FunctionalTestEnv *testEnvironment ) -func init() { +func TestMain(m *testing.M) { + // Functional tests for Sarama + // + // You can either set TOXIPROXY_ADDR, which points at a toxiproxy address + // already set up with 21801-21805 bound to zookeeper and 29091-29095 + // bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try + // and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka + // cluster, with toxiproxy configured as above. + // + // In either case, the following topics will be deleted (if they exist) and + // then created/pre-seeded with data for the functional test run: + // * uncomitted-topic-test-4 + // * test.1 + // * test.4 + // * test.64 + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + ctx := context.Background() + var env testEnvironment + if os.Getenv("DEBUG") == "true" { Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } - seed := time.Now().UTC().UnixNano() - if tmp := os.Getenv("TEST_SEED"); tmp != "" { - seed, _ = strconv.ParseInt(tmp, 0, 64) + usingExisting, err := existingEnvironment(ctx, &env) + if err != nil { + panic(err) + } + if !usingExisting { + err := prepareDockerTestEnvironment(ctx, &env) + if err != nil { + _ = tearDownDockerTestEnvironment(ctx, &env) + panic(err) + } + defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck + } + if err := prepareTestTopics(ctx, &env); err != nil { + panic(err) + } + FunctionalTestEnv = &env + return m.Run() +} + +type testEnvironment struct { + ToxiproxyClient *toxiproxy.Client + Proxies map[string]*toxiproxy.Proxy + KafkaBrokerAddrs []string + KafkaVersion string +} + +func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { + Logger.Println("bringing up docker-based test environment") + + // Always (try to) tear down first. + if err := tearDownDockerTestEnvironment(ctx, env); err != nil { + return fmt.Errorf("failed to tear down existing env: %w", err) } - Logger.Println("Using random seed:", seed) - rand.Seed(seed) - proxyAddr := os.Getenv("TOXIPROXY_ADDR") - if proxyAddr == "" { - proxyAddr = VagrantToxiproxy + if version, ok := os.LookupEnv("KAFKA_VERSION"); ok { + env.KafkaVersion = version + } else { + // We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0. + env.KafkaVersion = "2.5.0" } - proxyClient = toxiproxy.NewClient(proxyAddr) - kafkaPeers := os.Getenv("KAFKA_PEERS") - if kafkaPeers == "" { - kafkaPeers = VagrantKafkaPeers + // the mapping of confluent platform docker image versions -> kafka versions can be + // found here: https://docs.confluent.io/current/installation/versions-interoperability.html + var confluentPlatformVersion string + switch env.KafkaVersion { + case "2.5.0": + confluentPlatformVersion = "5.5.0" + case "2.4.1": + confluentPlatformVersion = "5.4.2" + default: + return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion) + } + + c := exec.Command("docker-compose", "up", "-d") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion)) + err := c.Run() + if err != nil { + return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err) + } + + // Set up toxiproxy Proxies + env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474") + env.Proxies = map[string]*toxiproxy.Proxy{} + for i := 1; i <= 5; i++ { + proxyName := fmt.Sprintf("kafka%d", i) + proxy, err := env.ToxiproxyClient.CreateProxy( + proxyName, + fmt.Sprintf("0.0.0.0:%d", 29090+i), + fmt.Sprintf("kafka-%d:%d", i, 29090+i), + ) + if err != nil { + return fmt.Errorf("failed to create toxiproxy: %w", err) + } + env.Proxies[proxyName] = proxy + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i)) } - kafkaBrokers = strings.Split(kafkaPeers, ",") - if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil { - if err = c.Close(); err == nil { - kafkaAvailable = true + // Wait for the kafka broker to come up + allBrokersUp := false + for i := 0; i < 45 && !allBrokersUp; i++ { + Logger.Println("waiting for kafka brokers to come up") + time.Sleep(1 * time.Second) + config := NewConfig() + config.Version, err = ParseKafkaVersion(env.KafkaVersion) + if err != nil { + return err } + config.Net.DialTimeout = 1 * time.Second + config.Net.ReadTimeout = 1 * time.Second + config.Net.WriteTimeout = 1 * time.Second + config.ClientID = "sarama-tests" + brokersOk := make([]bool, len(env.KafkaBrokerAddrs)) + retryLoop: + for j, addr := range env.KafkaBrokerAddrs { + client, err := NewClient([]string{addr}, config) + if err != nil { + continue + } + err = client.RefreshMetadata() + if err != nil { + continue + } + brokers := client.Brokers() + if len(brokers) < 5 { + continue + } + for _, broker := range brokers { + err := broker.Open(client.Config()) + if err != nil { + continue retryLoop + } + connected, err := broker.Connected() + if err != nil || !connected { + continue retryLoop + } + } + brokersOk[j] = true + } + allBrokersUp = true + for _, u := range brokersOk { + allBrokersUp = allBrokersUp && u + } + } + if !allBrokersUp { + return fmt.Errorf("timed out waiting for broker to come up") } - kafkaRequired = os.Getenv("CI") != "" + return nil } -func checkKafkaAvailability(t testing.TB) { - if !kafkaAvailable { - if kafkaRequired { - t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) - } else { - t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) +func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) { + toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR") + if !ok { + return false, nil + } + toxiproxyURL, err := url.Parse(toxiproxyAddr) + if err != nil { + return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url") + } + toxiproxyHost := toxiproxyURL.Hostname() + + env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr) + for i := 1; i <= 5; i++ { + proxyName := fmt.Sprintf("kafka%d", i) + proxy, err := env.ToxiproxyClient.Proxy(proxyName) + if err != nil { + return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err) + } + env.Proxies[proxyName] = proxy + // get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port" + _, proxyPort, err := net.SplitHostPort(proxy.Listen) + if err != nil { + return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err) } + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort)) + } + + env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION") + if !ok { + return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR") } + return true, nil +} + +func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { + c := exec.Command("docker-compose", "down", "--volumes") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + downErr := c.Run() + + c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + rmErr := c.Run() + if downErr != nil { + return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr) + } + if rmErr != nil { + return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr) + } + return nil +} + +func prepareTestTopics(ctx context.Context, env *testEnvironment) error { + Logger.Println("creating test topics") + var testTopicNames []string + for topic := range testTopicDetails { + testTopicNames = append(testTopicNames, topic) + } + + Logger.Println("Creating topics") + config := NewConfig() + config.Metadata.Retry.Max = 5 + config.Metadata.Retry.Backoff = 10 * time.Second + config.ClientID = "sarama-tests" + var err error + config.Version, err = ParseKafkaVersion(env.KafkaVersion) + if err != nil { + return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err) + } + + client, err := NewClient(env.KafkaBrokerAddrs, config) + if err != nil { + return fmt.Errorf("failed to connect to kafka: %w", err) + } + defer client.Close() + + controller, err := client.Controller() + if err != nil { + return fmt.Errorf("failed to connect to kafka controller: %w", err) + } + defer controller.Close() + + // Start by deleting the test topics (if they already exist) + deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{ + Topics: testTopicNames, + Timeout: 30 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to delete test topics: %w", err) + } + for topic, topicErr := range deleteRes.TopicErrorCodes { + if !isTopicNotExistsErrorOrOk(topicErr) { + return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr) + } + } + + // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed + // synchronously + var topicsOk bool + for i := 0; i < 20 && !topicsOk; i++ { + time.Sleep(1 * time.Second) + md, err := controller.GetMetadata(&MetadataRequest{ + Topics: testTopicNames, + }) + if err != nil { + return fmt.Errorf("failed to get metadata for test topics: %w", err) + } + + topicsOk = true + for _, topicsMd := range md.Topics { + if !isTopicNotExistsErrorOrOk(topicsMd.Err) { + topicsOk = false + } + } + } + if !topicsOk { + return fmt.Errorf("timed out waiting for test topics to be gone") + } + + // now create the topics empty + createRes, err := controller.CreateTopics(&CreateTopicsRequest{ + TopicDetails: testTopicDetails, + Timeout: 30 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to create test topics: %w", err) + } + for topic, topicErr := range createRes.TopicErrors { + if !isTopicExistsErrorOrOk(topicErr.Err) { + return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr) + } + } + + // This is kind of gross, but we don't actually have support for doing transactional publishing + // with sarama, so we need to use a java-based tool to publish uncomitted messages to + // the uncommitted-topic-test-4 topic + jarName := filepath.Base(uncomittedMsgJar) + if _, err := os.Stat(jarName); err != nil { + Logger.Printf("Downloading %s\n", uncomittedMsgJar) + req, err := http.NewRequest("GET", uncomittedMsgJar, nil) + if err != nil { + return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err) + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) + } + defer res.Body.Close() + jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + if err != nil { + return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) + } + defer jarFile.Close() + + _, err = io.Copy(jarFile, res.Body) + if err != nil { + return fmt.Errorf("failed writing the uncomitted msg jar: %w", err) + } + } + + c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + err = c.Run() + if err != nil { + return fmt.Errorf("failed running uncomitted msg jar: %w", err) + } + return nil +} + +func isTopicNotExistsErrorOrOk(err KError) bool { + return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError +} + +func isTopicExistsErrorOrOk(err KError) bool { + return err == ErrTopicAlreadyExists || err == ErrNoError } func checkKafkaVersion(t testing.TB, requiredVersion string) { - kafkaVersion := os.Getenv("KAFKA_VERSION") + kafkaVersion := FunctionalTestEnv.KafkaVersion if kafkaVersion == "" { - t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion) + t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion) } else { available := parseKafkaVersion(kafkaVersion) required := parseKafkaVersion(requiredVersion) @@ -84,30 +399,19 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) { } func resetProxies(t testing.TB) { - if err := proxyClient.ResetState(); err != nil { + if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil { t.Error(err) } - Proxies = nil -} - -func fetchProxies(t testing.TB) { - var err error - Proxies, err = proxyClient.Proxies() - if err != nil { - t.Fatal(err) - } } func SaveProxy(t *testing.T, px string) { - if err := Proxies[px].Save(); err != nil { + if err := FunctionalTestEnv.Proxies[px].Save(); err != nil { t.Fatal(err) } } func setupFunctionalTest(t testing.TB) { - checkKafkaAvailability(t) resetProxies(t) - fetchProxies(t) } func teardownFunctionalTest(t testing.TB) { diff --git a/vagrant/create_topics.sh b/vagrant/create_topics.sh deleted file mode 100755 index 959d3a053..000000000 --- a/vagrant/create_topics.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh - -set -ex - -cd ${KAFKA_INSTALL_ROOT}/kafka-9092 -bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic test.1 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 4 --replication-factor 3 --topic test.4 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 64 --replication-factor 3 --topic test.64 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic uncommitted-topic-test-4 --zookeeper localhost:2181 \ No newline at end of file diff --git a/vagrant/halt_cluster.sh b/vagrant/halt_cluster.sh deleted file mode 100755 index e671c4812..000000000 --- a/vagrant/halt_cluster.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash - -# If the functional tests failed (or some other task) then -# we might want to look into the broker logs -if [ "$TRAVIS_TEST_RESULT" = "1" ]; then - echo "Dumping Kafka brokers server.log:" - for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - sed -e "s/^/kafka-${KAFKA_PORT} /" ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/logs/server.log{.*,} - done -fi - -set -ex - -for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/kafka-server-stop.sh -done - -for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/zookeeper-server-stop.sh -done - -killall toxiproxy diff --git a/vagrant/install_cluster.sh b/vagrant/install_cluster.sh deleted file mode 100755 index aa22261e4..000000000 --- a/vagrant/install_cluster.sh +++ /dev/null @@ -1,86 +0,0 @@ -#!/bin/sh - -set -ex - -TOXIPROXY_VERSION=2.1.4 - -mkdir -p ${KAFKA_INSTALL_ROOT} -if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then - wget --quiet https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -fi -if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then - wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} - chmod +x ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} -fi -rm -f ${KAFKA_INSTALL_ROOT}/toxiproxy -ln -s ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ${KAFKA_INSTALL_ROOT}/toxiproxy - -for i in 1 2 3 4 5; do - ZK_PORT=$((i + 2180)) - ZK_PORT_REAL=$((i + 21800)) - KAFKA_PORT=$((i + 9090)) - KAFKA_PORT_REAL=$((i + 29090)) - - # unpack kafka - mkdir -p ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} - tar xzf ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -C ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} --strip-components 1 - - # broker configuration - mkdir -p "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data" - - # Append to default server.properties with a small number of customisations - printf "\n\n" >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties" - cat << EOF >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties" -############################# Sarama Test Cluster ############################# - -broker.id=${KAFKA_PORT} -broker.rack=${i} - -# Listen on "real" port -listeners=PLAINTEXT://:${KAFKA_PORT_REAL} -# Advertise Toxiproxy port -advertised.listeners=PLAINTEXT://${KAFKA_HOSTNAME}:${KAFKA_PORT} - -# Connect to Zookeeper via Toxiproxy port -zookeeper.connect=127.0.0.1:${ZK_PORT} - -# Data directory -log.dirs="${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data" - -# Create new topics with a replication factor of 2 so failover can be tested -# more easily. -default.replication.factor=2 - -# Turn on log.retention.bytes to avoid filling up the VM's disk -log.retention.bytes=268435456 -log.segment.bytes=268435456 - -# Enable topic deletion and disable auto-creation -delete.topic.enable=true -auto.create.topics.enable=false - -# Lower the zookeeper timeouts so we don't have to wait forever for a node -# to die when we use toxiproxy to kill its zookeeper connection -zookeeper.session.timeout.ms=3000 -zookeeper.connection.timeout.ms=3000 - -# Disable broker ID length constraint -reserved.broker.max.id=10000 - -# Permit follower fetching (KIP-392) -replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector - -############################################################################### -EOF - - # zookeeper configuration - cp ${REPOSITORY_ROOT}/vagrant/zookeeper.properties ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/ - sed -i s/KAFKAID/${KAFKA_PORT}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - sed -i s/ZK_PORT/${ZK_PORT_REAL}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - - ZK_DATADIR="${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}" - mkdir -p ${ZK_DATADIR} - sed -i s#ZK_DATADIR#${ZK_DATADIR}#g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - - echo $i > ${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}/myid -done diff --git a/vagrant/kafka.conf b/vagrant/kafka.conf deleted file mode 100644 index 25101df5a..000000000 --- a/vagrant/kafka.conf +++ /dev/null @@ -1,9 +0,0 @@ -start on started zookeeper-ZK_PORT -stop on stopping zookeeper-ZK_PORT - -# Use a script instead of exec (using env stanza leaks KAFKA_HEAP_OPTS from zookeeper) -script - sleep 2 - export KAFKA_HEAP_OPTS="-Xmx320m" - exec /opt/kafka-KAFKAID/bin/kafka-server-start.sh /opt/kafka-KAFKAID/config/server.properties -end script diff --git a/vagrant/provision.sh b/vagrant/provision.sh deleted file mode 100755 index 7f10de74a..000000000 --- a/vagrant/provision.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/sh - -set -ex - -apt-get update -yes | apt-get install default-jre - -export KAFKA_INSTALL_ROOT=/opt -export KAFKA_HOSTNAME=192.168.100.67 -export KAFKA_VERSION=1.0.2 -export KAFKA_SCALA_VERSION=2.11 -export REPOSITORY_ROOT=/vagrant - -sh /vagrant/vagrant/install_cluster.sh -sh /vagrant/vagrant/setup_services.sh -sh /vagrant/vagrant/create_topics.sh -sh /vagrant/vagrant/run_java_producer.sh diff --git a/vagrant/run_java_producer.sh b/vagrant/run_java_producer.sh deleted file mode 100755 index 5851b7484..000000000 --- a/vagrant/run_java_producer.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh - -set -ex - -wget https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -java -jar simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -b ${KAFKA_HOSTNAME}:9092 -c 4 \ No newline at end of file diff --git a/vagrant/run_toxiproxy.sh b/vagrant/run_toxiproxy.sh deleted file mode 100755 index e52c00e7b..000000000 --- a/vagrant/run_toxiproxy.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/sh - -set -ex - -${KAFKA_INSTALL_ROOT}/toxiproxy -port 8474 -host 0.0.0.0 & -PID=$! - -while ! nc -q 1 localhost 8474