Skip to content

Commit

Permalink
Initial add transaction to producer
Browse files Browse the repository at this point in the history
Use lock to prevent transaction misuse

add exacly once functional producer test

add txn api to sync producer

Add unit tests on txn produce
  • Loading branch information
ryarnyah committed Jul 27, 2022
1 parent 06d1a62 commit ce3ade0
Show file tree
Hide file tree
Showing 12 changed files with 1,444 additions and 14 deletions.
670 changes: 657 additions & 13 deletions async_producer.go

Large diffs are not rendered by default.

286 changes: 286 additions & 0 deletions async_producer_test.go
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/fortytw2/leaktest"
"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/require"
)

const TestMessage = "ABC THE MESSAGE"
Expand Down Expand Up @@ -1661,6 +1662,291 @@ func TestProducerError(t *testing.T) {
}
}

func TestTxmngInitProducerId(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
broker.Returns(metadataLeader)

config := NewTestConfig()
config.Producer.Idempotent = true
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

txmng, err := newTransactionManager(config, client)
require.NoError(t, err)

require.Equal(t, int64(1), txmng.producerID)
require.Equal(t, int16(0), txmng.producerEpoch)
}

func TestTxnProduceRecordWithCommit(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.TransactionalID = "test"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
metadataLeader.AddTopic("test-topic", ErrNoError)
metadataLeader.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataLeader)

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

findCoordinatorResponse := FindCoordinatorResponse{
Coordinator: client.Brokers()[0],
Err: ErrNoError,
Version: 1,
}
broker.Returns(&findCoordinatorResponse)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

ap, err := NewAsyncProducerFromClient(client)
producer := ap.(*asyncProducer)
require.NoError(t, err)

produceResponse := new(ProduceResponse)
produceResponse.Version = 3
produceResponse.AddTopicPartition("test-topic", 0, ErrNoError)
broker.Returns(produceResponse)

addPartitionsToTxnResponse := &AddPartitionsToTxnResponse{
Errors: make(map[string][]*PartitionError),
}
broker.Returns(addPartitionsToTxnResponse)

endTxnResponse := &EndTxnResponse{
Err: ErrNoError,
}
broker.Returns(endTxnResponse)

require.Equal(t, transactionReady, producer.txnmgr.status)

err = producer.BeginTxn()
require.NoError(t, err)
require.Equal(t, inTransaction, producer.txnmgr.status)

producer.Input() <- &ProducerMessage{Topic: "test-topic", Key: nil, Value: StringEncoder(TestMessage)}
err = producer.CommitTxn()
require.NoError(t, err)
require.Equal(t, transactionReady, producer.txnmgr.status)
}

func TestTxnProduceRecordWithAbort(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.TransactionalID = "test"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
metadataLeader.AddTopic("test-topic", ErrNoError)
metadataLeader.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataLeader)

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

findCoordinatorResponse := FindCoordinatorResponse{
Coordinator: client.Brokers()[0],
Err: ErrNoError,
Version: 1,
}
broker.Returns(&findCoordinatorResponse)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

ap, err := NewAsyncProducerFromClient(client)
producer := ap.(*asyncProducer)
require.NoError(t, err)

produceResponse := new(ProduceResponse)
produceResponse.Version = 3
produceResponse.AddTopicPartition("test-topic", 0, ErrNoError)
broker.Returns(produceResponse)

addPartitionsToTxnResponse := &AddPartitionsToTxnResponse{
Errors: make(map[string][]*PartitionError),
}
broker.Returns(addPartitionsToTxnResponse)

endTxnResponse := &EndTxnResponse{
Err: ErrNoError,
}
broker.Returns(endTxnResponse)

require.Equal(t, transactionReady, producer.txnmgr.status)

err = producer.BeginTxn()
require.NoError(t, err)
require.Equal(t, inTransaction, producer.txnmgr.status)

producer.Input() <- &ProducerMessage{Topic: "test-topic", Key: nil, Value: StringEncoder(TestMessage)}
err = producer.AbortTxn()
require.NoError(t, err)
require.Equal(t, transactionReady, producer.txnmgr.status)
}

func TestTxnCanAbort(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.TransactionalID = "test"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
metadataLeader.AddTopic("test-topic", ErrNoError)
metadataLeader.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataLeader)

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

findCoordinatorResponse := FindCoordinatorResponse{
Coordinator: client.Brokers()[0],
Err: ErrNoError,
Version: 1,
}
broker.Returns(&findCoordinatorResponse)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

ap, err := NewAsyncProducerFromClient(client)
producer := ap.(*asyncProducer)
require.NoError(t, err)

produceResponse := new(ProduceResponse)
produceResponse.Version = 3
produceResponse.AddTopicPartition("test-topic", 0, ErrNoError)
broker.Returns(produceResponse)

