Skip to content

Commit

Permalink
optimize locks
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierre MORVAN committed Aug 4, 2022
1 parent 16d117f commit 4e81f11
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
8 changes: 5 additions & 3 deletions examples/txn_producer/main.go
Expand Up @@ -75,7 +75,7 @@ func main() {
config.Producer.Idempotent = true
config.Producer.Return.Errors = false
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
if transactionalProducer {
config.Producer.Transaction.ID = "txn_producer"
} else {
Expand Down Expand Up @@ -141,7 +141,8 @@ func produceTestRecord(producerProvider *producerProvider) {
return
}
}
for i := 0; i < 1000; i++ {
var i int64
for i = 0; i < recordsNumber; i++ {
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder("test")}
}
if producer.IsTransactional() {
Expand Down Expand Up @@ -172,7 +173,8 @@ func produceTestRecord(producerProvider *producerProvider) {
return
}
} else {
for i := 0; i < 1000; i++ {
var i int64
for i = 0; i < recordsNumber; i++ {
<-producer.Successes()
}
}
Expand Down
51 changes: 39 additions & 12 deletions transaction_manager.go
Expand Up @@ -89,6 +89,8 @@ type transactionManager struct {
status ProducerTxnStatusFlag

partitionInTxnLock sync.Mutex
publishTxnPartitionsLock sync.Mutex
newPartitionsInCurrentTxn topicPartitionSet
pendingPartitionsInCurrentTxn topicPartitionSet
partitionsInCurrentTxn topicPartitionSet

Expand Down Expand Up @@ -548,6 +550,7 @@ func (t *transactionManager) completeTransaction() error {

t.lastError = nil
t.epochBumpRequired = false
t.newPartitionsInCurrentTxn = topicPartitionSet{}
t.partitionsInCurrentTxn = topicPartitionSet{}
t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
t.offsetsInCurrentTxn = map[string]map[string][]*PartitionOffsetMetadata{}
Expand Down Expand Up @@ -676,32 +679,50 @@ func (t *transactionManager) finishTransaction(commit bool) error {
}

func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()

if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return
}

tp := topicPartition{topic: topic, partition: partition}

t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
if _, ok := t.partitionsInCurrentTxn[tp]; ok {
// partition is already added
return
}
if _, ok := t.pendingPartitionsInCurrentTxn[tp]; ok {
// partition is already added
return
}

t.pendingPartitionsInCurrentTxn[tp] = struct{}{}
t.newPartitionsInCurrentTxn[tp] = struct{}{}
}

// Makes a request to kafka to add a list of partitions ot the current transaction
func (t *transactionManager) publishTxnPartitions() error {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
t.publishTxnPartitionsLock.Lock()
defer t.publishTxnPartitionsLock.Unlock()

if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return t.lastError
}

if len(t.pendingPartitionsInCurrentTxn) == 0 {
newPartitions := topicPartitionSet{}
addedPartitions := topicPartitionSet{}
func() {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
if len(t.newPartitionsInCurrentTxn) == 0 {
return
}
for k, v := range t.newPartitionsInCurrentTxn {
t.pendingPartitionsInCurrentTxn[k] = v
newPartitions[k] = v
}
t.newPartitionsInCurrentTxn = topicPartitionSet{}
}()
if len(newPartitions) > 0 {
return nil
}

Expand All @@ -711,6 +732,11 @@ func (t *transactionManager) publishTxnPartitions() error {
// aborted anyway. In this case, we must be able to continue sending the batches which are in
// retry for partitions that were successfully added.
removeAllPartitionsOnFatalOrAbortedError := func() {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
for k, v := range addedPartitions {
t.partitionsInCurrentTxn[k] = v
}
t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
}

Expand Down Expand Up @@ -754,7 +780,7 @@ func (t *transactionManager) publishTxnPartitions() error {
TransactionalID: t.transactionalID,
ProducerID: t.producerID,
ProducerEpoch: t.producerEpoch,
TopicPartitions: t.pendingPartitionsInCurrentTxn.toMap(),
TopicPartitions: newPartitions.toMap(),
})

if err != nil {
Expand All @@ -775,8 +801,8 @@ func (t *transactionManager) publishTxnPartitions() error {
switch response.Err {
case ErrNoError:
// Mark partition as added to transaction
t.partitionsInCurrentTxn[tp] = struct{}{}
delete(t.pendingPartitionsInCurrentTxn, tp)
addedPartitions[tp] = struct{}{}
delete(newPartitions, tp)
continue
case ErrConsumerCoordinatorNotAvailable:
fallthrough
Expand All @@ -789,7 +815,7 @@ func (t *transactionManager) publishTxnPartitions() error {
case ErrOffsetsLoadInProgress:
// Retry topicPartition
case ErrConcurrentTransactions:
if len(t.partitionsInCurrentTxn) == 0 {
if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
retryBackoff = addPartitionsRetryBackoff
}
case ErrOperationNotAttempted:
Expand All @@ -812,7 +838,7 @@ func (t *transactionManager) publishTxnPartitions() error {
}

// handle end
if len(t.pendingPartitionsInCurrentTxn) == 0 {
if len(newPartitions) == 0 {
DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
t.transactionalID, addPartResponse)
return false, nil
Expand All @@ -826,6 +852,7 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er
producerID: noProducerID,
producerEpoch: noProducerEpoch,
client: client,
newPartitionsInCurrentTxn: topicPartitionSet{},
pendingPartitionsInCurrentTxn: topicPartitionSet{},
partitionsInCurrentTxn: topicPartitionSet{},
offsetsInCurrentTxn: make(map[string]map[string][]*PartitionOffsetMetadata),
Expand Down

0 comments on commit 4e81f11

Please sign in to comment.