diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 2f59183f0..ee2a9e4cd 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -138,20 +138,141 @@ func TestVersionMatrixIdempotent(t *testing.T) { } func TestReadOnlyAndAllCommittedMessages(t *testing.T) { + prevLogger := Logger + defer func() { Logger = prevLogger }() + Logger = &testLogger{t} + checkKafkaVersion(t, "0.11.0") setupFunctionalTest(t) defer teardownFunctionalTest(t) config := NewTestConfig() + config.ClientID = t.Name() + config.Net.MaxOpenRequests = 1 config.Consumer.IsolationLevel = ReadCommitted + config.Producer.Idempotent = true + config.Producer.Return.Successes = true + config.Producer.RequiredAcks = WaitForAll config.Version = V0_11_0_0 + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer client.Close() + controller, err := client.Controller() + if err != nil { + t.Fatal(err) + } + defer controller.Close() + + transactionalID := "transactional.id config" + + var coordinator *Broker + + // wait until transactional topic is available and then connect to the + // coordinator broker + for coordinator == nil { + if _, err := client.Leader("__transaction_state", 0); err != nil { + time.Sleep(2 * time.Second) + _ = client.RefreshMetadata("__transaction_state") + continue + } + coordRes, err := controller.FindCoordinator(&FindCoordinatorRequest{ + Version: 2, + CoordinatorKey: transactionalID, + CoordinatorType: CoordinatorTransaction, + }) + if err != nil { + t.Fatal(err) + } + if coordRes.Err != ErrNoError { + continue + } + if err := coordRes.Coordinator.Open(client.Config()); err != nil { + t.Fatal(err) + } + coordinator = coordRes.Coordinator + break + } + + // produce some uncommitted messages to the topic + pidRes, err := coordinator.InitProducerID(&InitProducerIDRequest{ + TransactionalID: &transactionalID, + TransactionTimeout: 10 * time.Second, + }) + if err != nil { + t.Fatal(err) + } + _, _ = coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{ + TransactionalID: transactionalID, + ProducerID: pidRes.ProducerID, + ProducerEpoch: pidRes.ProducerEpoch, + TopicPartitions: map[string][]int32{ + uncommittedTopic: {0}, + }, + }) + if err != nil { + t.Fatal(err) + } + ps := &produceSet{ + msgs: make(map[string]map[int32]*partitionSet), + parent: &asyncProducer{ + conf: config, + }, + producerID: pidRes.ProducerID, + producerEpoch: pidRes.ProducerEpoch, + } + _ = ps.add(&ProducerMessage{ + Topic: uncommittedTopic, + Partition: 0, + Value: StringEncoder("uncommitted message 1"), + }) + _ = ps.add(&ProducerMessage{ + Topic: uncommittedTopic, + Partition: 0, + Value: StringEncoder("uncommitted message 2"), + }) + produceReq := ps.buildRequest() + produceReq.TransactionalID = &transactionalID + if _, err := coordinator.Produce(produceReq); err != nil { + t.Fatal(err) + } + + // now produce some committed messages to the topic + producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer producer.Close() + + for i := 1; i <= 6; i++ { + producer.Input() <- &ProducerMessage{ + Topic: uncommittedTopic, + Partition: 0, + Value: StringEncoder(fmt.Sprintf("Committed %v", i)), + } + <-producer.Successes() + } + + // now abort the uncommitted transaction + if _, err := coordinator.EndTxn(&EndTxnRequest{ + TransactionalID: transactionalID, + ProducerID: pidRes.ProducerID, + ProducerEpoch: pidRes.ProducerEpoch, + TransactionResult: false, // aborted + }); err != nil { + t.Fatal(err) + } + + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } + defer consumer.Close() - pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest) + pc, err := consumer.ConsumePartition(uncommittedTopic, 0, OffsetOldest) require.NoError(t, err) msgChannel := pc.Messages() diff --git a/functional_test.go b/functional_test.go index 95bffb964..4b66e872f 100644 --- a/functional_test.go +++ b/functional_test.go @@ -6,14 +6,11 @@ package sarama import ( "context" "fmt" - "io" "log" "net" - "net/http" "net/url" "os" "os/exec" - "path/filepath" "strconv" "strings" "testing" @@ -22,9 +19,7 @@ import ( toxiproxy "github.com/Shopify/toxiproxy/client" ) -const ( - uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar" -) +const uncommittedTopic = "uncommitted-topic-test-4" var ( testTopicDetails = map[string]*TopicDetail{ @@ -40,7 +35,7 @@ var ( NumPartitions: 64, ReplicationFactor: 3, }, - "uncommitted-topic-test-4": { + uncommittedTopic: { NumPartitions: 1, ReplicationFactor: 3, }, @@ -341,40 +336,6 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { } } - // This is kind of gross, but we don't actually have support for doing transactional publishing - // with sarama, so we need to use a java-based tool to publish uncommitted messages to - // the uncommitted-topic-test-4 topic - jarName := filepath.Base(uncomittedMsgJar) - if _, err := os.Stat(jarName); err != nil { - Logger.Printf("Downloading %s\n", uncomittedMsgJar) - req, err := http.NewRequest("GET", uncomittedMsgJar, nil) - if err != nil { - return fmt.Errorf("failed creating requst for uncommitted msg jar: %w", err) - } - res, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) - } - defer res.Body.Close() - jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644) - if err != nil { - return fmt.Errorf("failed opening the uncommitted msg jar: %w", err) - } - defer jarFile.Close() - - _, err = io.Copy(jarFile, res.Body) - if err != nil { - return fmt.Errorf("failed writing the uncommitted msg jar: %w", err) - } - } - - c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4") - c.Stdout = os.Stdout - c.Stderr = os.Stderr - err = c.Run() - if err != nil { - return fmt.Errorf("failed running uncommitted msg jar: %w", err) - } return nil }