addPartitionsToTxnResponse := &AddPartitionsToTxnResponse{
Errors: map[string][]*PartitionError{
"test-topic": {
{
Partition: 0,
Err: ErrTopicAuthorizationFailed,
},
},
},
}
broker.Returns(addPartitionsToTxnResponse)

endTxnResponse := &EndTxnResponse{
Err: ErrNoError,
}
broker.Returns(endTxnResponse)

require.Equal(t, transactionReady, producer.txnmgr.status)

err = producer.BeginTxn()
require.NoError(t, err)
require.Equal(t, inTransaction, producer.txnmgr.status)

producer.Input() <- &ProducerMessage{Topic: "test-topic", Key: nil, Value: StringEncoder(TestMessage)}
err = producer.CommitTxn()
require.Error(t, err)
require.Equal(t, abortableError, producer.txnmgr.status)

err = producer.AbortTxn()
require.NoError(t, err)
}

func TestTxmngInitProducerIdTxn(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 1
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
broker.Returns(metadataLeader)

config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.TransactionalID = "test"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)

findCoordinatorResponse := FindCoordinatorResponse{
Coordinator: client.Brokers()[0],
Err: ErrNoError,
Version: 1,
}
broker.Returns(&findCoordinatorResponse)

producerIdResponse := &InitProducerIDResponse{
Err: ErrNoError,
ProducerID: 1,
ProducerEpoch: 0,
}
broker.Returns(producerIdResponse)

txmng, err := newTransactionManager(config, client)
require.NoError(t, err)

require.Equal(t, int64(1), txmng.producerID)
require.Equal(t, int16(0), txmng.producerEpoch)
require.Equal(t, transactionReady, txmng.status)
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
7 changes: 7 additions & 0 deletions broker.go
Expand Up @@ -270,6 +270,13 @@ func (b *Broker) Open(conf *Config) error {
return nil
}

func (b *Broker) ResponseSize() int {
b.lock.Lock()
defer b.lock.Unlock()

return len(b.responses)
}

// Connected returns true if the broker is connected and false otherwise. If the broker is not
// connected but it had tried to connect, the error from that connection attempt is also returned.
func (b *Broker) Connected() (bool, error) {
Expand Down
28 changes: 28 additions & 0 deletions client.go
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"errors"
"math"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -90,6 +91,9 @@ type Client interface {
// InitProducerID retrieves information required for Idempotent Producer
InitProducerID() (*InitProducerIDResponse, error)

// LeastLoadedBroker retrieves broker that has the least responses pending
LeastLoadedBroker() *Broker

// Close shuts down all broker connections managed by this client. It is required
// to call this function before a client object passes out of scope, as it will
// otherwise leak memory. You must close any Producers or Consumers using a client
Expand Down Expand Up @@ -709,6 +713,30 @@ func (client *client) anyBroker() *Broker {
return nil
}

func (client *client) LeastLoadedBroker() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

var leastLoadedBroker *Broker
pendingRequests := math.MaxInt
for _, broker := range client.brokers {
if pendingRequests > broker.ResponseSize() {
pendingRequests = broker.ResponseSize()
leastLoadedBroker = broker
}
}

if leastLoadedBroker != nil {
_ = leastLoadedBroker.Open(client.conf)
}
return leastLoadedBroker
}

// private caching/lazy metadata helpers

type partitionType int
Expand Down
10 changes: 10 additions & 0 deletions config.go
Expand Up @@ -188,6 +188,11 @@ type Config struct {
// If enabled, the producer will ensure that exactly one copy of each message is
// written.
Idempotent bool
// Used in transactions to identify an instance of a producer through restarts
TransactionalID string
// Amount of time a transaction can remain unresolved (neither committed nor aborted)
// default is 1 min
TransactionTimeout time.Duration

// Return specifies what channels will be populated. If they are set to true,
// you must read from the respective channels to prevent deadlock. If,
Expand Down Expand Up @@ -481,6 +486,7 @@ func NewConfig() *Config {
c.Producer.Retry.Backoff = 100 * time.Millisecond
c.Producer.Return.Errors = true
c.Producer.CompressionLevel = CompressionLevelDefault
c.Producer.TransactionTimeout = 1 * time.Minute

c.Consumer.Fetch.Min = 1
c.Consumer.Fetch.Default = 1024 * 1024
Expand Down Expand Up @@ -706,6 +712,10 @@ func (c *Config) Validate() error {
}
}

if c.Producer.TransactionalID != "" && !c.Producer.Idempotent {
return ConfigurationError("Transactional producer requires Idempotent to be true")
}

// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
Expand Down

0 comments on commit ce3ade0

Please sign in to comment.