spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
988 stars 603 forks source link

[Feature Request] In `KafkaMessageChannelBinder` pass the `extendedConsumerProperties` as an argument to `ListenerContainerWithDlqAndRetryCustomizer` #2985

Open LukeKynaston opened 1 month ago

LukeKynaston commented 1 month ago

Dear devs,

In org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder#createConsumerEndpoint()

We have the following snippet:

    if (customizer instanceof ListenerContainerWithDlqAndRetryCustomizer) {

            BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver = createDestResolver(
                    extendedConsumerProperties.getExtension());
            BackOff createBackOff = extendedConsumerProperties.getMaxAttempts() > 1
                    ? createBackOff(extendedConsumerProperties)
                    : null;
            ((ListenerContainerWithDlqAndRetryCustomizer) customizer)
                    .configure(messageListenerContainer, destination.getName(), consumerGroup, destinationResolver,
                            createBackOff);
        }
        else {
            ((ListenerContainerCustomizer<Object>) customizer)
                    .configure(messageListenerContainer, destination.getName(), consumerGroup);
        }

I would like to pass the extendedConsumerProperties as an argument to an overload of ListenerContainerWithDlqAndRetryCustomizer#configure().

It may be sensible to do the same for ListenerContainerCustomizer too.

This is to get access to all of the properties available, including the bindingName, which users can use to configure their custom error-handling strategy more easily.

Kind regards,

Luke

olegz commented 1 month ago

If I understand you correctly you can inject BindingServiceProperties and then execute getBindings where you'll get a map of BindingProperties

LukeKynaston commented 1 month ago

Hi Oleg, thanks for getting back to me.

Yes, this is what we currently do, however, there are situations where it is not possible to uniquely identify a consumer based on the destination and group. i.e. If we have more than one consumer configured for a destination without a group specified i.e. an anonymous consumer group.

While we can filter by group and destination, as described above, this approach is cumbersome and error prone.

It'd be a lot easier to have the extendedConsumerProperties available in the configure() method.

olegz commented 1 month ago

We have AbstractExtendedBindingProperties.getExtendedConsumerProperties(bindingName). is that what you are looking for?

olegz commented 1 month ago

If you have access to the binder instance there is an extendedBindingProperties field. It is currently not accessible, but we can discuss it

LukeKynaston commented 1 month ago

That's correct, I'm looking for access to the extendedBindingProperties field.

I think an overload of the ListenerContainerWithDlqAndRetryCustomizer#configure() method containing this field would be a good approach.

olegz commented 1 month ago

Ok, how would you know which binder instance are you using? I mean if we were to inject something or overload a method we would need to know that

LukeKynaston commented 1 month ago

The KafkaMessageChannelBinder appears to configure an error handler sequentially for every binding.

Looking through the code we call org.springframework.cloud.stream.binding.AbstractBindingLifecycle#start() Where we call this.bindables.values().forEach(this::doStartWithBindable); So we should only ever have one binding available at a time in the extendedBindingProperties field.