Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync producer erroring for repeated transactions with the same transaction ID #2859

Open
tomplarge opened this issue Apr 12, 2024 · 5 comments
Assignees
Labels
needs-investigation Issues that require followup from maintainers

Comments

@tomplarge
Copy link

tomplarge commented Apr 12, 2024

Description

There seems to be an issue with repeatedly creating transactions with the same transaction ID. I have a toy example that creates a SyncProducer with a stable transaction ID "transaction-id", repeatedly calls BeginTxn(), SendMessages(), CommitTxn() in a loop until failure. I have debug logging enabled, and almost immediately, I see the error:

txnmgr/add-partition-to-txn retrying after 20ms... (1 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)

When I increase config.Producer.Transaction.Retry.Backoff, I see crashes happen less often, but I still see the same error from the transaction manager.
When I increase sleepTime (which introduces a delay between starting transactions), I see this issue go away entirely.

Is this an issue in sarama's handling of transaction state? Or do I have a misunderstanding?

Versions
Sarama Kafka Go
1.43.1 3.6.0 1.21.8
Configuration
	config.Version = sarama.V3_6_0_0
	config.ClientID = "client-id"
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Transaction.Retry.Max = 1
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Producer.Idempotent = true
	config.Net.MaxOpenRequests = 1
	config.Producer.Timeout = 1 * time.Minute
	config.Producer.Transaction.ID = "transaction-id"
Logs
logs: CLICK ME

{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:132","message":"Initializing new client\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"client/metadata fetching metadata for all topics from broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Connected to broker at localhost:9092 (unregistered)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"client/brokers registered new broker #0 at localhost:9092"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:132","message":"Successfully initialized new client\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/init-producer-id [transaction-id] invoking InitProducerId with current producer ID -1 and epoch -1 in order to bump the epoch\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"client/coordinator requesting coordinator for transaction-id from localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Connected to broker at localhost:9092 (registered as #0)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"client/coordinator coordinator for transaction-id is #0 (localhost:9092)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateUninitialized to ProducerTxnStateReady\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/init-producer-id [transaction-id] successful init producer id &{ThrottleTime:0s Err:kafka server: Not an error, why are you printing me? Version:4 ProducerID:3017 ProducerEpoch:71}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:919","message":"txnmgr/init-producer-id [transaction-id] obtained a ProducerId: 3017 and ProducerEpoch: 71\n"}
Starting transaction:  ProducerTxnStateReady
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:914","message":"producer/broker/0 starting up\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:930","message":"producer/broker/0 state change to [open] on test/0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/add-partition-to-txn [transaction-id] successful to add partitions txn &{Version:2 ThrottleTime:0s Errors:map[test:[0x140000100d0]]}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] committing transaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInTransaction to ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/endtxn [transaction-id] successful to end txn &{Version:0 ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction to ProducerTxnStateReady\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] transaction committed\n"}
Committed transaction:  ProducerTxnStateReady
Starting transaction:  ProducerTxnStateReady
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:807","message":"txnmgr/add-partition-to-txn retrying after 1ms... (1 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/add-partition-to-txn [transaction-id] successful to add partitions txn &{Version:2 ThrottleTime:0s Errors:map[test:[0x14000126928]]}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] committing transaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInTransaction to ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/endtxn [transaction-id] successful to end txn &{Version:0 ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction to ProducerTxnStateReady\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] transaction committed\n"}
Committed transaction:  ProducerTxnStateReady
Starting transaction:  ProducerTxnStateReady
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:807","message":"txnmgr/add-partition-to-txn retrying after 1ms... (1 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/add-partition-to-txn [transaction-id] successful to add partitions txn &{Version:2 ThrottleTime:0s Errors:map[test:[0x14000126bc8]]}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] committing transaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInTransaction to ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/endtxn [transaction-id] successful to end txn &{Version:0 ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction to ProducerTxnStateReady\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] transaction committed\n"}
Committed transaction:  ProducerTxnStateReady
Starting transaction:  ProducerTxnStateReady
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:807","message":"txnmgr/add-partition-to-txn retrying after 1ms... (1 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/add-partition-to-txn [transaction-id] successful to add partitions txn &{Version:2 ThrottleTime:0s Errors:map[test:[0x140000aade0]]}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] committing transaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInTransaction to ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/endtxn [transaction-id] successful to end txn &{Version:0 ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction to ProducerTxnStateReady\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] transaction committed\n"}
Committed transaction:  ProducerTxnStateReady
Starting transaction:  ProducerTxnStateReady
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:807","message":"txnmgr/add-partition-to-txn retrying after 1ms... (1 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/add-partition-to-txn [transaction-id] successful to add partitions txn &{Version:2 ThrottleTime:0s Errors:map[test:[0x140001270d0]]}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] committing transaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInTransaction to ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/endtxn [transaction-id] successful to end txn &{Version:0 ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction to ProducerTxnStateReady\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] transaction committed\n"}
Committed transaction:  ProducerTxnStateReady
Starting transaction:  ProducerTxnStateReady
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:807","message":"txnmgr/add-partition-to-txn retrying after 1ms... (1 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/add-partition-to-txn [transaction-id] successful to add partitions txn &{Version:2 ThrottleTime:0s Errors:map[test:[0x140001272e8]]}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] committing transaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInTransaction to ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/endtxn [transaction-id] successful to end txn &{Version:0 ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateEndTransaction|ProducerTxnStateCommittingTransaction to ProducerTxnStateReady\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] transaction committed\n"}
Committed transaction:  ProducerTxnStateReady
Starting transaction:  ProducerTxnStateReady
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:807","message":"txnmgr/add-partition-to-txn retrying after 1ms... (1 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/transaction_manager.go:807","message":"txnmgr/add-partition-to-txn retrying after 1ms... (0 attempts remaining) (transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:1192","message":"producer/broker/0 state change to [closing] because transaction manager: failed to send partitions to transaction: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/add-partition-to-txn [transaction-id] successful to add partitions txn &{Version:2 ThrottleTime:0s Errors:map[test:[0x140000ab2b0]]}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Closed connection to broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInTransaction to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:711","message":"producer/leader/test/0 state change to [retrying-1]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:721","message":"producer/leader/test/0 abandoning broker 0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:920","message":"producer/broker/0 input chan closed\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:1020","message":"producer/broker/0 shut down\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"client/metadata fetching metadata for [test] from broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Connected to broker at localhost:9092 (registered as #0)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:914","message":"producer/broker/0 starting up\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:930","message":"producer/broker/0 state change to [open] on test/0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:624","message":"producer/leader/test/0 selected broker 0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:727","message":"producer/leader/test/0 state change to [flushing-1]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:749","message":"producer/leader/test/0 state change to [normal]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:1192","message":"producer/broker/0 state change to [closing] because kafka server: The broker received an out of order sequence number\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Closed connection to broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:711","message":"producer/leader/test/0 state change to [retrying-2]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:721","message":"producer/leader/test/0 abandoning broker 0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:920","message":"producer/broker/0 input chan closed\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:1020","message":"producer/broker/0 shut down\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"client/metadata fetching metadata for [test] from broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Connected to broker at localhost:9092 (registered as #0)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:914","message":"producer/broker/0 starting up\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:930","message":"producer/broker/0 state change to [open] on test/0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:624","message":"producer/leader/test/0 selected broker 0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:727","message":"producer/leader/test/0 state change to [flushing-2]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:749","message":"producer/leader/test/0 state change to [normal]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:1192","message":"producer/broker/0 state change to [closing] because kafka server: The broker received an out of order sequence number\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Closed connection to broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:711","message":"producer/leader/test/0 state change to [retrying-3]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:721","message":"producer/leader/test/0 abandoning broker 0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:920","message":"producer/broker/0 input chan closed\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:1020","message":"producer/broker/0 shut down\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"client/metadata fetching metadata for [test] from broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Connected to broker at localhost:9092 (registered as #0)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:914","message":"producer/broker/0 starting up\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:930","message":"producer/broker/0 state change to [open] on test/0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:624","message":"producer/leader/test/0 selected broker 0\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:727","message":"producer/leader/test/0 state change to [flushing-3]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:749","message":"producer/leader/test/0 state change to [normal]\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/async_producer.go:1192","message":"producer/broker/0 state change to [closing] because kafka server: The broker received an out of order sequence number\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Closed connection to broker localhost:9092\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateInError|ProducerTxnStateAbortableError\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"producer/txnmgr [transaction-id] aborting transaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"Connected to broker at localhost:9092 (registered as #0)\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/endtxn [transaction-id] successful to end txn &{Version:0 ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction to ProducerTxnStateInitializing\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInitializing to ProducerTxnStateInitializing\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/init-producer-id [transaction-id] invoking InitProducerId for the first time in order to acquire a producer ID\n"}
{"level":"debug","time":"2024-04-12T13:14:40-06:00","caller":"/Users/thomaslarge/sarama/vendor/github.com/IBM/sarama/sarama.go:129","message":"txnmgr/transition [transaction-id] transition from ProducerTxnStateInitializing to ProducerTxnStateInError|ProducerTxnStateFatalError\n"}
AbortTxn:  kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
panic: SendMessages: kafka: Failed to deliver 50 messages.

goroutine 1 [running]:
main.main()
	/Users/thomaslarge/sarama/cmd/test/main.go:57 +0x6a4
exit status 2

Additional Context

Here is the program

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/IBM/sarama"
	"github.com/rs/zerolog/log"
)

func main() {
	logger := log.With().Caller().Logger()
	sarama.Logger = &logger

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, newConfig())
	if err != nil {
		panic(fmt.Sprintf("NewSyncProducer: %s", err.Error()))
	}
	defer func() {
		if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 {
			err = producer.AbortTxn()
			if err != nil {
				fmt.Println("AbortTxn: ", err.Error())
			}
		}
	}()

	signals := []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL}
	ctx, cancel := signal.NotifyContext(context.Background(), signals...)
        defer cancel()

	messages := make([]*sarama.ProducerMessage, 0)
	for i := 0; i < 50; i++ {
		messages = append(messages, &sarama.ProducerMessage{
			Topic: "test",
		})
	}

	sleepTime := 0 * time.Millisecond
	for {
		select {
		case <-ctx.Done():
			return
		default:
			fmt.Println("Starting transaction: ", producer.TxnStatus())

			err = producer.BeginTxn()
			if err != nil {
				panic(fmt.Sprintf("BeginTxn: %s", err.Error()))
			}

			err = producer.SendMessages(messages)
			if err != nil {
				panic(fmt.Sprintf("SendMessages: %s", err.Error()))
			}

			err = producer.CommitTxn()
			if err != nil {
				panic(fmt.Sprintf("CommitTxn: %s", err.Error()))
			}

			fmt.Println("Committed transaction: ", producer.TxnStatus())
		}

		time.Sleep(sleepTime)
	}
}

