Azure / amqpnetlite

AMQP 1.0 .NET Library
Apache License 2.0
400 stars 143 forks source link

How to implement Publish / Subscriber with Topic and Queues in AMQP 1.0 and RabbitMQ? #470

Closed PassionateDeveloper86 closed 3 years ago

PassionateDeveloper86 commented 3 years ago

Simple question I think:

I want to have a Topic with the routing key "red.dog" and I want to have 3 queues: 1 durable queue whih gets all (the routing key should be #, right?) 1 non durable queue which gets all 1 non durable queue which gets only red (routing key should be red.*, right?)

So I am using C#, the newest RabbitMQ and AMQP 1.0 with the amqpnetlite library.

My sender is like this:

        string address = "amqp://localhost:5672";
        Connection connection = new Connection(new Address(address),
                                               SaslProfile.Anonymous,
                                               new Open() { ContainerId = DEFAULT_CONTAINER_ID },
                                               null);
        Session session = new Session(connection);
        SenderLink sender = new SenderLink(session, "test-sender", "/topic/red.dogs"); // TOPIC: red.dogs 
        while (true)
        {
            try
            {
                Message message1 = new Message("Hello AMQP!");
                sender.Send(message1);
                Thread.Sleep(5000);
                this._logger.LogInformation("Send!");
            }
            catch (Exception ex)
            {
                this._logger.LogError("ERROR: " + ex.Message);
            }
        }

which works fine. I can see messages within the amqp.topics and when I create a queue over the webinterface and bind it to "#" it gets all messages correcty. So far so good.

Now when I try to subscribe there is no message:

       string address = "amqp://localhost:5672";
        Connection connection = new Connection(new Address(address),
                                               SaslProfile.Anonymous,
                                               new Open() { ContainerId = DEFAULT_CONTAINER_ID },
                                               null);
        Session session = new Session(connection);
        Source source = new Source();
        source.Address = "/queue/#";
        source.Durable = 0;
        ReceiverLink rec = new ReceiverLink(session, "test-sub", source, null);
        while (true)
        {
            try
            {
                Message msg = rec.Receive(TimeSpan.FromSeconds(2));
                if (msg == null)
                {
                    this._logger.LogInformation("Nothing recieved");
                }
                else
                {
                    this._logger.LogInformation("Recieved: " + msg.Body);
                }
            }
            catch (Exception ex)
            {
                this._logger.LogInformation("ERROR: " + ex.Message);
            }

        }

So I see the queue created in the web interface named "#" but without binding, it is non-durable with "Durable = 0" and durable with "Durable = 1", so this seems to work. What I can't find out is the correct naming of the address to get my 3 wished examples running (tried a lot of).

I found this documentation about AMQP 1.0 integration: https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/amqp10_client

So my /queue/# should be right I think..

What did I do wrong?

ohad-buki commented 1 year ago

also having the same problem