Skip to content

Commit

Permalink
revert previous optimisation...
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierre MORVAN committed Aug 4, 2022
1 parent 4e81f11 commit 3dc7a5b
Showing 1 changed file with 8 additions and 34 deletions.
42 changes: 8 additions & 34 deletions transaction_manager.go
Expand Up @@ -90,7 +90,6 @@ type transactionManager struct {

partitionInTxnLock sync.Mutex
publishTxnPartitionsLock sync.Mutex
newPartitionsInCurrentTxn topicPartitionSet
pendingPartitionsInCurrentTxn topicPartitionSet
partitionsInCurrentTxn topicPartitionSet

Expand Down Expand Up @@ -550,7 +549,6 @@ func (t *transactionManager) completeTransaction() error {

t.lastError = nil
t.epochBumpRequired = false
t.newPartitionsInCurrentTxn = topicPartitionSet{}
t.partitionsInCurrentTxn = topicPartitionSet{}
t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
t.offsetsInCurrentTxn = map[string]map[string][]*PartitionOffsetMetadata{}
Expand Down Expand Up @@ -691,38 +689,20 @@ func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partiti
// partition is already added
return
}
if _, ok := t.pendingPartitionsInCurrentTxn[tp]; ok {
// partition is already added
return
}

t.newPartitionsInCurrentTxn[tp] = struct{}{}
t.pendingPartitionsInCurrentTxn[tp] = struct{}{}
}

// Makes a request to kafka to add a list of partitions ot the current transaction
func (t *transactionManager) publishTxnPartitions() error {
t.publishTxnPartitionsLock.Lock()
defer t.publishTxnPartitionsLock.Unlock()
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()

if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return t.lastError
}

newPartitions := topicPartitionSet{}
addedPartitions := topicPartitionSet{}
func() {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
if len(t.newPartitionsInCurrentTxn) == 0 {
return
}
for k, v := range t.newPartitionsInCurrentTxn {
t.pendingPartitionsInCurrentTxn[k] = v
newPartitions[k] = v
}
t.newPartitionsInCurrentTxn = topicPartitionSet{}
}()
if len(newPartitions) > 0 {
if len(t.pendingPartitionsInCurrentTxn) == 0 {
return nil
}

Expand All @@ -732,11 +712,6 @@ func (t *transactionManager) publishTxnPartitions() error {
// aborted anyway. In this case, we must be able to continue sending the batches which are in
// retry for partitions that were successfully added.
removeAllPartitionsOnFatalOrAbortedError := func() {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
for k, v := range addedPartitions {
t.partitionsInCurrentTxn[k] = v
}
t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
}

Expand Down Expand Up @@ -780,7 +755,7 @@ func (t *transactionManager) publishTxnPartitions() error {
TransactionalID: t.transactionalID,
ProducerID: t.producerID,
ProducerEpoch: t.producerEpoch,
TopicPartitions: newPartitions.toMap(),
TopicPartitions: t.pendingPartitionsInCurrentTxn.toMap(),
})

if err != nil {
Expand All @@ -801,8 +776,8 @@ func (t *transactionManager) publishTxnPartitions() error {
switch response.Err {
case ErrNoError:
// Mark partition as added to transaction
addedPartitions[tp] = struct{}{}
delete(newPartitions, tp)
t.partitionsInCurrentTxn[tp] = struct{}{}
delete(t.pendingPartitionsInCurrentTxn, tp)
continue
case ErrConsumerCoordinatorNotAvailable:
fallthrough
Expand Down Expand Up @@ -838,7 +813,7 @@ func (t *transactionManager) publishTxnPartitions() error {
}

// handle end
if len(newPartitions) == 0 {
if len(t.pendingPartitionsInCurrentTxn) == 0 {
DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
t.transactionalID, addPartResponse)
return false, nil
Expand All @@ -852,7 +827,6 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er
producerID: noProducerID,
producerEpoch: noProducerEpoch,
client: client,
newPartitionsInCurrentTxn: topicPartitionSet{},
pendingPartitionsInCurrentTxn: topicPartitionSet{},
partitionsInCurrentTxn: topicPartitionSet{},
offsetsInCurrentTxn: make(map[string]map[string][]*PartitionOffsetMetadata),
Expand Down

0 comments on commit 3dc7a5b

Please sign in to comment.