func newConfig() *sarama.Config {
	config := sarama.NewConfig()
	config.Version = sarama.V3_6_0_0
	config.ClientID = "client-id"

	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Transaction.Retry.Max = 1
	config.Producer.Transaction.Retry.Backoff = 1 * time.Millisecond
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Producer.Idempotent = true
	config.Net.MaxOpenRequests = 1
	config.Producer.Timeout = 1 * time.Minute
	config.Producer.Transaction.ID = "transaction-id"

	return config
}
@dnwe dnwe added the needs-investigation Issues that require followup from maintainers label Apr 18, 2024
@dnwe dnwe self-assigned this Apr 18, 2024
@dnwe
Copy link
Collaborator

dnwe commented Apr 18, 2024

@tomplarge thanks for raising this one. When we commit the transaction, the txnmgr/endtxn request is sent to the coordinator, which writes a PrepareCommit message to the transaction log and then returns the response to us. However, it the final 'CompleteCommit' message is written asynchronously so there is a timing windows if you immediately try to use the transaction again then you get the Errors.CONCURRENT_TRANSACTIONS response back.

The Java client just treats this as a retriable error and tries again after a backoff (see here) so we should probably make sure we're doing the same

@dnwe
Copy link
Collaborator

dnwe commented Apr 18, 2024

Ah, I just spotted your Retry.Max = 1 and Retry.Backoff = 1 * time.Millisecond, so that's probably why the existing Retry code isn't working for you here

case ErrConcurrentTransactions:
// Retry

@tomplarge
Copy link
Author

@dnwe thanks for the response! I also ran the exactly once example and noticed it had the same issue when debug logging was enabled. The exactly once example suggests that retrying in application logic is recommended (https://github.com/IBM/sarama/blob/main/examples/exactly_once/main.go#L295). So are you saying that the way to handle this is just to be more generous with Retry.Max and Retry.Backoff as well as in application logic for retrying the desired operation? If so, what are reasonable values for these configurations?

As an aside, in an optimal world, would it not make more sense to enforce synchronous behavior when committing a transaction? I'm curious what your thoughts are here.

@dnwe
Copy link
Collaborator

dnwe commented Apr 18, 2024

@tomplarge yeah I agree it shouldn't be async, ideally we wouldn't get the response from the Commit until it had 100% completed server side, but that async behaviour seems to be a bit of a nit in the Kafka code ever since the transactional producer was introduced in 0.11.0 rather than a mistake Sarama is making, and something they were just happy to hide away in the client retries

See KAFKA-5477 where the 'transaction in progress' retry backoff was special cased to be shorter than the regular backoff specifically because the client can seemingly always get the in progress on the first attempt

@tomplarge
Copy link
Author

@dnwe this is great to know, thanks for pointing out that issue entry. I'll do some reconfiguring and more flexible retry logic. The larger question I have is at what point do we actually fail on the application side? For example, during a rebalance, I have noticed this error spewing from the fenced producers: Error: Unknown error, how did this happen? Error code = 90. Error code 90 corresponds to PRODUCER_FENCED, but this error is not resolvable after any number of retries. How should this be handled? Does this scenario break any part of exactly-once guarantees?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-investigation Issues that require followup from maintainers
Projects
None yet
Development

No branches or pull requests

2 participants