Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka transactional support failes after 1 week with gap in messages #542

Open
msillence opened this issue Apr 28, 2022 · 2 comments
Open

Comments

@msillence
Copy link
Contributor

msillence commented Apr 28, 2022

Expected Behavior

Message is sent

Actual Behaviour

sending fails with key error message being: "The producer attempted to use a producer id which is not currently assigned to its transactional id"

Essentially it's this error: https://stackoverflow.com/a/59421077

a streaming application that had no traffic for 7 days, it’s producer metadata was deleted

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
	at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1112) ~[kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:356) ~[kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:619) ~[kafka-clients-2.8.0.jar:?]
	at com.fnz.kafka.figaro.valuation.sender.HoldingSender.send(HoldingSender.java:47) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.service.HoldService.processPartition(HoldService.java:54) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.$$access$$processPartition(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.processPartition(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.lambda$process$0(FigaroHoldListener.java:46) ~[classes/:?]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[?:?]
	at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.process(FigaroHoldListener.java:44) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.$$access$$process(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.process(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.inject.DelegatingExecutableMethod.invoke(DelegatingExecutableMethod.java:76) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.core.bind.DefaultExecutableBinder$1.invoke(DefaultExecutableBinder.java:109) ~[micronaut-core-3.4.1.jar:3.4.1]
	at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.processConsumerRecordsAsBatch(KafkaConsumerProcessor.java:604) ~[micronaut-kafka-4.2.0.jar:4.2.0]
	at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.createConsumerThreadPollLoop(KafkaConsumerProcessor.java:462) ~[micronaut-kafka-4.2.0.jar:4.2.0]
	at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$submitConsumerThread$7(KafkaConsumerProcessor.java:421) ~[micronaut-kafka-4.2.0.jar:4.2.0]
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:79) ~[micrometer-core-1.8.3.jar:1.8.3]
	at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:160) ~[micrometer-core-1.8.3.jar:1.8.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
","logging.googleapis.com/sourceLocation":{"function":"com.fnz.kafka.figaro.valuation.service.HoldService.processPartition"},"logging.googleapis.com/insertId":"1246","_exception":{"class":"org.apache.kafka.common.KafkaException","message":"Cannot execute transactional method because we are in an error state","stackTrace":"org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
	at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1112) ~[kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:356) ~[kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:619) ~[kafka-clients-2.8.0.jar:?]
	at com.fnz.kafka.figaro.valuation.sender.HoldingSender.send(HoldingSender.java:47) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.service.HoldService.processPartition(HoldService.java:54) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.$$access$$processPartition(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.processPartition(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.lambda$process$0(FigaroHoldListener.java:46) ~[classes/:?]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[?:?]
	at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.process(FigaroHoldListener.java:44) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.$$access$$process(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.process(Unknown Source) ~[classes/:?]
	at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.inject.DelegatingExecutableMethod.invoke(DelegatingExecutableMethod.java:76) ~[micronaut-inject-3.4.1.jar:3.4.1]
	at io.micronaut.core.bind.DefaultExecutableBinder$1.invoke(DefaultExecutableBinder.java:109) ~[micronaut-core-3.4.1.jar:3.4.1]
	at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.processConsumerRecordsAsBatch(KafkaConsumerProcessor.java:604) ~[micronaut-kafka-4.2.0.jar:4.2.0]
	at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.createConsumerThreadPollLoop(KafkaConsumerProcessor.java:462) ~[micronaut-kafka-4.2.0.jar:4.2.0]
	at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$submitConsumerThread$7(KafkaConsumerProcessor.java:421) ~[micronaut-kafka-4.2.0.jar:4.2.0]
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:79) ~[micrometer-core-1.8.3.jar:1.8.3]
	at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:160) ~[micrometer-core-1.8.3.jar:1.8.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

Steps To Reproduce

add the config to reduce the timeout from the default 1 week to 10 seconds:
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS=10000
or set property transactional.id.expiration.ms=10000

I still find it takes significantly longer than 10 second, I'm testing with 45 minutes.

using the code:
https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaClientTx

notably I've got the producer.initTransactions(); in the class initalisation as there's an exception if you call this twice:
TransactionalId holding-tx: Invalid transition attempted from state READY to state INITIALIZING

  • send a message
  • wait 45 mins
  • send another message
@Singleton
public class HoldingSender {
	@Value("${KAFKA_TOPIC_HOLDING}")
	public String holdingTopic;

	private final Producer<HoldingKey, HoldingValue> holdingProducer;

	public HoldingSender(@KafkaClient(id="holding", transactionalId="holding-sender") Producer<HoldingKey, HoldingValue> kafkaProducer) {
		this.holdingProducer = kafkaProducer;
		holdingProducer.initTransactions();
	}

	public synchronized List<Future<RecordMetadata>> send(List<ProducerRecord<HoldingKey, HoldingValue>> records) {
		try {
			holdingProducer.beginTransaction();
			List<Future<RecordMetadata>> futures = records.stream().map(record -> holdingProducer.send(record)).toList();
			holdingProducer.commitTransaction();
			return futures;

Environment Information

Running with both docker and confluent cloud
docker versions:

  • debezium/zookeeper:1.7.1.Final
  • debezium/kafka:1.7.1.Final

Example Application

No response

Version

3.4.1

@dstepanov
Copy link
Contributor

Can you please create a sample app with a test describing what is wrong? I don't really understand what is not correct.

@msillence
Copy link
Contributor Author

Sorry been a bit snowed under there is also info here https://stackoverflow.com/a/52304789
the core concept is to send a message then wait for a week default or 45 mins with the setting above

I did have a test harness I'll see if I can publish it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants