spring-projects / spring-amqp

Spring AMQP - support for Spring programming model with AMQP, especially but not limited to RabbitMQ
https://spring.io/projects/spring-amqp
Apache License 2.0
807 stars 620 forks source link

High load messages piles up in queue and stopped/slowdown getting consumed and shows Null pointer exception. #2743

Closed sau3078 closed 3 months ago

sau3078 commented 3 months ago

#spring-boot-starter-amqp

version : 3.2.5

During peak load or high volume of data flow let say I have 10 instances running of my springboot rabbitmq application which is consuming messages from some queue. Listener don't have any concurrency configured as of now. It using default spring rabbitmq prefetch limit. Messages were stuck at some time because rate of publishing of messages were quite high in queue. In the application logs which I shared below show some Null pointer exception with spring amqp library.

 @RabbitListener(queues = "${rabbitmq.QueueName}")
  public void onMessage(String message) throws JsonProcessingException {
      SomeClass notification = objectMapper.readValue(message, SomeClass.class);
}

To Reproduce

Load high volume of rabbitmq messages in queue and run multiple instances of springboot rabbitmq java based microservice.

Expected behavior

rabbitmq messages should be able to processed with out any null pointer exception.

Sample Logs

Below is the message which NotificationListener receives in String format and then its tries to deserialize into some object. Seems received json is valid.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void NotificationListener.onMessage(java.lang.String) throws com.fasterxml.jackson.core.JsonProcessingException' threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:287) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:225) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1682) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1604) at jdk.internal.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:102) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:335) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:227) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:124) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220) at org.springframework.amqp.rabbit.listener.$Proxy244.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1592) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1583) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1528) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1506) at io.micrometer.observation.Observation.observe(Observation.java:499) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1506) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1084) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1020) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1422) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1324) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.NullPointerException: null

Found more logs below

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)
    at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78)
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:159)
    at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:550)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:395)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:227)
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:124)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
    at org.springframework.amqp.rabbit.listener.$Proxy244.invokeListener(Unknown Source)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1592)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1583)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1528)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1506)
    at io.micrometer.observation.Observation.observe(Observation.java:499)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1506)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1084)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1020)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1422)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1324)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
    ... 21 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void NotificationListener.onMessage(java.lang.String) throws com.fasterxml.jackson.core.JsonProcessingException' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:287)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:225)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1682)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1604)
    at jdk.internal.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:102)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:335)
    ... 16 common frames omitted
Caused by: java.lang.NullPointerException: null
artembilan commented 3 months ago

@sau3078 ,

please, consider to reformat your comment for more readable form. See GitHub Markdown features: https://docs.github.com/en/get-started/writing-on-github/getting-started-with-writing-and-formatting-on-github/basic-writing-and-formatting-syntax

sau3078 commented 3 months ago

@sau3078 ,

please, consider to reformat your comment for more readable form. See GitHub Markdown features: https://docs.github.com/en/get-started/writing-on-github/getting-started-with-writing-and-formatting-on-github/basic-writing-and-formatting-syntax

sau3078 commented 3 months ago

@artembilan can you see above raised issue please

artembilan commented 3 months ago

Thanks for updating your request.

Unfortunately I don't see how that Caused by: java.lang.NullPointerException: null is relevant to Spring AMQP.

Looks like this is a concern of your NotificationListener.onMessage() method. You can simply unit test it directly without any RabbitMQ involved.

sau3078 commented 3 months ago

@artembilan Looks like this is a concern of your NotificationListener.onMessage() method regarding your statement I do agree its showing jsonProcessing exception but really its not actual problem . If it has to do something with null pointer in my code it should included class or method along with line no.

Its really difficult to find where null pointer its actually happening.

Do you something to turn off this unwanted max retry showing in logs ? Although we may internally wanted to try 3 attempt incase we are getting errors during processing of messages and after max attempt exhausted messages will be routed to dead letter queues.

Apart from this any additional logging that can be added when received messages from the queue so that we can figure out where is the exact issue ?

Regarding testing I already tested using mockito framework its happening rarely not frequently .

artembilan commented 3 months ago

I think that NPE comes from objectMapper.readValue(). That's why we don't see any pointers to your own code.

