spring-attic / spring-cloud-gcp

Integration for Google Cloud Platform APIs with Spring
Apache License 2.0
704 stars 694 forks source link

Spring cloud stream gcp pubsub automatically ack message when shutting down application #2615

Closed thayhoang closed 3 years ago

thayhoang commented 3 years ago

I'm using spring cloud stream google pubsub. Here my consumer class

   @StreamListener(MessageSource.INPUT)
    void handleFileReady(String message) {
        try {

            System.out.println("Receive : " + message);
            process(message);

        } catch (Exception e) {
            handleFailure(e);
        }
    }

    private void process(String message) {
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int i = 1/0;
    }

    private void handleFailure(Exception e) {
        throw new RuntimeException("testt");
    }

my configuration

spring:
  cloud:
    stream:
      bindings:
        inputChannel:
          contentType: application/json
          destination: my-destination
          group: my-group
          default:
            consumer:
              auto-create-resources: true
            producer:
              auto-create-resources: true
    gcp:
      pubsub:
        subscriber:
          max-ack-extension-period: 30000
          parallel-pull-count: 1
          executor-threads: 5

When my application sleeping Thread.sleep(100000), I'm shutting down the application (using IntelliJ). I got the exeption

java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at com.hoangmn.pubsubdemo.service.Consumer.process(Consumer.java:25)
    at com.hoangmn.pubsubdemo.service.Consumer.handleFileReady(Consumer.java:16)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
    at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:149)
    at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:154)
    at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:379)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

At this point, I'd expect the message been nack and when I start the application again it can start consume it again. But the message disappeared. I've also check the google console and see it has been acked. Is this a bug?

image

My code: https://github.com/thayhoang/pubsub-demo

meltsufin commented 3 years ago

I think it should be neither acked on nacked, according to our documentation. Which version of the project are you using? cc/ @elefeint

thayhoang commented 3 years ago

I think it should be neither acked on nacked, according to our documentation. Which version of the project are you using? cc/ @elefeint

"I think it should be neither acked on nacked" -> I think it should be too, then the message passed the acknowledgement deadline and my app can start consume it again. But it disappeared. I'm using spring boot version 2.2.4.RELEASE. You can check out my source code: https://github.com/thayhoang/pubsub-demo

elefeint commented 3 years ago

@thayhoang AUTO_ACK mode is default, so the adapter acknowledges the message as long as it was successfully handed off to a Spring Integration channel. The behavior you are looking for is more consistent with MANUAL mode; here is how to configure it: https://github.com/spring-cloud/spring-cloud-gcp/blob/master/docs/src/main/asciidoc/spring-stream.adoc#consumer-destination-configuration

thayhoang commented 3 years ago

@elefeint image

Correct me if I'm wrong. The documentation mention the default mode is AUTO. which should nack the message when there's an exception.

elefeint commented 3 years ago

I was wrong that the adapter should ACK upon handing off to Spring Integration -- the logic is synchronous, so any exceptions in the handler should result in nack().

@thayhoang I can reproduce the outstanding messages count going down when the demo app is shut down. This is very odd, and should not be happening; I will look into it more.

thayhoang commented 3 years ago

@elefeint When will this fix be available for production?

elefeint commented 3 years ago

There is no date for Hoxton.SR10 release yet (SR9 was released mid-November, so likely some time in January), but we will be releasing the new major version from the new repo first thing in January.

The workaround will be forward-ported by then.

Thank you for identifying the issue!