Closed ArvinUbhi closed 7 months ago
1000 is the default number of credits - ensure your broker re-allows credits afterward. Can you try to reformat the code in the issue as it's not clear who is consuming the messages.
@cescoffier I have created a small repo to demonstrate this and included reproduction steps in the README. The code demos a small amqp consumer (MessageProcessor class) which feeds into a second class (MessagePostProcessor class).
The MessagePostProcessor class counts the number of received messages from the amqp queue and prints the count.
As you can see, the very last print of the count is 1000.
The repo does not explain which broker you use and how you insert the 1000+ messages.
Tried with:
@Channel("sender")
MutinyEmitter<String> emitter;
public void init(@Observes StartupEvent event) {
for (int i = 0; i < 2000; i++) {
emitter.sendAndAwait("Hello");
}
}
No problem.
@cescoffier I have added more information on the issue and steps to reproduce in the README.
Just to reiterate, the issue surrounds consuming more than 1000 messages from an AMQP Artemis broker.
Regarding how to load over 1000 messages, you could use your code above to load 2000/3000 messages beforehand.
It's what I did, I added 2000 messages in a queue, and then consumed them without any issue. I used the Artemis dev service.
Can you update the reproducer with:
@cescoffier I noticed that the queue that I am consuming from is non-destructive. Messages in this queue are only removed once they expire.
Please could you try and set up a non-destructive queue in your broker and try again.
I believe the fix could be related to setting the distribution mode attribute but I am unsure how to do this in Quarkus.
I would need the proper instruction to do that (sorry, I'm not an Artemis expert).
Sure @cescoffier - how are you configuring your Artemis queue? Are you doing it through a properties file (broker properties, address properties or address configurations) or are you using the AMQP UI?
Example of AMQP UI:
I started an artemis broker, configured the queue (anycast, durable) from the UI, sent 2000 messages to the queue (the UI confirmed they were there), and then, started the consumer - it got the 2000 messages. However, I didn't find how to configure a non-destructive queue (meaning that once consumed, the messages are gone).
Both my address and queue are marked anycast.
@cescoffier Try to create a multicast nondestructive durable queue instead.
With a non-destructive queue, messages are not removed when you acknowledge them. Messages usually would have an expiry and are removed by expiring.
Here is the config that I have used (you'll have to change the address name and queue name):
- addressConfigurations.addressName.queueConfigs.queueName.routingType=MULTICAST
- addressConfigurations.addressName.queueConfigs.queueName.purgeOnNoConsumers=false
- addressConfigurations.addressName.queueConfigs.queueName.durable=true
- addressConfigurations.addressName.queueConfigs.queueName.nonDestructive=true
- addressConfigurations.addressName.queueConfigs.queueName.redeliveryDelay=25000
- addressConfigurations.addressName.queueConfigs.queueName.maxConsumers=-1
If you are doing this through a UI, you can set these settings when creating an address/ queue.
Hum, I can't setup non-destructive from the UI:
Then with the following configuration:
mp.messaging.incoming.incoming-queue.address=address-name::queue-name
mp.messaging.incoming.incoming-queue.failure-strategy=release
mp.messaging.incoming.incoming-queue.durable=true
I got everything consumed.
Pinging @gemmellr - any idea why we would be limited at 1000 in this case?
@cescoffier If you create an AMQ queue through the UI you can select non-destructive. First, select amq-broker: Then, select Operations: Scroll down and select createQueue: Add the queue details, and select the non-destructive checkbox:
This should be a multicast, durable, non-destructive queue.
You can also select autoCreateAddress if needed.
You can then load this queue with 2k/3k messages and try to reproduce the same steps.
Ok... so you need to invoke jmx action to setup that queue.
Nope this could be set up in broker properties before starting the broker using the config I provided yesterday:
- addressConfigurations.addressName.queueConfigs.queueName.routingType=MULTICAST
- addressConfigurations.addressName.queueConfigs.queueName.purgeOnNoConsumers=false
- addressConfigurations.addressName.queueConfigs.queueName.durable=true
- addressConfigurations.addressName.queueConfigs.queueName.nonDestructive=true
- addressConfigurations.addressName.queueConfigs.queueName.redeliveryDelay=25000
- addressConfigurations.addressName.queueConfigs.queueName.maxConsumers=-1
OR if you have started your broker you can do it in the way I showed in the last message.
Ok, so I did exactly this - no problem. I was able to consume the 2000 messages.
The best would be to provide a reproducer with your exact setup.
Pinging @gemmellr - any idea why we would be limited at 1000 in this case?
I dont know, no, I've never used this functionality. I'd have to guess the broker is deciding not to send more for some reason or other. Maybe its failing to account for the 'non consuming' acknowledgement properly or something.
The original post notes also using a .net consumer with copy distribution mode...i.e specifically a 'queue browser' that is known not to consume. They didnt say what happens when they dont use copy distribution with the .net consumer. If its the same it could again point to the broker, not handling 'non broswers' as is being expected (whether its meant to, im not sure). That you seem to see different behaviour could again poing to the broker as its not clear the broker versions are the same.
Closing as we never got a reproducer that demonstrates the problem in action
Describe the bug
When configuring the AMQP client and retrieving messages from the non-destructive AMQP queue using the @Incoming tag, it retrieves the first 1000 messages and then stops consuming. The amqp queue has more than 50k messages.
When trying the same mechanism in .Net, I am able to set the AMQP Source Distribution Mode to 'copy' which then allows me to retrieve all the messages from the AMQP queue.
What is the best way to retrieve all messages from AMQP and if possible, how do I set the distribution mode attribute to the AMQP client configuration to 'copy'?
Expected behavior
All messages from the queue are consumed and processed.
Actual behavior
The first 1000 messages are consumed and processed and no more messages are received.
How to Reproduce?
No response
Output of
uname -a
orver
No response
Output of
java -version
17
Quarkus version or git rev
3.4.3
Build tool (ie. output of
mvnw --version
orgradlew --version
)apache-maven-3.8.2
Additional information
No response