Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Cannot receive txn message after previous message be aborted #1186

Open
SmileYun opened this issue Feb 27, 2024 · 1 comment
Open

[Bug] Cannot receive txn message after previous message be aborted #1186

SmileYun opened this issue Feb 27, 2024 · 1 comment

Comments

@SmileYun
Copy link

SmileYun commented Feb 27, 2024

Expected behavior

  1. Receive committed messages
  2. Even if the producer send a normal message, consumer cannot receive an abort message

Actual behavior

  1. Start a consumer
    Output:

  2. Send 3 normal messages (seq: 0,1,2)
    Msg received: Normal msg. seq: 0
    Msg received: Normal msg. seq: 1
    Msg received: Normal msg. seq: 2

  3. Send 3 txn messages (seq: 0: commit,1: abort, 2: commit)
    Msg received: Transaction msg with commit. seq: 0

But also expect the message of seq 3 should be received.

producer log shown txn status is 4, 5, 4
2024/02/27 12:37:56 TxInfo 1 {0 64}
2024/02/27 12:37:56 Send msg 17.589083ms, 862:3:0,
2024/02/27 12:37:56 Txn be acked. 37.285584ms 4 {MostSigBits:0 LeastSigBits:64}
2024/02/27 12:37:56 Flush 4 {0 64}
2024/02/27 12:37:56 LastSequenceID 1709008676
2024/02/27 12:37:57 TxInfo 1 {0 65}
2024/02/27 12:37:57 Send msg 12.656042ms, 862:5:0,
2024/02/27 12:37:57 Txn be acked. 37.830083ms 5 {MostSigBits:0 LeastSigBits:65}
2024/02/27 12:37:57 Flush 5 {0 65}
2024/02/27 12:37:57 LastSequenceID 1709008677
2024/02/27 12:37:58 TxInfo 1 {0 66}
2024/02/27 12:37:58 Send msg 8.361125ms, 862:6:0,
2024/02/27 12:37:58 Txn be acked. 23.730334ms 4 {MostSigBits:0 LeastSigBits:66}
2024/02/27 12:37:58 Flush 4 {0 66}
2024/02/27 12:37:58 LastSequenceID 1709008678
INFO[0003] Closing producer producerID=1 producer_name=p-name topic="persistent://public/default/topic"
INFO[0003] Closed producer producerID=1 producer_name=p-name topic="persistent://public/default/topic"

  1. Send 3 normal messages
    Msg received: Transaction msg with abort. seq: 1 ** // <----- should not be received **
    Msg received: Transaction msg with commit. seq: 2
    Msg received: Normal msg. seq: 0
    Msg received: Normal msg. seq: 1
    Msg received: Normal msg. seq: 2

Steps to reproduce

// pseudo code

  • Consumer:
client := pulsar.NewClient({EnableTransaction: true})
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Name:                              "consumer",
		Topic:                             tp,
		SubscriptionName:                  "consumer-sub-name", // consumeArgs.SubscriptionName,
		EnableBatchIndexAcknowledgment:    true,                // consumeArgs.EnableBatchIndexAck,
		EnableAutoScaledReceiverQueueSize: false,               // consumeArgs.EnableAutoScaledReceiverQueueSize,
		Type:                              pulsar.Exclusive,    // consumeArgs.SubscriptionType,
		SubscriptionMode:                  pulsar.NonDurable,   // consumeArgs.SubscriptionMode,
		SubscriptionInitialPosition:       pulsar.SubscriptionPositionEarliest,
		AckWithResponse:                   true,
	})
for msg := range consumer.Chan() {
log.Infof("Msg received: %v", string(msg.Payload()))
} 
  • Producer:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Name:                    "p-name",
		Topic:                   tp,                     // produceArgs.Topic,
		MaxPendingMessages:      10,                     // produceArgs.ProducerQueueSize,
		BatchingMaxPublishDelay: 500 * time.Millisecond, // time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
		BatchingMaxSize:         uint(1024 * 1024 * 10), // produceArgs.BatchingMaxSize * 1024,
		BatchingMaxMessages:     1024,                   // produceArgs.BatchingNumMessages,
		SendTimeout:             0 * time.Second,
		CompressionType:         pulsar.LZ4,
	})
isInTx := true
ack := "commit" // // means: err = tx.Commit(context.TODO()) 
sendMsg(client, producer, isInTx, ack, "0")

ack = "abort" // means: err = tx.Abort(context.TODO()) 
sendMsg(client, producer, isInTx, ack, "1")

ack = "commit"
sendMsg(client, producer, isInTx, ack, "2")

System configuration

Pulsar version:
Current version of pulsar is: 3.2.0 (standalone)

@SmileYun SmileYun changed the title Cannot receive txn message after previous message be aborted [Bug] Cannot receive txn message after previous message be aborted Feb 27, 2024
@SmileYun
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant