Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Micronaut Kafka does not properly bind records to bean methods #889

Open
jmera opened this issue Oct 3, 2023 · 6 comments
Open

Micronaut Kafka does not properly bind records to bean methods #889

jmera opened this issue Oct 3, 2023 · 6 comments
Assignees

Comments

@jmera
Copy link

jmera commented Oct 3, 2023

Expected Behavior

When a producer publishes a record of type com.company.RecordA to mytopic, only the consumer whose receive method is concerned with records of type com.company.RecordA (the one annotated with @MessageBody com.company.RecordA) should be invoked.

Actual Behaviour

When a producer publishes a record of type com.company.RecordA to mytopic, all consumer receive methods are invoked, despite that consumer not being concerned with the record it is being invoked with.

Error UnsatisfiedArgumentException

Error processing record [Optional[RecordA(...)]] for Kafka consumer [ConsumerB] produced error: Required argument [RecordB] not specified

Steps To Reproduce

Create a number of consumers on the same topic, each concerned with one message type:

@KafkaListener("consumer-group-a")
public class ConsumerA {
    @Topic("mytopic")
    public void receive(@KafkaKey com.company.KafkaKey key, @MessageBody com.company.RecordA record) {
        // ... do stuff with RecordA
    }
}

@KafkaListener("consumer-group-b")
public class ConsumerB {
    @Topic("mytopic")
    public void receive(@KafkaKey com.company.KafkaKey key, @MessageBody com.company.RecordB record) {
        // ... do stuff with RecordB
    }
}
// Consumer C, D, etc...

Produce messages of type RecordA, RecordB to the same topic

Environment Information

application.yaml looks looks something like this:

kafka:
  bootstrap.servers: localhost:9092
  client.id: ${random.uuid}
  consumers:
    default:
      key:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
      value:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
      schema.registry.url: ${SCHEMA_REGISTRY_URL}
      specific.avro.reader: true

Example Application

No response

Version

3.9.1

@guillermocalvo guillermocalvo self-assigned this Oct 5, 2023
@guillermocalvo
Copy link
Contributor

Thanks for reporting this issue @jmera

I understand how it might seem convenient to carelessly send all kinds of unrelated events to the same topic and let Micronaut dispatch each one to the consumer concerned with each type. However, the best practice is to use different Kafka topics for different event types. You can certainly mix different types in the same topic and get away with it, using either inheritance or composition. But at the end of the day, all parties consuming from the same topic should be able to handle all events (even if that means simply ignoring some of them).

@sdelamo I wouldn't recommend pursuing this change because 1) it doesn't promote good practices and 2) it's not backward compatible. Any thoughts before I close the issue?

@jmera
Copy link
Author

jmera commented Oct 5, 2023

@guillermocalvo first and foremost, it's rude to assume a) that this is careless for my particular use case and b) that all of these events are unrelated. In fact, it's the opposite: all of these events are related and they must be processed in order. Reason being they represent changes occurring in our sytem, scoped to that particular event's KafkaKey; they are events that represent actions users take in our system.

If you have a better approach for ensuring events scoped to a particular KafkaKey are consumed in order, I'm all ears.

At my company we've implemented a custom exception handler; I was just trying to help out the OS community by reporting what I perceive to be a Micronaut bug. But if that is not the case and this is intended behavior, apologies, please close this issue. Thank you for your contributions to micronaut-kafka.

@guillermocalvo
Copy link
Contributor

guillermocalvo commented Oct 5, 2023

@jmera Sorry if I came accross as rude. That was not my intention at all ☹️

it's rude to assume a) that this is careless for my particular use case

By "carelessly" I just meant that producers get to send events without having to worry too much about the destination topic -- because there's only 1 topic for N kinds of events.

and b) that all of these events are unrelated. In fact, it's the opposite: all of these events are related and they must be processed in order. Reason being they represent changes occurring in our sytem, scoped to that particular event's KafkaKey; they are events that represent actions users take in our system.

By "unrelated" I meant that the types of the events are disjoint. Since you didn't specify that RecordA and RecordB extend a common parent RecordX, I assumed your events are not related type-wise.

If you have a better approach for ensuring events scoped to a particular KafkaKey are consumed in order, I'm all ears.

  • Inheritance: If your event types are related to each other, you can have consumers receive the common ancestor and decide what to do with them depending on their subtype.
  • Composition: If they're not, you can use an event type that contains instances of those events. Again, your consumers can do different things depending on what's inside your "container" event.

At my company we've implemented a custom exception handler; I was just trying to help out the OS community by reporting what I perceive to be a Micronaut bug.

We definitely appreciate your help and the time you took to raise this issue. My concern here was that, while your use case may be perfectly valid, this is probably not the way most users expect it to work.

The most common scenario is that all events sent to a topic have the same type. If an event of the wrong type is ever sent to the wrong topic, it is considered a programming error. Most users want to see that exception logged, so they can track the error down. Otherwise, the problem would go on unnoticed: those events would be silently ignored, and consumers that are receiving events from the "right" topic would never get a chance to process them. That is the kind of situation I wanted to avoid.

@jmera
Copy link
Author

jmera commented Oct 5, 2023

@guillermocalvo all is good, thank you for clarifying; it's difficult to express intent over text 🙂

In my example, each record is an Avro-generated class that extends org.apache.avro.specific.SpecificRecordBase (thus the deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer). So from that perspective yes they are related. For what it's worth, semantically the events mean things like: "customer A saved entity X to their favorites" or "customer B deleted entity Y from their favorites", "customer A updated entity X in their favorites", etc.

Inheritance: If your event types are related to each other, you can have consumers receive the common ancestor and decide what to do with them depending on their subtype.

I haven't given this a shot, but a path I think is worth exploring

@jmera
Copy link
Author

jmera commented Oct 5, 2023

@guillermocalvo your suggestion to use inheritance turned out to be quite clean compared to my example above. I was able to consolidate consumers into one that looks something like this:

@KafkaListener
public class ConsolidatedConsumer {
    @Topic("mytopic")
    public void receive(@KafkaKey com.company.KafkaKey key, @MessageBody SpecificRecordBase record) {
        switch (record) {
            case RecordA recordA -> {
                // do stuff with recordA
            }
            case RecordB recordB -> {
                // do stuff with recordB
            } // ... etc
            default -> {
                log.error("Some error message");
            }
        }
    }
}

In my opinion, it's not completely obvious this is the suggested way to handle publishing N event types to 1 topic. Do you think it makes sense to add some sort of documentation to micronaut-kafka around this particular use case?

@guillermocalvo
Copy link
Contributor

@jmera 👍 Your example looks good! In my opinion, consumer consolidation reduces complexity and saves you from having to coordinate different consumers and their respective group IDs.

As for documenting this approach, I'm not really sure 🤔 I still think that, in general, we shouldn't encourage users to mix different types of events in the same Kafka topic.

@sdelamo What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants