rebus-org / Rebus.AzureServiceBus

:bus: Azure Service Bus transport for Rebus
https://mookid.dk/category/rebus
Other
33 stars 20 forks source link

ASB Topic subscription also creates queues #77

Closed oguzhankahyaoglu closed 2 years ago

oguzhankahyaoglu commented 2 years ago

Hello,

We just trying to switch from Masstransit to this library. For queues, everything is working fine. However; our applications are also using ASB topics in both ways (publisher or publisher/subscriber, even multiple subscribers multiple nodes running same app).

Current code is as follows:

      services.AddTypedRebus<TMessage>(rebus =>
            {
                //topicName = topicName + new Random().Next(100);
                Log.Warning($"Topic name: {topicName}");
                rebus
                    .Logging(x => x.Serilog(Log.Logger))
                    .Transport(x =>
                    {
                        x.UseAzureServiceBus(serviceBusConnectionString, TopicTestMessage.QUEUE_NAME)
                            .SetMessagePayloadSizeLimit(1024 * 1024) //1024 KB
                            .SetAutoDeleteOnIdle(TimeSpan.FromDays(7))
                            .SetMessagePeekLockDuration(TimeSpan.FromMinutes(5))
                            .SetDefaultMessageTimeToLive(TimeSpan.FromDays(1))
                            ;
                    })
                    // configure serializer to serialize as pure JSONM (i.e. WITHOUT type information inside the serialized format)
                    .Serialization(s => s.UseNewtonsoftJson(JsonInteroperabilityMode.PureJson))
                    .Options(x =>
                    {
                        //x.Decorate<INameFormatter>(r => new LegacyV3NameFormatter());
                        x.SetMaxParallelism(1);
                        x.SetNumberOfWorkers(1);
                    })
                    .Routing(x=> x.TypeBased())
                    ;

                return rebus;
            });

In Startup.Configure:

         app.UseTypedRebus<TopicTestMessage>(bus =>
            {
                bus.Subscribe<TopicTestMessage>().Wait();
            });

The above code creates queues and topics simultaneously. Queue creation is unneccessary. However, published messages are not being consumed in topic subscriptions.

When we switch to ("Fernas" would be the subscription name of Topic, it would be node names in production like "Fernas-XXXXXX"):

      x.UseAzureServiceBus(serviceBusConnectionString, "Fernas")
                            .SetMessagePayloadSizeLimit(1024 * 1024) //1024 KB
                            .SetAutoDeleteOnIdle(TimeSpan.FromDays(7))
                            .SetMessagePeekLockDuration(TimeSpan.FromMinutes(5))
                            .SetDefaultMessageTimeToLive(TimeSpan.FromDays(1))
                            ;

and

           app.UseTypedRebus<TopicTestMessage>(bus =>
            {
                bus.Advanced.Topics.Subscribe("Fernas").Wait();
            });

The application crashes:

[17:01:26 INF -  ] Creating ASB queue Fernas
[17:01:27 INF -  ] Bus TopicTestMessage stopped
Application startup exception: Rebus.Injection.ResolutionException: Could not resolve Rebus.Bus.IBus with decorator depth 0 - registrations: Rebus.Injection.Injectionist+Handler
 ---> Rebus.Exceptions.RebusApplicationException: Could not get queue description for queue Fernas
 ---> Azure.Messaging.ServiceBus.ServiceBusException: Queue was not found (MessagingEntityNotFound)

I am having very hard time while solving some problems using rebus.xx libraries documentation. I've read most of the samples but could not figure it out.

The desired outcome would be as follows:

mookid8000 commented 2 years ago

When you say

Queue creation is unneccessary

what do you actually mean?

And when you say

However, published messages are not being consumed in topic subscriptions.

I maybe get a little bit of a hint that you're a little bit off in your understanding of how Rebus works with ASB's queues and topics.

Let me just try to sum it up:

Rebus works like this: Each bus instance capable of consuming messages needs an input queue.

