spring-attic / spring-cloud-gcp

Integration for Google Cloud Platform APIs with Spring
Apache License 2.0
704 stars 694 forks source link

With MessageHistory enabled, sending a PubSub message and consuming it in a subscription fails due to IllegalArgumentException #2562

Closed tvirtualw closed 3 years ago

tvirtualw commented 3 years ago

I have enabled Spring Integration message history with the @EnableMessageHistory annotation.

After that, sending a message to a PubSub topic via PubSubMessageHandler and then consuming it again from a PubSub subscription fails with the following exception.

2020-10-22 11:29:27.107  WARN 6060 --- [sub-subscriber2] .s.c.g.p.i.i.PubSubInboundChannelAdapter : Sending Spring message [1] failed; message nacked automatically.

java.lang.IllegalArgumentException: Incorrect type specified for header 'history'. Expected [class org.springframework.integration.history.MessageHistory] but actual type is [class java.lang.String]
    at org.springframework.messaging.MessageHeaders.get(MessageHeaders.java:215)
    at org.springframework.integration.history.MessageHistory.write(MessageHistory.java:93)
    at org.springframework.integration.endpoint.MessageProducerSupport.trackMessageIfAny(MessageProducerSupport.java:292)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:206)
    at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:148)
    at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:191)
    at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:395)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

I don't know much about the framework internals but I believe that "!" + MessageHistory.HEADER_NAME should be added to org.springframework.cloud.gcp.pubsub.integration.PubSubHeaderMapper#outboundHeaderPatterns to fix this.

By adding this pattern manually like below, the exception disappears.

        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, gcpSettings.getSystemIdTopicName());
        PubSubHeaderMapper mapper = new PubSubHeaderMapper();
        mapper.setOutboundHeaderPatterns(
                "!" + MessageHistory.HEADER_NAME, //
                "!" + MessageHeaders.ID, //
                "!" + MessageHeaders.TIMESTAMP, //
                "!" + GcpPubSubHeaders.ACKNOWLEDGEMENT, //
                "!" + GcpPubSubHeaders.ORIGINAL_MESSAGE, //
                "!" + NativeMessageHeaderAccessor.NATIVE_HEADERS, //
                "*" //
        );
        handler.setHeaderMapper(mapper);

Unfortunately, the default patterns cannot be easily extended and need to be duplicated, which could lead to issues during version upgrades.

Spring Boot 2.3.4 with Spring Integration 5.3.2 and spring-cloud-gcp-starter-pubsub 1.2.5

meltsufin commented 3 years ago

@tvirtualw Are you expecting this message history to be carried with the message to the consumer? If you filter out the header, the exception may disappear, but you also lose the header propagation. cc/ @artembilan

artembilan commented 3 years ago

I agree. This has to be excluded from sending over the Pub/Sub. This is not a cloud trace, but rather local trace for the current application and should not be carried over the network.

meltsufin commented 3 years ago

Thanks, @artembilan!