Azure / amqpnetlite

AMQP 1.0 .NET Library
Apache License 2.0
401 stars 143 forks source link

Callback function in ReceiverLink.Start is not called #521

Closed timbrando closed 2 years ago

timbrando commented 2 years ago

Hi,

I am developing a client for the HoloLens 2 (UWP) to communicate via a RabbitMQ and therefore use AmqpNetLite as it seems to be the only one that works on UWP. Currently, I make some dry runs with two plain C# console applications (AmqpSender and AmqpReceiver) as described in the Hello AMQP tutorial. Until now, everything works as expected with the blocking ReceiverLink.Receive(). I send the message and receive it. My next step was to use ReceiverLink.Start(...) but I can't get the MessageCallback function to be called. My programs are as follows:

AmqpSender:

using System;
using Amqp;

namespace AmqpSender {
    class Program {
        static void Main(string[] args) {
            Address address = new Address("amqp://guest:guest@localhost:5672");
            Connection connection = new Connection(address);
            Session session = new Session(connection);

            Message message = new Message("Hello AMQP!");
            SenderLink sender = new SenderLink(session, "sender-link", "/exchange/topic_logs/hello");
            sender.Send(message);
            Console.WriteLine("Sent Hello AMQP!"); //will be received by receiver.Receive()
            sender.Send(message);
            Console.WriteLine("Sent Hello AMQP!"); //not received by receiver.Start()

            sender.Close();
            session.Close();
            connection.Close();
        }
    }
}

AmqpReceiver:

namespace AmqpReceiver
{
    class Program
    {
        static void Main(string[] args)
        {
            Address address = new Address("amqp://guest:guest@localhost:5672");
            Connection connection = new Connection(address);
            Session session = new Session(connection);
            ReceiverLink receiver = new ReceiverLink(session, "receiver-link", "/exchange/topic_logs/hello");

            Console.WriteLine("Receiver connected to broker.");
            Message message = receiver.Receive(); //works totally fine
            Console.WriteLine("Received " + message.Body.ToString());
            receiver.Accept(message);

            receiver.Start(20, (receiver, msg) => { Console.WriteLine("Received " + msg.Body.ToString()); }); //callback function never gets called

            receiver.AddClosedCallback((sender, e) => { Console.WriteLine("Receiver closed!"); }); //this callback works, however
            receiver.Close();
            session.Close();
            connection.Close();
        }
    }
}

I am not much of a C# developer, so please apologize any stupid mistakes. A different issue is that the ReceiverLink seems to create a durable queue in the RabbitMQ. Is there any way that the queue will be automatically unbinded when the connection (or link) closes?

Thanks for the support!

Kind regards

xinchen10 commented 2 years ago

This is a limitation of the implementation when Receive and OnMessage callback are used on the same link. This usage is not intended. The expected usage pattern is either Receive calls in a loop or Start call with a callback.

In your case the message is not given to the callback because it is kept in a local queue in the receiver and is waiting for the next Receive call to pick it up. This behavior is also related to the default prefetch mode of the receiver. When you call Receive, the receiver link issues a certain number of credits (# of messages) to the broker to prefetch messages so that subsequent Receive calls will get messages from its local buffer queue.

To avoid this issue, you could use either Receive or Start with callback, but not both.

The other option is controlling the credit mode by calling ReceiverLink.SetCredit method before calling Receive. If you use the Manual mode and set credit to 1, then your code should work.

I agree that this is not obvious so we will see if there is anything that could be done to improve user experience in this case.

timbrando commented 2 years ago

Hi,

thanks for your quick answer! 👍 My example was a bit misleading as this was not really my intention to do but only to show that Receive() worked and Start() not. Thank you for clarification and both of your suggestions worked. Next step would be to use the ConnectionFactory but for now I will use the synchronous connection creation.

Also:

Is there any way that the queue will be automatically unbinded when the connection (or link) closes?

Currently, every queue created will remain binded.

Feel free to close this issue. I will leave the issue open because it might indeed be one.