diff --git a/examples/txn_producer/main.go b/examples/txn_producer/main.go index bfda18854..08bb5630a 100644 --- a/examples/txn_producer/main.go +++ b/examples/txn_producer/main.go @@ -75,7 +75,7 @@ func main() { config.Producer.Idempotent = true config.Producer.Return.Errors = false config.Producer.RequiredAcks = sarama.WaitForAll - config.Producer.Partitioner = sarama.NewRandomPartitioner + config.Producer.Partitioner = sarama.NewRoundRobinPartitioner if transactionalProducer { config.Producer.Transaction.ID = "txn_producer" } else { @@ -141,7 +141,8 @@ func produceTestRecord(producerProvider *producerProvider) { return } } - for i := 0; i < 1000; i++ { + var i int64 + for i = 0; i < recordsNumber; i++ { producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder("test")} } if producer.IsTransactional() { @@ -172,7 +173,8 @@ func produceTestRecord(producerProvider *producerProvider) { return } } else { - for i := 0; i < 1000; i++ { + var i int64 + for i = 0; i < recordsNumber; i++ { <-producer.Successes() } } diff --git a/transaction_manager.go b/transaction_manager.go index 5698cab29..3e233e1d5 100644 --- a/transaction_manager.go +++ b/transaction_manager.go @@ -89,6 +89,8 @@ type transactionManager struct { status ProducerTxnStatusFlag partitionInTxnLock sync.Mutex + publishTxnPartitionsLock sync.Mutex + newPartitionsInCurrentTxn topicPartitionSet pendingPartitionsInCurrentTxn topicPartitionSet partitionsInCurrentTxn topicPartitionSet @@ -548,6 +550,7 @@ func (t *transactionManager) completeTransaction() error { t.lastError = nil t.epochBumpRequired = false + t.newPartitionsInCurrentTxn = topicPartitionSet{} t.partitionsInCurrentTxn = topicPartitionSet{} t.pendingPartitionsInCurrentTxn = topicPartitionSet{} t.offsetsInCurrentTxn = map[string]map[string][]*PartitionOffsetMetadata{} @@ -676,32 +679,50 @@ func (t *transactionManager) finishTransaction(commit bool) error { } func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) { - t.partitionInTxnLock.Lock() - defer t.partitionInTxnLock.Unlock() - if t.currentTxnStatus()&ProducerTxnFlagInError != 0 { return } tp := topicPartition{topic: topic, partition: partition} + + t.partitionInTxnLock.Lock() + defer t.partitionInTxnLock.Unlock() if _, ok := t.partitionsInCurrentTxn[tp]; ok { // partition is already added return } + if _, ok := t.pendingPartitionsInCurrentTxn[tp]; ok { + // partition is already added + return + } - t.pendingPartitionsInCurrentTxn[tp] = struct{}{} + t.newPartitionsInCurrentTxn[tp] = struct{}{} } // Makes a request to kafka to add a list of partitions ot the current transaction func (t *transactionManager) publishTxnPartitions() error { - t.partitionInTxnLock.Lock() - defer t.partitionInTxnLock.Unlock() + t.publishTxnPartitionsLock.Lock() + defer t.publishTxnPartitionsLock.Unlock() if t.currentTxnStatus()&ProducerTxnFlagInError != 0 { return t.lastError } - if len(t.pendingPartitionsInCurrentTxn) == 0 { + newPartitions := topicPartitionSet{} + addedPartitions := topicPartitionSet{} + func() { + t.partitionInTxnLock.Lock() + defer t.partitionInTxnLock.Unlock() + if len(t.newPartitionsInCurrentTxn) == 0 { + return + } + for k, v := range t.newPartitionsInCurrentTxn { + t.pendingPartitionsInCurrentTxn[k] = v + newPartitions[k] = v + } + t.newPartitionsInCurrentTxn = topicPartitionSet{} + }() + if len(newPartitions) > 0 { return nil } @@ -711,6 +732,11 @@ func (t *transactionManager) publishTxnPartitions() error { // aborted anyway. In this case, we must be able to continue sending the batches which are in // retry for partitions that were successfully added. removeAllPartitionsOnFatalOrAbortedError := func() { + t.partitionInTxnLock.Lock() + defer t.partitionInTxnLock.Unlock() + for k, v := range addedPartitions { + t.partitionsInCurrentTxn[k] = v + } t.pendingPartitionsInCurrentTxn = topicPartitionSet{} } @@ -754,7 +780,7 @@ func (t *transactionManager) publishTxnPartitions() error { TransactionalID: t.transactionalID, ProducerID: t.producerID, ProducerEpoch: t.producerEpoch, - TopicPartitions: t.pendingPartitionsInCurrentTxn.toMap(), + TopicPartitions: newPartitions.toMap(), }) if err != nil { @@ -775,8 +801,8 @@ func (t *transactionManager) publishTxnPartitions() error { switch response.Err { case ErrNoError: // Mark partition as added to transaction - t.partitionsInCurrentTxn[tp] = struct{}{} - delete(t.pendingPartitionsInCurrentTxn, tp) + addedPartitions[tp] = struct{}{} + delete(newPartitions, tp) continue case ErrConsumerCoordinatorNotAvailable: fallthrough @@ -789,7 +815,7 @@ func (t *transactionManager) publishTxnPartitions() error { case ErrOffsetsLoadInProgress: // Retry topicPartition case ErrConcurrentTransactions: - if len(t.partitionsInCurrentTxn) == 0 { + if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff { retryBackoff = addPartitionsRetryBackoff } case ErrOperationNotAttempted: @@ -812,7 +838,7 @@ func (t *transactionManager) publishTxnPartitions() error { } // handle end - if len(t.pendingPartitionsInCurrentTxn) == 0 { + if len(newPartitions) == 0 { DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n", t.transactionalID, addPartResponse) return false, nil @@ -826,6 +852,7 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er producerID: noProducerID, producerEpoch: noProducerEpoch, client: client, + newPartitionsInCurrentTxn: topicPartitionSet{}, pendingPartitionsInCurrentTxn: topicPartitionSet{}, partitionsInCurrentTxn: topicPartitionSet{}, offsetsInCurrentTxn: make(map[string]map[string][]*PartitionOffsetMetadata),