spring-cloud / stream-applications

Functions and Spring Cloud Stream Applications for data driven microservices
https://spring.io/projects/spring-cloud-stream-applications
247 stars 106 forks source link

Header serialization exception in aggregator processor with Redis #500

Closed CEDDM closed 11 months ago

CEDDM commented 11 months ago

When I configure my aggregator with Redis to cache JSON messages, I get a serialization exception because the object does not implement Serializable. This is an known problem with Redis default serializer (JdkSerializationRedisSerializer).

The problem is we don't have a real Java object when the aggregator is used in SCDF (the messages come from Kafka or RabbitMQ) The recommanded workaround I found is to use GenericJackson2JsonRedisSerializer but there is no way to do this without forking the aggregator (see the answer of @artembilan here : https://stackoverflow.com/questions/77088203/redis-serializer-properties-in-scdf).

It would be great to add a property to set the Redis serializer in the app configuration.

EDIT : According to @artembilan analysis, the problem comes from headers not the payload so the description above is wrong. Some headers must be filtered if not serializable

artembilan commented 11 months ago

Any explanation how an aggregator processor receives from the binder something what is not byte[]? Well, I may understand that JsonBytesToMap function may have an effect since default content-type in Spring Cloud Stream is JSON. But again: the result Map is Serializable.

May you can share with us a stack trace from that serialization error, so we would have some clue what is going on?

Thanks

CEDDM commented 11 months ago

Sorry I forgot to mention that I don't use the last version of the aggregator. I use 2021.1.2 version. I will give a try with the latest version containing the JsonBytesToMap evolution and come back to you.

CEDDM commented 11 months ago

In fact I think the problem happens before the aggregation when the app is caching individual messages to Redis so it's not a Map at this point. And yes I don't understand why there is a serialization problem as the message is already byte[]... Anyway I will try with the latest version and post the stacktrace

CEDDM commented 11 months ago

I just see that 2021.1.2 is still the latest stable version so I couldn't try with a more recent one. But my aggregation is : aggregation=#this.![@jacksonObjectMapper.readValue(payload, T(java.util.Map))] So I think it's pretty much the same as JsonBytesToMap

Here is the stacktrace :

