CopernicaMarketingSoftware / AMQP-CPP

C++ library for asynchronous non-blocking communication with RabbitMQ
Apache License 2.0
875 stars 339 forks source link

Recall onSuccess callback never invoked #501

Closed davtir88 closed 1 year ago

davtir88 commented 1 year ago

Looking at the implementation of the DeferredRecall class, it derives from the DeferredReceiver class which in turn derives from the Deferred class. I was able to successfully install the onSuccess callback on the DeferredRecall object but unfortunately it seems that the callback is never invoked.

I attach here a sample code that reproduces the problem. The program periodically sends unroutable mandatory messages on an exchange not bound to any queue. The onReceived callback is correctly invoked but the onSuccess callback doesn't.

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

class MyTimer
{
private:
    struct ev_timer _timer;
    AMQP::TcpChannel *_channel;
    std::string _queue;
    bool _exchangeDeclared;
    uint64_t _counter;

    static void callback(struct ev_loop *loop, struct ev_timer *timer, int revents)
    {
        // retrieve the this pointer
        MyTimer *self = static_cast<MyTimer*>(timer->data);
        if (self == nullptr) { return; }

        if (!self->_exchangeDeclared)
        {
            // the exchange is created but it is not assigned to any queue, all messages published are unroutable
            self->_channel->declareExchange("unbound_exchange").onSuccess([self]() { self->_exchangeDeclared = true; std::cout << "Exchange declared" << "\n"; });
        }
        else
        {
             // publish a message
             self->_channel->publish("unbound_exchange", "", "Message " + std::to_string(self->_counter++), AMQP::mandatory);
        }
    }

public:
    MyTimer(struct ev_loop *loop, AMQP::TcpChannel *channel, std::string queue) : 
        _channel(channel), _queue(std::move(queue)), _exchangeDeclared(false), _counter(0)
    {
        // initialize the libev structure
        ev_timer_init(&_timer, callback, 0.005, 1.005);

        // this object is the data
        _timer.data = this;

        // and start it
        ev_timer_start(loop, &_timer);
    }
    virtual ~MyTimer()
    {
        // @todo to be implemented
    }
};

int main()
{
    // access to the event loop
    auto *loop = EV_DEFAULT;

    // handler for libev
    AMQP::LibEvHandler handler(loop);

    // make a connection
    AMQP::Address address("amqp://guest:guest@localhost/");
    AMQP::TcpConnection connection(&handler, address);

    // we need a channel too
    AMQP::TcpChannel channel(&connection);

    // construct a timer that is going to publish stuff
    auto *timer = new MyTimer(loop, &channel, "");

    channel.recall()
        .onReceived([&](const AMQP::Message& message, int16_t code, const std::string& description) 
        { 
             std::cout << "Received unroutable message: " << message.body() << std::endl; 
        })
        .onSuccess([&]() 
        {
             std::cout << "Recall successfull. " << std::endl;
        })
        .onError([&](const char* errmsg) 
        {
             std::cout << "Recal failure. Reason was: " << errmsg << std::endl; 
        });

    // run the loop
    ev_run(loop, 0);

    // done
    return 0;
}

Is it the intended approach to know whether or not the recall function was executed successfully? Thanks in advance

EmielBruijntjes commented 1 year ago

The onSuccess() method is never called. It uses a common base class because it is so similar in processing headers, data-frames, etc. But is it thus not completely the same. Maybe it was better to use a different architecture for it - we probable were just a bit lazy.

Anyway, with recall() you can install the callback(s) to process bounced messages. With the publish() call you can pass flags to rabbitmq to tell that undelivarvle msgs may be bounced, and that's it. There is no onSuccess that is called.

davtir88 commented 1 year ago

So if I understand correctly I can fairly assume that once the recall function is invoked I can consider the callback installed without the risk of latencies or errors from the server.

Thanks for the clarifications!

EmielBruijntjes commented 1 year ago

Yes