Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multi-threaded access to KafkaConsumer #613

Open
Hc747 opened this issue Nov 11, 2022 · 4 comments
Open

Allow multi-threaded access to KafkaConsumer #613

Hc747 opened this issue Nov 11, 2022 · 4 comments
Labels
type: improvement A minor improvement to an existing feature

Comments

@Hc747
Copy link
Contributor

Hc747 commented Nov 11, 2022

Feature description

Problem:
A KafkaConsumer instance can be safely shared between threads when all access is mutually exclusive. Improper access results in a ConcurrentModificationException being raised - as per the documentation.

Micronaut's current implementation does not ensure that access to a KafkaConsumer is mutually exclusive, and as a consequence, downstream code is unable to perform operations with the KafkaConsumer instance outside of the poll loop thread. This is prohibitive as it prevents multi-threaded code from utilising all of the niceties of Micronaut's Kafka implementation and life-cycle management.

Feature Request:
To ensure that Micronaut's use of KafkaConsumers is mutually exclusive, achieved by performing Kafka operations within a synchronized block.

Implementation:

  • Synchronise access to KafkaConsumer instances within the KafkaConsumerProcessor when performing Kafka operations.
  • Update documentation in ConsumerRegistry#getConsumer.
@Hc747
Copy link
Contributor Author

Hc747 commented Nov 11, 2022

Able to work around this with the following code, which bypasses MN's KafkaConsumerProcessor.

    @Inject
    KafkaConsumerFactory factory;

    @Inject
    SerdeRegistry registry;

    @Inject @Named(TaskExecutors.MESSAGE_CONSUMER)
    ExecutorService executor;

    @PostConstruct
    public void start() {
        log.info("Configuration: {} - Factory: {}", configuration, factory);
        if (configuration.getKeyDeserializer().isEmpty()) {
            configuration.setKeyDeserializer(registry.pickDeserializer(Argument.of(KeyType.class)));
        }

        if (configuration.getValueDeserializer().isEmpty()) {
            configuration.setValueDeserializer(registry.pickDeserializer(Argument.of(ValueType.class)));
        }


        final var consumer = factory.createConsumer(configuration);
        log.info("Consumer: {}", consumer);

        executor.submit(() -> ...);
    }

@graemerocher
Copy link
Contributor

I don't see anywhere in the referenced documentation that says a consumer can be accessed in a multi threaded manner when all access is mutually exclusive. Introducing synchronization and locking will just slow down the implementation and introduce lock contention.

@Hc747
Copy link
Contributor Author

Hc747 commented Nov 15, 2022

I don't see anywhere in the referenced documentation that says a consumer can be accessed in a multi threaded manner when all access is mutually exclusive. Introducing synchronization and locking will just slow down the implementation and introduce lock contention.

From the first line of the documentation linked above: It is the responsibility of the user to ensure that multi-threaded access is properly synchronized
https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

The Kafka Consumer API ensures that access to a consumer is mutually exclusive by means of checking the ID of the current thread against the one referenced within the consumer and by reference counting (see: acquire/release). Without explicit synchronisation in application code (Micronaut, etc), it is possible for two threads to attempt to acquire access to the consumer at the same time, which leads to a ConcurrentModificationException being raised as access is not mutually exclusive. Explicit synchronisation prevents this issue, allows access to be shared between threads, and is unlikely to incur too much overhead in implementation.

@graemerocher
Copy link
Contributor

ok, will take a look

@graemerocher graemerocher added the type: improvement A minor improvement to an existing feature label Nov 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: improvement A minor improvement to an existing feature
Projects
None yet
Development

No branches or pull requests

2 participants