This means that each Rebus instance has exactly 1 input queue (or 0 if it's a one-way client). Messages are ALWAYS received from the consumer's input queue, and only from that.

You can then subscribe to topics by having the bus instance call

await bus.Advanced.Topics.Subscribe("custom-topic");

which will ensure a topic called "custom-topic" exists, and then create a subscription underneath it (named after the subscriber's input queue), which is configured to forward messages to the subscriber's input queue, this way acting as a BINDING.

So subscribe/unsubscribe will bind/unbind topics to queues, and the binding will be implemented by creating a subscription with "ForwardTo = <subscriber's input queue>".

Does that make sense?

Oh, and if you've encountered something in the documentation that you think is off or does not explain things enough, please let me know 🙂

oguzhankahyaoglu commented 2 years ago

https://github.com/rebus-org/Rebus/wiki/Azure-Service-Bus-transport For the documentation side, in this link, there is no information how ASB topics or queues are configured seperately. Also, Rebus configures queue's properties like SetAutoDeleteOnIdle, SetMessagePayloadSizeLimit etc. However, it does not respect to the configuration code when it comes to Topics.

Queue creation is unneccessary when it comes to work with Topics. When we need topic configuration, Rebus also creates queue with the same name and does not follow names used while creating Topics.

Using the code samples above, Rebus created topic with name of fernas.events~fernas.business.servicebus.topictestmessage and with a subscriber of fernas-test-topic-message-11 which was the name provided in inputQueue parameter.

When we send messages, it would route to queues I am aware of it. But when it comes to Topic configuration (which should be done in a way, seperated from queue configuration), the library should be only able to publish messages and maybe throw exceptions on send trials.

Hence, we should be able to specify the Topic name, Subscription name and it should not create a queue with the same name.

mookid8000 commented 2 years ago

For the documentation side, in this link, there is no information how ASB topics or queues are configured seperately.

Thanks for pointing that out. I've added an explanation now 🙂

Also, Rebus configures queue's properties like SetAutoDeleteOnIdle, SetMessagePayloadSizeLimit etc. However, it does not respect to the configuration code when it comes to Topics.

It's because Rebus has a pretty simple and rigid way of working with queues, topics, and subscriptions, which means that the Rebus instance only "owns" its input queue, and therefore it should not e.g. configure a topic to disappear when idle.

Queue creation is unneccessary when it comes to work with Topics. When we need topic configuration, Rebus also creates queue with the same name and does not follow names used while creating Topics.

If you go over the explanation I added to the wiki, you'll see why queues are important to Rebus. Also, Rebus names topics after the name you pass as "topic" when you publish/subscribe, so

await bus.Advanced.Topics.Publish("something_happened", new SomethingHappened());

and

await bus.Advanced.Topics.Subscribe("something_happened");

will result in the creation of a topic named "something_happened".

Most of the time though, it's easier to let Rebus automatically name the topics from the type of the event (it'll do so via its ITopicNameConvention), so

await bus.Publish(new SomethingHappened());

and

await bus.Subscribe<SomethingHappened>();

will work with a topic named after SomethingHappened, which defaults to be something like namespace_something_happened__assembly or something like that.

oguzhankahyaoglu commented 2 years ago

Thank you for detailed explanation of how Rebus is working. However, it does not suit our requirements. I will create a fork from the library and change it that would fit to our needs. Topics in rebus is far away from what we need actually. We strictly need to control both queue/topic/subscriber properties and names. I would be gladful if you have a look at my fork once its done.

oguzhankahyaoglu commented 2 years ago

Thats a crucial point of how Rebus handles/redirects topic subscription messages to seperate queues I just saw it btw:

                // if it looks fine, just skip it
                if (subscription.ForwardTo == inputQueuePath) return;

                subscription.ForwardTo = inputQueuePath;
mookid8000 commented 2 years ago

However, it does not suit our requirements. (...) Topics in rebus is far away from what we need actually. We strictly need to control both queue/topic/subscriber properties and names.

Ok, I'm sorry to hear that.

But please note that Rebus does what it does in that particular way, not because I thought it was a good idea, but because it's based on how RabbitMQ topic exchanges work, which I consider a little bit of a "reference implementation" on how topic-based routing should work.

This also means that if you can fit the design of your topic-based communication into that model, it's aligned with how lots of other systems work out there, and so it's easier to understand for new developers, easier to maintain, etc.

But I am curious to know which kind of control you need to exert over your topics and subscriptions, since Rebus' way of using them is not fit for your purpose. Could you maybe tell me a little bit more about this?

mookid8000 commented 2 years ago

(btw I just closed this issue as I consider it "done", but let's keep the discussion going 🙂 )