Skip to content

Commit

Permalink
Add unit tests on txn produce
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierre MORVAN committed Jul 27, 2022
1 parent 22d6fe3 commit 40dc6a6
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 12 deletions.
23 changes: 11 additions & 12 deletions async_producer.go
Expand Up @@ -491,17 +491,15 @@ func (t *transactionManager) finishTransaction(commit bool) (err error) {
t.mutex.Lock()
defer t.mutex.Unlock()

// Don't do anything if no records has been sent
if len(t.partitionsInCurrentTxn) > 0 {
if t.status == committingTransaction && len(t.partitionsInCurrentTxn) > 0 {
err = t.publishTxnPartitions(commit)
if err != nil {
return err
}

err = t.endTxn(commit)
if err != nil {
return err
}
}
err = t.endTxn(commit)
if err != nil {
return err
}

// reset transaction status
Expand All @@ -512,7 +510,6 @@ func (t *transactionManager) finishTransaction(commit bool) (err error) {

// Makes a request to kafka to add a list of partitions ot the current transaction
func (t *transactionManager) publishTxnPartitions(commit bool) error {
partitionsToAdd := t.partitionsInCurrentTxn
var lastError error
for i := 0; i <= t.client.Config().Producer.Retry.Max; i++ {
lastError = nil
Expand All @@ -525,7 +522,7 @@ func (t *transactionManager) publishTxnPartitions(commit bool) error {
TransactionalID: t.transactionalID,
ProducerID: t.producerID,
ProducerEpoch: t.producerEpoch,
TopicPartitions: partitionsToAdd.toMap(),
TopicPartitions: t.partitionsInCurrentTxn.toMap(),
})

if err != nil {
Expand Down Expand Up @@ -585,9 +582,9 @@ func (t *transactionManager) publishTxnPartitions(commit bool) error {
lastError = Wrap(ErrAddPartitionsToTxn, responseErrors...)
}

partitionsToAdd = failedTxn
t.partitionsInCurrentTxn = failedTxn
// handle success
if len(partitionsToAdd) == 0 {
if len(t.partitionsInCurrentTxn) == 0 {
break
}

Expand Down Expand Up @@ -946,7 +943,9 @@ func (p *asyncProducer) dispatcher() {
}

if msg.flags&endtxn != 0 {
p.txnmgr.status = committingTransaction
if p.txnmgr.status == inTransaction {
p.txnmgr.status = committingTransaction
}
p.inFlight.Done()
continue
}
Expand Down
211 changes: 211 additions & 0 deletions async_producer_test.go
Expand Up @@ -1694,6 +1694,217 @@ func TestTxmngInitProducerId(t *testing.T) {
require.Equal(t, int16(0), txmng.producerEpoch)
}

func TestTxnProduceRecordWithCommit(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.TransactionalID = "test"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
metadataLeader.AddTopic("test-topic", ErrNoError)
metadataLeader.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataLeader)

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

findCoordinatorResponse := FindCoordinatorResponse{
Coordinator: client.Brokers()[0],
Err: ErrNoError,
Version: 1,
}
broker.Returns(&findCoordinatorResponse)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

ap, err := NewAsyncProducerFromClient(client)
producer := ap.(*asyncProducer)
require.NoError(t, err)

produceResponse := new(ProduceResponse)
produceResponse.Version = 3
produceResponse.AddTopicPartition("test-topic", 0, ErrNoError)
broker.Returns(produceResponse)

addPartitionsToTxnResponse := &AddPartitionsToTxnResponse{
Errors: make(map[string][]*PartitionError),
}
broker.Returns(addPartitionsToTxnResponse)

endTxnResponse := &EndTxnResponse{
Err: ErrNoError,
}
broker.Returns(endTxnResponse)

require.Equal(t, transactionReady, producer.txnmgr.status)

err = producer.BeginTxn()
require.NoError(t, err)
require.Equal(t, inTransaction, producer.txnmgr.status)

producer.Input() <- &ProducerMessage{Topic: "test-topic", Key: nil, Value: StringEncoder(TestMessage)}
err = producer.CommitTxn()
require.NoError(t, err)
require.Equal(t, transactionReady, producer.txnmgr.status)
}

func TestTxnProduceRecordWithAbort(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.TransactionalID = "test"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
metadataLeader.AddTopic("test-topic", ErrNoError)
metadataLeader.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataLeader)

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

findCoordinatorResponse := FindCoordinatorResponse{
Coordinator: client.Brokers()[0],
Err: ErrNoError,
Version: 1,
}
broker.Returns(&findCoordinatorResponse)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

ap, err := NewAsyncProducerFromClient(client)
producer := ap.(*asyncProducer)
require.NoError(t, err)

produceResponse := new(ProduceResponse)
produceResponse.Version = 3
produceResponse.AddTopicPartition("test-topic", 0, ErrNoError)
broker.Returns(produceResponse)

addPartitionsToTxnResponse := &AddPartitionsToTxnResponse{
Errors: make(map[string][]*PartitionError),
}
broker.Returns(addPartitionsToTxnResponse)

endTxnResponse := &EndTxnResponse{
Err: ErrNoError,
}
broker.Returns(endTxnResponse)

require.Equal(t, transactionReady, producer.txnmgr.status)

err = producer.BeginTxn()
require.NoError(t, err)
require.Equal(t, inTransaction, producer.txnmgr.status)

producer.Input() <- &ProducerMessage{Topic: "test-topic", Key: nil, Value: StringEncoder(TestMessage)}
err = producer.AbortTxn()
require.NoError(t, err)
require.Equal(t, transactionReady, producer.txnmgr.status)
}

func TestTxnCanAbort(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.TransactionalID = "test"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
metadataLeader.AddTopic("test-topic", ErrNoError)
metadataLeader.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataLeader)

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

findCoordinatorResponse := FindCoordinatorResponse{
Coordinator: client.Brokers()[0],
Err: ErrNoError,
Version: 1,
}
broker.Returns(&findCoordinatorResponse)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

ap, err := NewAsyncProducerFromClient(client)
producer := ap.(*asyncProducer)
require.NoError(t, err)

produceResponse := new(ProduceResponse)
produceResponse.Version = 3
produceResponse.AddTopicPartition("test-topic", 0, ErrNoError)
broker.Returns(produceResponse)

addPartitionsToTxnResponse := &AddPartitionsToTxnResponse{
Errors: map[string][]*PartitionError{
"test-topic": {
{
Partition: 0,
Err: ErrTopicAuthorizationFailed,
},
},
},
}
broker.Returns(addPartitionsToTxnResponse)

endTxnResponse := &EndTxnResponse{
Err: ErrNoError,
}
broker.Returns(endTxnResponse)

require.Equal(t, transactionReady, producer.txnmgr.status)

err = producer.BeginTxn()
require.NoError(t, err)
require.Equal(t, inTransaction, producer.txnmgr.status)

producer.Input() <- &ProducerMessage{Topic: "test-topic", Key: nil, Value: StringEncoder(TestMessage)}
err = producer.CommitTxn()
require.Error(t, err)
require.Equal(t, abortableError, producer.txnmgr.status)

err = producer.AbortTxn()
require.NoError(t, err)
}

func TestTxmngInitProducerIdTxn(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()
Expand Down

0 comments on commit 40dc6a6

Please sign in to comment.