Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.36k stars 2k forks source link

[QUESTION] How to configure @ServiceBusListener ServiceBusReceiveMode among Spring Message #38739

Open kensinzl opened 9 months ago

kensinzl commented 9 months ago

Query/Question

my question is how to configure the ServiceBusReceiveMode for the Java springboot if you use the annotation, eg: @ServiceBusListener

Setup (please complete the following information if applicable):

### Tasks
joshfree commented 9 months ago

@yiliuTo could you take a look?

/cc @Azure/azsdk-sb-java

Netyyyy commented 9 months ago

Hi @kensinzl Thank you for reporting this issue. We have received your submission and will take a look. We appreciate your input and will review this matter as soon as possible. Please feel free to provide any additional information or context that you think may be helpful. We'll keep you updated on the progress of our review.

kensinzl commented 7 months ago

Hey, is there any progress, seems no one take this issue to have a look

kensinzl commented 7 months ago

@yiliuTo @Netyyyy @joshfree any progress for this ticket? thanks

nikoladjuran commented 6 months ago

I'm stuck with a similar issue where I want to configure the @ServiceBusListener to use PEEK_LOOK and to enable sessions on the consumer as the subscription has sessions enabled. Properties like spring.cloud.azure.servicebus.processor.session-enabled=true spring.cloud.azure.servicebus.processor.receive-mode=peek_lock do not seem to work with the annotated listeners.

nikoladjuran commented 6 months ago

@kensinzl FYI: Managed to figure it out... Those properties don't configure the @ServiceBusListener but seem to be for other approaches like ServiceBusClient. I've managed to configure the listeners with the following bean declaration.

@Bean
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
    return key -> {
        var processorProperties = new ProcessorProperties();
        processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
        processorProperties.setReceiveMode(ServiceBusReceiveMode.PEEK_LOCK);
        processorProperties.setSessionEnabled(Boolean.TRUE);

        return processorProperties;
    };
}
kensinzl commented 6 months ago

@nikoladjuran Hey man, thanks for the information.

I still use the @ServiceBusListener(Spring messaging) to monitor the service bus queue. If my understanding is correct, the @ServiceBusListener source code is using the default ServiceBusReceiveMode.PEEK_LOCK and also embed the commit to delete the message if successfully executed.

again, the document of azure is quite a mess, particularly the Java Azure SDK. Hopefully, I am on the right track.

vitalii-fedoryshyn commented 3 months ago

Just will jump into old discussion and provide some more code that can be useful for developers. Normally I do not recommend do something like this and try to keep everything as simple as possible, but sometimes you can have very specific requirements. You can not only configure connection with PropertiesSupplier, but as well use connection indirectly via ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT header for manual send message delivery acknowledge. For example:

@Component
public class ServiceBusPropertiesSupplier implements PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> {

    @Value("${app.messaging.not-auto-complete-queue-name}")
    private String notAutoCompleteQueueName;

    @Override
    public ProcessorProperties getProperties(final ConsumerIdentifier key) {
        final ProcessorProperties processorProperties = new ProcessorProperties();
        if (Objects.equals(key.getDestination(), notAutoCompleteQueueName)) {
            processorProperties.setAutoComplete(Boolean.FALSE);
        }
        return processorProperties;
    }

}

and you message consumer can do something like this:

@ServiceBusListener(destination = QUEUE_NAME_SPEL)
public void handleMessage(
        ServiceBusReceivedMessage message,
        @Header(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT) ServiceBusReceivedMessageContext context
) {
     if (messageWrongFormat(message)) {
        context.deadLetter();
    } else if (messageHandleSuccessfully(message)) {
        context.complete();
    } else {
         // We do not send anything, so Service Bus will wait until 'Message lock duration' time passed
         // And then deliver message to some consumer
         // or
         // call context.abandon(); so Service Bus immediate re-delivery message to some consumer
     }
}
kensinzl commented 3 months ago

double confirm, any progress for this ticket?? @yiliuTo @Netyyyy @joshfree, thanks so much

saragluna commented 3 months ago

@kensinzl FYI: Managed to figure it out... Those properties don't configure the @ServiceBusListener but seem to be for other approaches like ServiceBusClient. I've managed to configure the listeners with the following bean declaration.

@Bean
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
    return key -> {
        var processorProperties = new ProcessorProperties();
        processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
        processorProperties.setReceiveMode(ServiceBusReceiveMode.PEEK_LOCK);
        processorProperties.setSessionEnabled(Boolean.TRUE);

        return processorProperties;
    };
}

@kensinzl, have you tried this approach? If you register a PropertiesSupplier bean in your application,

@Bean
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
    return key -> {
        var processorProperties = new ProcessorProperties();
        processorProperties.setReceiveMode(ServiceBusReceiveMode. RECEIVE_AND_DELETE);

        return processorProperties;
    };
}

I understand the perfect way to let you configure it in the annotation, but could you help check whether this approach could meet your requirement? Or what you expect is to be able to configure it via annoation?

kensinzl commented 3 months ago

@saragluna

Thank you for the reply.

I make it work now. Let me type the following so that other people can refer if they also face the same issue.

1. https://learn.microsoft.com/en-us/azure/developer/java/spring-framework/using-service-bus-in-spring-applications if you used Spring Messaging Azure Service Bus annotation way from the above official link, whose default mode is the PeekLock which is hardcode at the ServiceBusReceiverClientBuilder

I tried the properties configuration, eg: spring.cloud.azure.servicebus.consumer.receive-mode=RECEIVE_AND_DELETE or spring.cloud.azure.servicebus.processor.receive-mode=RECEIVE_AND_DELETE, But both of them DO NOT change the mode into RECEIVE_AND_DELETE from the log monitor.

2. if you want to use the RECEIVE_AND_DELETE mode for the Spring Messaging Azure Service Bus annotation way, you can inject a Bean like the following code. Note: you need to make the auto-complete into false.

    @Bean
    public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
        return key -> {
            ProcessorProperties processorProperties = new ProcessorProperties();
            processorProperties.setReceiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE);
            processorProperties.setAutoComplete(false);
            return processorProperties;
        };
    }

this will be injected into AzureServiceBusMessagingAutoConfiguration.

hopefully, I am at the right track