spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.12k stars 1.52k forks source link

Support customer-defined MessagConverter in @KafkaListener annotated method. #3346

Open rujche opened 2 weeks ago

rujche commented 2 weeks ago

Steps:

  1. Create a POJO by avro maven plugin: 1.1. Add avro-maven-plugin in pom.xml.
    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.11.0</version>
            <executions>
                <execution>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>${project.basedir}/src/main/resources/schema-registry</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
    </build>

    1.2. Add avro file (SampleMessage.avsc) in src/main/resources folder.

    {
    "namespace": "rujche.sample.avsc.generated",
    "type": "record",
    "name": "SampleMessage",
    "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    }
    ]
    }

    1.3. Run mvn clean install to generate code (SampleMessage.java).

    
    /**
    * Autogenerated by Avro
    *
    * DO NOT EDIT DIRECTLY
    */
    package rujche.sample.avsc.generated;

import org.apache.avro.generic.GenericArray; import org.apache.avro.specific.SpecificData; import org.apache.avro.util.Utf8; import org.apache.avro.message.BinaryMessageEncoder; import org.apache.avro.message.BinaryMessageDecoder; import org.apache.avro.message.SchemaStore;

@org.apache.avro.specific.AvroGenerated public class SampleMessage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { private static final long serialVersionUID = -4620837119995760321L;

public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"SampleMessage\",\"namespace\":\"rujche.sample.avsc.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"description\",\"type\":\"string\"}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

private static final SpecificData MODEL$ = new SpecificData();

private static final BinaryMessageEncoder ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);

private static final BinaryMessageDecoder DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);

... }


2. Write message receive method.
```java
@Configuration
public class KafkaConfiguration {
    @KafkaListener(id = "sample-message-group", topics = "${eventhubs.eventhub}")
    public void listen(SampleMessage message) {
        LOGGER.info("Received message. topic = {}, message = {}", properties.getEventhub(), message);
    }
}
  1. Run application.

Expected behavior: message converted to SampleMessage successfully. Current Behavior: throw exception like this:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.avro.generic.GenericData$Record] to [rujche.sample.avsc.generated.SampleMessage] for GenericMessage [payload={"name": "test-message", "description": "Message from SampleController."}, headers={kafka_offset=5, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@37d70054, kafka_timestampType=LOG_APPEND_TIME, content-type=[B@4ca22ade, kafka_receivedPartitionId=0, kafka_receivedTopic=rujche24070501eh, kafka_receivedTimestamp=1720257701209, kafka_groupId=sample-message-group}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.1.jar:3.2.1]
    ... 16 common frames omitted

Workaround: Use GenericMessage<Object> instead of SampleMessage:

    @KafkaListener(id = "sample-message-group", topics = "${eventhubs.eventhub}")
    public void listen(GenericMessage<Object> message) {
        LOGGER.info("Received message. topic = {}, message = {}", properties.getEventhub(), message);
    }

Related codes: https://github.com/spring-projects/spring-kafka/blob/98957e0c6c97a42830624eef6de3a6f2eecb521a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java#L1204-L1215

artembilan commented 1 week ago

There is respective option to set such a converter:

    /**
     * Set the bean name of a
     * {@link org.springframework.messaging.converter.SmartMessageConverter} (such as the
     * {@link org.springframework.messaging.converter.CompositeMessageConverter}) to use
     * in conjunction with the
     * {@link org.springframework.messaging.MessageHeaders#CONTENT_TYPE} header to perform
     * the conversion to the required type. If a SpEL expression is provided
     * ({@code #{...}}), the expression can either evaluate to a
     * {@link org.springframework.messaging.converter.SmartMessageConverter} instance or a
     * bean name.
     * @return the bean name.
     * @since 2.7.1
     */
    String contentTypeConverter() default "";

However I'm not sure what converter implementation you can use for Avro. We don't have one out-of-the-box.