spring-cloud / spring-cloud-function

Apache License 2.0
1.04k stars 618 forks source link

Getting inconsistent payload and headers in kafka stream consumer functional because of SimpleFunctionRegistry #1174

Closed ellogi closed 1 month ago

ellogi commented 2 months ago

I use Spring cloud stream kafka binder with functions to describe my kafka consumer and consume messages in batch mode.

    @Bean
    public Consumer<Message<List<InputOrder>>> process() {
        return messages -> {
            List<Map<String, Object>> headers = messages.getHeaders()
                    .get(
                            KafkaHeaders.BATCH_CONVERTED_HEADERS,
                            List.class
                    );
            var orders = messages.getPayload();
      };
    }

And properties is

    cloud:
        stream:
            kafka:
                binder:
                    brokers:
                        - <broker>
                    configuration:
                        max.poll.records: 20
            bindings:
                process-in-0:
                    binder: kafka
                    destination: <topic>
                    content-type: application/json
                    group: <group>
                    consumer:
                        batch-mode: true
            default-binder: kafka

If one of the input kafka json messages couldn't be deserialized in org.springframework.cloud.function.context.config.JsonMessageConverter#convertFromInternal for any reason, then exception is swallowed silently and null is returned. After that SmartCompositeMessageConverter#fromMessage(org.springframework.messaging.Message<?>, java.lang.Class<?>, java.lang.Object) just doesn't add that message to converted result. So If one item in message.payload got this conversion error, but the second one does not, then convertedInput in SimpleFunctionRegistry.FunctionInvocationWrapper#convertInputMessageIfNecessary would have only second item without any exception or error message, but at the same time it would still have two item of headers because convertedInput is checked only on null and doesn't be checked that it has the same count of input and converted messages. So, finally in my consumer function I get messages with 1 message in payload and 2 item in headers and, moreover, the order in headers is incorrect. I mean that the first header is for the lost message, not for the presented, but it is impossible to determine it.

Here in the end convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build(); we have one item in convertedInput, but copy all original headers

        private Object convertInputMessageIfNecessary(Message message, Type type) {
            if (type == null) {
                return null;
            }
            if (message.getPayload() instanceof Optional) {
                return message;
            }
            if (message.getPayload() instanceof Collection<?>) {
                Type itemType = FunctionTypeUtils.getImmediateGenericType(type, 0);
                if (itemType == null) {
                    itemType = type;
                }
                Type collectionType = CollectionUtils.findCommonElementType((Collection<?>) message.getPayload());
                if (collectionType == itemType) {
                    return message.getPayload();
                }
            }

            Object convertedInput = message.getPayload();

            Type itemType = this.extractActualValueTypeIfNecessary(type);
            Class<?> rawType = FunctionTypeUtils.isMessage(type)
                    ? FunctionTypeUtils.getRawType(itemType)
                    : FunctionTypeUtils.getRawType(type);
            convertedInput = this.isConversionHintRequired(type, rawType)
                    ? SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType, itemType)
                    : SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType);

            if (convertedInput != null && !rawType.isAssignableFrom(convertedInput.getClass())) {
                logger.warn("Failed to convert input to " + rawType + ". Will attempt to invoke function with raw type");
            }

            if (FunctionTypeUtils.isMessage(type)) {
                if (convertedInput == null) {
                    if (logger.isDebugEnabled()) {
                        /*
                         * In the event conversion was unsuccessful we simply return the original un-converted message.
                         * This will help to deal with issues like KafkaNull and others. However if this was not the intention
                         * of the developer, this would be discovered early in the development process where the
                         * additional message converter could be added to facilitate the conversion.
                         */
                        logger.debug("Input type conversion of payload " + message.getPayload() + " resulted in 'null'. "
                                + "Will use the original message as input.");
                    }

                    convertedInput = message;
                }
                else {
                    if (!(convertedInput instanceof Message)) {
                        convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
                    }
                }
            }
            return convertedInput;
        }

Is it a correct behavior and I must always check count of headers and count of items in payload in consumer? Or maybe I miss some properties or error handling or anything else? And I am also unsure that losing incorrect input messages silently is correct. Thanks in advance for any feedback.

Affects at least version 3.2.12

olegz commented 1 month ago

This was resolved via a series of commits ins spring-cloud-stream

5d881b2ad 29a355832 058fc660a fac9eb1eb 14c10462f

As well as the commit in spring-cloud-function

6dee668e3

It was alos back-ported to 4.1.x and will be available with the next release

LukeKynaston commented 1 day ago

@olegz Do you know when 4.1.4 will be built?