GoogleCloudPlatform / spring-cloud-gcp

New home for Spring Cloud GCP development starting with version 2.0.
Apache License 2.0
415 stars 307 forks source link

PubSubInboundChannelAdapter does not allow null payload #963

Open michalstefanext opened 2 years ago

michalstefanext commented 2 years ago

Hello, I would like to ask, if there is any possibility to receive message with empty payload via PubSubInboundChannelAdapter? It seems to me that this is impossible, however that is perfectly valid scenario in gcp pubsub.

java.lang.IllegalArgumentException: payload must not be null
    at org.springframework.util.Assert.notNull(Assert.java:201)
    at org.springframework.integration.support.MessageBuilder.<init>(MessageBuilder.java:70)
    at org.springframework.integration.support.MessageBuilder.withPayload(MessageBuilder.java:114)
    at org.springframework.integration.support.DefaultMessageBuilderFactory.withPayload(DefaultMessageBuilderFactory.java:72)
    at org.springframework.integration.support.DefaultMessageBuilderFactory.withPayload(DefaultMessageBuilderFactory.java:31)
    at com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:146)
    at com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:173)
    at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:396)
    at com.google.cloud.pubsub.v1.SequentialExecutorService$AutoExecutor$1.run(SequentialExecutorService.java:134)
    at com.google.cloud.pubsub.v1.SequentialExecutorService$SequentialExecutor$1.run(SequentialExecutorService.java:112)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
meltsufin commented 2 years ago

@michalstefanext This is a validation in Spring Integration itself org.springframework.integration.support.MessageBuilder. I'm not sure why they prevent nulls for the message payload.

cc/ @artembilan

artembilan commented 2 years ago

See here for more info: https://jira.spring.io/projects/INT/issues/INT-997

But generally speaking the null payload was never supported: no data - no message. So, yo need to consider some rework in the PubSubInboundChannelAdapter to prevent such a null to be emitted or some other wrapper object, like we have with KafkaNull as a tombstone signal to send and receive null from Apache Kafka. That's for now until we really introduce an Optional wrapper support in Spring Integration.

meltsufin commented 2 years ago

Thanks @artembilan! It seems that something like a PubSubNull would be the way to go. It will require some changes to PubSubInboundChannelAdapter and our message conversion logic to ensure that PubSubNull is converted to null when needed. We'll accept it as a feature request.

elefeint commented 2 years ago

@emmileaf Take a look at this issue -- it's in the Spring Integration layer of Pub/Sub support.

emmileaf commented 2 years ago

@michalstefanext Do you happen to remember what was the type of the empty payload (string, byte array, etc.) passed to the PubSubInboundChannelAdapter, and whether it went through any custom conversions or transformations?

I am having some difficulty reproducing the exception: directly receiving a pubsub message with empty body seems to be handled as expected (as empty strings and empty byte arrays), and not resulting in null as the Spring message payload.