Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add producer transaction API #2295

Merged
merged 2 commits into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
224 changes: 167 additions & 57 deletions async_producer.go
Expand Up @@ -49,65 +49,27 @@ type AsyncProducer interface {
// you can set Producer.Return.Errors in your config to false, which prevents
// errors to be returned.
Errors() <-chan *ProducerError
}

// transactionManager keeps the state necessary to ensure idempotent production
type transactionManager struct {
producerID int64
producerEpoch int16
sequenceNumbers map[string]int32
mutex sync.Mutex
}

const (
noProducerID = -1
noProducerEpoch = -1
)

func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence, t.producerEpoch
}
// IsTransactional return true when current producer is is transactional.
IsTransactional() bool

func (t *transactionManager) bumpEpoch() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.producerEpoch++
for k := range t.sequenceNumbers {
t.sequenceNumbers[k] = 0
}
}
// TxnStatus return current producer transaction status.
TxnStatus() ProducerTxnStatusFlag

func (t *transactionManager) getProducerID() (int64, int16) {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.producerID, t.producerEpoch
}
// BeginTxn mark current transaction as ready.
BeginTxn() error

func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
txnmgr := &transactionManager{
producerID: noProducerID,
producerEpoch: noProducerEpoch,
}
// CommitTxn commit current transaction.
CommitTxn() error

if conf.Producer.Idempotent {
initProducerIDResponse, err := client.InitProducerID()
if err != nil {
return nil, err
}
txnmgr.producerID = initProducerIDResponse.ProducerID
txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
txnmgr.sequenceNumbers = make(map[string]int32)
txnmgr.mutex = sync.Mutex{}
// AbortTxn abort current transaction.
AbortTxn() error

Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
}
// AddOffsetsToTxn add associated offsets to current transaction.
AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error

return txnmgr, nil
// AddMessageToTxn add message offsets to current transaction.
AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}

type asyncProducer struct {
Expand All @@ -123,6 +85,7 @@ type asyncProducer struct {
brokerLock sync.Mutex

txnmgr *transactionManager
txLock sync.Mutex

metricsRegistry metrics.Registry
}
Expand Down Expand Up @@ -179,9 +142,12 @@ func newAsyncProducer(client Client) (AsyncProducer, error) {
type flagSet int8

const (
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
fin // final message from partitionProducer to brokerProducer and back
shutdown // start the shutdown process
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
fin // final message from partitionProducer to brokerProducer and back
shutdown // start the shutdown process
endtxn // endtxn
committxn // endtxn
aborttxn // endtxn
)

// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
Expand Down Expand Up @@ -287,6 +253,97 @@ func (pe ProducerErrors) Error() string {
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
}

func (p *asyncProducer) IsTransactional() bool {
return p.txnmgr.isTransactional()
}

func (p *asyncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
offsets := make(map[string][]*PartitionOffsetMetadata)
offsets[msg.Topic] = []*PartitionOffsetMetadata{
{
Partition: msg.Partition,
Offset: msg.Offset + 1,
Metadata: metadata,
},
}
return p.AddOffsetsToTxn(offsets, groupId)
}

func (p *asyncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Printf("producer/txnmgr [%s] attempt to call AddOffsetsToTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
return ErrNonTransactedProducer
}

DebugLogger.Printf("producer/txnmgr [%s] add offsets to transaction\n", p.txnmgr.transactionalID)
return p.txnmgr.addOffsetsToTxn(offsets, groupId)
}

func (p *asyncProducer) TxnStatus() ProducerTxnStatusFlag {
return p.txnmgr.currentTxnStatus()
}

func (p *asyncProducer) BeginTxn() error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Println("producer/txnmgr attempt to call BeginTxn on a non-transactional producer")
return ErrNonTransactedProducer
}

return p.txnmgr.transitionTo(ProducerTxnFlagInTransaction, nil)
}

func (p *asyncProducer) CommitTxn() error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Printf("producer/txnmgr [%s] attempt to call CommitTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
return ErrNonTransactedProducer
}

DebugLogger.Printf("producer/txnmgr [%s] committing transaction\n", p.txnmgr.transactionalID)
err := p.finishTransaction(true)
if err != nil {
return err
}
DebugLogger.Printf("producer/txnmgr [%s] transaction committed\n", p.txnmgr.transactionalID)
return nil
}

func (p *asyncProducer) AbortTxn() error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Printf("producer/txnmgr [%s] attempt to call AbortTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
return ErrNonTransactedProducer
}
DebugLogger.Printf("producer/txnmgr [%s] aborting transaction\n", p.txnmgr.transactionalID)
err := p.finishTransaction(false)
if err != nil {
return err
}
DebugLogger.Printf("producer/txnmgr [%s] transaction aborted\n", p.txnmgr.transactionalID)
return nil
}

func (p *asyncProducer) finishTransaction(commit bool) error {
p.inFlight.Add(1)
if commit {
p.input <- &ProducerMessage{flags: endtxn | committxn}
} else {
p.input <- &ProducerMessage{flags: endtxn | aborttxn}
}
p.inFlight.Wait()
return p.txnmgr.finishTransaction(commit)
}

func (p *asyncProducer) Errors() <-chan *ProducerError {
return p.errors
}
Expand Down Expand Up @@ -340,11 +397,27 @@ func (p *asyncProducer) dispatcher() {
continue
}

if msg.flags&endtxn != 0 {
var err error
if msg.flags&committxn != 0 {
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil)
} else {
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil)
}
if err != nil {
Logger.Printf("producer/txnmgr unable to end transaction %s", err)
}
p.inFlight.Done()
continue
}

if msg.flags&shutdown != 0 {
shuttingDown = true
p.inFlight.Done()
continue
} else if msg.retries == 0 {
}

if msg.retries == 0 {
if shuttingDown {
// we can't just call returnError here because that decrements the wait group,
// which hasn't been incremented yet for this message, and shouldn't be
Expand All @@ -357,6 +430,13 @@ func (p *asyncProducer) dispatcher() {
continue
}
p.inFlight.Add(1)
// Ignore retried msg, there are already in txn.
// Can't produce new record when transaction is not started.
if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction)
p.returnError(msg, ErrTransactionNotReady)
continue
}
}

for _, interceptor := range p.conf.Producer.Interceptors {
Expand Down Expand Up @@ -609,6 +689,10 @@ func (pp *partitionProducer) dispatch() {
msg.hasSequence = true
}

if pp.parent.IsTransactional() {
pp.parent.txnmgr.maybeAddPartitionToCurrentTxn(pp.topic, pp.partition)
}

pp.brokerProducer.input <- msg
}
}
Expand Down Expand Up @@ -719,6 +803,16 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
}
}(set)

if p.IsTransactional() {
// Add partition to tx before sending current batch
err := p.txnmgr.publishTxnPartitions()
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
}

// Use AsyncProduce vs Produce to not block waiting for the response
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
// https://kafka.apache.org/protocol#protocol_network
Expand Down Expand Up @@ -1158,10 +1252,26 @@ func (p *asyncProducer) bumpIdempotentProducerEpoch() {
}
}

func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
if errors.Is(err, ErrClusterAuthorizationFailed) ||
errors.Is(err, ErrProducerFenced) ||
errors.Is(err, ErrUnsupportedVersion) ||
errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
}
if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
p.txnmgr.epochBumpRequired = true
}
return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
if p.IsTransactional() {
_ = p.maybeTransitionToErrorState(err)
}
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
// will never see a message with this number, so we can never continue the sequence.
if msg.hasSequence {
if !p.IsTransactional() && msg.hasSequence {
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
p.bumpIdempotentProducerEpoch()
}
Expand Down