diff --git a/admin_test.go b/admin_test.go index 53c4659f9..754003f68 100644 --- a/admin_test.go +++ b/admin_test.go @@ -16,7 +16,7 @@ func TestClusterAdmin(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -38,7 +38,7 @@ func TestClusterAdminInvalidController(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err == nil { @@ -61,7 +61,7 @@ func TestClusterAdminCreateTopic(t *testing.T) { "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -89,7 +89,7 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -117,7 +117,7 @@ func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_11_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -148,7 +148,7 @@ func TestClusterAdminListTopics(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -198,7 +198,7 @@ func TestClusterAdminDeleteTopic(t *testing.T) { "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -227,7 +227,7 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) { "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -256,7 +256,7 @@ func TestClusterAdminCreatePartitions(t *testing.T) { "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -285,7 +285,7 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -314,7 +314,7 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -350,7 +350,7 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) { "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_4_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -388,7 +388,7 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_3_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -427,7 +427,7 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) { "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_4_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -472,7 +472,7 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V2_3_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -508,7 +508,7 @@ func TestClusterAdminDeleteRecords(t *testing.T) { "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -567,7 +567,7 @@ func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -604,7 +604,7 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -660,7 +660,7 @@ func TestClusterAdminDescribeConfig(t *testing.T) { {V2_0_0_0, 2, true}, } for _, tt := range tests { - config := NewConfig() + config := NewTestConfig() config.Version = tt.saramaVersion admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -715,7 +715,7 @@ func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponseWithErrorCode(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -760,7 +760,7 @@ func TestClusterAdminDescribeBrokerConfig(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin( []string{ @@ -800,7 +800,7 @@ func TestClusterAdminAlterConfig(t *testing.T) { "AlterConfigsRequest": NewMockAlterConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -833,7 +833,7 @@ func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { "AlterConfigsRequest": NewMockAlterConfigsResponseWithErrorCode(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -873,7 +873,7 @@ func TestClusterAdminAlterBrokerConfig(t *testing.T) { "AlterConfigsRequest": NewMockAlterConfigsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin( []string{ @@ -918,7 +918,7 @@ func TestClusterAdminCreateAcl(t *testing.T) { "CreateAclsRequest": NewMockCreateAclsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -951,7 +951,7 @@ func TestClusterAdminListAcls(t *testing.T) { "CreateAclsRequest": NewMockCreateAclsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -997,7 +997,7 @@ func TestClusterAdminDeleteAcl(t *testing.T) { "DeleteAclsRequest": NewMockDeleteAclsResponse(t), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { @@ -1033,7 +1033,7 @@ func TestDescribeTopic(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1071,7 +1071,7 @@ func TestDescribeTopicWithVersion0_11(t *testing.T) { SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V0_11_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1114,7 +1114,7 @@ func TestDescribeConsumerGroup(t *testing.T) { "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1153,7 +1153,7 @@ func TestListConsumerGroups(t *testing.T) { AddGroup("my-group", "consumer"), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1215,7 +1215,7 @@ func TestListConsumerGroupsMultiBroker(t *testing.T) { AddGroup(secondGroup, "consumer"), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1267,7 +1267,7 @@ func TestListConsumerGroupOffsets(t *testing.T) { "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1312,7 +1312,7 @@ func TestDeleteConsumerGroup(t *testing.T) { "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) @@ -1341,7 +1341,7 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) { SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_1_0_0 client, err := NewClient([]string{seedBroker1.Addr()}, config) @@ -1386,7 +1386,7 @@ func TestDescribeLogDirs(t *testing.T) { SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}), }) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) diff --git a/async_producer_test.go b/async_producer_test.go index f8c2656cf..ab762ba48 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -111,7 +111,7 @@ func TestAsyncProducer(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -162,7 +162,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { leader.Returns(prodSuccess) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 5 config.Producer.Return.Successes = true producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -202,7 +202,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError) leader1.Returns(prodResponse1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 5 config.Producer.Return.Successes = true config.Producer.Partitioner = NewRoundRobinPartitioner @@ -235,7 +235,7 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) { prodResponse.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 2 config.Producer.Return.Successes = true config.Producer.Partitioner = func(topic string) Partitioner { @@ -274,7 +274,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -324,7 +324,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 2 config.Producer.Return.Successes = true config.Producer.Retry.Max = 0 // disable! @@ -388,7 +388,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { leader.Returns(prodSuccess) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Partitioner = NewManualPartitioner @@ -425,7 +425,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -462,7 +462,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 3 @@ -505,7 +505,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 4 @@ -561,7 +561,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Retry.Max = 4 @@ -629,7 +629,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -686,7 +686,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 config.Producer.Retry.Max = 1 @@ -742,7 +742,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 5 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -807,7 +807,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 @@ -856,7 +856,7 @@ func TestAsyncProducerNoReturns(t *testing.T) { metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = false config.Producer.Return.Errors = false @@ -905,7 +905,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { } broker.Returns(initProducerID) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 4 @@ -1049,7 +1049,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { return nil } - config := NewConfig() + config := NewTestConfig() config.Version = V0_11_0_0 config.Producer.Idempotent = true config.Net.MaxOpenRequests = 1 @@ -1100,7 +1100,7 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { } broker.Returns(initProducerID) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Retry.Max = 400000 @@ -1150,7 +1150,7 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { } broker.Returns(initProducerID) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Flush.Frequency = 10 * time.Millisecond config.Producer.Return.Successes = true @@ -1216,7 +1216,7 @@ func TestBrokerProducerShutdown(t *testing.T) { "my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError) mockBroker.Returns(metadataResponse) - producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, nil) + producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -1264,7 +1264,7 @@ func testProducerInterceptor( metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true config.Producer.Interceptors = interceptors @@ -1391,7 +1391,7 @@ ProducerLoop: // for the Successes channel to be populated, you have to set // config.Producer.Return.Successes to true. func ExampleAsyncProducer_goroutines() { - config := NewConfig() + config := NewTestConfig() config.Producer.Return.Successes = true producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { @@ -1441,3 +1441,12 @@ ProducerLoop: log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors) } + +// NewTestConfig returns a config meant to be used by tests. +// Due to inconsistencies with the request versions the clients send using the default Kafka version +// and the response versions our mocks use, we default to the minimum Kafka version in most tests +func NewTestConfig() *Config { + config := NewConfig() + config.Version = MinVersion + return config +} diff --git a/broker_test.go b/broker_test.go index c7d548600..37064b7cb 100644 --- a/broker_test.go +++ b/broker_test.go @@ -92,7 +92,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { broker := NewBroker(mb.Addr()) // Set the broker id in order to validate local broker metrics broker.id = 0 - conf := NewConfig() + conf := NewTestConfig() conf.Version = tt.version err := broker.Open(conf) if err != nil { @@ -226,7 +226,7 @@ func TestSASLOAuthBearer(t *testing.T) { broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypeOAuth conf.Net.SASL.TokenProvider = test.tokProvider @@ -358,7 +358,7 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { "SaslHandshakeRequest": mockSASLHandshakeResponse, }) - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512 conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient } @@ -464,7 +464,7 @@ func TestSASLPlainAuth(t *testing.T) { broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypePlaintext conf.Net.SASL.AuthIdentity = test.authidentity conf.Net.SASL.User = "token" @@ -550,7 +550,7 @@ func TestSASLReadTimeout(t *testing.T) { broker.requestsInFlight = metrics.NilCounter{} } - conf := NewConfig() + conf := NewTestConfig() { conf.Net.ReadTimeout = time.Millisecond conf.Net.SASL.Mechanism = SASLTypePlaintext @@ -640,7 +640,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} - conf := NewConfig() + conf := NewTestConfig() conf.Net.SASL.Mechanism = SASLTypeGSSAPI conf.Net.SASL.GSSAPI.ServiceName = "kafka" conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf" diff --git a/client_test.go b/client_test.go index 9b42e2d99..797e2acb5 100644 --- a/client_test.go +++ b/client_test.go @@ -21,7 +21,7 @@ func TestSimpleClient(t *testing.T) { seedBroker.Returns(new(MetadataResponse)) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -42,7 +42,7 @@ func TestCachedPartitions(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 c, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -80,7 +80,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -130,7 +130,7 @@ func TestClientSeedBrokers(t *testing.T) { metadataResponse.AddBroker("localhost:12345", 2) seedBroker.Returns(metadataResponse) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -152,7 +152,7 @@ func TestClientMetadata(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable) seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -230,7 +230,7 @@ func TestClientMetadataWithOfflineReplicas(t *testing.T) { seedBroker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Version = V1_0_0_0 config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) @@ -312,7 +312,7 @@ func TestClientGetOffset(t *testing.T) { metadata.AddBroker(leaderAddr, leader.BrokerID()) seedBroker.Returns(metadata) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -358,7 +358,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { retryCount := int32(0) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration { atomic.AddInt32(&retryCount, 1) @@ -393,7 +393,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) @@ -431,7 +431,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) seedBroker.Returns(metadataResponse1) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) if err != nil { @@ -491,7 +491,7 @@ func TestClientRefreshBehaviour(t *testing.T) { metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataResponse2) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -524,7 +524,7 @@ func TestClientRefreshBrokers(t *testing.T) { metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) initialSeed.Returns(metadataResponse1) - c, err := NewClient([]string{initialSeed.Addr()}, nil) + c, err := NewClient([]string{initialSeed.Addr()}, NewTestConfig()) client := c.(*client) if err != nil { @@ -555,7 +555,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -581,7 +581,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) { emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) - conf := NewConfig() + conf := NewTestConfig() conf.Metadata.Retry.Backoff = 0 conf.Metadata.RefreshFrequency = 0 c, err := NewClient([]string{initialSeed.Addr()}, conf) @@ -648,7 +648,7 @@ func TestClientController(t *testing.T) { SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()), }) - cfg := NewConfig() + cfg := NewTestConfig() // test kafka version greater than 0.10.0.0 cfg.Version = V0_10_0_0 @@ -690,7 +690,7 @@ func TestClientMetadataTimeout(t *testing.T) { emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) - conf := NewConfig() + conf := NewTestConfig() // Speed up the metadata request failure because of a read timeout conf.Net.ReadTimeout = 100 * time.Millisecond // Disable backoff and refresh @@ -756,7 +756,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse1) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -830,7 +830,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 0 client, err := NewClient([]string{seedBroker.Addr()}, config) @@ -882,7 +882,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) { metadataResponse := new(MetadataResponse) seedBroker.Returns(metadataResponse) - conf := NewConfig() + conf := NewTestConfig() conf.Metadata.RefreshFrequency = 100 * time.Millisecond client, err := NewClient([]string{seedBroker.Addr()}, conf) if err != nil { diff --git a/client_tls_test.go b/client_tls_test.go index a825cf767..489b67f85 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -188,7 +188,7 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon seedBroker.Returns(new(MetadataResponse)) - config := NewConfig() + config := NewTestConfig() config.Net.TLS.Enable = true config.Net.TLS.Config = clientConfig diff --git a/config.go b/config.go index 9b7ce7aeb..43e739cad 100644 --- a/config.go +++ b/config.go @@ -486,7 +486,7 @@ func NewConfig() *Config { c.ClientID = defaultClientID c.ChannelBufferSize = 256 - c.Version = MinVersion + c.Version = DefaultVersion c.MetricRegistry = metrics.NewRegistry() return c diff --git a/config_test.go b/config_test.go index 6da9dbe5f..2e34ba3d1 100644 --- a/config_test.go +++ b/config_test.go @@ -8,7 +8,7 @@ import ( ) func TestDefaultConfigValidates(t *testing.T) { - config := NewConfig() + config := NewTestConfig() if err := config.Validate(); err != nil { t.Error(err) } @@ -18,7 +18,7 @@ func TestDefaultConfigValidates(t *testing.T) { } func TestInvalidClientIDConfigValidates(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.ClientID = "foo:bar" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { t.Error("Expected invalid ClientID, got ", err) @@ -26,7 +26,7 @@ func TestInvalidClientIDConfigValidates(t *testing.T) { } func TestEmptyClientIDConfigValidates(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.ClientID = "" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { t.Error("Expected invalid ClientID, got ", err) @@ -194,7 +194,7 @@ func TestNetConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -227,7 +227,7 @@ func TestMetadataConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -249,7 +249,7 @@ func TestAdminConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -349,7 +349,7 @@ func TestProducerConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -379,7 +379,7 @@ func TestConsumerConfigValidates(t *testing.T) { } for i, test := range tests { - c := NewConfig() + c := NewTestConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) @@ -388,7 +388,7 @@ func TestConsumerConfigValidates(t *testing.T) { } func TestLZ4ConfigValidation(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionLZ4 if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" { t.Error("Expected invalid lz4/kafka version error, got ", err) @@ -400,7 +400,7 @@ func TestLZ4ConfigValidation(t *testing.T) { } func TestZstdConfigValidation(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionZSTD if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" { t.Error("Expected invalid zstd/kafka version error, got ", err) @@ -419,7 +419,7 @@ func ExampleConfig_metrics() { appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry) appGauge.Update(1) - config := NewConfig() + config := NewTestConfig() // Use a prefix registry instead of the default local one config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.") diff --git a/consumer_group_test.go b/consumer_group_test.go index bbef97488..06eb1d964 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -18,7 +18,7 @@ func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, cla } func ExampleConsumerGroup() { - config := NewConfig() + config := NewTestConfig() config.Version = V2_0_0_0 // specify appropriate version config.Consumer.Return.Errors = true diff --git a/consumer_test.go b/consumer_test.go index 2416b2c83..9b48d8cdf 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -36,7 +36,7 @@ func TestConsumerOffsetManual(t *testing.T) { }) // When - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -81,7 +81,7 @@ func TestConsumerOffsetNewest(t *testing.T) { SetHighWaterMark("my_topic", 0, 14), }) - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -118,7 +118,7 @@ func TestConsumerRecreate(t *testing.T) { SetMessage("my_topic", 0, 10, testMsg), }) - c, err := NewConsumer([]string{broker0.Addr()}, nil) + c, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -158,7 +158,7 @@ func TestConsumerDuplicate(t *testing.T) { "FetchRequest": NewMockFetchResponse(t, 1), }) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 0 c, err := NewConsumer([]string{broker0.Addr()}, config) if err != nil { @@ -257,7 +257,7 @@ func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) { // If consumer fails to refresh metadata it keeps retrying with frequency // specified by `Config.Consumer.Retry.Backoff`. func TestConsumerLeaderRefreshError(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.Backoff = 200 * time.Millisecond config.Consumer.Return.Errors = true @@ -269,7 +269,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) { func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { var calls int32 = 0 - config := NewConfig() + config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration { atomic.AddInt32(&calls, 1) @@ -294,7 +294,7 @@ func TestConsumerInvalidTopic(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()), }) - c, err := NewConsumer([]string{broker0.Addr()}, nil) + c, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -327,7 +327,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) { SetMessage("my_topic", 0, 123, testMsg), }) - config := NewConfig() + config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.Backoff = 100 * time.Millisecond config.Consumer.Return.Errors = true @@ -382,7 +382,7 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) { "FetchRequest": NewMockWrapper(fetchResponse), }) - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -421,7 +421,7 @@ func TestConsumerExtraOffsets(t *testing.T) { newFetchResponse.SetLastStableOffset("my_topic", 0, 4) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { var offsetResponseVersion int16 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Consumer.Return.Errors = true if fetchResponse1.Version >= 4 { cfg.Version = V0_11_0_0 @@ -487,7 +487,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { fetchResponse2 := &FetchResponse{Version: 4} fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Consumer.Return.Errors = true cfg.Version = V0_11_0_0 @@ -533,7 +533,7 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V0_11_0_0 broker0 := NewMockBroker(t, 0) @@ -576,7 +576,7 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) { fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V1_1_0_0 broker0 := NewMockBroker(t, 0) @@ -635,7 +635,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { newFetchResponse.SetLastStableOffset("my_topic", 0, 11) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { var offsetResponseVersion int16 - cfg := NewConfig() + cfg := NewTestConfig() if fetchResponse1.Version >= 4 { cfg.Version = V0_11_0_0 offsetResponseVersion = 1 @@ -710,7 +710,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { }) // launch test goroutines - config := NewConfig() + config := NewTestConfig() config.Consumer.Retry.Backoff = 50 master, err := NewConsumer([]string{seedBroker.Addr()}, config) if err != nil { @@ -869,7 +869,7 @@ func TestConsumerInterleavedClose(t *testing.T) { SetMessage("my_topic", 1, 2000, testMsg), }) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 0 master, err := NewConsumer([]string{broker0.Addr()}, config) if err != nil { @@ -930,7 +930,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { "FetchRequest": mockFetchResponse, }) - config := NewConfig() + config := NewTestConfig() config.Consumer.Return.Errors = true config.Consumer.Retry.Backoff = 100 * time.Millisecond config.ChannelBufferSize = 1 @@ -1007,7 +1007,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) { SetOffset("my_topic", 0, OffsetOldest, 2345), }) - master, err := NewConsumer([]string{broker0.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -1044,7 +1044,7 @@ func TestConsumerExpiryTicker(t *testing.T) { "FetchRequest": NewMockSequence(fetchResponse1), }) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 0 config.Consumer.MaxProcessingTime = 10 * time.Millisecond master, err := NewConsumer([]string{broker0.Addr()}, config) @@ -1113,7 +1113,7 @@ func TestConsumerTimestamps(t *testing.T) { } { var fr *FetchResponse var offsetResponseVersion int16 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = d.kversion switch { case d.kversion.IsAtLeast(V0_11_0_0): @@ -1213,7 +1213,7 @@ func TestExcludeUncommitted(t *testing.T) { "FetchRequest": NewMockWrapper(fetchResponse), }) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Consumer.Return.Errors = true cfg.Version = V0_11_0_0 cfg.Consumer.IsolationLevel = ReadCommitted @@ -1257,7 +1257,7 @@ func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int6 // This example shows how to use the consumer to read messages // from a single partition. func ExampleConsumer() { - consumer, err := NewConsumer([]string{"localhost:9092"}, nil) + consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig()) if err != nil { panic(err) } @@ -1366,7 +1366,7 @@ func testConsumerInterceptor( SetOffset("my_topic", 0, OffsetNewest, 0), "FetchRequest": mockFetchResponse, }) - config := NewConfig() + config := NewTestConfig() config.Consumer.Interceptors = interceptors // When master, err := NewConsumer([]string{broker0.Addr()}, config) diff --git a/functional_client_test.go b/functional_client_test.go index 513b8ee9b..bc4715dc4 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -15,7 +15,7 @@ func TestFuncConnectionFailure(t *testing.T) { FunctionalTestEnv.Proxies["kafka1"].Enabled = false SaveProxy(t, "kafka1") - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 _, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config) @@ -28,7 +28,7 @@ func TestFuncClientMetadata(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 10 * time.Millisecond client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) @@ -72,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 4d71510a8..8a25f9b52 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string { } func testFuncConsumerGroupFuzzySeed(topic string) error { - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { return err } @@ -238,7 +238,7 @@ type testFuncConsumerGroupMember struct { func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxMessages int32, sink *testFuncConsumerGroupSink, topics ...string) *testFuncConsumerGroupMember { t.Helper() - config := NewConfig() + config := NewTestConfig() config.ClientID = clientID config.Version = V0_10_2_0 config.Consumer.Return.Errors = true diff --git a/functional_consumer_test.go b/functional_consumer_test.go index aca9434db..0ec993087 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -18,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -49,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { t.Fatal(err) } - c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -141,7 +141,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.Consumer.IsolationLevel = ReadCommitted config.Version = V0_11_0_0 @@ -195,7 +195,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi var producedMessages []*ProducerMessage for _, prodVer := range clientVersions { for _, codec := range codecs { - prodCfg := NewConfig() + prodCfg := NewTestConfig() prodCfg.Version = prodVer prodCfg.Producer.Return.Successes = true prodCfg.Producer.Return.Errors = true @@ -251,7 +251,7 @@ consumerVersionLoop: t.Logf("*** Consuming with client version %s\n", consVer) // Create a partition consumer that should start from the first produced // message. - consCfg := NewConfig() + consCfg := NewTestConfig() consCfg.Version = consVer c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) if err != nil { diff --git a/functional_offset_manager_test.go b/functional_offset_manager_test.go index 32e160aab..5d7f65663 100644 --- a/functional_offset_manager_test.go +++ b/functional_offset_manager_test.go @@ -11,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { t.Fatal(err) } diff --git a/functional_producer_test.go b/functional_producer_test.go index b83dc72e3..7c1925b06 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -18,37 +18,37 @@ import ( const TestBatchSize = 1000 func TestFuncProducing(t *testing.T) { - config := NewConfig() + config := NewTestConfig() testProducingMessages(t, config) } func TestFuncProducingGzip(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionGZIP testProducingMessages(t, config) } func TestFuncProducingSnappy(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Compression = CompressionSnappy testProducingMessages(t, config) } func TestFuncProducingZstd(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Version = V2_1_0_0 config.Producer.Compression = CompressionZSTD testProducingMessages(t, config) } func TestFuncProducingNoResponse(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.RequiredAcks = NoResponse testProducingMessages(t, config) } func TestFuncProducingFlushing(t *testing.T) { - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = TestBatchSize / 8 config.Producer.Flush.Frequency = 250 * time.Millisecond testProducingMessages(t, config) @@ -58,7 +58,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -111,7 +111,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Frequency = 250 * time.Millisecond config.Producer.Idempotent = true config.Producer.Timeout = 500 * time.Millisecond @@ -185,7 +185,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } func TestInterceptors(t *testing.T) { - config := NewConfig() + config := NewTestConfig() setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -442,7 +442,7 @@ func BenchmarkProducerSmallSinglePartition(b *testing.B) { benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128))) } func BenchmarkProducerMediumSnappy(b *testing.B) { - conf := NewConfig() + conf := NewTestConfig() conf.Producer.Compression = CompressionSnappy benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024))) } diff --git a/functional_test.go b/functional_test.go index 89dbfe2dc..f4f36b8cc 100644 --- a/functional_test.go +++ b/functional_test.go @@ -158,7 +158,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err for i := 0; i < 45 && !allBrokersUp; i++ { Logger.Println("waiting for kafka brokers to come up") time.Sleep(1 * time.Second) - config := NewConfig() + config := NewTestConfig() config.Version, err = ParseKafkaVersion(env.KafkaVersion) if err != nil { return err @@ -267,7 +267,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { } Logger.Println("Creating topics") - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 5 config.Metadata.Retry.Backoff = 10 * time.Second config.ClientID = "sarama-tests" diff --git a/kerberos_client_test.go b/kerberos_client_test.go index 003da6a3e..9cd7a21b6 100644 --- a/kerberos_client_test.go +++ b/kerberos_client_test.go @@ -17,7 +17,7 @@ import ( func TestFaildToCreateKerberosConfig(t *testing.T) { expectedErr := errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory") - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -41,7 +41,7 @@ func TestCreateWithPassword(t *testing.T) { expectedDoman := "EXAMPLE.COM" expectedCName := "client" - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -70,7 +70,7 @@ func TestCreateWithKeyTab(t *testing.T) { } // Expect to try to create a client with keytab and fails with "o such file or directory" error expectedErr := errors.New("open nonexist.keytab: no such file or directory") - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -92,7 +92,7 @@ func TestCreateWithDisablePAFXFAST(t *testing.T) { } // Expect to try to create a client with keytab and fails with "o such file or directory" error expectedErr := errors.New("open nonexist.keytab: no such file or directory") - clientConfig := NewConfig() + clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI clientConfig.Net.SASL.Enable = true clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka" diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index b5d92aad8..fc354232c 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -43,7 +43,7 @@ func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { } func TestProducerReturnsExpectationsToChannels(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Return.Successes = true mp := NewAsyncProducer(t, config) diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 311cfa026..e1202cc7a 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -20,7 +20,7 @@ func TestMockConsumerImplementsConsumerInterface(t *testing.T) { } func TestConsumerHandlesExpectations(t *testing.T) { - consumer := NewConsumer(t, nil) + consumer := NewConsumer(t, NewTestConfig()) defer func() { if err := consumer.Close(); err != nil { t.Error(err) @@ -65,7 +65,7 @@ func TestConsumerHandlesExpectations(t *testing.T) { } func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { - consumer := NewConsumer(t, nil) + consumer := NewConsumer(t, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) @@ -91,7 +91,7 @@ func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) _, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest) if err != errOutOfExpectations { @@ -109,7 +109,7 @@ func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")}) if err := consumer.Close(); err != nil { @@ -123,7 +123,7 @@ func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { func TestConsumerWithWrongOffsetExpectation(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) _, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest) @@ -142,7 +142,7 @@ func TestConsumerWithWrongOffsetExpectation(t *testing.T) { func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) @@ -167,7 +167,7 @@ func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) pcmock.YieldError(sarama.ErrInvalidMessage) @@ -194,7 +194,7 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { func TestConsumerTopicMetadata(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) consumer.SetTopicMetadata(map[string][]int32{ "test1": {0, 1, 2, 3}, @@ -237,7 +237,7 @@ func TestConsumerTopicMetadata(t *testing.T) { func TestConsumerUnexpectedTopicMetadata(t *testing.T) { trm := newTestReporterMock() - consumer := NewConsumer(trm, nil) + consumer := NewConsumer(trm, NewTestConfig()) if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers { t.Error("Expected sarama.ErrOutOfBrokers, found", err) diff --git a/mocks/mocks.go b/mocks/mocks.go index 1e1d9f1c6..5b0e4329e 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -15,6 +15,8 @@ package mocks import ( "errors" + + "github.com/Shopify/sarama" ) // ErrorReporter is a simple interface that includes the testing.T methods we use to report @@ -39,3 +41,12 @@ type producerExpectation struct { Result error CheckFunction ValueChecker } + +// NewTestConfig returns a config meant to be used by tests. +// Due to inconsistencies with the request versions the clients send using the default Kafka version +// and the response versions our mocks use, we default to the minimum Kafka version in most tests +func NewTestConfig() *sarama.Config { + config := sarama.NewConfig() + config.Version = sarama.MinVersion + return config +} diff --git a/offset_manager_test.go b/offset_manager_test.go index 5aa2ee0ff..249e53444 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -50,7 +50,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) { - return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig()) + return initOffsetManagerWithBackoffFunc(t, retention, nil, NewTestConfig()) } func initPartitionOffsetManager(t *testing.T, om OffsetManager, @@ -76,7 +76,7 @@ func TestNewOffsetManager(t *testing.T) { seedBroker.Returns(new(MetadataResponse)) defer seedBroker.Close() - testClient, err := NewClient([]string{seedBroker.Addr()}, nil) + testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } @@ -120,7 +120,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` for _, tt := range offsetsautocommitTestTable { t.Run(tt.name, func(t *testing.T) { - config := NewConfig() + config := NewTestConfig() if tt.set { config.Consumer.Offsets.AutoCommit.Enable = tt.enable } @@ -171,7 +171,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false - config := NewConfig() + config := NewTestConfig() config.Consumer.Offsets.AutoCommit.Enable = false om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) @@ -278,7 +278,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { atomic.AddInt32(&retryCount, 1) return 0 } - om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig()) + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewTestConfig()) // Error on first fetchInitialOffset call responseBlock := OffsetFetchResponseBlock{ diff --git a/partitioner_test.go b/partitioner_test.go index f6dde0204..9c0161122 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -212,7 +212,7 @@ func TestManualPartitioner(t *testing.T) { // This example shows how you can partition messages randomly, even when a key is set, // by overriding Config.Producer.Partitioner. func ExamplePartitioner_random() { - config := NewConfig() + config := NewTestConfig() config.Producer.Partitioner = NewRandomPartitioner producer, err := NewSyncProducer([]string{"localhost:9092"}, config) @@ -236,7 +236,7 @@ func ExamplePartitioner_random() { // This example shows how to assign partitions to your messages manually. func ExamplePartitioner_manual() { - config := NewConfig() + config := NewTestConfig() // First, we tell the producer that we are going to partition ourselves. config.Producer.Partitioner = NewManualPartitioner @@ -268,7 +268,7 @@ func ExamplePartitioner_manual() { // This example shows how to set a different partitioner depending on the topic. func ExamplePartitioner_per_topic() { - config := NewConfig() + config := NewTestConfig() config.Producer.Partitioner = func(topic string) Partitioner { switch topic { case "access_log", "error_log": diff --git a/produce_set_test.go b/produce_set_test.go index c0928834b..221f40b59 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -7,7 +7,7 @@ import ( ) func makeProduceSet() (*asyncProducer, *produceSet) { - conf := NewConfig() + conf := NewTestConfig() txnmgr, _ := newTransactionManager(conf, nil) parent := &asyncProducer{ conf: conf, @@ -285,7 +285,7 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) { const pID = 1000 const pEpoch = 1234 - config := NewConfig() + config := NewTestConfig() config.Producer.RequiredAcks = WaitForAll config.Producer.Idempotent = true config.Version = V0_11_0_0 diff --git a/sync_producer_test.go b/sync_producer_test.go index 9c480b0b5..5f14f7be0 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -21,7 +21,9 @@ func TestSyncProducer(t *testing.T) { leader.Returns(prodSuccess) } - producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil) + config := NewTestConfig() + config.Producer.Return.Successes = true + producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } @@ -67,7 +69,7 @@ func TestSyncProducerBatch(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 3 config.Producer.Return.Successes = true producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) @@ -115,7 +117,7 @@ func TestConcurrentSyncProducer(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - config := NewConfig() + config := NewTestConfig() config.Producer.Flush.Messages = 100 config.Producer.Return.Successes = true producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) @@ -154,7 +156,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) broker.Returns(metadataResponse) - config := NewConfig() + config := NewTestConfig() config.Metadata.Retry.Max = 0 config.Producer.Retry.Max = 0 config.Producer.Return.Successes = true @@ -187,7 +189,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) seedBroker.Returns(metadataLeader1) - config := NewConfig() + config := NewTestConfig() config.Producer.Retry.Max = 0 // disable! config.Producer.Retry.Backoff = 0 config.Producer.Return.Successes = true diff --git a/utils.go b/utils.go index 93bdeefef..3e9dfd7ad 100644 --- a/utils.go +++ b/utils.go @@ -190,14 +190,15 @@ var ( V2_5_0_0, V2_6_0_0, } - MinVersion = V0_8_2_0 - MaxVersion = V2_6_0_0 + MinVersion = V0_8_2_0 + MaxVersion = V2_6_0_0 + DefaultVersion = V1_0_0_0 ) //ParseKafkaVersion parses and returns kafka version or error from a string func ParseKafkaVersion(s string) (KafkaVersion, error) { if len(s) < 5 { - return MinVersion, fmt.Errorf("invalid version `%s`", s) + return DefaultVersion, fmt.Errorf("invalid version `%s`", s) } var major, minor, veryMinor, patch uint var err error @@ -207,7 +208,7 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) { err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor}) } if err != nil { - return MinVersion, err + return DefaultVersion, err } return newKafkaVersion(major, minor, veryMinor, patch), nil }