2023-09-18T11:16:30.665905861+02:00 stdout F 2023-09-18 09:16:30.665 ERROR [test-aggregator-aggregator,5037da907fd2b327,ee76cce914935991] 1 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'aggregator'; defined in: 'org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration'; from source: 'public org.springframework.integration.config.AggregatorFactoryBean org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration.aggregator(org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.messaging.MessageChannel)']; nested exception is java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)', failedMessage=GenericMessage [payload=byte[48], headers={content-length=439, sequenceSize=4, target-protocol=kafka, authorization=Basic Om51bGw=, b3=5037da907fd2b327-98ac8c6d934be05e-0, host=10.152.183.167:8181, source-type=kafka, connection=Keep-Alive, correlationId=78c5121d-a229-ec2e-3201-8233291b4f10, id=4cca4fb5-180b-345e-71a3-ffe2020fe01b, contentType=application/json, kafka_receivedTimestamp=1695028590365, timestamp=1695028590574, http_requestMethod=POST, sequenceNumber=3, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-aggregator.splitter, accept=[text/plain, application/json, application/*+json, */*], nativeHeaders={}, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@66c1b1ed, http_requestUrl=http://10.152.183.167:8181/, kafka_receivedPartitionId=0, accept-encoding=gzip,deflate, kafka_groupId=test-aggregator, user-agent=Apache-HttpClient/4.5.13 (Java/11.0.20.1)}]
2023-09-18T11:16:30.665926538+02:00 stdout F    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
2023-09-18T11:16:30.665929086+02:00 stdout F    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
2023-09-18T11:16:30.665930856+02:00 stdout F    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:59)
2023-09-18T11:16:30.665932291+02:00 stdout F    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
2023-09-18T11:16:30.665933719+02:00 stdout F    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
2023-09-18T11:16:30.665935112+02:00 stdout F    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
2023-09-18T11:16:30.665936549+02:00 stdout F    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
2023-09-18T11:16:30.66593819+02:00 stdout F     at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
2023-09-18T11:16:30.665939641+02:00 stdout F    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
2023-09-18T11:16:30.665941166+02:00 stdout F    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
2023-09-18T11:16:30.665942774+02:00 stdout F    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
2023-09-18T11:16:30.66594415+02:00 stdout F     at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
2023-09-18T11:16:30.665945525+02:00 stdout F    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
2023-09-18T11:16:30.665946945+02:00 stdout F    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
2023-09-18T11:16:30.66594843+02:00 stdout F     at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
2023-09-18T11:16:30.665949805+02:00 stdout F    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
2023-09-18T11:16:30.66595119+02:00 stdout F     at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
2023-09-18T11:16:30.665952594+02:00 stdout F    at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
2023-09-18T11:16:30.665953988+02:00 stdout F    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
2023-09-18T11:16:30.66595724+02:00 stdout F     at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
2023-09-18T11:16:30.665958754+02:00 stdout F    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
2023-09-18T11:16:30.665960145+02:00 stdout F    at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
2023-09-18T11:16:30.665963507+02:00 stdout F    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
2023-09-18T11:16:30.665964956+02:00 stdout F    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
2023-09-18T11:16:30.665966347+02:00 stdout F    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
2023-09-18T11:16:30.665967724+02:00 stdout F    at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
2023-09-18T11:16:30.6659694+02:00 stdout F  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
2023-09-18T11:16:30.66597084+02:00 stdout F     at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
2023-09-18T11:16:30.665972266+02:00 stdout F    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
2023-09-18T11:16:30.665973876+02:00 stdout F    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
2023-09-18T11:16:30.66597532+02:00 stdout F     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
2023-09-18T11:16:30.665976718+02:00 stdout F    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
2023-09-18T11:16:30.6659781+02:00 stdout F  at java.base/java.lang.Thread.run(Unknown Source)
2023-09-18T11:16:30.66597977+02:00 stdout F Caused by: java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)'
2023-09-18T11:16:30.665981195+02:00 stdout F    at org.springframework.integration.redis.store.RedisMessageStore.rethrowAsIllegalArgumentException(RedisMessageStore.java:188)
2023-09-18T11:16:30.665982596+02:00 stdout F    at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:128)
2023-09-18T11:16:30.665983994+02:00 stdout F    at org.springframework.integration.store.AbstractKeyValueMessageStore.doAddMessage(AbstractKeyValueMessageStore.java:146)
2023-09-18T11:16:30.665985421+02:00 stdout F    at org.springframework.integration.store.AbstractKeyValueMessageStore.addMessagesToGroup(AbstractKeyValueMessageStore.java:214)
2023-09-18T11:16:30.66598687+02:00 stdout F     at org.springframework.integration.store.AbstractMessageGroupStore.addMessageToGroup(AbstractMessageGroupStore.java:189)
2023-09-18T11:16:30.665988294+02:00 stdout F    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:854)
2023-09-18T11:16:30.665989757+02:00 stdout F    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:566)
2023-09-18T11:16:30.665991159+02:00 stdout F    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:541)
2023-09-18T11:16:30.665992539+02:00 stdout F    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
2023-09-18T11:16:30.665993961+02:00 stdout F    ... 31 more
2023-09-18T11:16:30.665995666+02:00 stdout F Caused by: org.springframework.data.redis.serializer.SerializationException: Cannot serialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
2023-09-18T11:16:30.665997091+02:00 stdout F    at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:96)
2023-09-18T11:16:30.66599852+02:00 stdout F     at org.springframework.data.redis.core.AbstractOperations.rawValue(AbstractOperations.java:128)
2023-09-18T11:16:30.666001531+02:00 stdout F    at org.springframework.data.redis.core.DefaultValueOperations.setIfAbsent(DefaultValueOperations.java:364)
2023-09-18T11:16:30.666002961+02:00 stdout F    at org.springframework.data.redis.core.DefaultBoundValueOperations.setIfAbsent(DefaultBoundValueOperations.java:190)
2023-09-18T11:16:30.666004357+02:00 stdout F    at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:121)
2023-09-18T11:16:30.666005745+02:00 stdout F    ... 38 more
2023-09-18T11:16:30.666007412+02:00 stdout F Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
2023-09-18T11:16:30.666008808+02:00 stdout F    at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:64)
2023-09-18T11:16:30.666010247+02:00 stdout F    at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:33)
2023-09-18T11:16:30.666012758+02:00 stdout F    at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:94)
2023-09-18T11:16:30.666014248+02:00 stdout F    ... 42 more
2023-09-18T11:16:30.666015674+02:00 stdout F Caused by: java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
2023-09-18T11:16:30.666017077+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666018469+02:00 stdout F    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666019849+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666021262+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666022639+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666024069+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
2023-09-18T11:16:30.666025446+02:00 stdout F    at java.base/java.util.ArrayList.writeObject(Unknown Source)
2023-09-18T11:16:30.666026844+02:00 stdout F    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-09-18T11:16:30.666028228+02:00 stdout F    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666029592+02:00 stdout F    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666030953+02:00 stdout F    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2023-09-18T11:16:30.666032331+02:00 stdout F    at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
2023-09-18T11:16:30.666033715+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666037642+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666039145+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666040577+02:00 stdout F    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666041959+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666043326+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666044699+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.66604606+02:00 stdout F     at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666047441+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666048889+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666051926+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666053401+02:00 stdout F    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666054782+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666056152+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.66605756+02:00 stdout F     at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666059016+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
2023-09-18T11:16:30.666060396+02:00 stdout F    at java.base/java.util.HashMap.internalWriteEntries(Unknown Source)
2023-09-18T11:16:30.666061842+02:00 stdout F    at java.base/java.util.HashMap.writeObject(Unknown Source)
2023-09-18T11:16:30.666063224+02:00 stdout F    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-09-18T11:16:30.666064608+02:00 stdout F    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666065982+02:00 stdout F    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666067382+02:00 stdout F    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2023-09-18T11:16:30.666068754+02:00 stdout F    at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
2023-09-18T11:16:30.666070126+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666071517+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666073907+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666075358+02:00 stdout F    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666076745+02:00 stdout F    at java.base/java.io.ObjectOutputStream.defaultWriteObject(Unknown Source)
2023-09-18T11:16:30.66607816+02:00 stdout F     at org.springframework.messaging.MessageHeaders.writeObject(MessageHeaders.java:317)
2023-09-18T11:16:30.666079551+02:00 stdout F    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-09-18T11:16:30.666080931+02:00 stdout F    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666082335+02:00 stdout F    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666083716+02:00 stdout F    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2023-09-18T11:16:30.666085169+02:00 stdout F    at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
2023-09-18T11:16:30.666086546+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666087944+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666089326+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666090714+02:00 stdout F    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666092079+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666094271+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666095703+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.66609713+02:00 stdout F     at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666098505+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666099947+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666102962+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666104421+02:00 stdout F    at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
2023-09-18T11:16:30.666105795+02:00 stdout F    at org.springframework.core.serializer.DefaultSerializer.serialize(DefaultSerializer.java:46)
2023-09-18T11:16:30.666107178+02:00 stdout F    at org.springframework.core.serializer.Serializer.serializeToByteArray(Serializer.java:56)
2023-09-18T11:16:30.666126366+02:00 stdout F    at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:60)
2023-09-18T11:16:30.666129355+02:00 stdout F    ... 44 more
2023-09-18T11:16:30.666130766+02:00 stdout F 

