Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite loop in io.micronaut.configuration.kafka.processor.ConsumerState #995

Open
henriquelsmti opened this issue Mar 21, 2024 · 2 comments

Comments

@henriquelsmti
Copy link
Contributor

Expected Behavior

Ensure that the close method does not enter into an infinite loop.

Actual Behaviour

I have four projects using Micronaut Kafka, each with several @KafkaListener annotations. In one project where I am upgrading from Micronaut 3 to Micronaut 4, I'm encountering some integration tests that never finalize the context. After analyzing it, I noticed that the ConsumerState.close() method in one of my @KafkaListener annotations was entering an infinite loop, waiting for the closedState to be different from POLLING. There's nothing special about this specific @KafkaListener; below is a summary of its configuration.

@Singleton
@CompileStatic
@KafkaListener(
		offsetStrategy = OffsetStrategy.ASYNC_PER_RECORD,
		offsetReset = OffsetReset.EARLIEST,
		properties = [
				@Property(name = ConsumerConfig.MAX_POLL_RECORDS_CONFIG, value = '100'),
		]
)
class LeituraEnvioRecursoGuiaBackgroundService {
    @Topic('${micronaut.application.name}.fila.leitura.envio.recurso.guia')
    void consomeFilaGuiasParaLeitura(@KafkaKey Long registroProcessamentoId, GuiaParaProcessamento processamento) {
    }
}

My workaround was to downgrade the micronaut-kafka to version 5.1.2.

implementation("io.micronaut.kafka:micronaut-kafka") version {
    strictly '5.1.2'
}

Steps To Reproduce

Unfortunately, I cannot create a test case. I have four projects using Micronaut Kafka 5.3.0, and only one of them, within one of the @KafkaListener annotations, presented this issue. This project is the largest among them.

Environment Information

Linux Mint 23.3
JDK correto 17, 21
Micronaut 4.3.3

Example Application

No response

Version

5.2.0, 5.3.0

@yanlobianchi
Copy link

+1

@rorueda
Copy link
Contributor

rorueda commented May 7, 2024

I had the same issue during tests. The problem seems to be the logic used to set the state of the kafka consumer: the catch block below in ConsumerState class will not catch anything if the code that polls and processes the kafka records catches Exception (and at least when batching, which is my case, there is such a catch block).

void threadPollLoop() {
    try (kafkaConsumer) {
        //noinspection InfiniteLoopStatement
        while (true) { //NOSONAR
            refreshAssignmentsPollAndProcessRecords();
        }
    } catch (WakeupException e) {
        closedState = ConsumerCloseState.CLOSED;
    }
}

I see two options:

  • Modify the state logic to not rely on this exception;
  • Guarantee the WakeupException is not swallowed, both in micronaut and user code;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants