CopernicaMarketingSoftware / AMQP-CPP

C++ library for asynchronous non-blocking communication with RabbitMQ
Apache License 2.0
875 stars 339 forks source link

Cancelling a consume call does not work as expected #506

Open robertlangenberg opened 1 year ago

robertlangenberg commented 1 year ago

Consuming a queue works, as does publishing. I don't use exchanges or anything, just one consumer on one queue. When attempting to cancel a queue, I'm still receiving messages after the DeferredCancel::onSuccess callback is executed. Also, the callback(std::string consumer) is empty, should this be the consumerTag again? I'm using the qt5 event loop and the most recent RabbitMQ release.

Here's what I'm observing:

// first publish many messages to "queueName"

m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m)
        {std::cout << m.body()<< std::endl;});

m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer)
        {std::cout << "should have stopped consuming for: " << consumer << std::endl});

output:

message 1 should have stopped consuming for: (here is just an empty string) message 2 message 3 ... until ALL messages have been delivered

I would have expected the messages to stop after the output "should have stopped consuming" is printed.

When attempting to start consuming under hardcodedConsumerTag again several seconds later, RabbitMQ returns "error - consumerTag already in use", which I interpret to mean the previous consumer is still registered. RabbitMQ also continues to deliver messages to the original consumer until I manually exit the application.

EmielBruijntjes commented 1 year ago

yes that is strange, what is the consumer-tag that is reported by the application if you also install a onSuccess callback:

m_channel->consume("queueName", "hardcodedConsumerTag").onSuccess([](const std::string &tag) {
    std::cout << "started consumer tag" << std::endl;
}).onReceived([](AMQP::Message m)
        {std::cout.write(m.body(), m.bodySize()) << std::endl;});

Also watch out that m.body() might not be null-terminated.

robertlangenberg commented 1 year ago

Oh wow that was already a good question, turns out that was a bug on my part while attempting to solve the problem with automatically assigned tags, long story short, the DeferredConsume::onSuccess() and DeferredCancel::onSuccess() callbacks get the correct consumerTag passed. Unfortunately, the problem that consumption continues persists, but printing in the callbacks has shown something else: I'm cancelling a consumer before getting the DeferredConsume::onSuccess() callback. AMQP-CPP reports success for the cancellation of the (potentially not yet existing?) consumer. Could this be what's happening? And if yes, should DeferredCancel::onSuccess() be executed if a non-existing consumer has been cancelled? Is there a way for me to perform the consume and cancel calls blocking with AMQP-CPP? Nesting everything within the DeferredConsume::onSuccess() callback would not be ideal as the nesting is already pretty deep and the consume call is done elsewhere in the code.

I might have found a(nother?) bug while trying to fix this: Rejecting a message doesn't seem to actually requeue it in the same place, another consumer will get subsequent messages, even when setting m_channel->setQos(1,true):

m_channel->setQos(1,true);
m_queueCancelled = false;
m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([&](const AMQP::Message &message,
                                   uint64_t deliveryTag, bool redelivered)
        {if (m_queueCancelled){
            std::cout << "queue already cancelled, rejecting: " << message.body() << std::endl;
            m_channel->reject(deliveryTag, AMQP::requeue);
        } else { 
            //process message
            std::cout << "consuming normally: " << message.body() << std::endl;
            m_channel->ack(deliveryTag);
        }).onSuccess([](){std::cout << "consuming started" << std::endl;});

// everything below is nested in the cancel().onSuccess()
m_channel->cancel("hardcodedConsumerTag").onSuccess([&](std::string consumer)
        {m_queueCancelled = true;
        std::cout << "should have stopped consuming for: " << consumer << std::endl;

        m_channel->get(queueName).onSuccess([&](const AMQP::Message &message, uint64_t deliveryTag,
            bool redelivered) { 
                std::cout <<"printing in get: " << message.body() << std::endl; 
                m_channel->ack(deliveryTag);
                callGetUntilEmpty(); //emits a QT signal that calls m_channel->get with the same arguments recursively until DeferredGet::onEmpty() is called
            }).onEmpty([&](){std::cout << "queue empty" << std::endl); 
                                    m_queueCancelled = false; 
                                    m_channel->consume("hardcodedConsumerTag");});
});

and the output is


should have stopped consuming for: hardcodedConsumerTag
consuming started
queue already cancelled, rejecting: message1
printing in get: message2
printing in get: message1
queue already cancelled, rejecting: message3
queue empty
consuming normally: message3
consume error:  NOT_ALLOWED - attempt to reuse consumer tag 'hardcodedConsumerTag' // I installed DeferredConsume::onError for this to be printed

I need to preserve the message order, so having rejected messages not be in the same place is really inconvenient... I guess this might be RabbitMQ config, but I'm all ears if you happen to know how to achieve that :)

note that when I neither acknowledge nor reject the message in the supposedley canceled consume call, m_channel->get() will still get message2 and message3, i thought this would not happen with setQos(1,true).