From 921cf9c0761efbde58b450edb81e67ee07498b6e Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Wed, 26 Aug 2020 18:03:13 +0100 Subject: [PATCH 1/7] Bump default version to 1.0.0 The current default, 0.8.2, is a 5 year-old version. The problem with defaulting to older versions when the Kafka cluster is a newer version is manifold: - requires Kafka to downconvert consumer response messages (e.g if a newly-configured client produced messages and a default Sarama client is reading) - this has implications on the performance of Kafka, because it uses additional CPU. Especially so if compression is enabled, at which point the broker would need to decompress, downconvert and compress the message back. Downconversion also blocks zerocopy, because your fetch response now needs to be copied into memory - requires Kafka to upconvert producer requests' messages - in general is hard to support. While the latest Kafka currently supports the oldest versions, it is in the best interest of the project to deprecate and eventually drop support for legacy versions. Otherwise it becomes hard to maintain, the test matrix grows and new features need to work around old version limitations (no idempotency, exactly once). It is easier for Kafka to deprecate/drop support when its ecosystem has done so already 0.11 is the minimum we should default at as it enables Kafka's v2 message format, avoiding expensive upconversion/downconversion in the Kafka broker. It is also required for correctness in some cases (e.g KIP-101) 1.0.0 is a 3-year old version at this point and a reasonable default --- config.go | 2 +- utils.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/config.go b/config.go index 9b7ce7aeb..43e739cad 100644 --- a/config.go +++ b/config.go @@ -486,7 +486,7 @@ func NewConfig() *Config { c.ClientID = defaultClientID c.ChannelBufferSize = 256 - c.Version = MinVersion + c.Version = DefaultVersion c.MetricRegistry = metrics.NewRegistry() return c diff --git a/utils.go b/utils.go index 93bdeefef..e109d45f8 100644 --- a/utils.go +++ b/utils.go @@ -192,12 +192,13 @@ var ( } MinVersion = V0_8_2_0 MaxVersion = V2_6_0_0 + DefaultVersion = V1_0_0_0 ) //ParseKafkaVersion parses and returns kafka version or error from a string func ParseKafkaVersion(s string) (KafkaVersion, error) { if len(s) < 5 { - return MinVersion, fmt.Errorf("invalid version `%s`", s) + return DefaultVersion, fmt.Errorf("invalid version `%s`", s) } var major, minor, veryMinor, patch uint var err error @@ -207,7 +208,7 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) { err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor}) } if err != nil { - return MinVersion, err + return DefaultVersion, err } return newKafkaVersion(major, minor, veryMinor, patch), nil } From 91e853d6ea45af16ffcb23788ec42a4bde7882c5 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 27 Aug 2020 15:54:10 +0100 Subject: [PATCH 2/7] Use MetadataResponse V5 in tests --- async_producer_test.go | 44 +++++++++++++++---------------- client_test.go | 54 +++++++++++++++++++-------------------- client_tls_test.go | 2 +- metadata_response_test.go | 6 +++++ offset_manager_test.go | 4 +-- sync_producer_test.go | 14 +++++----- 6 files changed, 65 insertions(+), 59 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index f8c2656cf..1ca625c9c 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -102,7 +102,7 @@ func TestAsyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -151,7 +151,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -187,7 +187,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { leader0 := NewMockBroker(t, 2) leader1 := NewMockBroker(t, 3) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID()) metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError) @@ -226,7 +226,7 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -269,7 +269,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := new(MetadataResponse) + metadataLeader1 := newMetadataResponse() metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -291,7 +291,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) leader1.Returns(prodNotLeader) - metadataLeader2 := new(MetadataResponse) + metadataLeader2 := newMetadataResponse() metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) leader1.Returns(metadataLeader2) @@ -318,7 +318,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := new(MetadataResponse) + metadataLeader1 := newMetadataResponse() metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError) @@ -344,7 +344,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { expectResults(t, producer, 0, 2) producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} - metadataLeader2 := new(MetadataResponse) + metadataLeader2 := newMetadataResponse() metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError) @@ -377,7 +377,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -417,7 +417,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -457,7 +457,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := new(MetadataResponse) + metadataLeader1 := newMetadataResponse() metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -480,7 +480,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down // ok fine, tell it to go to leader2 finally - metadataLeader2 := new(MetadataResponse) + metadataLeader2 := newMetadataResponse() metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader2) @@ -500,7 +500,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := new(MetadataResponse) + metadataLeader1 := newMetadataResponse() metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -522,7 +522,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) leader1.Returns(prodNotLeader) - metadataLeader2 := new(MetadataResponse) + metadataLeader2 := newMetadataResponse() metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) @@ -556,7 +556,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := new(MetadataResponse) + metadataLeader1 := newMetadataResponse() metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -583,7 +583,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - metadataLeader2 := new(MetadataResponse) + metadataLeader2 := newMetadataResponse() metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) @@ -624,7 +624,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -680,7 +680,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) @@ -736,7 +736,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) @@ -802,7 +802,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataLeader := new(MetadataResponse) + metadataLeader := newMetadataResponse() metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) @@ -851,7 +851,7 @@ func TestAsyncProducerNoReturns(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataLeader := new(MetadataResponse) + metadataLeader := newMetadataResponse() metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) @@ -1259,7 +1259,7 @@ func testProducerInterceptor( ) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataLeader := new(MetadataResponse) + metadataLeader := newMetadataResponse() metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) diff --git a/client_test.go b/client_test.go index 9b42e2d99..d0efaf666 100644 --- a/client_test.go +++ b/client_test.go @@ -19,7 +19,7 @@ func safeClose(t testing.TB, c io.Closer) { func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + seedBroker.Returns(newMetadataResponse()) client, err := NewClient([]string{seedBroker.Addr()}, nil) if err != nil { @@ -36,7 +36,7 @@ func TestCachedPartitions(t *testing.T) { replicas := []int32{3, 1, 5} isr := []int32{5, 1} - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker("localhost:12345", 2) metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, []int32{}, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable) @@ -74,7 +74,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { replicas := []int32{seedBroker.BrokerID()} - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, []int32{}, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError) @@ -87,7 +87,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { t.Fatal(err) } - metadataResponse = new(MetadataResponse) + metadataResponse = newMetadataResponse() metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -106,7 +106,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { t.Errorf("Expected no error, found %v", err) } - metadataResponse = new(MetadataResponse) + metadataResponse = newMetadataResponse() metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -126,7 +126,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { func TestClientSeedBrokers(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker("localhost:12345", 2) seedBroker.Returns(metadataResponse) @@ -146,7 +146,7 @@ func TestClientMetadata(t *testing.T) { replicas := []int32{3, 1, 5} isr := []int32{5, 1} - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable) @@ -222,7 +222,7 @@ func TestClientMetadataWithOfflineReplicas(t *testing.T) { isr := []int32{1, 2} offlineReplicas := []int32{3} - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, offlineReplicas, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError) @@ -307,7 +307,7 @@ func TestClientGetOffset(t *testing.T) { leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() - metadata := new(MetadataResponse) + metadata := newMetadataResponse() metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) metadata.AddBroker(leaderAddr, leader.BrokerID()) seedBroker.Returns(metadata) @@ -353,7 +353,7 @@ func TestClientGetOffset(t *testing.T) { func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() seedBroker.Returns(metadataResponse1) retryCount := int32(0) @@ -369,7 +369,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { t.Fatal(err) } - metadataUnknownTopic := new(MetadataResponse) + metadataUnknownTopic := newMetadataResponse() metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -390,7 +390,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { func TestClientReceivingUnknownTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() seedBroker.Returns(metadataResponse1) config := NewConfig() @@ -401,7 +401,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { t.Fatal(err) } - metadataUnknownTopic := new(MetadataResponse) + metadataUnknownTopic := newMetadataResponse() metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -427,7 +427,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse1) @@ -440,7 +440,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()} - metadataPartial := new(MetadataResponse) + metadataPartial := newMetadataResponse() metadataPartial.AddBroker(seedBroker.Addr(), 1) metadataPartial.AddBroker(leader.Addr(), 5) metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable) @@ -482,11 +482,11 @@ func TestClientRefreshBehaviour(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse1) - metadataResponse2 := new(MetadataResponse) + metadataResponse2 := newMetadataResponse() metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse2) @@ -519,7 +519,7 @@ func TestClientRefreshBrokers(t *testing.T) { initialSeed := NewMockBroker(t, 0) leader := NewMockBroker(t, 5) - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) initialSeed.Returns(metadataResponse1) @@ -550,7 +550,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) @@ -564,7 +564,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { t.Error("Meta broker is not 2") } - metadataResponse2 := new(MetadataResponse) + metadataResponse2 := newMetadataResponse() metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse2) @@ -578,7 +578,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { func TestClientResurrectDeadSeeds(t *testing.T) { initialSeed := NewMockBroker(t, 0) - emptyMetadata := new(MetadataResponse) + emptyMetadata := newMetadataResponse() initialSeed.Returns(emptyMetadata) conf := NewConfig() @@ -687,7 +687,7 @@ func TestClientMetadataTimeout(t *testing.T) { t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) { // Use a responsive broker to create a working client initialSeed := NewMockBroker(t, 0) - emptyMetadata := new(MetadataResponse) + emptyMetadata := newMetadataResponse() initialSeed.Returns(emptyMetadata) conf := NewConfig() @@ -750,7 +750,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { freshCoordinator := NewMockBroker(t, 3) replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) @@ -827,7 +827,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) coordinator := NewMockBroker(t, 2) - metadataResponse1 := new(MetadataResponse) + metadataResponse1 := newMetadataResponse() seedBroker.Returns(metadataResponse1) config := NewConfig() @@ -842,12 +842,12 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable seedBroker.Returns(coordinatorResponse1) - metadataResponse2 := new(MetadataResponse) + metadataResponse2 := newMetadataResponse() metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse2) replicas := []int32{coordinator.BrokerID()} - metadataResponse3 := new(MetadataResponse) + metadataResponse3 := newMetadataResponse() metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse3) @@ -879,7 +879,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { func TestClientAutorefreshShutdownRace(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() seedBroker.Returns(metadataResponse) conf := NewConfig() diff --git a/client_tls_test.go b/client_tls_test.go index a825cf767..eab50434f 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -186,7 +186,7 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon seedBroker := NewMockBrokerListener(childT, 1, seedListener) defer seedBroker.Close() - seedBroker.Returns(new(MetadataResponse)) + seedBroker.Returns(newMetadataResponse()) config := NewConfig() config.Net.TLS.Enable = true diff --git a/metadata_response_test.go b/metadata_response_test.go index 04a4ce7fc..f67dbdcb2 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -102,6 +102,12 @@ var ( } ) +func newMetadataResponse() *MetadataResponse { + resp := new(MetadataResponse) + resp.Version = 5 + return resp +} + func TestEmptyMetadataResponseV0(t *testing.T) { response := MetadataResponse{} diff --git a/offset_manager_test.go b/offset_manager_test.go index 5aa2ee0ff..1d54bce52 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -22,7 +22,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, broker = NewMockBroker(t, 1) coordinator = NewMockBroker(t, 2) - seedMeta := new(MetadataResponse) + seedMeta := newMetadataResponse() seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID()) seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, []int32{}, ErrNoError) seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, []int32{}, ErrNoError) @@ -73,7 +73,7 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, func TestNewOffsetManager(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + seedBroker.Returns(newMetadataResponse()) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, nil) diff --git a/sync_producer_test.go b/sync_producer_test.go index 9c480b0b5..abf1ef77f 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -10,7 +10,7 @@ func TestSyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -58,7 +58,7 @@ func TestSyncProducerBatch(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -106,7 +106,7 @@ func TestConcurrentSyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -149,7 +149,7 @@ func TestConcurrentSyncProducer(t *testing.T) { func TestSyncProducerToNonExistingTopic(t *testing.T) { broker := NewMockBroker(t, 1) - metadataResponse := new(MetadataResponse) + metadataResponse := newMetadataResponse() metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) broker.Returns(metadataResponse) @@ -164,7 +164,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { t.Fatal(err) } - metadataResponse = new(MetadataResponse) + metadataResponse = newMetadataResponse() metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) broker.Returns(metadataResponse) @@ -182,7 +182,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := new(MetadataResponse) + metadataLeader1 := newMetadataResponse() metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -205,7 +205,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { t.Fatal(err) } - metadataLeader2 := new(MetadataResponse) + metadataLeader2 := newMetadataResponse() metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) leader1.Returns(metadataLeader2) From d3a58e434ef66023d610961198ea1e80763b63ac Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 27 Aug 2020 16:50:57 +0100 Subject: [PATCH 3/7] Revert "Use MetadataResponse V5 in tests" This reverts commit 91e853d6ea45af16ffcb23788ec42a4bde7882c5. --- async_producer_test.go | 44 +++++++++++++++---------------- client_test.go | 54 +++++++++++++++++++-------------------- client_tls_test.go | 2 +- metadata_response_test.go | 6 ----- offset_manager_test.go | 4 +-- sync_producer_test.go | 14 +++++----- 6 files changed, 59 insertions(+), 65 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 1ca625c9c..f8c2656cf 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -102,7 +102,7 @@ func TestAsyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -151,7 +151,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -187,7 +187,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { leader0 := NewMockBroker(t, 2) leader1 := NewMockBroker(t, 3) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID()) metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError) @@ -226,7 +226,7 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -269,7 +269,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := newMetadataResponse() + metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -291,7 +291,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) leader1.Returns(prodNotLeader) - metadataLeader2 := newMetadataResponse() + metadataLeader2 := new(MetadataResponse) metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) leader1.Returns(metadataLeader2) @@ -318,7 +318,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := newMetadataResponse() + metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError) @@ -344,7 +344,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { expectResults(t, producer, 0, 2) producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} - metadataLeader2 := newMetadataResponse() + metadataLeader2 := new(MetadataResponse) metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError) @@ -377,7 +377,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -417,7 +417,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -457,7 +457,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := newMetadataResponse() + metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -480,7 +480,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down // ok fine, tell it to go to leader2 finally - metadataLeader2 := newMetadataResponse() + metadataLeader2 := new(MetadataResponse) metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader2) @@ -500,7 +500,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := newMetadataResponse() + metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -522,7 +522,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) leader1.Returns(prodNotLeader) - metadataLeader2 := newMetadataResponse() + metadataLeader2 := new(MetadataResponse) metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) @@ -556,7 +556,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := newMetadataResponse() + metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -583,7 +583,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - metadataLeader2 := newMetadataResponse() + metadataLeader2 := new(MetadataResponse) metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) @@ -624,7 +624,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -680,7 +680,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) @@ -736,7 +736,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) @@ -802,7 +802,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataLeader := newMetadataResponse() + metadataLeader := new(MetadataResponse) metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) @@ -851,7 +851,7 @@ func TestAsyncProducerNoReturns(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataLeader := newMetadataResponse() + metadataLeader := new(MetadataResponse) metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) @@ -1259,7 +1259,7 @@ func testProducerInterceptor( ) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataLeader := newMetadataResponse() + metadataLeader := new(MetadataResponse) metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) diff --git a/client_test.go b/client_test.go index d0efaf666..9b42e2d99 100644 --- a/client_test.go +++ b/client_test.go @@ -19,7 +19,7 @@ func safeClose(t testing.TB, c io.Closer) { func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(newMetadataResponse()) + seedBroker.Returns(new(MetadataResponse)) client, err := NewClient([]string{seedBroker.Addr()}, nil) if err != nil { @@ -36,7 +36,7 @@ func TestCachedPartitions(t *testing.T) { replicas := []int32{3, 1, 5} isr := []int32{5, 1} - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker("localhost:12345", 2) metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, []int32{}, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable) @@ -74,7 +74,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { replicas := []int32{seedBroker.BrokerID()} - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, []int32{}, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError) @@ -87,7 +87,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { t.Fatal(err) } - metadataResponse = newMetadataResponse() + metadataResponse = new(MetadataResponse) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -106,7 +106,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { t.Errorf("Expected no error, found %v", err) } - metadataResponse = newMetadataResponse() + metadataResponse = new(MetadataResponse) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -126,7 +126,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { func TestClientSeedBrokers(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker("localhost:12345", 2) seedBroker.Returns(metadataResponse) @@ -146,7 +146,7 @@ func TestClientMetadata(t *testing.T) { replicas := []int32{3, 1, 5} isr := []int32{5, 1} - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable) @@ -222,7 +222,7 @@ func TestClientMetadataWithOfflineReplicas(t *testing.T) { isr := []int32{1, 2} offlineReplicas := []int32{3} - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, offlineReplicas, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError) @@ -307,7 +307,7 @@ func TestClientGetOffset(t *testing.T) { leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() - metadata := newMetadataResponse() + metadata := new(MetadataResponse) metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) metadata.AddBroker(leaderAddr, leader.BrokerID()) seedBroker.Returns(metadata) @@ -353,7 +353,7 @@ func TestClientGetOffset(t *testing.T) { func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) retryCount := int32(0) @@ -369,7 +369,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { t.Fatal(err) } - metadataUnknownTopic := newMetadataResponse() + metadataUnknownTopic := new(MetadataResponse) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -390,7 +390,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { func TestClientReceivingUnknownTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) config := NewConfig() @@ -401,7 +401,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { t.Fatal(err) } - metadataUnknownTopic := newMetadataResponse() + metadataUnknownTopic := new(MetadataResponse) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -427,7 +427,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse1) @@ -440,7 +440,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()} - metadataPartial := newMetadataResponse() + metadataPartial := new(MetadataResponse) metadataPartial.AddBroker(seedBroker.Addr(), 1) metadataPartial.AddBroker(leader.Addr(), 5) metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable) @@ -482,11 +482,11 @@ func TestClientRefreshBehaviour(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse1) - metadataResponse2 := newMetadataResponse() + metadataResponse2 := new(MetadataResponse) metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse2) @@ -519,7 +519,7 @@ func TestClientRefreshBrokers(t *testing.T) { initialSeed := NewMockBroker(t, 0) leader := NewMockBroker(t, 5) - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) initialSeed.Returns(metadataResponse1) @@ -550,7 +550,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) @@ -564,7 +564,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { t.Error("Meta broker is not 2") } - metadataResponse2 := newMetadataResponse() + metadataResponse2 := new(MetadataResponse) metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse2) @@ -578,7 +578,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { func TestClientResurrectDeadSeeds(t *testing.T) { initialSeed := NewMockBroker(t, 0) - emptyMetadata := newMetadataResponse() + emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) conf := NewConfig() @@ -687,7 +687,7 @@ func TestClientMetadataTimeout(t *testing.T) { t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) { // Use a responsive broker to create a working client initialSeed := NewMockBroker(t, 0) - emptyMetadata := newMetadataResponse() + emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) conf := NewConfig() @@ -750,7 +750,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { freshCoordinator := NewMockBroker(t, 3) replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) @@ -827,7 +827,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) coordinator := NewMockBroker(t, 2) - metadataResponse1 := newMetadataResponse() + metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) config := NewConfig() @@ -842,12 +842,12 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable seedBroker.Returns(coordinatorResponse1) - metadataResponse2 := newMetadataResponse() + metadataResponse2 := new(MetadataResponse) metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse2) replicas := []int32{coordinator.BrokerID()} - metadataResponse3 := newMetadataResponse() + metadataResponse3 := new(MetadataResponse) metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse3) @@ -879,7 +879,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { func TestClientAutorefreshShutdownRace(t *testing.T) { seedBroker := NewMockBroker(t, 1) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) seedBroker.Returns(metadataResponse) conf := NewConfig() diff --git a/client_tls_test.go b/client_tls_test.go index eab50434f..a825cf767 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -186,7 +186,7 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon seedBroker := NewMockBrokerListener(childT, 1, seedListener) defer seedBroker.Close() - seedBroker.Returns(newMetadataResponse()) + seedBroker.Returns(new(MetadataResponse)) config := NewConfig() config.Net.TLS.Enable = true diff --git a/metadata_response_test.go b/metadata_response_test.go index f67dbdcb2..04a4ce7fc 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -102,12 +102,6 @@ var ( } ) -func newMetadataResponse() *MetadataResponse { - resp := new(MetadataResponse) - resp.Version = 5 - return resp -} - func TestEmptyMetadataResponseV0(t *testing.T) { response := MetadataResponse{} diff --git a/offset_manager_test.go b/offset_manager_test.go index 1d54bce52..5aa2ee0ff 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -22,7 +22,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, broker = NewMockBroker(t, 1) coordinator = NewMockBroker(t, 2) - seedMeta := newMetadataResponse() + seedMeta := new(MetadataResponse) seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID()) seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, []int32{}, ErrNoError) seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, []int32{}, ErrNoError) @@ -73,7 +73,7 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, func TestNewOffsetManager(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(newMetadataResponse()) + seedBroker.Returns(new(MetadataResponse)) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, nil) diff --git a/sync_producer_test.go b/sync_producer_test.go index abf1ef77f..9c480b0b5 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -10,7 +10,7 @@ func TestSyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -58,7 +58,7 @@ func TestSyncProducerBatch(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -106,7 +106,7 @@ func TestConcurrentSyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) @@ -149,7 +149,7 @@ func TestConcurrentSyncProducer(t *testing.T) { func TestSyncProducerToNonExistingTopic(t *testing.T) { broker := NewMockBroker(t, 1) - metadataResponse := newMetadataResponse() + metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) broker.Returns(metadataResponse) @@ -164,7 +164,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { t.Fatal(err) } - metadataResponse = newMetadataResponse() + metadataResponse = new(MetadataResponse) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) broker.Returns(metadataResponse) @@ -182,7 +182,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) - metadataLeader1 := newMetadataResponse() + metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) @@ -205,7 +205,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { t.Fatal(err) } - metadataLeader2 := newMetadataResponse() + metadataLeader2 := new(MetadataResponse) metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) leader1.Returns(metadataLeader2) From df30c626825924cd165f687358127efbdd6ba589 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 27 Aug 2020 18:25:46 +0100 Subject: [PATCH 4/7] Set Kafka version to 0.8 in tests --- admin_test.go | 72 +++++++++++++++---------------- async_producer_test.go | 55 +++++++++++++---------- broker_test.go | 12 +++--- client_test.go | 38 ++++++++-------- client_tls_test.go | 2 +- config_test.go | 22 +++++----- consumer_group_test.go | 2 +- consumer_test.go | 46 ++++++++++---------- functional_client_test.go | 6 +-- functional_consumer_group_test.go | 4 +- functional_consumer_test.go | 10 ++--- functional_offset_manager_test.go | 2 +- functional_producer_test.go | 20 ++++----- functional_test.go | 4 +- kerberos_client_test.go | 8 ++-- mocks/async_producer_test.go | 2 +- mocks/consumer_test.go | 18 ++++---- mocks/mocks.go | 10 +++++ offset_manager_test.go | 10 ++--- partitioner_test.go | 6 +-- produce_set_test.go | 4 +- sync_producer_test.go | 12 +++--- 22 files changed, 193 insertions(+), 172 deletions(-) diff --git a/admin_test.go b/admin_test.go index 53c4659f9..754003f68 100644 --- a/admin_test.go +++ b/admin_test.go @@ -16,7 +16,7 @@ func TestClusterAdmin(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -38,7 +38,7 @@ func TestClusterAdminInvalidController(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err == nil { @@ -61,7 +61,7 @@ func TestClusterAdminCreateTopic(t *testing.T) { "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -89,7 +89,7 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -117,7 +117,7 @@ func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_11_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -148,7 +148,7 @@ func TestClusterAdminListTopics(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -198,7 +198,7 @@ func TestClusterAdminDeleteTopic(t *testing.T) { "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -227,7 +227,7 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) { "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -256,7 +256,7 @@ func TestClusterAdminCreatePartitions(t *testing.T) { "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -285,7 +285,7 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -314,7 +314,7 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -350,7 +350,7 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) { "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_4_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -388,7 +388,7 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_3_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -427,7 +427,7 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) { "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_4_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -472,7 +472,7 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_3_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -508,7 +508,7 @@ func TestClusterAdminDeleteRecords(t *testing.T) { "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -567,7 +567,7 @@ func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -604,7 +604,7 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -660,7 +660,7 @@ func TestClusterAdminDescribeConfig(t *testing.T) { {V2_0_0_0, 2, true}, } for _, tt := range tests { - config := NewConfig() + config := NewTestConfig() config.Version = tt.saramaVersion admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -715,7 +715,7 @@ func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponseWithErrorCode(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -760,7 +760,7 @@ func TestClusterAdminDescribeBrokerConfig(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin( []string{ @@ -800,7 +800,7 @@ func TestClusterAdminAlterConfig(t *testing.T) { "AlterConfigsRequest": NewMockAlterConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -833,7 +833,7 @@ func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { "AlterConfigsRequest": NewMockAlterConfigsResponseWithErrorCode(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -873,7 +873,7 @@ func TestClusterAdminAlterBrokerConfig(t *testing.T) { "AlterConfigsRequest": NewMockAlterConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin( []string{ @@ -918,7 +918,7 @@ func TestClusterAdminCreateAcl(t *testing.T) { "CreateAclsRequest": NewMockCreateAclsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -951,7 +951,7 @@ func TestClusterAdminListAcls(t *testing.T) { "CreateAclsRequest": NewMockCreateAclsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -997,7 +997,7 @@ func TestClusterAdminDeleteAcl(t *testing.T) { "DeleteAclsRequest": NewMockDeleteAclsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -1033,7 +1033,7 @@ func TestDescribeTopic(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1071,7 +1071,7 @@ func TestDescribeTopicWithVersion0_11(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_11_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1114,7 +1114,7 @@ func TestDescribeConsumerGroup(t *testing.T) { "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1153,7 +1153,7 @@ func TestListConsumerGroups(t *testing.T) { AddGroup("my-group", "consumer"), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1215,7 +1215,7 @@ func TestListConsumerGroupsMultiBroker(t *testing.T) { AddGroup(secondGroup, "consumer"), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1267,7 +1267,7 @@ func TestListConsumerGroupOffsets(t *testing.T) { "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1312,7 +1312,7 @@ func TestDeleteConsumerGroup(t *testing.T) { "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1341,7 +1341,7 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) { SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 client, err := NewClient([]string{seedBroker1.Addr()}, config) @@ -1386,7 +1386,7 @@ func TestDescribeLogDirs(t *testing.T) { SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) diff --git a/async_producer_test.go b/async_producer_test.go index f8c2656cf..ab762ba48 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -111,7 +111,7 @@ func TestAsyncProducer(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -162,7 +162,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { leader.Returns(prodSuccess) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 5 config.Producer.Return.Successes = true producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -202,7 +202,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError) leader1.Returns(prodResponse1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 5 config.Producer.Return.Successes = true config.Producer.Partitioner = NewRoundRobinPartitioner @@ -235,7 +235,7 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) { prodResponse.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 2 config.Producer.Return.Successes = true config.Producer.Partitioner = func(topic string) Partitioner { @@ -274,7 +274,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -324,7 +324,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 2 config.Producer.Return.Successes = true config.Producer.Retry.Max = 0 // disable! @@ -388,7 +388,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { leader.Returns(prodSuccess) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Partitioner = NewManualPartitioner @@ -425,7 +425,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -462,7 +462,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 3 @@ -505,7 +505,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 4 @@ -561,7 +561,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Retry.Max = 4 @@ -629,7 +629,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -686,7 +686,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 config.Producer.Retry.Max = 1 @@ -742,7 +742,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 5 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -807,7 +807,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -856,7 +856,7 @@ func TestAsyncProducerNoReturns(t *testing.T) { metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = false config.Producer.Return.Errors = false @@ -905,7 +905,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { } broker.Returns(initProducerID) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 4 @@ -1049,7 +1049,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { return nil } - config := NewConfig() + config := NewTestConfig() config.Version = V0_11_0_0 config.Producer.Idempotent = true config.Net.MaxOpenRequests = 1 @@ -1100,7 +1100,7 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { } broker.Returns(initProducerID) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 400000 @@ -1150,7 +1150,7 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { } broker.Returns(initProducerID) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Flush.Frequency = 10 * time.Millisecond config.Producer.Return.Successes = true @@ -1216,7 +1216,7 @@ func TestBrokerProducerShutdown(t *testing.T) { "my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError) mockBroker.Returns(metadataResponse) - producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, nil) + producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -1264,7 +1264,7 @@ func testProducerInterceptor( metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Interceptors = interceptors @@ -1391,7 +1391,7 @@ ProducerLoop: // for the Successes channel to be populated, you have to set // config.Producer.Return.Successes to true. func ExampleAsyncProducer_goroutines() { - config := NewConfig() + config := NewTestConfig() config.Producer.Return.Successes = true producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { @@ -1441,3 +1441,12 @@ ProducerLoop: log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors) } + +// NewTestConfig returns a config meant to be used by tests. +// Due to inconsistencies with the request versions the clients send using the default Kafka version +// and the response versions our mocks use, we default to the minimum Kafka version in most tests +func NewTestConfig() *Config { + config := NewConfig() + config.Version = MinVersion + return config +} diff --git a/broker_test.go b/broker_test.go index c7d548600..37064b7cb 100644 --- a/broker_test.go +++ b/broker_test.go @@ -92,7 +92,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { broker := NewBroker(mb.Addr()) // Set the broker id in order to validate local broker metrics broker.id = 0 - conf := NewConfig() + conf := NewTestConfig() conf.Version = tt.version err := broker.Open(conf) if err != nil { @@ -226,7 +226,7 @@ func TestSASLOAuthBearer(t *testing.T) { broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypeOAuth conf.Net.SASL.TokenProvider = test.tokProvider @@ -358,7 +358,7 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { "SaslHandshakeRequest": mockSASLHandshakeResponse, }) - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512 conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient } @@ -464,7 +464,7 @@ func TestSASLPlainAuth(t *testing.T) { broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypePlaintext conf.Net.SASL.AuthIdentity = test.authidentity conf.Net.SASL.User = "token" @@ -550,7 +550,7 @@ func TestSASLReadTimeout(t *testing.T) { broker.requestsInFlight = metrics.NilCounter{} } - conf := NewConfig() + conf := NewTestConfig() { conf.Net.ReadTimeout = time.Millisecond conf.Net.SASL.Mechanism = SASLTypePlaintext @@ -640,7 +640,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypeGSSAPI conf.Net.SASL.GSSAPI.ServiceName = "kafka" conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf" diff --git a/client_test.go b/client_test.go index 9b42e2d99..797e2acb5 100644 --- a/client_test.go +++ b/client_test.go @@ -21,7 +21,7 @@ func TestSimpleClient(t *testing.T) { seedBroker.Returns(new(MetadataResponse)) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -42,7 +42,7 @@ func TestCachedPartitions(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 c, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -80,7 +80,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -130,7 +130,7 @@ func TestClientSeedBrokers(t *testing.T) { metadataResponse.AddBroker("localhost:12345", 2) seedBroker.Returns(metadataResponse) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -152,7 +152,7 @@ func TestClientMetadata(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -230,7 +230,7 @@ func TestClientMetadataWithOfflineReplicas(t *testing.T) { seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) @@ -312,7 +312,7 @@ func TestClientGetOffset(t *testing.T) { metadata.AddBroker(leaderAddr, leader.BrokerID()) seedBroker.Returns(metadata) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -358,7 +358,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { retryCount := int32(0) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration { atomic.AddInt32(&retryCount, 1) @@ -393,7 +393,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) @@ -431,7 +431,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse1) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -491,7 +491,7 @@ func TestClientRefreshBehaviour(t *testing.T) { metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse2) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -524,7 +524,7 @@ func TestClientRefreshBrokers(t *testing.T) { metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) initialSeed.Returns(metadataResponse1) - c, err := NewClient([]string{initialSeed.Addr()}, nil) + c, err := NewClient([]string{initialSeed.Addr()}, NewTestConfig()) client := c.(*client) if err != nil { @@ -555,7 +555,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -581,7 +581,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) { emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) - conf := NewConfig() + conf := NewTestConfig() conf.Metadata.Retry.Backoff = 0 conf.Metadata.RefreshFrequency = 0 c, err := NewClient([]string{initialSeed.Addr()}, conf) @@ -648,7 +648,7 @@ func TestClientController(t *testing.T) { SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()), }) - cfg := NewConfig() + cfg := NewTestConfig() // test kafka version greater than 0.10.0.0 cfg.Version = V0_10_0_0 @@ -690,7 +690,7 @@ func TestClientMetadataTimeout(t *testing.T) { emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) - conf := NewConfig() + conf := NewTestConfig() // Speed up the metadata request failure because of a read timeout conf.Net.ReadTimeout = 100 * time.Millisecond // Disable backoff and refresh @@ -756,7 +756,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse1) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -830,7 +830,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) @@ -882,7 +882,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) { metadataResponse := new(MetadataResponse) seedBroker.Returns(metadataResponse) - conf := NewConfig() + conf := NewTestConfig() conf.Metadata.RefreshFrequency = 100 * time.Millisecond client, err := NewClient([]string{seedBroker.Addr()}, conf) if err != nil { diff --git a/client_tls_test.go b/client_tls_test.go index a825cf767..489b67f85 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -188,7 +188,7 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon seedBroker.Returns(new(MetadataResponse)) - config := NewConfig() + config := NewTestConfig() config.Net.TLS.Enable = true config.Net.TLS.Config = clientConfig diff --git a/config_test.go b/config_test.go index 6da9dbe5f..2e34ba3d1 100644 --- a/config_test.go +++ b/config_test.go @@ -8,7 +8,7 @@ import ( ) func TestDefaultConfigValidates(t *testing.T) { - config := NewConfig() + config := NewTestConfig() if err := config.Validate(); err != nil { t.Error(err) } @@ -18,7 +18,7 @@ func TestDefaultConfigValidates(t *testing.T) { } func TestInvalidClientIDConfigValidates(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.ClientID = "foo:bar" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { t.Error("Expected invalid ClientID, got ", err) @@ -26,7 +26,7 @@ func TestInvalidClientIDConfigValidates(t *testing.T) { } func TestEmptyClientIDConfigValidates(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.ClientID = "" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { t.Error("Expected invalid ClientID, got ", err) @@ -194,7 +194,7 @@ func TestNetConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -227,7 +227,7 @@ func TestMetadataConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -249,7 +249,7 @@ func TestAdminConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -349,7 +349,7 @@ func TestProducerConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -379,7 +379,7 @@ func TestConsumerConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -388,7 +388,7 @@ func TestConsumerConfigValidates(t *testing.T) { } func TestLZ4ConfigValidation(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionLZ4 if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" { t.Error("Expected invalid lz4/kafka version error, got ", err) @@ -400,7 +400,7 @@ func TestLZ4ConfigValidation(t *testing.T) { } func TestZstdConfigValidation(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionZSTD if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" { t.Error("Expected invalid zstd/kafka version error, got ", err) @@ -419,7 +419,7 @@ func ExampleConfig_metrics() { appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry) appGauge.Update(1) - config := NewConfig() + config := NewTestConfig() // Use a prefix registry instead of the default local one config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.") diff --git a/consumer_group_test.go b/consumer_group_test.go index bbef97488..06eb1d964 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -18,7 +18,7 @@ func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, cla } func ExampleConsumerGroup() { - config := NewConfig() + config := NewTestConfig() config.Version = V2_0_0_0 // specify appropriate version config.Consumer.Return.Errors = true diff --git a/consumer_test.go b/consumer_test.go index 2416b2c83..9b48d8cdf 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -36,7 +36,7 @@ func TestConsumerOffsetManual(t *testing.T) { }) // When - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -81,7 +81,7 @@ func TestConsumerOffsetNewest(t *testing.T) { SetHighWaterMark("my_topic", 0, 14), }) - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -118,7 +118,7 @@ func TestConsumerRecreate(t *testing.T) { SetMessage("my_topic", 0, 10, testMsg), }) - c, err := NewConsumer([]string{broker0.Addr()}, nil) + c, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -158,7 +158,7 @@ func TestConsumerDuplicate(t *testing.T) { "FetchRequest": NewMockFetchResponse(t, 1), }) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 0 c, err := NewConsumer([]string{broker0.Addr()}, config) if err != nil { @@ -257,7 +257,7 @@ func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) { // If consumer fails to refresh metadata it keeps retrying with frequency // specified by `Config.Consumer.Retry.Backoff`. func TestConsumerLeaderRefreshError(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.Backoff = 200 * time.Millisecond config.Consumer.Return.Errors = true @@ -269,7 +269,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) { func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { var calls int32 = 0 - config := NewConfig() + config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration { atomic.AddInt32(&calls, 1) @@ -294,7 +294,7 @@ func TestConsumerInvalidTopic(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()), }) - c, err := NewConsumer([]string{broker0.Addr()}, nil) + c, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -327,7 +327,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) { SetMessage("my_topic", 0, 123, testMsg), }) - config := NewConfig() + config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.Backoff = 100 * time.Millisecond config.Consumer.Return.Errors = true @@ -382,7 +382,7 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) { "FetchRequest": NewMockWrapper(fetchResponse), }) - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -421,7 +421,7 @@ func TestConsumerExtraOffsets(t *testing.T) { newFetchResponse.SetLastStableOffset("my_topic", 0, 4) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { var offsetResponseVersion int16 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Consumer.Return.Errors = true if fetchResponse1.Version >= 4 { cfg.Version = V0_11_0_0 @@ -487,7 +487,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { fetchResponse2 := &FetchResponse{Version: 4} fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Consumer.Return.Errors = true cfg.Version = V0_11_0_0 @@ -533,7 +533,7 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V0_11_0_0 broker0 := NewMockBroker(t, 0) @@ -576,7 +576,7 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) { fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V1_1_0_0 broker0 := NewMockBroker(t, 0) @@ -635,7 +635,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { newFetchResponse.SetLastStableOffset("my_topic", 0, 11) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { var offsetResponseVersion int16 - cfg := NewConfig() + cfg := NewTestConfig() if fetchResponse1.Version >= 4 { cfg.Version = V0_11_0_0 offsetResponseVersion = 1 @@ -710,7 +710,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { }) // launch test goroutines - config := NewConfig() + config := NewTestConfig() config.Consumer.Retry.Backoff = 50 master, err := NewConsumer([]string{seedBroker.Addr()}, config) if err != nil { @@ -869,7 +869,7 @@ func TestConsumerInterleavedClose(t *testing.T) { SetMessage("my_topic", 1, 2000, testMsg), }) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 0 master, err := NewConsumer([]string{broker0.Addr()}, config) if err != nil { @@ -930,7 +930,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { "FetchRequest": mockFetchResponse, }) - config := NewConfig() + config := NewTestConfig() config.Consumer.Return.Errors = true config.Consumer.Retry.Backoff = 100 * time.Millisecond config.ChannelBufferSize = 1 @@ -1007,7 +1007,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) { SetOffset("my_topic", 0, OffsetOldest, 2345), }) - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -1044,7 +1044,7 @@ func TestConsumerExpiryTicker(t *testing.T) { "FetchRequest": NewMockSequence(fetchResponse1), }) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 0 config.Consumer.MaxProcessingTime = 10 * time.Millisecond master, err := NewConsumer([]string{broker0.Addr()}, config) @@ -1113,7 +1113,7 @@ func TestConsumerTimestamps(t *testing.T) { } { var fr *FetchResponse var offsetResponseVersion int16 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = d.kversion switch { case d.kversion.IsAtLeast(V0_11_0_0): @@ -1213,7 +1213,7 @@ func TestExcludeUncommitted(t *testing.T) { "FetchRequest": NewMockWrapper(fetchResponse), }) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Consumer.Return.Errors = true cfg.Version = V0_11_0_0 cfg.Consumer.IsolationLevel = ReadCommitted @@ -1257,7 +1257,7 @@ func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int6 // This example shows how to use the consumer to read messages // from a single partition. func ExampleConsumer() { - consumer, err := NewConsumer([]string{"localhost:9092"}, nil) + consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig()) if err != nil { panic(err) } @@ -1366,7 +1366,7 @@ func testConsumerInterceptor( SetOffset("my_topic", 0, OffsetNewest, 0), "FetchRequest": mockFetchResponse, }) - config := NewConfig() + config := NewTestConfig() config.Consumer.Interceptors = interceptors // When master, err := NewConsumer([]string{broker0.Addr()}, config) diff --git a/functional_client_test.go b/functional_client_test.go index 513b8ee9b..bc4715dc4 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -15,7 +15,7 @@ func TestFuncConnectionFailure(t *testing.T) { FunctionalTestEnv.Proxies["kafka1"].Enabled = false SaveProxy(t, "kafka1") - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 _, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config) @@ -28,7 +28,7 @@ func TestFuncClientMetadata(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 10 * time.Millisecond client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) @@ -72,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 4d71510a8..8a25f9b52 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string { } func testFuncConsumerGroupFuzzySeed(topic string) error { - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { return err } @@ -238,7 +238,7 @@ type testFuncConsumerGroupMember struct { func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxMessages int32, sink *testFuncConsumerGroupSink, topics ...string) *testFuncConsumerGroupMember { t.Helper() - config := NewConfig() + config := NewTestConfig() config.ClientID = clientID config.Version = V0_10_2_0 config.Consumer.Return.Errors = true diff --git a/functional_consumer_test.go b/functional_consumer_test.go index aca9434db..0ec993087 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -18,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -49,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { t.Fatal(err) } - c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -141,7 +141,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.Consumer.IsolationLevel = ReadCommitted config.Version = V0_11_0_0 @@ -195,7 +195,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi var producedMessages []*ProducerMessage for _, prodVer := range clientVersions { for _, codec := range codecs { - prodCfg := NewConfig() + prodCfg := NewTestConfig() prodCfg.Version = prodVer prodCfg.Producer.Return.Successes = true prodCfg.Producer.Return.Errors = true @@ -251,7 +251,7 @@ consumerVersionLoop: t.Logf("*** Consuming with client version %s\n", consVer) // Create a partition consumer that should start from the first produced // message. - consCfg := NewConfig() + consCfg := NewTestConfig() consCfg.Version = consVer c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) if err != nil { diff --git a/functional_offset_manager_test.go b/functional_offset_manager_test.go index 32e160aab..5d7f65663 100644 --- a/functional_offset_manager_test.go +++ b/functional_offset_manager_test.go @@ -11,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } diff --git a/functional_producer_test.go b/functional_producer_test.go index b83dc72e3..7c1925b06 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -18,37 +18,37 @@ import ( const TestBatchSize = 1000 func TestFuncProducing(t *testing.T) { - config := NewConfig() + config := NewTestConfig() testProducingMessages(t, config) } func TestFuncProducingGzip(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionGZIP testProducingMessages(t, config) } func TestFuncProducingSnappy(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionSnappy testProducingMessages(t, config) } func TestFuncProducingZstd(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Version = V2_1_0_0 config.Producer.Compression = CompressionZSTD testProducingMessages(t, config) } func TestFuncProducingNoResponse(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.RequiredAcks = NoResponse testProducingMessages(t, config) } func TestFuncProducingFlushing(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = TestBatchSize / 8 config.Producer.Flush.Frequency = 250 * time.Millisecond testProducingMessages(t, config) @@ -58,7 +58,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -111,7 +111,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Frequency = 250 * time.Millisecond config.Producer.Idempotent = true config.Producer.Timeout = 500 * time.Millisecond @@ -185,7 +185,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } func TestInterceptors(t *testing.T) { - config := NewConfig() + config := NewTestConfig() setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -442,7 +442,7 @@ func BenchmarkProducerSmallSinglePartition(b *testing.B) { benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128))) } func BenchmarkProducerMediumSnappy(b *testing.B) { - conf := NewConfig() + conf := NewTestConfig() conf.Producer.Compression = CompressionSnappy benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024))) } diff --git a/functional_test.go b/functional_test.go index 89dbfe2dc..f4f36b8cc 100644 --- a/functional_test.go +++ b/functional_test.go @@ -158,7 +158,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err for i := 0; i < 45 && !allBrokersUp; i++ { Logger.Println("waiting for kafka brokers to come up") time.Sleep(1 * time.Second) - config := NewConfig() + config := NewTestConfig() config.Version, err = ParseKafkaVersion(env.KafkaVersion) if err != nil { return err @@ -267,7 +267,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { } Logger.Println("Creating topics") - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 5 config.Metadata.Retry.Backoff = 10 * time.Second config.ClientID = "sarama-tests" diff --git a/kerberos_client_test.go b/kerberos_client_test.go index 003da6a3e..9cd7a21b6 100644 --- a/kerberos_client_test.go +++ b/kerberos_client_test.go @@ -17,7 +17,7 @@ import ( func TestFaildToCreateKerberosConfig(t *testing.T) { expectedErr := errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory") - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -41,7 +41,7 @@ func TestCreateWithPassword(t *testing.T) { expectedDoman := "EXAMPLE.COM" expectedCName := "client" - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -70,7 +70,7 @@ func TestCreateWithKeyTab(t *testing.T) { } // Expect to try to create a client with keytab and fails with "o such file or directory" error expectedErr := errors.New("open nonexist.keytab: no such file or directory") - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -92,7 +92,7 @@ func TestCreateWithDisablePAFXFAST(t *testing.T) { } // Expect to try to create a client with keytab and fails with "o such file or directory" error expectedErr := errors.New("open nonexist.keytab: no such file or directory") - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index b5d92aad8..c89effbda 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -43,7 +43,7 @@ func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { } func TestProducerReturnsExpectationsToChannels(t *testing.T) { - config := sarama.NewConfig() + config := sarama.NewTestConfig() config.Producer.Return.Successes = true mp := NewAsyncProducer(t, config) diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 311cfa026..41283663a 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -20,7 +20,7 @@ func TestMockConsumerImplementsConsumerInterface(t *testing.T) { } func TestConsumerHandlesExpectations(t *testing.T) { - consumer := NewConsumer(t, nil) + consumer := NewConsumer(t, NewTestConfig()) defer func() { if err := consumer.Close(); err != nil { t.Error(err) @@ -65,7 +65,7 @@ func TestConsumerHandlesExpectations(t *testing.T) { } func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { - consumer := NewConsumer(t, nil) + consumer := NewConsumer(t, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) @@ -91,7 +91,7 @@ func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) _, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest) if err != errOutOfExpectations { @@ -109,7 +109,7 @@ func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")}) if err := consumer.Close(); err != nil { @@ -123,7 +123,7 @@ func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { func TestConsumerWithWrongOffsetExpectation(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) _, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest) @@ -142,7 +142,7 @@ func TestConsumerWithWrongOffsetExpectation(t *testing.T) { func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) @@ -167,7 +167,7 @@ func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) pcmock.YieldError(sarama.ErrInvalidMessage) @@ -194,7 +194,7 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { func TestConsumerTopicMetadata(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) consumer.SetTopicMetadata(map[string][]int32{ "test1": {0, 1, 2, 3}, @@ -237,7 +237,7 @@ func TestConsumerTopicMetadata(t *testing.T) { func TestConsumerUnexpectedTopicMetadata(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, sarama.NewTestConfig()) if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers { t.Error("Expected sarama.ErrOutOfBrokers, found", err) diff --git a/mocks/mocks.go b/mocks/mocks.go index 1e1d9f1c6..a32f12ef0 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -15,6 +15,7 @@ package mocks import ( "errors" + "github.com/Shopify/sarama" ) // ErrorReporter is a simple interface that includes the testing.T methods we use to report @@ -39,3 +40,12 @@ type producerExpectation struct { Result error CheckFunction ValueChecker } + +// NewTestConfig returns a config meant to be used by tests. +// Due to inconsistencies with the request versions the clients send using the default Kafka version +// and the response versions our mocks use, we default to the minimum Kafka version in most tests +func NewTestConfig() *sarama.Config { + config := sarama.NewConfig() + config.Version = sarama.MinVersion + return config +} diff --git a/offset_manager_test.go b/offset_manager_test.go index 5aa2ee0ff..249e53444 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -50,7 +50,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) { - return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig()) + return initOffsetManagerWithBackoffFunc(t, retention, nil, NewTestConfig()) } func initPartitionOffsetManager(t *testing.T, om OffsetManager, @@ -76,7 +76,7 @@ func TestNewOffsetManager(t *testing.T) { seedBroker.Returns(new(MetadataResponse)) defer seedBroker.Close() - testClient, err := NewClient([]string{seedBroker.Addr()}, nil) + testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -120,7 +120,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` for _, tt := range offsetsautocommitTestTable { t.Run(tt.name, func(t *testing.T) { - config := NewConfig() + config := NewTestConfig() if tt.set { config.Consumer.Offsets.AutoCommit.Enable = tt.enable } @@ -171,7 +171,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false - config := NewConfig() + config := NewTestConfig() config.Consumer.Offsets.AutoCommit.Enable = false om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) @@ -278,7 +278,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { atomic.AddInt32(&retryCount, 1) return 0 } - om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig()) + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewTestConfig()) // Error on first fetchInitialOffset call responseBlock := OffsetFetchResponseBlock{ diff --git a/partitioner_test.go b/partitioner_test.go index f6dde0204..9c0161122 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -212,7 +212,7 @@ func TestManualPartitioner(t *testing.T) { // This example shows how you can partition messages randomly, even when a key is set, // by overriding Config.Producer.Partitioner. func ExamplePartitioner_random() { - config := NewConfig() + config := NewTestConfig() config.Producer.Partitioner = NewRandomPartitioner producer, err := NewSyncProducer([]string{"localhost:9092"}, config) @@ -236,7 +236,7 @@ func ExamplePartitioner_random() { // This example shows how to assign partitions to your messages manually. func ExamplePartitioner_manual() { - config := NewConfig() + config := NewTestConfig() // First, we tell the producer that we are going to partition ourselves. config.Producer.Partitioner = NewManualPartitioner @@ -268,7 +268,7 @@ func ExamplePartitioner_manual() { // This example shows how to set a different partitioner depending on the topic. func ExamplePartitioner_per_topic() { - config := NewConfig() + config := NewTestConfig() config.Producer.Partitioner = func(topic string) Partitioner { switch topic { case "access_log", "error_log": diff --git a/produce_set_test.go b/produce_set_test.go index c0928834b..221f40b59 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -7,7 +7,7 @@ import ( ) func makeProduceSet() (*asyncProducer, *produceSet) { - conf := NewConfig() + conf := NewTestConfig() txnmgr, _ := newTransactionManager(conf, nil) parent := &asyncProducer{ conf: conf, @@ -285,7 +285,7 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) { const pID = 1000 const pEpoch = 1234 - config := NewConfig() + config := NewTestConfig() config.Producer.RequiredAcks = WaitForAll config.Producer.Idempotent = true config.Version = V0_11_0_0 diff --git a/sync_producer_test.go b/sync_producer_test.go index 9c480b0b5..5f14f7be0 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -21,7 +21,9 @@ func TestSyncProducer(t *testing.T) { leader.Returns(prodSuccess) } - producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil) + config := NewTestConfig() + config.Producer.Return.Successes = true + producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } @@ -67,7 +69,7 @@ func TestSyncProducerBatch(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 3 config.Producer.Return.Successes = true producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) @@ -115,7 +117,7 @@ func TestConcurrentSyncProducer(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 100 config.Producer.Return.Successes = true producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) @@ -154,7 +156,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) broker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 config.Producer.Retry.Max = 0 config.Producer.Return.Successes = true @@ -187,7 +189,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Retry.Max = 0 // disable! config.Producer.Retry.Backoff = 0 config.Producer.Return.Successes = true From 583203dd230500feb9e48e3a020833d4b9b239b5 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Tue, 1 Sep 2020 17:24:19 +0100 Subject: [PATCH 5/7] Fix compilation issues in functional tests --- mocks/async_producer_test.go | 2 +- mocks/consumer_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index c89effbda..fc354232c 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -43,7 +43,7 @@ func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { } func TestProducerReturnsExpectationsToChannels(t *testing.T) { - config := sarama.NewTestConfig() + config := NewTestConfig() config.Producer.Return.Successes = true mp := NewAsyncProducer(t, config) diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 41283663a..e1202cc7a 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -237,7 +237,7 @@ func TestConsumerTopicMetadata(t *testing.T) { func TestConsumerUnexpectedTopicMetadata(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, sarama.NewTestConfig()) + consumer := NewConsumer(trm, NewTestConfig()) if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers { t.Error("Expected sarama.ErrOutOfBrokers, found", err) From c61d5b6300bb5426e1dcbb8e8a28e5a90ced29a4 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Tue, 1 Sep 2020 17:57:03 +0100 Subject: [PATCH 6/7] gofmt --- utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils.go b/utils.go index e109d45f8..3e9dfd7ad 100644 --- a/utils.go +++ b/utils.go @@ -190,8 +190,8 @@ var ( V2_5_0_0, V2_6_0_0, } - MinVersion = V0_8_2_0 - MaxVersion = V2_6_0_0 + MinVersion = V0_8_2_0 + MaxVersion = V2_6_0_0 DefaultVersion = V1_0_0_0 ) From 5ab6809aecc5a50ee523d27983762d0abdcc0105 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 3 Sep 2020 16:08:03 +0100 Subject: [PATCH 7/7] gofmt --- mocks/mocks.go | 1 + 1 file changed, 1 insertion(+) diff --git a/mocks/mocks.go b/mocks/mocks.go index a32f12ef0..5b0e4329e 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -15,6 +15,7 @@ package mocks import ( "errors" + "github.com/Shopify/sarama" )