You probably use Spring Boot, so you have some spring.rabbitmq.listener.simple.retry.enabled = true. It comes with these options by default though:

        /**
         * Maximum number of attempts to deliver a message.
         */
        private int maxAttempts = 3;

        /**
         * Duration between the first and second attempt to deliver a message.
         */
        private Duration initialInterval = Duration.ofMillis(1000);

        /**
         * Multiplier to apply to the previous retry interval.
         */
        private double multiplier = 1.0;

        /**
         * Maximum duration between attempts.
         */
        private Duration maxInterval = Duration.ofMillis(10000);

So, I'm not sure what you are asking if you have whole control for that feature in your hands.

You can add DEBUG logging level configuration for the org.springframework.amqp.rabbit.listener.adapter category. And there is going to a log like:

        if (logger.isDebugEnabled() && !projectionUsed) {
            logger.debug("Processing [" + message + "]");
        }

However this all sounds more like a Jackson diagnostics feature, which is out of this project scope. Plus I'm really not familiar what we can do on Jackson side to track your problem down...

sau3078 commented 3 months ago

@artembilan thanks for your reply.

I do have below configuration in my springboot rabbitmq application

spring.rabbitmq.listener.simple.default-requeue-rejected=true spring.rabbitmq.listener.simple.retry.max-attempts=1 spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.prefetch=10

I have overridden max-attempts behaviour but when this happens we don't want to show in our logs so asking for your help is there any configuration so that it does not show up in the logs it has retries i.e max attempt exhausted something.... but still wanted to have above configuration for max retry.

For ObjectMapper I have below configuration :

 @Bean
  public ObjectMapper objectMapper() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    return objectMapper;
  }

In my listener class where I am reading messages from rabbitmq autowiring this objectMapper class not sure when it will NPE.

threadName : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-4 in the logs where I see NPE

if (logger.isDebugEnabled() && !projectionUsed) {
            logger.debug("Processing [" + message + "]");
}

where does !projectionUsed comes from ?

artembilan commented 3 months ago

I assume you mean messages in logs like this:

                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Retry: count=" + context.getRetryCount() + labelMessage);
                    }

and

logger.debug("Retry exhausted and recovery disabled for this throwable");

So, just configure your application.properties for this:

logging.level.org.springframework.retry=warn

Re. Processing log message. See MessagingMessageListenerAdapter:

        boolean projectionUsed = amqpMessage == null ? false : amqpMessage.getMessageProperties().isProjectionUsed();
        if (projectionUsed) {
            amqpMessage.getMessageProperties().setProjectionUsed(false);
        }
        if (logger.isDebugEnabled() && !projectionUsed) {
            logger.debug("Processing [" + message + "]");
        }

That property is set from AbstractJackson2MessageConverter:

if (inferredType != null && this.useProjectionForInterfaces && inferredType.isInterface()
                && !inferredType.getRawClass().getPackage().getName().startsWith("java.util")) { // List etc
            content = this.projectingConverter.convert(message, inferredType.getRawClass());
            properties.setProjectionUsed(true);
        }

And its Javadocs:


    /**
     * Set to true to use Spring Data projection to create the object if the inferred
     * parameter type is an interface.
     * @param useProjectionForInterfaces true to use projection.
     * @since 2.2
     */
    public void setUseProjectionForInterfaces(boolean useProjectionForInterfaces) {

Why your ObjectMapper fails is out scope of this project.

Let's concentrate here on Spring and its interaction with AMQP!

sau3078 commented 3 months ago

@artembilan we are not logging anything related to max retry incase of failures. It shown in application console or datadog for example when listener fails to process the messages. I don't need to show these errors to the users.

artembilan commented 3 months ago

Do you mean that stacktrace:

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)

?

So, or you catch that NPE exception in your listener and handle it somehow. Or you can provide your own MessageRecoverer implementation: https://docs.spring.io/spring-amqp/reference/amqp/resilience-recovering-from-errors-and-broker-failures.html#async-listeners

sau3078 commented 3 months ago

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)``

Yes I was talking above logs in the console how can be avoid showing?

artembilan commented 3 months ago

See docs I've mentioned before. I general, that is exception, why would one ignore it for any reason? You might can configure your Datadog to filter out such a message, since it is always going to be there with that RejectAndDontRequeueRecoverer.

sau3078 commented 3 months ago

closing this issue as I don't have enough evidence NPE is due to spring amqp libarary.