From 480a5481830f1c33a54d01b7d5f42b764b19f4bc Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Fri, 7 May 2021 17:51:01 +0100 Subject: [PATCH] fix(test): use Sarama transactional producer One of the pieces of #1695 was to use a small jar for publishing uncommited messages as part of the functional test. Replacing that with the native Sarama transactional producer Fixes #1733 Co-authored-by: KJTsanaktsidis --- functional_consumer_test.go | 113 +++++++++++++++++++++++++++++++++++- functional_test.go | 100 +++++++++++++++---------------- 2 files changed, 158 insertions(+), 55 deletions(-) diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 2f59183f07..cf95974588 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -8,6 +8,7 @@ import ( "math" "os" "sort" + "strconv" "sync" "testing" "time" @@ -143,15 +144,125 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { defer teardownFunctionalTest(t) config := NewTestConfig() + config.ClientID = t.Name() + config.Net.MaxOpenRequests = 1 config.Consumer.IsolationLevel = ReadCommitted + config.Producer.Idempotent = true + config.Producer.Return.Successes = true + config.Producer.RequiredAcks = WaitForAll config.Version = V0_11_0_0 + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer client.Close() + controller, err := client.Controller() + if err != nil { + t.Fatal(err) + } + defer controller.Close() + + transactionalID := strconv.FormatInt(time.Now().UnixNano()/(1<<22), 10) + + var coordinator *Broker + + // find the transaction coordinator + for { + coordRes, err := controller.FindCoordinator(&FindCoordinatorRequest{ + Version: 2, + CoordinatorKey: transactionalID, + CoordinatorType: CoordinatorTransaction, + }) + if err != nil { + t.Fatal(err) + } + if coordRes.Err != ErrNoError { + continue + } + if err := coordRes.Coordinator.Open(client.Config()); err != nil { + t.Fatal(err) + } + coordinator = coordRes.Coordinator + break + } + + // produce some uncommitted messages to the topic + pidRes, err := coordinator.InitProducerID(&InitProducerIDRequest{ + TransactionalID: &transactionalID, + TransactionTimeout: 10 * time.Second, + }) + if err != nil { + t.Fatal(err) + } + _, _ = coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{ + TransactionalID: transactionalID, + ProducerID: pidRes.ProducerID, + ProducerEpoch: pidRes.ProducerEpoch, + TopicPartitions: map[string][]int32{ + uncommittedTopic: {0}, + }, + }) + if err != nil { + t.Fatal(err) + } + ps := &produceSet{ + msgs: make(map[string]map[int32]*partitionSet), + parent: &asyncProducer{ + conf: config, + }, + producerID: pidRes.ProducerID, + producerEpoch: pidRes.ProducerEpoch, + } + _ = ps.add(&ProducerMessage{ + Topic: uncommittedTopic, + Partition: 0, + Value: StringEncoder("uncommitted message 1"), + }) + _ = ps.add(&ProducerMessage{ + Topic: uncommittedTopic, + Partition: 0, + Value: StringEncoder("uncommitted message 2"), + }) + produceReq := ps.buildRequest() + produceReq.TransactionalID = &transactionalID + if _, err := coordinator.Produce(produceReq); err != nil { + t.Fatal(err) + } + + // now produce some committed messages to the topic + producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer producer.Close() + + for i := 1; i <= 6; i++ { + producer.Input() <- &ProducerMessage{ + Topic: uncommittedTopic, + Partition: 0, + Value: StringEncoder(fmt.Sprintf("Committed %v", i)), + } + <-producer.Successes() + } + + // now abort the uncommitted transaction + if _, err := coordinator.EndTxn(&EndTxnRequest{ + TransactionalID: transactionalID, + ProducerID: pidRes.ProducerID, + ProducerEpoch: pidRes.ProducerEpoch, + TransactionResult: false, // aborted + }); err != nil { + t.Fatal(err) + } + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } + defer consumer.Close() - pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest) + pc, err := consumer.ConsumePartition(uncommittedTopic, 0, OffsetOldest) require.NoError(t, err) msgChannel := pc.Messages() diff --git a/functional_test.go b/functional_test.go index 4f6cfc9d86..6597b63dfb 100644 --- a/functional_test.go +++ b/functional_test.go @@ -6,13 +6,10 @@ package sarama import ( "context" "fmt" - "io" "log" - "net/http" "net/url" "os" "os/exec" - "path/filepath" "strconv" "strings" "testing" @@ -21,9 +18,7 @@ import ( toxiproxy "github.com/Shopify/toxiproxy/v2/client" ) -const ( - uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar" -) +const uncommittedTopic = "uncommitted-topic-test-4" var ( testTopicDetails = map[string]*TopicDetail{ @@ -39,7 +34,7 @@ var ( NumPartitions: 64, ReplicationFactor: 3, }, - "uncommitted-topic-test-4": { + uncommittedTopic: { NumPartitions: 1, ReplicationFactor: 3, }, @@ -277,7 +272,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { config := NewTestConfig() config.Metadata.Retry.Max = 5 config.Metadata.Retry.Backoff = 10 * time.Second - config.ClientID = "sarama-tests" + config.ClientID = "sarama-prepareTestTopics" var err error config.Version, err = ParseKafkaVersion(env.KafkaVersion) if err != nil { @@ -312,25 +307,29 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed // synchronously - var topicsOk bool - for i := 0; i < 20 && !topicsOk; i++ { - time.Sleep(1 * time.Second) - md, err := controller.GetMetadata(&MetadataRequest{ - Topics: testTopicNames, - }) - if err != nil { - return fmt.Errorf("failed to get metadata for test topics: %w", err) - } + { + var topicsOk bool + for i := 0; i < 30 && !topicsOk; i++ { + time.Sleep(1 * time.Second) + md, err := controller.GetMetadata(&MetadataRequest{ + Topics: testTopicNames, + }) + if err != nil { + return fmt.Errorf("failed to get metadata for test topics: %w", err) + } - topicsOk = true - for _, topicsMd := range md.Topics { - if !isTopicNotExistsErrorOrOk(topicsMd.Err) { - topicsOk = false + if len(md.Topics) == len(testTopicNames) { + topicsOk = true + for _, topicsMd := range md.Topics { + if !isTopicNotExistsErrorOrOk(topicsMd.Err) { + topicsOk = false + } + } } } - } - if !topicsOk { - return fmt.Errorf("timed out waiting for test topics to be gone") + if !topicsOk { + return fmt.Errorf("timed out waiting for test topics to be gone") + } } // now create the topics empty @@ -347,40 +346,33 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { } } - // This is kind of gross, but we don't actually have support for doing transactional publishing - // with sarama, so we need to use a java-based tool to publish uncommitted messages to - // the uncommitted-topic-test-4 topic - jarName := filepath.Base(uncomittedMsgJar) - if _, err := os.Stat(jarName); err != nil { - Logger.Printf("Downloading %s\n", uncomittedMsgJar) - req, err := http.NewRequest("GET", uncomittedMsgJar, nil) - if err != nil { - return fmt.Errorf("failed creating requst for uncommitted msg jar: %w", err) - } - res, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) - } - defer res.Body.Close() - jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644) - if err != nil { - return fmt.Errorf("failed opening the uncommitted msg jar: %w", err) - } - defer jarFile.Close() + // wait for the topics to _actually_ exist - the creates are not guaranteed to be processed + // synchronously + { + var topicsOk bool + for i := 0; i < 30 && !topicsOk; i++ { + time.Sleep(1 * time.Second) + md, err := controller.GetMetadata(&MetadataRequest{ + Topics: testTopicNames, + }) + if err != nil { + return fmt.Errorf("failed to get metadata for test topics: %w", err) + } - _, err = io.Copy(jarFile, res.Body) - if err != nil { - return fmt.Errorf("failed writing the uncommitted msg jar: %w", err) + if len(md.Topics) == len(testTopicNames) { + topicsOk = true + for _, topicsMd := range md.Topics { + if topicsMd.Err != ErrNoError { + topicsOk = false + } + } + } + } + if !topicsOk { + return fmt.Errorf("timed out waiting for test topics to be created") } } - c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4") - c.Stdout = os.Stdout - c.Stderr = os.Stderr - err = c.Run() - if err != nil { - return fmt.Errorf("failed running uncommitted msg jar: %w", err) - } return nil }