From f33a0afab27e9024c34d4d3ef4c37f8aea4d9f56 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sat, 9 May 2020 18:11:56 +1000 Subject: [PATCH 1/7] Add docker-composed based functional test harness * Functional tests are skipped except when the `functional` build tag is passed to the go test command (i.e. go test -tags=functional) * If TOXIPROXY_ADDR is not set when the functional tests are being run, it will use docker-compose to automatically spin up a zookeeper/kafka/toxiproxy environment suitab le for running the tests. This requies a working Docker and for the docker-compose command line tool to be installed. * If TOXIPROXY_ADDR and KAFKA_VERSION are set, then the tests will not spin up any docker infrastructure and will instead rely on whatever kafka broker is behind the specified toxiproxy. --- .gitignore | 3 + docker-compose.yml | 134 +++++++++++ functional_client_test.go | 10 +- functional_consumer_group_test.go | 6 +- functional_consumer_test.go | 14 +- functional_offset_manager_test.go | 4 +- functional_producer_test.go | 18 +- functional_test.go | 386 ++++++++++++++++++++++++++---- 8 files changed, 504 insertions(+), 71 deletions(-) create mode 100644 docker-compose.yml diff --git a/.gitignore b/.gitignore index 6e362e4f2..eb4b19509 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ _testmain.go coverage.txt profile.out + +simplest-uncommitted-msg-0.1-jar-with-dependencies.jar + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..03425a6f9 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,134 @@ +version: '3.7' +services: + zookeeper-1: + image: 'confluentinc/cp-zookeeper:5.5.0' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '1' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + zookeeper-2: + image: 'confluentinc/cp-zookeeper:5.5.0' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '2' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + zookeeper-3: + image: 'confluentinc/cp-zookeeper:5.5.0' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '3' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + kafka-1: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '1' + KAFKA_BROKER_RACK: '1' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-2: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '2' + KAFKA_BROKER_RACK: '2' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-3: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '3' + KAFKA_BROKER_RACK: '3' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-4: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '4' + KAFKA_BROKER_RACK: '4' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-5: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '5' + KAFKA_BROKER_RACK: '5' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + toxiproxy: + image: 'shopify/toxiproxy:2.1.4' + ports: + # The tests themselves actually start the proies on these ports + - '29091:29091' + - '29092:29092' + - '29093:29093' + - '29094:29094' + - '29095:29095' + # This is the toxiproxy API port + - '8474:8474' diff --git a/functional_client_test.go b/functional_client_test.go index 2bf99d252..513b8ee9b 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -10,13 +12,13 @@ func TestFuncConnectionFailure(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - Proxies["kafka1"].Enabled = false + FunctionalTestEnv.Proxies["kafka1"].Enabled = false SaveProxy(t, "kafka1") config := NewConfig() config.Metadata.Retry.Max = 1 - _, err := NewClient([]string{kafkaBrokers[0]}, config) + _, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config) if err != ErrOutOfBrokers { t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err) } @@ -29,7 +31,7 @@ func TestFuncClientMetadata(t *testing.T) { config := NewConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 10 * time.Millisecond - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -70,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index ae376086d..4d71510a8 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -1,4 +1,4 @@ -// +build go1.9 +//+build functional package sarama @@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string { } func testFuncConsumerGroupFuzzySeed(topic string) error { - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { return err } @@ -245,7 +245,7 @@ func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxM config.Consumer.Offsets.Initial = OffsetOldest config.Consumer.Group.Rebalance.Timeout = 10 * time.Second - group, err := NewConsumerGroup(kafkaBrokers, groupID, config) + group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config) if err != nil { t.Fatal(err) return nil diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 8b31b45c5..aca9434db 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -16,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - consumer, err := NewConsumer(kafkaBrokers, nil) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -36,7 +38,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - p, err := NewSyncProducer(kafkaBrokers, nil) + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -47,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { t.Fatal(err) } - c, err := NewConsumer(kafkaBrokers, nil) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -143,7 +145,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { config.Consumer.IsolationLevel = ReadCommitted config.Version = V0_11_0_0 - consumer, err := NewConsumer(kafkaBrokers, config) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -205,7 +207,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi prodCfg.Net.MaxOpenRequests = 1 } - p, err := NewSyncProducer(kafkaBrokers, prodCfg) + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) if err != nil { t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) continue @@ -251,7 +253,7 @@ consumerVersionLoop: // message. consCfg := NewConfig() consCfg.Version = consVer - c, err := NewConsumer(kafkaBrokers, consCfg) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) if err != nil { t.Fatal(err) } diff --git a/functional_offset_manager_test.go b/functional_offset_manager_test.go index 436f35ef4..32e160aab 100644 --- a/functional_offset_manager_test.go +++ b/functional_offset_manager_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -9,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } diff --git a/functional_producer_test.go b/functional_producer_test.go index e589a8eb8..0f86368d3 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -53,7 +55,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) { config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 config.Producer.Return.Successes = true - producer, err := NewSyncProducer(kafkaBrokers, config) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -81,7 +83,7 @@ func TestFuncProducingToInvalidTopic(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - producer, err := NewSyncProducer(kafkaBrokers, nil) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -113,7 +115,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { config.Net.MaxOpenRequests = 1 config.Version = V0_11_0_0 - producer, err := NewSyncProducer(kafkaBrokers, config) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -131,7 +133,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } // break the brokers. - for proxyName, proxy := range Proxies { + for proxyName, proxy := range FunctionalTestEnv.Proxies { if !strings.Contains(proxyName, "kafka") { continue } @@ -152,7 +154,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } // Now bring the proxy back up - for proxyName, proxy := range Proxies { + for proxyName, proxy := range FunctionalTestEnv.Proxies { if !strings.Contains(proxyName, "kafka") { continue } @@ -179,7 +181,7 @@ func testProducingMessages(t *testing.T, config *Config) { defer teardownFunctionalTest(t) // Configure some latency in order to properly validate the request latency metric - for _, proxy := range Proxies { + for _, proxy := range FunctionalTestEnv.Proxies { if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil { t.Fatal("Unable to configure latency toxicity", err) } @@ -188,7 +190,7 @@ func testProducingMessages(t *testing.T, config *Config) { config.Producer.Return.Successes = true config.Consumer.Return.Errors = true - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -380,7 +382,7 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) }() } - producer, err := NewAsyncProducer(kafkaBrokers, conf) + producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf) if err != nil { b.Fatal(err) } diff --git a/functional_test.go b/functional_test.go index 778d9e055..e56182be0 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1,79 +1,378 @@ +//+build functional + package sarama import ( + "context" + "fmt" + toxiproxy "github.com/Shopify/toxiproxy/client" + "io" "log" - "math/rand" "net" + "net/http" + "net/url" "os" + "os/exec" + "path/filepath" "strconv" "strings" "testing" "time" - - toxiproxy "github.com/Shopify/toxiproxy/client" ) const ( - VagrantToxiproxy = "http://192.168.100.67:8474" - VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095" - VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185" + uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar" ) var ( - kafkaAvailable, kafkaRequired bool - kafkaBrokers []string + testTopicDetails = map[string]*TopicDetail{ + "test.1": { + NumPartitions: 1, + ReplicationFactor: 3, + }, + "test.4": { + NumPartitions: 4, + ReplicationFactor: 3, + }, + "test.64": { + NumPartitions: 64, + ReplicationFactor: 3, + }, + "uncommitted-topic-test-4": { + NumPartitions: 1, + ReplicationFactor: 3, + }, + } - proxyClient *toxiproxy.Client - Proxies map[string]*toxiproxy.Proxy + FunctionalTestEnv *testEnvironment ) -func init() { +func TestMain(m *testing.M) { + // Functional tests for Sarama + // + // You can either set TOXIPROXY_ADDR, which points at a toxiproxy address + // already set up with 21801-21805 bound to zookeeper and 29091-29095 + // bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try + // and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka + // cluster, with toxiproxy configured as above. + // + // In either case, the following topics will be deleted (if they exist) and + // then created/pre-seeded with data for the functional test run: + // * uncomitted-topic-test-4 + // * test.1 + // * test.4 + // * test.64 + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + ctx := context.Background() + var env testEnvironment + if os.Getenv("DEBUG") == "true" { Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } - seed := time.Now().UTC().UnixNano() - if tmp := os.Getenv("TEST_SEED"); tmp != "" { - seed, _ = strconv.ParseInt(tmp, 0, 64) + usingExisting, err := existingEnvironment(ctx, &env) + if err != nil { + panic(err) + } + if !usingExisting { + err := prepareDockerTestEnvironment(ctx, &env) + if err != nil { + tearDownDockerTestEnvironment(ctx, &env) + panic(err) + } + defer tearDownDockerTestEnvironment(ctx, &env) } - Logger.Println("Using random seed:", seed) - rand.Seed(seed) + if err := prepareTestTopics(ctx, &env); err != nil { + panic(err) + } + FunctionalTestEnv = &env + return m.Run() +} + +type testEnvironment struct { + ToxiproxyClient *toxiproxy.Client + Proxies map[string]*toxiproxy.Proxy + KafkaBrokerAddrs []string + KafkaVersion string +} + +func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { + Logger.Println("bringing up docker-based test environment") - proxyAddr := os.Getenv("TOXIPROXY_ADDR") - if proxyAddr == "" { - proxyAddr = VagrantToxiproxy + // Always (try to) tear down first. + if err := tearDownDockerTestEnvironment(ctx, env); err != nil { + return fmt.Errorf("failed to tear down existing env: %w", err) } - proxyClient = toxiproxy.NewClient(proxyAddr) - kafkaPeers := os.Getenv("KAFKA_PEERS") - if kafkaPeers == "" { - kafkaPeers = VagrantKafkaPeers + c := exec.Command("docker-compose", "up", "-d") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + err := c.Run() + if err != nil { + return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err) } - kafkaBrokers = strings.Split(kafkaPeers, ",") - if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil { - if err = c.Close(); err == nil { - kafkaAvailable = true + // Set up toxiproxy Proxies + env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474") + env.Proxies = map[string]*toxiproxy.Proxy{} + for i := 1; i <= 5; i++ { + proxyName := fmt.Sprintf("kafka%d", i) + proxy, err := env.ToxiproxyClient.CreateProxy( + proxyName, + fmt.Sprintf("0.0.0.0:%d", 29090 + i), + fmt.Sprintf("kafka-%d:%d", i, 29090 + i), + ) + if err != nil { + return fmt.Errorf("failed to create toxiproxy: %w", err) } + env.Proxies[proxyName] = proxy + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090 + i)) } - kafkaRequired = os.Getenv("CI") != "" + // the mapping of confluent platform docker image vesions -> kafka versions can be + // found here: https://docs.confluent.io/current/installation/versions-interoperability.html + // We have cp-5.5.0 in the docker-compose file, so that's kafka 2.5.0. + env.KafkaVersion = "2.5.0" + + // Wait for the kafka broker to come up + allBrokersUp := false + for i := 0; i < 45 && !allBrokersUp; i++ { + Logger.Println("waiting for kafka brokers to come up") + time.Sleep(1 * time.Second) + config := NewConfig() + config.Version, err = ParseKafkaVersion(env.KafkaVersion) + if err != nil { + return err + } + config.Net.DialTimeout = 1 * time.Second + config.Net.ReadTimeout = 1 * time.Second + config.Net.WriteTimeout = 1 * time.Second + config.ClientID = "sarama-tests" + brokersOk := make([]bool, len(env.KafkaBrokerAddrs)) + retryLoop: + for j, addr := range env.KafkaBrokerAddrs { + client, err := NewClient([]string{addr},config) + if err != nil { + continue + } + err = client.RefreshMetadata() + if err != nil { + continue + } + brokers := client.Brokers() + if len(brokers) < 5 { + continue + } + for _, broker := range brokers { + err := broker.Open(client.Config()) + if err != nil { + continue retryLoop + } + connected, err := broker.Connected() + if err != nil || !connected { + continue retryLoop + } + } + brokersOk[j] = true + } + allBrokersUp = true + for _, u := range brokersOk { + allBrokersUp = allBrokersUp && u + } + } + if !allBrokersUp { + return fmt.Errorf("timed out waiting for broker to come up") + } + + return nil } -func checkKafkaAvailability(t testing.TB) { - if !kafkaAvailable { - if kafkaRequired { - t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) - } else { - t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) +func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) { + toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR") + if !ok { + return false, nil + } + toxiproxyURL, err := url.Parse(toxiproxyAddr) + if err != nil { + return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url") + } + toxiproxyHost := toxiproxyURL.Hostname() + + env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr) + for i := 1; i <= 5; i++ { + proxyName := fmt.Sprintf("kafka%d", i) + proxy, err := env.ToxiproxyClient.Proxy(proxyName) + if err != nil { + return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err) + } + env.Proxies[proxyName] = proxy + // get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port" + _, proxyPort, err := net.SplitHostPort(proxy.Listen) + if err != nil { + return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err) + } + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort)) + } + + env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION") + if !ok { + return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR") + } + return true, nil +} + +func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { + c := exec.Command("docker-compose", "down", "--volumes") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + downErr := c.Run() + + c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + rmErr := c.Run() + if downErr != nil { + return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr) + } + if rmErr != nil { + return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr) + } + return nil +} + +func prepareTestTopics(ctx context.Context, env *testEnvironment) error { + Logger.Println("creating test topics") + var testTopicNames []string + for topic, _ := range testTopicDetails { + testTopicNames = append(testTopicNames, topic) + } + + Logger.Println("Creating topics") + config := NewConfig() + config.Metadata.Retry.Max = 5 + config.Metadata.Retry.Backoff = 10 * time.Second + config.ClientID = "sarama-tests" + var err error + config.Version, err = ParseKafkaVersion(env.KafkaVersion) + if err != nil { + return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err) + } + + client, err := NewClient(env.KafkaBrokerAddrs, config) + if err != nil { + return fmt.Errorf("failed to connect to kafka: %w", err) + } + defer client.Close() + + controller, err := client.Controller() + if err != nil { + return fmt.Errorf("failed to connect to kafka controller: %w", err) + } + defer controller.Close() + + // Start by deleting the test topics (if they already exist) + deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{ + Topics: testTopicNames, + Timeout: 30 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to delete test topics: %w", err) + } + for topic, topicErr := range deleteRes.TopicErrorCodes { + if !isTopicNotExistsErrorOrOk(topicErr) { + return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr) + } + } + + // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed + // synchronously + var topicsOk bool + for i := 0; i < 20 && !topicsOk; i++ { + time.Sleep(1 * time.Second) + md, err := controller.GetMetadata(&MetadataRequest{ + Topics: testTopicNames, + }) + if err != nil { + return fmt.Errorf("failed to get metadata for test topics: %w", err) + } + + topicsOk = true + for _, topicsMd := range md.Topics { + if !isTopicNotExistsErrorOrOk(topicsMd.Err) { + topicsOk = false + } } } + if !topicsOk { + return fmt.Errorf("timed out waiting for test topics to be gone") + } + + // now create the topics empty + createRes, err := controller.CreateTopics(&CreateTopicsRequest{ + TopicDetails: testTopicDetails, + Timeout: 30 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to create test topics: %w", err) + } + for topic, topicErr := range createRes.TopicErrors { + if !isTopicExistsErrorOrOk(topicErr.Err) { + return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr) + } + } + + // This is kind of gross, but we don't actually have support for doing transactional publishing + // with sarama, so we need to use a java-based tool to publish uncomitted messages to + // the uncommitted-topic-test-4 topic + jarName := filepath.Base(uncomittedMsgJar) + if _, err := os.Stat(jarName); err != nil { + Logger.Printf("Downloading %s\n", uncomittedMsgJar) + req, err := http.NewRequest("GET", uncomittedMsgJar, nil) + if err != nil { + return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err) + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) + } + defer res.Body.Close() + jarFile, err := os.OpenFile(jarName, os.O_WRONLY | os.O_TRUNC | os.O_CREATE, 0644) + if err != nil { + return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) + } + defer jarFile.Close() + + _, err = io.Copy(jarFile, res.Body) + if err != nil { + return fmt.Errorf("failed writing the uncomitted msg jar: %w", err) + } + } + + c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + err = c.Run() + if err != nil { + return fmt.Errorf("failed running uncomitted msg jar: %w", err) + } + return nil +} + +func isTopicNotExistsErrorOrOk(err KError) bool { + return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError +} + +func isTopicExistsErrorOrOk(err KError) bool { + return err == ErrTopicAlreadyExists || err == ErrNoError } func checkKafkaVersion(t testing.TB, requiredVersion string) { - kafkaVersion := os.Getenv("KAFKA_VERSION") + kafkaVersion := FunctionalTestEnv.KafkaVersion if kafkaVersion == "" { - t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion) + t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion) } else { available := parseKafkaVersion(kafkaVersion) required := parseKafkaVersion(requiredVersion) @@ -84,30 +383,19 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) { } func resetProxies(t testing.TB) { - if err := proxyClient.ResetState(); err != nil { + if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil { t.Error(err) } - Proxies = nil -} - -func fetchProxies(t testing.TB) { - var err error - Proxies, err = proxyClient.Proxies() - if err != nil { - t.Fatal(err) - } } func SaveProxy(t *testing.T, px string) { - if err := Proxies[px].Save(); err != nil { + if err := FunctionalTestEnv.Proxies[px].Save(); err != nil { t.Fatal(err) } } func setupFunctionalTest(t testing.TB) { - checkKafkaAvailability(t) resetProxies(t) - fetchProxies(t) } func teardownFunctionalTest(t testing.TB) { From 75ff946b5267463981395104c6988cb6b3a764d7 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 16:02:36 +1000 Subject: [PATCH 2/7] Map confluent platform -> kafka version --- Makefile | 4 ++++ docker-compose.yml | 16 ++++++++-------- functional_test.go | 21 +++++++++++++++++++++ 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index c3b431a56..18981cec9 100644 --- a/Makefile +++ b/Makefile @@ -25,3 +25,7 @@ lint: test: $(GOTEST) ./... + +.PHONY: test_functional +test_functional: + $(GOTEST) -tags=functional ./... diff --git a/docker-compose.yml b/docker-compose.yml index 03425a6f9..25593fd3b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.7' services: zookeeper-1: - image: 'confluentinc/cp-zookeeper:5.5.0' + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: ZOOKEEPER_SERVER_ID: '1' @@ -13,7 +13,7 @@ services: ZOOKEEPER_SYNC_LIMIT: '5' ZOOKEEPER_MAX_CLIENT_CONNS: '0' zookeeper-2: - image: 'confluentinc/cp-zookeeper:5.5.0' + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: ZOOKEEPER_SERVER_ID: '2' @@ -25,7 +25,7 @@ services: ZOOKEEPER_SYNC_LIMIT: '5' ZOOKEEPER_MAX_CLIENT_CONNS: '0' zookeeper-3: - image: 'confluentinc/cp-zookeeper:5.5.0' + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: ZOOKEEPER_SERVER_ID: '3' @@ -37,7 +37,7 @@ services: ZOOKEEPER_SYNC_LIMIT: '5' ZOOKEEPER_MAX_CLIENT_CONNS: '0' kafka-1: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -54,7 +54,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-2: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -71,7 +71,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-3: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -88,7 +88,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-4: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -105,7 +105,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-5: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' diff --git a/functional_test.go b/functional_test.go index e56182be0..0e0c9e216 100644 --- a/functional_test.go +++ b/functional_test.go @@ -107,9 +107,30 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err return fmt.Errorf("failed to tear down existing env: %w", err) } + if version, ok := os.LookupEnv("KAFKA_VERSION"); ok { + env.KafkaVersion = version + } else { + // We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0. + env.KafkaVersion = "2.5.0" + } + + // the mapping of confluent platform docker image versions -> kafka versions can be + // found here: https://docs.confluent.io/current/installation/versions-interoperability.html + var confluentPlatformVersion string + switch env.KafkaVersion { + case "2.5.0": + confluentPlatformVersion = "5.5.0" + case "2.4.1": + confluentPlatformVersion = "5.4.2" + default: + return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion) + } + + c := exec.Command("docker-compose", "up", "-d") c.Stdout = os.Stdout c.Stderr = os.Stderr + c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion)) err := c.Run() if err != nil { return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err) From ddb342c2a4143262cfdde12885f86ea4fbf08847 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 16:11:14 +1000 Subject: [PATCH 3/7] Use the built-in docker compose support to run CI tests --- .github/workflows/ci.yml | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 116deb5e5..fca6cb201 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,13 +14,8 @@ jobs: platform: [ubuntu-latest] env: - KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095 - TOXIPROXY_ADDR: http://localhost:8474 - KAFKA_INSTALL_ROOT: /home/runner/kafka - KAFKA_HOSTNAME: localhost DEBUG: true KAFKA_VERSION: ${{ matrix.kafka-version }} - KAFKA_SCALA_VERSION: 2.12 steps: - uses: actions/checkout@v1 @@ -48,16 +43,9 @@ jobs: run: | curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.23.6 export REPOSITORY_ROOT=${GITHUB_WORKSPACE} - vagrant/install_cluster.sh - vagrant/boot_cluster.sh - vagrant/create_topics.sh - vagrant/run_java_producer.sh - name: Run test suite - run: make test + run: make test_functional - name: Run linter run: make lint - - - name: Teardown - run: vagrant/halt_cluster.sh From 42bbd066c0c6758fb18cd612e360f9fa0581308f Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 16:13:20 +1000 Subject: [PATCH 4/7] Delete vagrant test harness setup --- vagrant/create_topics.sh | 9 ---- vagrant/halt_cluster.sh | 25 ----------- vagrant/install_cluster.sh | 86 ------------------------------------ vagrant/kafka.conf | 9 ---- vagrant/provision.sh | 17 ------- vagrant/run_java_producer.sh | 6 --- vagrant/run_toxiproxy.sh | 22 --------- vagrant/setup_services.sh | 29 ------------ vagrant/toxiproxy.conf | 6 --- vagrant/zookeeper.conf | 7 --- vagrant/zookeeper.properties | 36 --------------- 11 files changed, 252 deletions(-) delete mode 100755 vagrant/create_topics.sh delete mode 100755 vagrant/halt_cluster.sh delete mode 100755 vagrant/install_cluster.sh delete mode 100644 vagrant/kafka.conf delete mode 100755 vagrant/provision.sh delete mode 100755 vagrant/run_java_producer.sh delete mode 100755 vagrant/run_toxiproxy.sh delete mode 100755 vagrant/setup_services.sh delete mode 100644 vagrant/toxiproxy.conf delete mode 100644 vagrant/zookeeper.conf delete mode 100644 vagrant/zookeeper.properties diff --git a/vagrant/create_topics.sh b/vagrant/create_topics.sh deleted file mode 100755 index 959d3a053..000000000 --- a/vagrant/create_topics.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh - -set -ex - -cd ${KAFKA_INSTALL_ROOT}/kafka-9092 -bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic test.1 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 4 --replication-factor 3 --topic test.4 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 64 --replication-factor 3 --topic test.64 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic uncommitted-topic-test-4 --zookeeper localhost:2181 \ No newline at end of file diff --git a/vagrant/halt_cluster.sh b/vagrant/halt_cluster.sh deleted file mode 100755 index e671c4812..000000000 --- a/vagrant/halt_cluster.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash - -# If the functional tests failed (or some other task) then -# we might want to look into the broker logs -if [ "$TRAVIS_TEST_RESULT" = "1" ]; then - echo "Dumping Kafka brokers server.log:" - for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - sed -e "s/^/kafka-${KAFKA_PORT} /" ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/logs/server.log{.*,} - done -fi - -set -ex - -for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/kafka-server-stop.sh -done - -for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/zookeeper-server-stop.sh -done - -killall toxiproxy diff --git a/vagrant/install_cluster.sh b/vagrant/install_cluster.sh deleted file mode 100755 index aa22261e4..000000000 --- a/vagrant/install_cluster.sh +++ /dev/null @@ -1,86 +0,0 @@ -#!/bin/sh - -set -ex - -TOXIPROXY_VERSION=2.1.4 - -mkdir -p ${KAFKA_INSTALL_ROOT} -if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then - wget --quiet https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -fi -if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then - wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} - chmod +x ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} -fi -rm -f ${KAFKA_INSTALL_ROOT}/toxiproxy -ln -s ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ${KAFKA_INSTALL_ROOT}/toxiproxy - -for i in 1 2 3 4 5; do - ZK_PORT=$((i + 2180)) - ZK_PORT_REAL=$((i + 21800)) - KAFKA_PORT=$((i + 9090)) - KAFKA_PORT_REAL=$((i + 29090)) - - # unpack kafka - mkdir -p ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} - tar xzf ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -C ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} --strip-components 1 - - # broker configuration - mkdir -p "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data" - - # Append to default server.properties with a small number of customisations - printf "\n\n" >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties" - cat << EOF >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties" -############################# Sarama Test Cluster ############################# - -broker.id=${KAFKA_PORT} -broker.rack=${i} - -# Listen on "real" port -listeners=PLAINTEXT://:${KAFKA_PORT_REAL} -# Advertise Toxiproxy port -advertised.listeners=PLAINTEXT://${KAFKA_HOSTNAME}:${KAFKA_PORT} - -# Connect to Zookeeper via Toxiproxy port -zookeeper.connect=127.0.0.1:${ZK_PORT} - -# Data directory -log.dirs="${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data" - -# Create new topics with a replication factor of 2 so failover can be tested -# more easily. -default.replication.factor=2 - -# Turn on log.retention.bytes to avoid filling up the VM's disk -log.retention.bytes=268435456 -log.segment.bytes=268435456 - -# Enable topic deletion and disable auto-creation -delete.topic.enable=true -auto.create.topics.enable=false - -# Lower the zookeeper timeouts so we don't have to wait forever for a node -# to die when we use toxiproxy to kill its zookeeper connection -zookeeper.session.timeout.ms=3000 -zookeeper.connection.timeout.ms=3000 - -# Disable broker ID length constraint -reserved.broker.max.id=10000 - -# Permit follower fetching (KIP-392) -replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector - -############################################################################### -EOF - - # zookeeper configuration - cp ${REPOSITORY_ROOT}/vagrant/zookeeper.properties ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/ - sed -i s/KAFKAID/${KAFKA_PORT}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - sed -i s/ZK_PORT/${ZK_PORT_REAL}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - - ZK_DATADIR="${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}" - mkdir -p ${ZK_DATADIR} - sed -i s#ZK_DATADIR#${ZK_DATADIR}#g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - - echo $i > ${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}/myid -done diff --git a/vagrant/kafka.conf b/vagrant/kafka.conf deleted file mode 100644 index 25101df5a..000000000 --- a/vagrant/kafka.conf +++ /dev/null @@ -1,9 +0,0 @@ -start on started zookeeper-ZK_PORT -stop on stopping zookeeper-ZK_PORT - -# Use a script instead of exec (using env stanza leaks KAFKA_HEAP_OPTS from zookeeper) -script - sleep 2 - export KAFKA_HEAP_OPTS="-Xmx320m" - exec /opt/kafka-KAFKAID/bin/kafka-server-start.sh /opt/kafka-KAFKAID/config/server.properties -end script diff --git a/vagrant/provision.sh b/vagrant/provision.sh deleted file mode 100755 index 7f10de74a..000000000 --- a/vagrant/provision.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/sh - -set -ex - -apt-get update -yes | apt-get install default-jre - -export KAFKA_INSTALL_ROOT=/opt -export KAFKA_HOSTNAME=192.168.100.67 -export KAFKA_VERSION=1.0.2 -export KAFKA_SCALA_VERSION=2.11 -export REPOSITORY_ROOT=/vagrant - -sh /vagrant/vagrant/install_cluster.sh -sh /vagrant/vagrant/setup_services.sh -sh /vagrant/vagrant/create_topics.sh -sh /vagrant/vagrant/run_java_producer.sh diff --git a/vagrant/run_java_producer.sh b/vagrant/run_java_producer.sh deleted file mode 100755 index 5851b7484..000000000 --- a/vagrant/run_java_producer.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh - -set -ex - -wget https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -java -jar simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -b ${KAFKA_HOSTNAME}:9092 -c 4 \ No newline at end of file diff --git a/vagrant/run_toxiproxy.sh b/vagrant/run_toxiproxy.sh deleted file mode 100755 index e52c00e7b..000000000 --- a/vagrant/run_toxiproxy.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/sh - -set -ex - -${KAFKA_INSTALL_ROOT}/toxiproxy -port 8474 -host 0.0.0.0 & -PID=$! - -while ! nc -q 1 localhost 8474 Date: Sun, 10 May 2020 17:41:42 +1000 Subject: [PATCH 5/7] Run the linter across functional tests as well --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 18981cec9..a05863480 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ fmt: gofmt -s -l -w $(FILES) $(TESTS) lint: - golangci-lint run + GOFLAGS="-tags=functional" golangci-lint run test: $(GOTEST) ./... From a9e0b4f16371e7c1c065fcc91b0fc081157361d2 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 18:55:21 +1000 Subject: [PATCH 6/7] Make linter pass on functional tests --- functional_test.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/functional_test.go b/functional_test.go index 0e0c9e216..2391725a4 100644 --- a/functional_test.go +++ b/functional_test.go @@ -5,7 +5,6 @@ package sarama import ( "context" "fmt" - toxiproxy "github.com/Shopify/toxiproxy/client" "io" "log" "net" @@ -18,6 +17,8 @@ import ( "strings" "testing" "time" + + toxiproxy "github.com/Shopify/toxiproxy/client" ) const ( @@ -27,19 +28,19 @@ const ( var ( testTopicDetails = map[string]*TopicDetail{ "test.1": { - NumPartitions: 1, + NumPartitions: 1, ReplicationFactor: 3, }, "test.4": { - NumPartitions: 4, + NumPartitions: 4, ReplicationFactor: 3, }, "test.64": { - NumPartitions: 64, + NumPartitions: 64, ReplicationFactor: 3, }, "uncommitted-topic-test-4": { - NumPartitions: 1, + NumPartitions: 1, ReplicationFactor: 3, }, } @@ -80,10 +81,10 @@ func testMain(m *testing.M) int { if !usingExisting { err := prepareDockerTestEnvironment(ctx, &env) if err != nil { - tearDownDockerTestEnvironment(ctx, &env) + _ = tearDownDockerTestEnvironment(ctx, &env) panic(err) } - defer tearDownDockerTestEnvironment(ctx, &env) + defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck } if err := prepareTestTopics(ctx, &env); err != nil { panic(err) @@ -126,7 +127,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion) } - c := exec.Command("docker-compose", "up", "-d") c.Stdout = os.Stdout c.Stderr = os.Stderr @@ -143,14 +143,14 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err proxyName := fmt.Sprintf("kafka%d", i) proxy, err := env.ToxiproxyClient.CreateProxy( proxyName, - fmt.Sprintf("0.0.0.0:%d", 29090 + i), - fmt.Sprintf("kafka-%d:%d", i, 29090 + i), + fmt.Sprintf("0.0.0.0:%d", 29090+i), + fmt.Sprintf("kafka-%d:%d", i, 29090+i), ) if err != nil { return fmt.Errorf("failed to create toxiproxy: %w", err) } env.Proxies[proxyName] = proxy - env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090 + i)) + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i)) } // the mapping of confluent platform docker image vesions -> kafka versions can be @@ -175,7 +175,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err brokersOk := make([]bool, len(env.KafkaBrokerAddrs)) retryLoop: for j, addr := range env.KafkaBrokerAddrs { - client, err := NewClient([]string{addr},config) + client, err := NewClient([]string{addr}, config) if err != nil { continue } @@ -267,7 +267,7 @@ func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) er func prepareTestTopics(ctx context.Context, env *testEnvironment) error { Logger.Println("creating test topics") var testTopicNames []string - for topic, _ := range testTopicDetails { + for topic := range testTopicDetails { testTopicNames = append(testTopicNames, topic) } @@ -296,7 +296,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { // Start by deleting the test topics (if they already exist) deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{ - Topics: testTopicNames, + Topics: testTopicNames, Timeout: 30 * time.Second, }) if err != nil { @@ -334,7 +334,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { // now create the topics empty createRes, err := controller.CreateTopics(&CreateTopicsRequest{ TopicDetails: testTopicDetails, - Timeout: 30 * time.Second, + Timeout: 30 * time.Second, }) if err != nil { return fmt.Errorf("failed to create test topics: %w", err) @@ -360,7 +360,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) } defer res.Body.Close() - jarFile, err := os.OpenFile(jarName, os.O_WRONLY | os.O_TRUNC | os.O_CREATE, 0644) + jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) if err != nil { return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) } From cfd5999fa195c5c7ae785e533166b06947bae7ce Mon Sep 17 00:00:00 2001 From: Konstantinos Tsanaktsidis Date: Wed, 24 Jun 2020 13:51:24 +1000 Subject: [PATCH 7/7] Remove redundant hard-coding of env.KafkaVersion --- functional_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/functional_test.go b/functional_test.go index 2391725a4..b61a43daf 100644 --- a/functional_test.go +++ b/functional_test.go @@ -153,11 +153,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i)) } - // the mapping of confluent platform docker image vesions -> kafka versions can be - // found here: https://docs.confluent.io/current/installation/versions-interoperability.html - // We have cp-5.5.0 in the docker-compose file, so that's kafka 2.5.0. - env.KafkaVersion = "2.5.0" - // Wait for the kafka broker to come up allBrokersUp := false for i := 0; i < 45 && !allBrokersUp; i++ {