amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
280 stars 79 forks source link

Receiver stops processing messages #232

Open ulisesbocchio opened 5 years ago

ulisesbocchio commented 5 years ago

After some time of my receiver working fine it stops consuming even though it has credit. Options:

credit_window: 10
autoaccept: true
autosettle: true

When I go to the ActiveMQ console, look for the queue and check the active consumers I see the single consumer that should be consuming this messages: image And it has that weird Dispatched Queue: 10 where in other queues that work "normally" that number is always 0. queue stats: image

I also see that the DLQ is most likely getting this messages and each message shows: image

I've tried same settings with simple example running locally and it runs fine. This happens on a real environment running a single node ActiveMQ and about 15 producers/consumers with dozens of queues.

Any help as to what could be causing this issue would be great!

grs commented 5 years ago

Doing some googling for that error, it sounds like it is usually caused either by the messages having a ttl (set by producers) that expires before they are consumed, or the consumer itself being marked slow based on some setting from http://activemq.apache.org/slow-consumer-handling.html.

Could either of these apply in your case?

ulisesbocchio commented 5 years ago

thank you @grs I did find the same online and effectively disabled flow control:

<policyEntry topic=">" producerFlowControl="false"/>
<policyEntry queue=">" producerFlowControl="false"/>

I had noticed that some of the producers had status "blocked=true", after this settings they are no longer blocked according to the settings but I see the same behavior. I didn't see any of the consumers marked as "slow" either. The nodes in my environment are all producers/consumers. Are there any gotchas by them sharing the same connection? Like if ActiveMQ blocks a producer... does it block the entire connection? And, is there a limitation as to how many messages are prefetched for a given connection? What I'm basically implementing is a job queue:

What I see is the status queue (virtual and individual for each worker) not being consumed. It only consumes up to a point. On normal situations everything works, but when I try to scale the job rate to the point where jobs really start queueing every consumer just hangs.

grs commented 5 years ago

Just to confirm, the published jobs do not have a ttl?

The producer flow control is a separate mechanism from the slow consumer handling, so I wouldn't expect the change you mention to have any effect.

It does seem to me, considering the observation that the rate is key, that it is either a ttl on the messages or the broker deciding the consumer is too slow. I'm not an expert on ActiveMQ though.

How are you checking whether a consumer is marked as slow? Is that through JMX? I did find another link: https://access.redhat.com/documentation/en-us/red_hat_jboss_a-mq/6.2/html/tuning_guide/tuningslowconsumers

You could improve the consumer rate by increasing the credit window.

ulisesbocchio commented 5 years ago

that is correct, TTL is not set on the outgoing messages, just { body: Buffer.from(jsonString) } thanks, will check those links and get back to you if I find anything. The sender seems to be blocked also since when I submit a job through an API endpoint that pushes messages to the job queues I don't see new messages showing up in the queues. The queue stats show all consumers/producers connected properly though

grs commented 5 years ago

when I submit a job through an API endpoint that pushes messages to the job queues I don't see new messages showing up in the queues

Are the messages going straight to the DLQ?

ulisesbocchio commented 5 years ago

no messages in DLQ, they never leave the workers. I'm suspecting the sender is not sendable. We internally keep the messages in a buffer and flush when sendable=true:

send(body) {
        if (!this.sender) {
            this._initSender();
        }

        this.buffer.push({
            body: Buffer.from(JSON.stringify(body)),
        });
        this._flush();
    }

_initSender() {
        this.sender = this.connection.open_sender({ target: this.producerName, autosettle: true });
        this.sender.on('sendable', () => {
            this._flush();
        });
    }

_flush() {
        if (!this.connected) {
            return;
        }

        while (this.buffer.length && this.sender.sendable()) {
            const message = this.buffer.shift();
            this.sender.send(message);
        }
    }
ulisesbocchio commented 5 years ago

quick question, what broker do you usually test this library against?

grs commented 5 years ago

For routine testing most often I use qpidd, the qpid c++ broker, as I find it the easiest (I should point out I also worked on that!). However I and others have used it extensively also with the qpid dispatch router and with activemq artemis. There are libraries built on it that are used against azure servicebus, and there are plenty of people who use it against rabbitmq and indeed activemq 5 as well.

That said, different use cases can flush out issues that need fixed. Though I can't be an expert in all the possible brokers, I do try to ensure that bugs are fixed no matter what broker they are observed against.

ulisesbocchio commented 5 years ago

thanks for the details :)

ulisesbocchio commented 5 years ago

Another interesting fact, if I bounce the activemq server, I see the messages that had been kept in the senders buffer start flowing into the broker, meaning that after reconnect the sender is sendable again.

grs commented 5 years ago

Consulting with a colleague with expertise on ActiveMQ 5, the suggestion was that you have the default pending message limit strategy in effect[1] which you could alter by setting to 0 or removing the line. Note however that if you do that messages would build up until memory was exhausted, if the producer rate remains faster than the consumer rate. You could also increase the value if the consumer rate is only failing to keep up temporarily.

[1] https://github.com/apache/activemq/blob/dc56fa3f6ea753b692b4b3a9ffacc4f82de6af74/assembly/src/release/conf/activemq.xml#L55

ulisesbocchio commented 5 years ago

thank you @grs, appreciate the time you're putting into help me. I tried a different configuration that worked yesterday:

       <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" reduceMemoryFootprint="true" producerFlowControl="false" topicPrefetch="1000" />
                <policyEntry queue=">" reduceMemoryFootprint="true" producerFlowControl="false" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1000" />
              </policyEntries>
            </policyMap>
        </destinationPolicy>

we don't have that setting constantPendingMessageLimitStrategy at all, so we should be good right? Or do I need to explicitly set it to 0?

I think the reduceMemoryFootprint is definitely helping since it's supposedly keeping the payload of the messages in disk at all times and memory consumption threshold isn't reach. I do have a few questions though:

grs commented 5 years ago

Is there a way to pass connection URL settings?

I don't understand the question fully. Are you wanting those settings to be applied on the client? If so then no, it does not use a url for configuring. You have to setup the options on the receiver, e.g. for prefetch use the credit_window option.

what are the possible values for durable in container.open_receiver/open_sender?

0 (nothing is durable), 1 (only the configuration is durable), 2 (messages and their state are also durable)

is it possible that the broker is sending some sort of "block" message to the connection that is causing the entire connection to come to stop? Could it be that the broker is trying to stop a specific consumer/producer but this library is stopping the entire connection?

I don't think so. The protocol has no way of doing that. The broker would have to be explicitly refusing to give credit to any sender from the connection.

ulisesbocchio commented 5 years ago

Thank you once more for the detailed response. What we see is definitely the connection coming to stop. The connection itself is there, the broker shows the producers/consumers to be connected, but there’s no data flow. Which matches your description of the broker refusing credit to everyone (senders and consumer) on a given connection. I’m gonna try create a reproducer app with all the details so you can play with it.