Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction is not properly aborted #729

Closed
dvachaiev opened this issue May 14, 2024 · 1 comment · Fixed by #735
Closed

Transaction is not properly aborted #729

dvachaiev opened this issue May 14, 2024 · 1 comment · Fixed by #735

Comments

@dvachaiev
Copy link

Calling Client.EndTransaction(ctx, kgo.TryAbort) doesn't abort transaction on the broker and records from it can be consumed with read_committed isolation level after next committed transaction:

consumed record: topic=test, partition=0, offset=115, value=abort 0
consumed record: topic=test, partition=0, offset=116, value=abort 1
consumed record: topic=test, partition=0, offset=117, value=abort 2
consumed record: topic=test, partition=0, offset=118, value=abort 3
consumed record: topic=test, partition=0, offset=119, value=abort 4
consumed record: topic=test, partition=0, offset=120, value=abort 5
consumed record: topic=test, partition=0, offset=121, value=abort 6
consumed record: topic=test, partition=0, offset=122, value=abort 7
consumed record: topic=test, partition=0, offset=123, value=abort 8
consumed record: topic=test, partition=0, offset=124, value=abort 9
consumed record: topic=test, partition=0, offset=125, value=commit 0
consumed record: topic=test, partition=0, offset=126, value=commit 1
consumed record: topic=test, partition=0, offset=127, value=commit 2
consumed record: topic=test, partition=0, offset=128, value=commit 3
consumed record: topic=test, partition=0, offset=129, value=commit 4
consumed record: topic=test, partition=0, offset=130, value=commit 5
consumed record: topic=test, partition=0, offset=131, value=commit 6
consumed record: topic=test, partition=0, offset=132, value=commit 7
consumed record: topic=test, partition=0, offset=133, value=commit 8
consumed record: topic=test, partition=0, offset=134, value=commit 9
consumed record: topic=test, partition=0, offset=135, value=�

Messages with abort in value shouldn't be consumed and there is no control record between aborted and committed records.

To reproduce I used slightly modified eos script:

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/twmb/franz-go/pkg/kerr"
	"github.com/twmb/franz-go/pkg/kgo"
)

var (
	seedBrokers = flag.String("brokers", "localhost:9092", "comma delimited list of seed brokers")
	produceTo   = flag.String("produce-to", "", "input topic to produce transactionally produce to")

	group = flag.String("group", "example-group", "group to use for consuming")

	produceTxnID = flag.String("produce-txn-id", "eos-example-input-producer", "transactional ID to use for the input producer")
)

func die(msg string, args ...any) {
	fmt.Fprintf(os.Stderr, msg, args...)
	os.Exit(1)
}

func main() {
	flag.Parse()

	if *produceTo == "" {
		die("missing -produce-to (%s)", *produceTo)
	}

	go consumer()
	go inputProducer()

	select {}
}

func inputProducer() {
	cl, err := kgo.NewClient(
		kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
		kgo.DefaultProduceTopic(*produceTo),
		kgo.TransactionalID(*produceTxnID),
		kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, func() string {
			return "[input producer] "
		})),
	)
	if err != nil {
		die("unable to create input producer: %v", err)
	}

	ctx := context.Background()

	for doCommit := true; ; doCommit = !doCommit {
		if err := cl.BeginTransaction(); err != nil {
			// We are unable to start a transaction if the client
			// is not transactional or if we are already in a
			// transaction. A proper transactional loop will never
			// account either error.
			die("unable to start transaction: %v", err)
		}

		msg := "commit "
		if !doCommit {
			msg = "abort "
		}

		e := kgo.AbortingFirstErrPromise(cl)
		for i := 0; i < 10; i++ {
			cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise())
		}
		commit := kgo.TransactionEndTry(doCommit && e.Err() == nil)

		switch err := cl.EndTransaction(ctx, commit); err {
		case nil:
		case kerr.OperationNotAttempted:
			if err := cl.EndTransaction(ctx, kgo.TryAbort); err != nil {
				die("abort failed: %v", err)
			}
		default:
			die("commit failed: %v", err)
		}

		time.Sleep(10 * time.Second)
	}
}

func consumer() {
	sess, err := kgo.NewClient(
		kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
		kgo.FetchIsolationLevel(kgo.ReadCommitted()),
		kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, func() string {
			return "[consumer] "
		})),
		kgo.ConsumerGroup(*group),
		kgo.ConsumeTopics(*produceTo),
		kgo.RequireStableFetchOffsets(),
		kgo.DisableAutoCommit(),
		kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
		kgo.KeepControlRecords(),
	)
	if err != nil {
		die("unable to create eos consumer/producer: %v", err)
	}
	defer sess.Close()

	ctx := context.Background()

	for {
		fetches := sess.PollFetches(ctx)

		if fetchErrs := fetches.Errors(); len(fetchErrs) > 0 {
			for _, fetchErr := range fetchErrs {
				fmt.Printf("error consuming from topic: topic=%s, partition=%d, err=%v",
					fetchErr.Topic, fetchErr.Partition, fetchErr.Err)
			}

			// The errors may be fatal for the partition (auth
			// problems), but we can still process any records if
			// there are any.
		}

		fetches.EachRecord(func(r *kgo.Record) {
			fmt.Fprintf(os.Stderr, "consumed record: topic=%s, partition=%d, offset=%d, value=%s\n", r.Topic, r.Partition, r.Offset, r.Value)
		})
	}
}
twmb added a commit that referenced this issue May 23, 2024
@twmb
Copy link
Owner

twmb commented May 23, 2024

Well, you owe me an hour back, but I suppose I owe you that time back as well because my own example is wrong. The code block above is failing because doCommit && e.Err() == nil short circuits when doCommit is false, meaning e.Err() == nil is never evaluated, meaning the AbortingFirstErrPromise never actually waits for records to be produced, meaning the transaction is actually ended before a single record is produced. The abort records actually are produced in a new transaction -- the one that actually will commit.

I spent far too long trying to figure out why this worked when I moved e.Err() out and made the code:

perr := e.Err()
commit := kgo.TransactionEndTry(doCommit && perr == nil)

I'll fixup the example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants