rebus-org / Rebus.AzureServiceBus

:bus: Azure Service Bus transport for Rebus
https://mookid.dk/category/rebus
Other
33 stars 20 forks source link

support "/" in topic path #1

Closed Meyce closed 7 years ago

Meyce commented 7 years ago

Azure Service Bus supports "/" in topic and queue paths, with that which comes before the "/" acting as a virtual directory. These are handy for better organizing groups of topics/queues in ASB, or supporting multiple environments per ASB namespace(dev, integration, staging, etc), when coupled with a decorator, and the visual representation is useful when using Service Bus Explorer(see screenshot below). The proposed changes alter the normalization of an ASB entity name not replacing a "/" with an underscore. I don't believe there are any issues in doing this as a "/" should not show up in a type's assembly-qualified name.

Also, the determination of a subscription is altered so that it evaluates input queue address and pulls everything after the last slash as a slash is an invalid character in a subscription name. With the changes organizing topics by environment can be accomplished with the following setup.

publisher

class Program
    {
        static void Main(string[] args)
        {

            var activator = new BuiltinHandlerActivator();

            var bus = Configure.With(activator)
                .Transport(t => t.UseAzureServiceBusAsOneWayClient("Endpoint=sb://conn"))
                .Options(o =>
                {
                    o.SimpleRetryStrategy(errorQueueAddress: "dev/errors");
                    o.OrganizeSubscriptionsByEnvironment("dev");
                })
                .Start();

            bus.Publish(new BusEvent("Here is a value")).Wait();
        }
    }

subscriber

class Program
    {
        static void Main(string[] args)
        {

            var activator = new BuiltinHandlerActivator();
            activator.Register(() => new EventBusHandler());

            var bus = Configure.With(activator)
                .Transport(t => t.UseAzureServiceBus("Endpoint=sb://conn", "dev/testreceiver1"))
                .Options(o =>
                {
                    o.SimpleRetryStrategy(errorQueueAddress: "dev/errors");
                    o.OrganizeSubscriptionsByEnvironment("dev");
                })
                .Start();

            bus.Subscribe<BusEvent>();

            Console.WriteLine("Press enter to quit");
            Console.ReadLine();
        }
    }

extension and decorator

public static class Ext
    {
        public static void OrganizeSubscriptionsByEnvironment(this OptionsConfigurer configurer, string environment)
        {
            configurer.Decorate<ISubscriptionStorage>(context =>
            {
                var ss = context.Get<ISubscriptionStorage>();
                var decorator = new EnviromentSubscriptionDecorator(ss, environment);
                return decorator;
            });
        }

    }

    public class EnviromentSubscriptionDecorator : ISubscriptionStorage
    {

        private readonly ISubscriptionStorage _toDecorate;
        private readonly string _environment;

        public EnviromentSubscriptionDecorator(ISubscriptionStorage toDecorate, string environment)
        {
            _toDecorate = toDecorate;
            _environment = environment;
        }

        public async Task<string[]> GetSubscriberAddresses(string topic)
        {
            var safeTopicName = string.Concat(topic
                .Select(c => char.IsLetterOrDigit(c) ? char.ToLower(c) : '_'));

            var returnable = $"subscription/{_environment}/{safeTopicName}";

            return new string[] { returnable };

        }

        public Task RegisterSubscriber(string topic, string subscriberAddress)
        {
            return _toDecorate.RegisterSubscriber($"{_environment}/{topic}", subscriberAddress);
        }

        public Task UnregisterSubscriber(string topic, string subscriberAddress)
        {
            return _toDecorate.UnregisterSubscriber($"{_environment}/{topic}", subscriberAddress);
        }

        public bool IsCentralized => _toDecorate.IsCentralized;

    }

Providing this effect while administering ASB through Service Bus Explorer. image

If the code changes aren't acceptable I can accomplish the same by completely overriding the subscription storage with the decorator, but the proposed changes make the decorator pretty slim.


Rebus is MIT-licensed. The code submitted in this pull request needs to carry the MIT license too. By leaving this text in, I hereby acknowledge that the code submitted in the pull request has the MIT license and can be merged with the Rebus codebase.

CLAassistant commented 7 years ago

CLA assistant check
All committers have signed the CLA.

mookid8000 commented 7 years ago

Whoa, cool! I will have to play around with it a little bit, but from the looks of it I cannot think of anything that would be broken.

Meyce commented 7 years ago

Sounds good. I'd be happy to write some additional tests for the changes if you would like them included, or documentation on how it can be used. Let me know how your testing goes.

mookid8000 commented 7 years ago

Hi @Meyce I merged the PR but I just had a thought – would it be even neater that this

Configure.With(activator)
    .Transport(t => t.UseAzureServiceBus("Endpoint=sb://conn", "dev/testreceiver1"))
    .Options(o =>
    {
        o.SimpleRetryStrategy(errorQueueAddress: "dev/errors");
        o.OrganizeSubscriptionsByEnvironment("dev");
    })
    .Start();

was simply replaced by this:

Configure.With(activator)
    .Transport(t => {
        t.UseAzureServiceBus("Endpoint=sb://conn", "testreceiver1")
            .OrganizeByEnvironment("dev");
    })
    .Start();

thus simply configuring once and for all that all queues and topics should be prefixed with dev/?

mookid8000 commented 7 years ago

And then maybe the OrganizeByEnvironment name could be changed for something better.... as in "who are we to tell people that they are organizing their queues and topics by environment?"

What we are in fact enabling is the ability to prefix all entity names with something.

Maybe a simpler more descriptive name could be something like UseEntityPrefix(...) or PrefixEntitiesWith(...) or something like that....

I don't know.

What do you think?

Meyce commented 7 years ago

I like the idea of adding a UseEntityPrefix/PrefixEntityWith. Anything that helps organize the long lists of topics/queues is welcome. For me, environment was the first logical step since adding a new standard ASB namespace is $10/month. We've got multiple non-production environments for CI/Integration testing/etc. that don't need a NS of their own.

I'll look into adding a method so that prefixing can be handled by an extension of the transport.

How long does it take before the merged code hits nuget?

Meyce commented 7 years ago

@mookid8000,

I had a chance to look at this some this afternoon, and I'm running in to a snag related to the Rebus infrastructural queues(error/audit).

I think it is important that the infrastructural queues have the following behavior.

  1. They can opt in to having a prefix.
  2. If they opt in to having a prefix, then the name of queue is overridden. For example on the error queue, the SimpleRetryStrategySettings would have an error queue with the prefix specified. So it would be dev/error where "dev" is the prefix..

I have been able to accomplish 2 above for the error queue by doing the following in the AzureServiceBusConfigurationExtensions.

configurer.OtherService<SimpleRetryStrategySettings>().Decorate(context =>
            {
                var rs = context.Get<SimpleRetryStrategySettings>();

                if (!settings.PrefixSupportingInfrastructure)
                    return rs;

                //replace the retry strategy with one that includes the prefix
                return new SimpleRetryStrategySettings(settings.EntityPrefix + rs.ErrorQueueAddress, rs.MaxDeliveryAttempts, rs.SecondLevelRetriesEnabled); 
            });

where settings.PrefixSupportingInfrastructure is a new bool property on the AzureServiceBusTransportSettings. I am assuming this would be an appropriate way to override the error queue name.

The audit classes on the other hand are internal to the core Rebus.dll assembly, and I can not override/decorate them.

When you get an opportunity could you look over my commit to my fork, and let me know what you think? For me to proceed handling audit the same way as error, I would need to make some changes to the Rebus project as well, which I am hoping to avoid.

The simpler option would be to not handle error/audit queues in such a fashion, and the "opt-in" would be specifying the prefixes on the optionsconfiguer.

Let me know you thoughts.

https://github.com/Meyce/Rebus.AzureServiceBus/commit/4bfcc1e34b334ed8b364d22adfe01b2d20c18c37

mookid8000 commented 7 years ago

hmmm... I'm beginning to be a little bit worried that it is not going to be that simple after all...

My initial feeling was that it could be made in a simple and "clean" way where you could

.PrefixEntitesWith("dev")

and then all queues and topics would simply have dev/ in front of them.

This would effectively behave as a logically separate namespace, and it would even be shown as such in supporting tools (e.g. Azure Service Bus Explorer as you showed... maybe others too?)

Could it be that

.PrefixEntitesWith("dev")

would do what I describe here, but we could detect a "rooted" entity path (i.e. one prefixed with /) and not attach a prefix to it?

This way, we could start an endpoint in a logical dev namespace like this:

Configure.With(...)
    .Transport(t => {
        t.UseAzureServiceBus(..., "my_queue")
            .PrefixEntitiesWith("dev");
    })
    .Start();

and have it send to dev/whatever like this:

await bus.Advanced.Routing.Send("whatever", someMessage);

but if you wanted to send to whatever in the "root" namespace, you would do this:

await bus.Advanced.Routing.Send("/whatever", someMessage);

thus explicitly rooting the path with a leading /.

If you wanted to forward failed messages to an error queue in the root of the namespace, you would then do it like this:

.Options(o => {
    o.SimpleRetryStrategy(errorQueue: "/error");
})

instead of

.Options(o => {
    // redundant!
    o.SimpleRetryStrategy(errorQueue: "error");
})

or

.Options(o => {
    // redundant!
    o.SimpleRetryStrategy(errorQueue: "/dev/error");
})

which would otherwise be two redundant ways of specifying the default.

What do you think about that?

Meyce commented 7 years ago

I'll look this over some more this weekend and get back with you.

On another note, I haven't seen a new nuget package with the original PR show up yet. Do you know when that will get published out?

mookid8000 commented 7 years ago

ah, sorry – I have published version 2.2.0 now 😄