spring-attic / spring-cloud-gcp

Integration for Google Cloud Platform APIs with Spring
Apache License 2.0
704 stars 694 forks source link

spring.cloud.stream.bindings.input.consumer.maxattempts and backOffInitialInterval does not work with pubsub #2560

Closed thayhoang closed 3 years ago

thayhoang commented 3 years ago

I used this setting with spring-cloud-starter-stream-rabbit and as I understand behind the scene, Spring create a RetryTemplate and retry the method until maxattempts. This doesn't work with spring-cloud-gcp-pubsub-stream-binder anymore. Here's my configuration

spring:
  cloud:
    stream:
      bindings:
        input:
          contentType: application/json
          destination: my-destination
          group: my-group
          consumer:
            concurrency: 6
            maxAttempts: 3
            backOffInitialInterval: 10000

Does spring-cloud-gcp-pubsub-stream-binder support maxattempts and backOffInitialInterval?

thayhoang commented 3 years ago

I'm looking into org.springframework.cloud.gcp.stream.binder.pubsub,PubSubMessageChannelBinder source code and it looks like it doesn't use any properties under consumer at all. Can anybody help me with this?

        @Override
    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
            ExtendedConsumerProperties<PubSubConsumerProperties> properties) {

        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(this.pubSubTemplate,
                destination.getName());

        ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, properties);
        adapter.setErrorChannel(errorInfrastructure.getErrorChannel());

        return adapter;
    }
elefeint commented 3 years ago

@thayhoang You are correct; we do not support the additional retry options in the Pub/Sub binder.

Similarly to #2406, are you looking for:

  1. Retry if message retrieval from Pub/Sub failed or
  2. Retry if the application message processing logic failed
thayhoang commented 3 years ago

Thank you, @elefeint Is the concurrency property is respected by pubsub? Because I dont see it referenced in the Pubsub binder class

elefeint commented 3 years ago

It is not, but you may find two related Pub/Sub-native properties useful: spring.cloud.gcp.pubsub.subscriber.parallel-pull-count and spring.cloud.gcp.pubsub.subscriber.executor-threads -- the number of parallel subscribers will be parallel-pull-count times executor-threads.

meltsufin commented 3 years ago

@thayhoang I've just verified that spring.cloud.stream.bindings.input.consumer.maxAttempts is in fact used by the Pub/Sub Binder. However, you need to be using it with message polling. Note that PubSubMessageChannelBinder extends AbstractMessageChannelBinder, and that's where there is logic to create a RetryTemplate if the maxAttempts property is set. This only applies to failures on message retrieval, however, not application message processing failures.

thayhoang commented 3 years ago

It is not, but you may find two related Pub/Sub-native properties useful: spring.cloud.gcp.pubsub.subscriber.parallel-pull-count and spring.cloud.gcp.pubsub.subscriber.executor-threads -- the number of parallel subscribers will be parallel-pull-count times executor-threads.

@elefeint So these 2 settings will be applied for all subscriptions right? What if I want to have a different parallel setting for each subscription. For example I have two subscription, one for main topic, the other for dead letter topic. How can I have different parallel setting for each one?

spring:
  cloud:
    stream:
      bindings:
        input:
          contentType: application/json
          destination: my-des
          group: test
        input-dlt:
          contentType: application/json
          destination: dead-letter-topic
          group: test
elefeint commented 3 years ago

That's correct; we don't support customizing Pub/Sub settings per-stream.

What kind of issues are you seeing with global parallelization settings?

thayhoang commented 3 years ago

@elefeint It's just my old project used rabbitmq and we can set different concurrency on each stream and I'd like to keep it the same with pubsub. It's not a big deal though. About parallel-pull-count and executor-threads, What's the difference between them? Suppose I need to have concurrency of 6, should I set it parallel-pull-count = 2 and executor-threads = 3 or vice versa? Or 1 parallel-pull-count and 6 executor-threads? What's the combination of these that you recommend?

elefeint commented 3 years ago

Each unit in parallel-pull-count will create is own thread pool with executor-threads in each. Which values to set depends on how long your application takes to process messages, and the general message traffic shape.

There was a good answer on StackOverlow from @anguillanneuf describing the two properties.