SolaceProducts / solace-spring-cloud

An umbrella project containing all Solace projects for Spring Cloud
Apache License 2.0
22 stars 15 forks source link

How to handle the InterruptedException properly #232

Open IagoSchlisting opened 1 year ago

IagoSchlisting commented 1 year ago

Hi colleagues,

I would like to know if there is a way to reprocess consumed messages that fails because of the InterruptedException error.

More details about our scenario: Our application scales automatically. Frequently, when it descales, the InterruptedException is thrown since the instance is shut down during the processing of one or more messages. The big problem is that these messages are not reprocessed automatically as it should and we lose it. Do you have any idea on how we can reprocess messages in this scenario?

Log error:

Caused by: com.solacesystems.jcsmp.JCSMPInterruptedException: Message enqueue interrupted
    at com.solacesystems.jcsmp.impl.PubADManager.enqueueMsgWithIdUpdateWithThrows(PubADManager.java:939)
    at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.updateAdMsgIDandEnqueueToWindow(JCSMPXMLMessageProducer.java:1475)
    at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendMsgOnce(JCSMPXMLMessageProducer.java:1247)
    at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendImpl(JCSMPXMLMessageProducer.java:1139)
    at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendToDestination(JCSMPXMLMessageProducer.java:932)
    at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendToDestination(JCSMPXMLMessageProducer.java:869)
    at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.send(JCSMPXMLMessageProducer.java:687)
    at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessage(JCSMPOutboundMessageHandler.java:102)
    ... 61 more
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at com.solacesystems.jcsmp.impl.queues.ConditionalBoundedMessageQueue.queueMsgWithIdUpdate(ConditionalBoundedMessageQueue.java:254)
    at com.solacesystems.jcsmp.impl.PubADManager.enqueueMsgWithIdUpdateWithThrows(PubADManager.java:934)
    ... 68 more

I'm looking forward to your return. Regards,

IagoSchlisting commented 1 year ago

If it can help you, this is our current consumer queue configuration:

spring.cloud.stream.bindings.inputChannel.binder=solace
spring.cloud.stream.bindings.inputChannel.destination=<destination_name>
spring.cloud.stream.bindings.inputChannel.group=<group_name>
spring.cloud.stream.bindings.inputChannel.consumer.max-attempts=1
spring.cloud.stream.solace.bindings.inputChannel.consumer.queueNameExpression=destination.trim()
spring.cloud.stream.solace.bindings.inputChannel.consumer.errorQueueNameExpression=destination.trim() + '.error-queue'
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=50
spring.cloud.stream.solace.bindings.inputChannel.consumer.autoBindErrorQueue=true
spring.cloud.stream.solace.bindings.inputChannel.consumer.provisionDurableQueue=false
spring.cloud.stream.solace.bindings.inputChannel.consumer.addDestinationAsSubscriptionToQueue=false
spring.cloud.stream.solace.bindings.inputChannel.consumer.errorMsgDmqEligible=true
spring.cloud.stream.solace.bindings.inputChannel.consumer.provisionErrorQueue=false