micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
86 stars 107 forks source link

Micronaut Kafka does not properly bind records to bean methods #889

Open jmera opened 1 year ago

jmera commented 1 year ago

Expected Behavior

When a producer publishes a record of type com.company.RecordA to mytopic, only the consumer whose receive method is concerned with records of type com.company.RecordA (the one annotated with @MessageBody com.company.RecordA) should be invoked.

Actual Behaviour

When a producer publishes a record of type com.company.RecordA to mytopic, all consumer receive methods are invoked, despite that consumer not being concerned with the record it is being invoked with.

Error UnsatisfiedArgumentException

Error processing record [Optional[RecordA(...)]] for Kafka consumer [ConsumerB] produced error: Required argument [RecordB] not specified

Steps To Reproduce

Create a number of consumers on the same topic, each concerned with one message type:

@KafkaListener("consumer-group-a")
public class ConsumerA {
    @Topic("mytopic")
    public void receive(@KafkaKey com.company.KafkaKey key, @MessageBody com.company.RecordA record) {
        // ... do stuff with RecordA
    }
}

@KafkaListener("consumer-group-b")
public class ConsumerB {
    @Topic("mytopic")
    public void receive(@KafkaKey com.company.KafkaKey key, @MessageBody com.company.RecordB record) {
        // ... do stuff with RecordB
    }
}
// Consumer C, D, etc...

Produce messages of type RecordA, RecordB to the same topic

Environment Information

application.yaml looks looks something like this:

kafka:
  bootstrap.servers: localhost:9092
  client.id: ${random.uuid}
  consumers:
    default:
      key:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
      value:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
      schema.registry.url: ${SCHEMA_REGISTRY_URL}
      specific.avro.reader: true

Example Application

No response

Version

3.9.1

guillermocalvo commented 1 year ago

Thanks for reporting this issue @jmera

I understand how it might seem convenient to carelessly send all kinds of unrelated events to the same topic and let Micronaut dispatch each one to the consumer concerned with each type. However, the best practice is to use different Kafka topics for different event types. You can certainly mix different types in the same topic and get away with it, using either inheritance or composition. But at the end of the day, all parties consuming from the same topic should be able to handle all events (even if that means simply ignoring some of them).

@sdelamo I wouldn't recommend pursuing this change because 1) it doesn't promote good practices and 2) it's not backward compatible. Any thoughts before I close the issue?

jmera commented 1 year ago

@guillermocalvo first and foremost, it's rude to assume a) that this is careless for my particular use case and b) that all of these events are unrelated. In fact, it's the opposite: all of these events are related and they must be processed in order. Reason being they represent changes occurring in our sytem, scoped to that particular event's KafkaKey; they are events that represent actions users take in our system.

If you have a better approach for ensuring events scoped to a particular KafkaKey are consumed in order, I'm all ears.

At my company we've implemented a custom exception handler; I was just trying to help out the OS community by reporting what I perceive to be a Micronaut bug. But if that is not the case and this is intended behavior, apologies, please close this issue. Thank you for your contributions to micronaut-kafka.

guillermocalvo commented 1 year ago

@jmera Sorry if I came accross as rude. That was not my intention at all ☹️

it's rude to assume a) that this is careless for my particular use case

By "carelessly" I just meant that producers get to send events without having to worry too much about the destination topic -- because there's only 1 topic for N kinds of events.

and b) that all of these events are unrelated. In fact, it's the opposite: all of these events are related and they must be processed in order. Reason being they represent changes occurring in our sytem, scoped to that particular event's KafkaKey; they are events that represent actions users take in our system.

By "unrelated" I meant that the types of the events are disjoint. Since you didn't specify that RecordA and RecordB extend a common parent RecordX, I assumed your events are not related type-wise.

If you have a better approach for ensuring events scoped to a particular KafkaKey are consumed in order, I'm all ears.

At my company we've implemented a custom exception handler; I was just trying to help out the OS community by reporting what I perceive to be a Micronaut bug.

We definitely appreciate your help and the time you took to raise this issue. My concern here was that, while your use case may be perfectly valid, this is probably not the way most users expect it to work.

The most common scenario is that all events sent to a topic have the same type. If an event of the wrong type is ever sent to the wrong topic, it is considered a programming error. Most users want to see that exception logged, so they can track the error down. Otherwise, the problem would go on unnoticed: those events would be silently ignored, and consumers that are receiving events from the "right" topic would never get a chance to process them. That is the kind of situation I wanted to avoid.

jmera commented 1 year ago

@guillermocalvo all is good, thank you for clarifying; it's difficult to express intent over text 🙂

In my example, each record is an Avro-generated class that extends org.apache.avro.specific.SpecificRecordBase (thus the deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer). So from that perspective yes they are related. For what it's worth, semantically the events mean things like: "customer A saved entity X to their favorites" or "customer B deleted entity Y from their favorites", "customer A updated entity X in their favorites", etc.

Inheritance: If your event types are related to each other, you can have consumers receive the common ancestor and decide what to do with them depending on their subtype.

I haven't given this a shot, but a path I think is worth exploring

jmera commented 1 year ago

@guillermocalvo your suggestion to use inheritance turned out to be quite clean compared to my example above. I was able to consolidate consumers into one that looks something like this:

@KafkaListener
public class ConsolidatedConsumer {
    @Topic("mytopic")
    public void receive(@KafkaKey com.company.KafkaKey key, @MessageBody SpecificRecordBase record) {
        switch (record) {
            case RecordA recordA -> {
                // do stuff with recordA
            }
            case RecordB recordB -> {
                // do stuff with recordB
            } // ... etc
            default -> {
                log.error("Some error message");
            }
        }
    }
}

In my opinion, it's not completely obvious this is the suggested way to handle publishing N event types to 1 topic. Do you think it makes sense to add some sort of documentation to micronaut-kafka around this particular use case?

guillermocalvo commented 1 year ago

@jmera 👍 Your example looks good! In my opinion, consumer consolidation reduces complexity and saves you from having to coordinate different consumers and their respective group IDs.

As for documenting this approach, I'm not really sure 🤔 I still think that, in general, we shouldn't encourage users to mix different types of events in the same Kafka topic.

@sdelamo What do you think?