CopernicaMarketingSoftware / AMQP-CPP

C++ library for asynchronous non-blocking communication with RabbitMQ
Apache License 2.0
875 stars 339 forks source link

onNegotiate #533

Closed naderafshari closed 2 months ago

naderafshari commented 3 months ago

After implementing the onNegotiate callback

uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval)

and returning a value of A seconds (interval to be used), I see disconnects after exactly 3xA Does anyone know why?

EmielBruijntjes commented 3 months ago

Please provide the shortest possible program that demonstrates your issue. Make sure to not use threads.

naderafshari commented 3 months ago
#include "event2/event.h"
#include <amqpcpp.h>
#include <amqpcpp/libevent.h>
#include <chrono>
#include <iomanip>

class AmqpHandler : public AMQP::LibEventHandler
{
public:
    AmqpHandler(struct event_base *evbase, uint16_t negotiateInterval)
        : AMQP::LibEventHandler(evbase), _evbase(evbase), _negotiateInterval(negotiateInterval)
    {
    }
    virtual ~AmqpHandler() = default;

private:
    void onAttached(AMQP::TcpConnection *connection) override {}
    void onConnected(AMQP::TcpConnection *connection) override {}
    bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override { return true; }
    void onReady(AMQP::TcpConnection *connection) override
    {
        auto now = std::chrono::system_clock::now();
        auto now_c = std::chrono::system_clock::to_time_t(now);
        std::cout << "Connection ready at " << std::put_time(std::localtime(&now_c), "%F %T") << std::endl;
    }
    void onError(AMQP::TcpConnection *connection, const char *message) override {}
    void onClosed(AMQP::TcpConnection *connection) override {}
    void onLost(AMQP::TcpConnection *connection) override {}
    void onDetached(AMQP::TcpConnection *connection) override {}
    uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval) override
    {
        return _negotiateInterval;
    }

    struct event_base* _evbase;
    uint16_t _negotiateInterval;
};

int main(int argc, char* argv[])
{
    if (argc != 2) {
        std::cerr << "Usage: " << argv[0] << " <negotiate_interval>" << std::endl;
        return 1;
    }

    uint16_t negotiateInterval = std::stoi(argv[1]);

    std::string addressString = "amqp://guest:guest@rabbitBroker:5672";
    AMQP::Address address(addressString);

    auto evBase = event_base_new();

    // Initialize AMQP event handler
    AmqpHandler cbeh(evBase, negotiateInterval);

    // Connect to RabbitMQ
    AMQP::TcpConnection connection(&cbeh, address);

    std::cout << "create channel" << std::endl;
    // we need a channel too
    AMQP::TcpChannel channel(&connection);

    std::cout << "declare queue" << std::endl;
    // create a temporary queue
    channel.declareQueue(AMQP::exclusive).onSuccess(
        [&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount)
        {
        // report the name of the temporary queue
        std::cout << "queue declared" << name << std::endl;
    });

    // run the loop
    event_base_dispatch(evBase);

    auto now = std::chrono::system_clock::now();
    auto now_c = std::chrono::system_clock::to_time_t(now);
    std::cout << "Event loop exited at " << std::put_time(std::localtime(&now_c), "%F %T") << std::endl;

    event_base_free(evBase);

    // done
    return 0;
}

In the above example rabbitMq disconnects after 30 seconds. I was expecting it to disconnect after 10 seconds.

sh-5.1# ./control-bus 10
create channel
declare queue
Connection ready at 2024-06-20 21:29:36
queue declaredamq.gen-7fmNuTbuzgkgkRj_-lkkEw
Event loop exited at 2024-06-20 21:30:06
naderafshari commented 3 months ago

Based on this https://www.rabbitmq.com/docs/heartbeats#heartbeats-interval the behaviour would be expected I think

EmielBruijntjes commented 2 months ago

Thank you for your patience. I have modified the LibEvent handlers to disable heartbeats for libevent. You should now no longer have an issue with connections that expire.

Some background info. The heartbeats are sent by RabbitMQ and by AMQP-CPP every couple of seconds to notify each other that they are still alive. For some handler implementations that come with AMQP-CPP, this heartbeat feature has been implemented in the Handler base classes ( LibEvHandler, LibBoostAsioHandler). When you use one of these handlers, you do not have to worry about heartbeats. The handler makes sure that server connections are killed if the server becomes unresponsive, and the handlers also ensure that the client-side gives a sign of life to the server every couple of seconds.

Implementing this is not entirely trivial, because you need to manage timers to periodically check if the server is alive, and to periodically make sure the client sends a sign of life too. And just checking for heartbeats is not sufficient, because heartbeats are only sent if nothing else was sent during the heartbeat interval. If there was some other sort of activity, the client nor the server need to send a heartbeat. It requires thus bookkeeping and network traffic interception to find out if a server or client has been active or not. But this all is not your concern, as long as you use LibEvHandler or LibBoostAsioHandler.

However, the handler that you are using is AMQP::LibEvent. And for this handler the timer and heartbeat management has not yet been implemented. It is thus up to your application code to take care of this, or, alternatively, to disable heartbeats altogether. Disabling heartbeats can be done by overriding the onNegotiate() method and make it return 0. If you return something else (like you did!) it also means that you are responsible for keeping the connection alive. If you don't do that, the connection will be closed by the server because of inactivity.

I think that most users of libevent have chosen to disable heartbeats, as that is the simplest approach to solve this issue. And because of this issue, I have now modified the LibEvent handler to disable heartbeats by default for libevent. So you do not even have to override onNegotiate() any longer.

I hope this answers your question.