Thanks for your help !

artembilan commented 11 months ago

OK. According to the stack trace, there is nothing with a payload. The problem comes from MessageHeaders serialization:

2023-09-18T11:16:30.66607816+02:00 stdout F     at org.springframework.messaging.MessageHeaders.writeObject(MessageHeaders.java:317)

And real exception is like this:

2023-09-18T11:16:30.666015674+02:00 stdout F Caused by: java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1

So, your original request is wrong since we definitely don't talk about "JSON or not" for a payload: we are just failing to serialize message headers. We do have a logic there in the MessageHeaders:

    private void writeObject(ObjectOutputStream out) throws IOException {
        Set<String> keysToIgnore = new HashSet<>();
        this.headers.forEach((key, value) -> {
            if (!(value instanceof Serializable)) {
                keysToIgnore.add(key);
            }
        });

Therefore that DefaultKafkaConsumerFactory$1 could be as a part of some other header which passes the mentioned condition.

We need to investigate what headers are populated by binders and filter them out before applying for an aggregator handler.

However to trigger that process I need you to refactor this ticket to the proper problem description. I wish we had that stack trace long time ago...

Thank you for understanding!

CEDDM commented 11 months ago

Thanks I didn't see that line in the stack trace ! I changed the title and add an EDIT in the description. I hope it's OK.

Here are the headers of one individual Kafka message of my example, if that helps :

{
    "content-length": "439",
    "http_requestMethod": "POST",
    "sequenceNumber": "1",
    "sequenceSize": "4",
    "target-protocol": "kafka",
    "accept": "[\"text/plain\",\"application/json\",\"application/*+json\",\"*/*\"]",
    "authorization": "Basic kshfbljgblgb=",
    "b3": "5037da907fd2b327-36bb30cb5c84cd85-0",
    "nativeHeaders": "{\"b3\":[\"5037da907fd2b327-36bb30cb5c84cd85-0\"]}",
    "spring_json_header_types": "{\"content-length\":\"java.lang.Long\",\"http_requestMethod\":\"java.lang.String\",\"sequenceNumber\":\"java.lang.Integer\",\"sequenceSize\":\"java.lang.Integer\",\"target-protocol\":\"java.lang.String\",\"accept\":\"java.util.ArrayList\",\"authorization\":\"java.lang.String\",\"b3\":\"java.lang.String\",\"nativeHeaders\":\"org.springframework.util.LinkedMultiValueMap\",\"host\":\"java.lang.String\",\"http_requestUrl\":\"java.lang.String\",\"connection\":\"java.lang.String\",\"correlationId\":\"java.util.UUID\",\"contentType\":\"java.lang.String\",\"accept-encoding\":\"java.lang.String\",\"user-agent\":\"java.lang.String\"}",
    "host": "10.152.xxx.yyy:8181",
    "http_requestUrl": "http://10.152.xxx.yyy:8181/",
    "connection": "Keep-Alive",
    "correlationId": "\"78c5121d-a229-ec2e-3201-8233291b4f10\"",
    "contentType": "application/json",
    "accept-encoding": "gzip,deflate",
    "user-agent": "Apache-HttpClient/4.5.13 (Java/11.0.20.1)"
}
artembilan commented 11 months ago

Well, those headers are OK and they are probably a part of Kafka record before it is pulled by consumer. And then this Kafka Binder consumer adds its own headers and that's what I'd like to investigate: what headers are added by the binder before message is reached the target handler in the function.

@sobychacko , any chances that you can quickly enlighten us what headers are populated by the Kafka Binder on the inbound side?

I wonder what is that org.springframework.kafka.core.DefaultKafkaConsumerFactory$1 and why it is not a top-level header to be skipped from Serializable filter?

Thanks

sobychacko commented 11 months ago

@CEDDM, We spent some time debugging this issue with @artembilan. The problem occurs because of the micrometer listeners we are adding in the binder and hence Spring Kafka creates a proxy object for the actual KafkaConsumer. The proxy is not able to get serialized over the wire. We are thinking about a potential fix. In the meantime, as a potential workaround, can you try to make the following change in the aggregator app and build a custom version for your use?

Add the following bean in your custom version.

@Bean
ClientFactoryCustomizer binderClientFactoryCustomizer() {
    return new ClientFactoryCustomizer() { };
}

When the binder detects this bean, it does not add any micrometer listeners, thus avoiding the proxy issue above.

Once you add that, you need to rebuild the app for the Kafka binder using the standard procedures for building a custom app. You need to rebuild and re-generated the binder-based app for the aggregator until we develop a proper fix at the framework level.

artembilan commented 11 months ago

Here is the fix for Spring for Apache Kafka: https://github.com/spring-projects/spring-kafka/pull/2822.

Since there is nothing we can fix in this project to mitigate the problem, I'm going to close this as Invalid.

Well, it is valid, but it does not trigger anything to be changed in this project, unless the version upgrade when it is available. Another workaround for this issue is to not use an external DB for message store in the aggregator processor.

Thank you for the report and understanding!

CEDDM commented 11 months ago

Thanks for the fix in Spring Kafka. In some cases, we really need to store messages to avoid losing them. The latest stable version is more than a year old. Any ETA for the next stable version with all the recent fixes and hopefully with the Spring Kafka fix ?

artembilan commented 11 months ago

PR for Spring for Apache Kafka has been just opened, so the fix won't make it the release until October. At the same time this project might not be updated to next Spring Boot version (to pull that fix from Spring for Apache Kafka) until November. Therefore you are stuck with suggested workarounds for a while.

Sorry for inconvenience.

Well, I can come up with a workaround for the version of Stream Applications which release is due today, but that is already for fully fresh generation of this project. Will it work for you upgrading to them afterwards?

artembilan commented 11 months ago

Correction: the release is postponed to next Wednesday. Reopening to provide the discussed fix as out-of-the-box workaround for this aggregator application.

CEDDM commented 11 months ago

Thanks a lot @artembilan ! That's great news !

artembilan commented 11 months ago

See related PR. But still sorry: even i we back-port it to previous version, there is no clues when we are going to release it. So, probably better to be ready for upcoming 4.0.0.