Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support KafkaListener to fail on records and stop consuming #892

Open
frisi opened this issue Oct 9, 2023 · 4 comments
Open

Support KafkaListener to fail on records and stop consuming #892

frisi opened this issue Oct 9, 2023 · 4 comments

Comments

@frisi
Copy link

frisi commented Oct 9, 2023

Feature description

Current behaviour

If the Listener fails while processing a record at offset N, it resumes at the offset N+1

I guess this is due to the decision made in #372
to chose availability over consistency.

Expected behaviour

There should be a way to configure KafkaListener to stop consuming records for a topic-partition if a record failed.

If configured to RETRY_ON_ERROR it should re-try retryCount times. If it still fails, log an error and stop processing.

IIUC as of micronaut-kafka 5.1.2 there is no way to tell the listener to stop consuming orther records if if failed
(or re-try indefinititely if errorStrategy RETRY_ON_ERROR is configured)
Or is there?

@cristianbriscaru
Copy link

I am having the same issue, if there is an exception when consuming a record it's just logged as an error and the consumer moves to the next record.
Have you managed to find a fix for this ?

@frisi
Copy link
Author

frisi commented Nov 13, 2023

as a workaround you could use KafkaConsumer directly @cristianbriscaru

sthg like this should work (kotlin project)

consumer.subscribe(topicPattern)

consumer.use {
        while (true) {
            // poll for new records
            val records = try {
                it.poll(Duration.ofMillis(1000L))
            } catch (e: WakeupException) {
                // log and ignore
                logger.info { "Preemptive shutdown: ${e.message}" }
                break
            } catch (e: KafkaException) {
                logger.error(e) { "Kafka error: ${e.message}" }
                throw e
            }
            // process records
            try {
                records.forEach { r ->
                    val key = r.key()
                    val version = r.timestamp()
                    val body = r.value()

                    // process record here
                }

                // commit offsets manually after finishing processing
                consumer.commitSync()
                if (records.count() > 0) {
                    logger.info { "finished batch of ${records.count()} records - commiting offsets" }
                }
            }
        }

@graemerocher
Copy link
Contributor

@cristianbriscaru not sure I understand. The ErrorStrategy is for this https://micronaut-projects.github.io/micronaut-kafka/latest/guide/index.html#kafkaErrors

@frisi if you don't define a retry count it will retry indefinitely, what behaviour are you after?

@cristianbriscaru
Copy link

Hi @graemerocher , I am using ErrorStrategy but when the kafka listener is reactive the message is not retired on error

@Slf4j
@RequiredArgsConstructor
@KafkaListener(
        value = "${kafka.consumers.ledger-company.group.id}",
        offsetReset = OffsetReset.EARLIEST,
        offsetStrategy = OffsetStrategy.SYNC,
        errorStrategy = @ErrorStrategy(
                value = ErrorStrategyValue.RETRY_ON_ERROR,
                retryDelay = "100ms",
                retryCount = 999999999
        )
)
public class CompanyListener {

    private final LedgerService ledgerService;
    
    @Topic("${kafka-topics.company}")
    public Mono<Void> receive(ConsumerRecord<String, DomainEventValue<CompanyProto>> message) {
        var eventType = message.value().getEventType();

        if (eventType.equals(CompanyEventTypes.CREATED)) {
                return ledgerService.create(LedgerExternalId.from(message.value().getEntity().get().getId()))
                    .then();
        }

        return Mono.empty();
    }
}

In this case the listener will throw an "Error processing record ..." once and move on without retrying , if I .block() the reactive stream than the message is retried according to the ErrorStrategy

What is the proper way to have a reactive kafka listener without using .block() and have a working error strategy ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants