spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Kafka Custom Batch Message Converter and target type of Records #1117

Closed ferblaca closed 3 years ago

ferblaca commented 3 years ago

Describe the issue When setting up a custom Batch Message Converter where I need to treat each record to convert it to a specific format (deserialize it) it is not possible because the target type is always Object, instead of the final type expected by the Consumer functional interface.

For example, if my Consumer functional interface is Consumer<List<ProductDTO>>, where ProductDTO is a POJO, in the BatchMessagingMessageListenerAdapter class and toMessagingMessage method, getType() is always Object instead of <List<ProductDTO>>.

From what we have seen, this happens because in the initialization of the adapter for Batch (KafkaMessageDrivenChannelAdapter), the super constructor of IntegrationBatchMessageListener is always super(null,null).

        IntegrationBatchMessageListener() {
            super(null, null); // NOSONAR - out if use
        }

In the end, when the BatchMessagingMessageConverter tries to convert the record with the recordConverter set, it is never possible because the condition containerType(type) is always false and convert method is never called.

Is this the expected behavior? How can I use the RecordConverter and use my deserializer to the desired target type in BatchMessagingMessageConverter?

Version of the framework Spring boot 2.5.2 Spring Cloud Ctream 3.1.3 Spring Kafka 2.7.4 Spring integration 5.5.4

garyrussell commented 3 years ago

I can't reproduce your issue; this works fine...

@SpringBootApplication
public class Kbgh1117Application {

    public static void main(String[] args) {
        SpringApplication.run(Kbgh1117Application.class, args);
    }

    @Bean
    public Consumer<List<Foo>> input() {
        return System.out::println;
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input-in-0", "{\"bar\":\"baz\"}".getBytes());
            template.send("input-in-0", "{\"bar\":\"qux\"}".getBytes());
        };
    }

    public static class Foo {

        private String bar;

        public Foo() {
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.input-in-0.group=foo
[Foo [bar=baz], Foo [bar=qux]]

The adapter creates a message with a byte[] payload and the conversion is done higher up the stack, in the org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType().

To answer your analysis:

Passing null into the superclass is a limitation of using the adapter in the KafkaMessageDrivenChannelAdapter; with @KafkaListener (in Spring for Apache Kafka), we can infer the type from the method signature.

With Spring Integration, the adapter is far away from the receiving Consumer and knows nothing about the signature.

The adapter does, however, have a mechanism to set the payload type which is passed into the adapters as the fallback type for the purposes of payload conversion.

There are (at least) several solutions.

  1. Use a JsonDeserializer in the consumer configuration and useNativeDecoding: true.
  2. Add a custom BatchMessagingMessageConverter with an overridden extractAndConvertValue()
  3. Set the payloadType on the channel adapter as the fallback type (not currently exposed by spring-cloud-stream (but could be done with a listener container customizer).

However, this should not be necessary because spring-cloud-stream does its own conversion, as I showed above.

ferblaca commented 3 years ago

Thanks for replying @garyrussell.

Yes, the case you indicate above works correctly. That is, the foo POJOS are deserialized done higher up the stack with the converter ApplicationJsonMessageMarshallingConverter once it reaches SimpleFunctionRegistry.convertInputMessageIfNecessary() where the inferred type of the consumer method signature is known. But if we wanted to perform the conversion in BatchMessagingMessageConverter it would not be possible since the inferred type is unknown.

From what you comment, that the inferred type of the consumer does not know in BatchMessagingMessageConverter.toMessage() is a limitation using Spring Cloud Stream. is it so?

If so, if we wanted to convert/deserialize the messages at this point with BatchMessagingMessageConverter it would not be possible, even if we define our own custom BatchMessagingMessageConverter by overriding the extractAndConvertValue() method, since we will not know the target inferred type.

So we understand that there are the following options:

  1. Use a Deserializer in the consumer configuration and useNativeDecoding: true.
  2. Let the deserialization of the messages is done higher up the stack where the inferred type is known (org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.convertInputMessageIfNecessary()).

The first option, given our use case and the type of deserialization we want to perform, in our particular case is not recommended.

On the other hand, regarding the second option, we have some doubts about how BatchMessagingMessageConverter works:

Message of Records and content-type

From what we have observed, the Message result in BatchMessagingMessageConverter.toMessage() has no content-type header, so by default it is application/json. This means that if the Batch Records were not serialized to json, e.g. with Avro, higher up the stack when trying to convert the Records depending on the supported MimeType, as by default the Message has content-type "application/json", the deserialization of the messages would fail as the ApplicationJsonMessageMessageMarshallingConverter converter will be used instead of the corresponding Avro converter. Shouldn't the return Message of the BatchMessagingMessageConverter.toMessage() method be assigned the content-type that the Records have?

Batch of heterogeneous records

What happens if in the batch of Records there are heterogeneous messages, that is to say, for example, that there are Records serialized to json and Avro in the same batch? If this happens and we let them be deserialized higher in the stack, we will try to convert everything with the same converter and it would not be possible. Is this a situation that should not occur in the case of an SCS consumer in Batch mode? That is, should all messages be serialized in the same way and also be of the same type inferred in the topic that is reading an SCS consumer in Batch mode?

garyrussell commented 3 years ago

If there's a content-type in the incoming record, it should be copied to the converter; if it's not, please open a GitHub issue against spring-integration.

It's not clear what you mean; there can only be one inferred type per listener, which you can wire into the custom converter; if the types are in a hierarchy, you could set the inferred type to the super class (or even Object) and down-cast in the listeners.

It would be rather unusual to mix encoding types in the same topic; you could use a delegating deserializer. The JsonDeserializer has a type mapping feature where the sending side sets a header to indicate the type (either the complete class name, or a token that is mapped to the class). You could use the content-type header to select Json Vs. Avro deserializer(s), with headers to help choose which one to invoke for a record.