Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.31k stars 1.97k forks source link

[BUG][azure-spring-boot-starter-servicebus-jms][qpid] Message is put to DLQ after MaxDeliveryCountExceeded even when message is processed without error/exception #24304

Closed bilak closed 2 years ago

bilak commented 3 years ago

This issue is related to azure support tickets TrackingID#2109220050000347 and Ticket-number #11091792 detailed logs are attached in first ticket.

Describe the bug We are consuming messages from servicebus (SB). While we don't see the spikes in SB everything works well. However when there's spike (a lot of incoming messages msgcount > 10k) messages are being retried and then put to DLQ with reason MaxDeliveryCountExceeded. It seems like message commit is not processed correctly in servicebus and/or the commit is not delivered to SB and we don't see any error in client library.

Exception or Stack Trace no stacktrace, no exception

To Reproduce Steps to reproduce the behavior: that's a question to you, but probably put > 10k messages to SB and try to process them.

Code Snippet Here in archivingUsageTrackingService we are sending message to another topic.

  @JmsListener(destination = "${azure.servicebus.queues.usageActivity}", containerFactory = JMS_FACTORY_NAME)
  public void consume(UsageMessage usageMessage) {
    log.info("UsageMessage received: {}", usageMessage);
    usageTrackingService.processUsageMessage(usageMessage);
    archivingUsageTrackingService.trackErsUsage(usageMessage);
  }

Following is the config for queue/topic factories

  @Bean
  public JmsListenerContainerFactory<?> fiscalJmsFactory(ConnectionFactory connectionFactory,
      DefaultJmsListenerContainerFactoryConfigurer configurer) {
    if (connectionFactory instanceof CachingConnectionFactory) {
      CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;
      cachingConnectionFactory.setCacheProducers(false);
    }
    DefaultJmsListenerContainerFactory factory = new DfDefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> log.error("Unable to process JMS message", t));
    factory.setExceptionListener(t -> log.error("Unable to process JMS message", t));
    factory.setMessageConverter(jacksonJmsMessageConverter());
    configurer.configure(factory, connectionFactory);
    return factory;
  }

  @Bean
  @ConditionalOnProperty(prefix = "df.azure.servicebus", name = "enableJmsTopic", havingValue = "true")
  public JmsListenerContainerFactory<?> fiscalJmsTopicFactory(ConnectionFactory connectionFactory,
      DefaultJmsListenerContainerFactoryConfigurer configurer) {
    if (connectionFactory instanceof CachingConnectionFactory) {
      CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;
      cachingConnectionFactory.setCacheProducers(false);
    }
    DefaultJmsListenerContainerFactory factory = new DfDefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> log.error("Unable to process JMS message", t));
    factory.setExceptionListener(t -> log.error("Unable to process JMS message", t));
    factory.setMessageConverter(jacksonJmsMessageConverter());
    factory.setSubscriptionDurable(true);
    configurer.configure(factory, connectionFactory);
    return factory;
  }

and this is our customized JmsListenerContainerFactory

@Getter
@Setter
public class DfDefaultJmsListenerContainerFactory extends DefaultJmsListenerContainerFactory {

  private Boolean exposeListenerSession = Boolean.FALSE;

  @Override
  protected void initializeContainer(DefaultMessageListenerContainer container) {
    super.initializeContainer(container);
    if (this.exposeListenerSession != null) {
      container.setExposeListenerSession(this.exposeListenerSession);
    }
  }

}

Expected behavior Whenever message is processed without any error it should be removed from SB queue and not moved to DLQ. OR when message couldn't be removed from queue because of any reason, we should see some error in client library.

Screenshots Spike: here we can see some client errors, but no error/exception is reported by underlying library. image

Setup (please complete the following information):

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

Saljack commented 3 years ago

image And here you can see that there is some correlation between number of incoming messages and client errors. But our microservice always processes only one message concurrently and we have maximal 7 instances of this microservice. So we concurrently process only 7 messages. Our application does not have any high usage of resources during these peeks. When we resend these messages into the same queue then these messages are normally processed on the first attempt. And as you can see there is almost no user error only during these peeks. And our application is "pretty" fast because it is able to process all messages 10 times during 10min but it is not able to acknowledge them.

yiliuTo commented 2 years ago

Hi @Saljack and @bilak , thanks for your reporting. This is due to the default prefetch count of 1000 is used, and with your configuration of max-delivery-count and lock-duratition, the prefetched messages cannot be consumed before the configuration expire, so they are dead lettered. We will fix this soon by providing a configuration option of prefetch.

Saljack commented 2 years ago

@yiliuTo Can you give us more details? Because I do not understand what you mean by prefetch. If I understand correctly the issue is in your library am I right? Is there the issue in qpid-proton-j or qpid-jms or spring-jms or an Service Bus library or directly in Service Bus? Is there any workaround because we have to resend everyday thousands of messages so it is not ok for us. Increasing number of max delivery count does not help only restarting our application helped to process some messages I have already tested it on our testing environment. sb

Can you provide any date/milestone/plan when when it should be fixed? Why isn't there any error in logs?

yiliuTo commented 2 years ago

@Saljack do you use the Service Bus Standard tier? I can reproduce your issue in standard tier but not the premium tier, and that's because in the JmsConnectionFactory of standard tier, we use the default prefetch count as 1000, which is defined in Client configuration - Apache Qpid™ , that value controls how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance. And in your case, during the spike the consumer cannot process all the 1000 messages within the lock duration, then the lock will expire and the message will be dead lettered after retry over max-delivery-count.

So this issue could be resolved by configuring prefetch count as you need, for example , setting it as 0 means a pull consumer. But now our library doesn't support setting this. For a quick workaround, you could modify our bean of jmsConnectionFactory by a bean post processor and set the prefetch policy of it. For example,

@Component
public class AzureServiceBusBeanPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof JmsConnectionFactory) {
            JmsConnectionFactory jmsConnectionFactory = (JmsConnectionFactory) bean;
            JmsDefaultPrefetchPolicy policy = new JmsDefaultPrefetchPolicy();
            policy.setAll(0); // you should adjust the number according to the consuming capiblity of your application
            jmsConnectionFactory.setPrefetchPolicy(policy);
        }
        return bean;
    }
}

We plan to fix this in next month before our release in next month.

Saljack commented 2 years ago

@yiliuTo Yes we use standard tier. I will test it we do not need to wait for your fix because we use our modified connection factory anyway. You can implement it as an general customizer which customize JmsConnectionFactory in the factory bean merhod. The strange thing is that it starts if there is more than 10 000 messages and then all communication with the queue in Azure Service Bus is completely dead locked until moving some messages to DLQ. You can see it in my graph that when it reached this limit then it was not able to acknowledge any message for 6hours (there was set max delivery count to 500). I still do not understand why if there is less than 10k messages this issue is not there and I can see it even if I have 7 or 3 client applications. It looks like there is some limitation directly in Azure Services Bus. Does premium tier use completely different library or API (I know it supports JMS 2.0).

yiliuTo commented 2 years ago

@Saljack for premium tier we rely on another library of com.microsoft.azure:azure-servicebus-jms:0.0.7 for the connection factory, which sets the default value of prefetch as 0.

For the phenomanon in your case, could you share your configuration of the lock duration? And how long does it take for your client to consume one message? Also in the picture you provided, how many client applications were there in 6pm and when you adjusted the max dilivery count?

Saljack commented 2 years ago

@yiliuTo There were 2-3 instances so I would say 3 because it scales fast. Lock duration is 1 min. And one message is processed in a few milliseconds (I checked randomly a few messages and it is +-4ms). In the second repetition it would be faster. So if I count correct then 10ms (per message) x 1000 messages = 10s so it should be ok and there is huge space to 1 min. I increased max delivery count at +- 6:50pm (I have already marked it in my graph). You can se that from +-7pm when it reached a limit then no message was acknowledged until 3am. But if you set prefetch to 0 then it would be pretty slow. I had to already disable producer caching so it is next slow down regarding Azure Services Bus.

yiliuTo commented 2 years ago

@Saljack One of the possible reasons for the dead-locking is that, there is one message being consumed over the lock duration(1min), then it causes consuming-time * max-delivery-count of dead locking , because your client rotates in fetching it from SB and then consuming for 500 times until it is moved to DLQ, then next message could be consumed. You could check if that's the reason by checking the longest consuming time of all messages and see if there are ones over your lock-duration. If so, perhaps you could consider add custom consuming logic for those messages instead of dealing with them directly. And just increasing max-diliverty-count might not be a good solution for it might expand some small issues.

Saljack commented 2 years ago

@yiliuTo No it is impossible because I sent completely the same message (i change only created date) for testing 20 000 times and therefore the logic do the same thing all the time.

Saljack commented 2 years ago

@yiliuTo Change prefetch policy to 0 works like a charm in my tests 👍. Can you set it as default value?

yiliuTo commented 2 years ago

@Saljack we will consider about setting as 0 by default, given the official doc of Azure Service Bus Prefetch prefetch is not the default option.

malsdorf commented 2 years ago

@yiliuTo Could you ping me internally, thanks

backwind1233 commented 2 years ago

@yiliuTo There were 2-3 instances so I would say 3 because it scales fast. Lock duration is 1 min. And one message is processed in a few milliseconds (I checked randomly a few messages and it is +-4ms). In the second repetition it would be faster. So if I count correct then 10ms (per message) x 1000 messages = 10s so it should be ok and there is huge space to 1 min. I increased max delivery count at +- 6:50pm (I have already marked it in my graph). You can se that from +-7pm when it reached a limit then no message was acknowledged until 3am. But if you set prefetch to 0 then it would be pretty slow. I had to already disable producer caching so it is next slow down regarding Azure Services Bus.

Hi @Saljack and @bilak, can you use command ulimit -a to see the number of file descriptors in the machine? I think it may because of the limit of the number of file descriptors, if so you can increase the number.

Also, I will change the code to set the default prefetch count as 0, and if time is enought I will also expose a parameter to set the prefetch count, and then I will release the library this month.

Saljack commented 2 years ago

@backwind1233 ulimit -a give me unlimited so this should not be a problem. I have already checked it and we have +-290 open files and I would say it is not so high number.

backwind1233 commented 2 years ago

@Saljack We have created and merged the PR, and will do the spring release today, after the release, you can use the lastest version to configure the prefetch count.

backwind1233 commented 2 years ago

@Saljack we have release azure-spring-boot-starter-servicebus-jms3.10.0, the parameter list is here.