spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
988 stars 603 forks source link

Null Payloads During Deserialization with Spring Cloud Stream Kafka #2994

Open BernardoMatar opened 3 weeks ago

BernardoMatar commented 3 weeks ago

Describe the bug Issue Description:

I am using Spring Cloud Stream with Kafka Stream functions and am encountering an issue where payloads are deserialized as null. I suspect this might be due to a configuration problem or an issue I have not yet identified. Here is the detailed behavior:

When processing messages through the Kafka stream function, I receive null payloads after deserialization. Up until the SmartCompositeMessageConverter method fromMessage, the message attribute contains a GenericMessage with the complete payload according to the batch-size. However, the conversion process fails, resulting in an empty list with no exceptions or error messages. No exceptions are thrown during the deserialization process.

Sample springBootVersion=2.7.18 org.springframework.cloud:spring-cloud-starter-stream-kafka:4.1.3'

image

image

image

image

image

BernardoMatar commented 3 weeks ago

Additionally, when batch-mode=false, deserialization occurs as expected. Upon further debugging, I noticed that when batch-mode=false in the SmartCompositeMessageConverter class, within the method fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint), the code does not proceed through the iteration since the payload is not iterable. As a result, when it checks for any available converters, which also turns out not to be the case, the method ends up returning null.

However, when it does pass the if condition where the payload is recognized as iterable, it undergoes the same situation of searching for converters, and if none are found, the return value is different, becoming an empty list.

When control returns to the SimpleFunctionRegistry class in the convertInputIfNecessary method, it does not give ConversionService a chance because convertedInput is not null, but rather a list.size=0.

Outside of batch mode, the conversion is handled by the following code snippet, but in batch mode, this section is not executed. If, during batch mode, I force convertedInput to be null instead of an empty list, the expected result emerges.

if (convertedInput == null) { // give ConversionService a chance convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false); }

Therefore, as a suggestion, instead of returning a list of size 0, consider checking if it is 0 and returning null, exactly at the point indicated in the image below.

image

olegz commented 3 weeks ago

Yes, this has been reported already and we have a solution, just need a bit more time to properly implement it as it will be a minor yet breaking change. FWIW, the reason why conversion works differently when batch is set to false is because at that time the collection is converted as a whole instead of one message at a time which is also something we can probably improve on. . . Stay tuned. . .

olegz commented 1 week ago

Related - https://github.com/spring-cloud/spring-cloud-stream/issues/2986