pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
746 stars 144 forks source link

Handling multiple message types on a single queue #362

Open blalonde opened 6 years ago

blalonde commented 6 years ago

This is linked to issue #261

Hi we were also wondering if it was possible to subscribe to multiple message types on the same queue. If I say

await busClient.SubscribeAsync<message1>(
async msg =>
{
await Task.Run(() => callback1(msg));
},
ctx => ctx
.UseSubscribeConfiguration(
cfg => cfg
.Consume(
c => c
.WithRoutingKey(routingKey1)
.WithPrefetchCount(2)
.WithNoLocal(false))
.FromDeclaredQueue(
q => q
.WithName(queue))
.OnDeclaredExchange(
e => e
.WithName(exchange)
.WithType(ExchangeType.Topic))));
await busClient.SubscribeAsync<message2>(
async msg =>
{
await Task.Run(() => callback2(msg));
},
ctx => ctx
.UseSubscribeConfiguration(
cfg => cfg
.Consume(
c => c
.WithRoutingKey(routingKey2)
.WithPrefetchCount(2)
.WithNoLocal(false))
.FromDeclaredQueue(
q => q
.WithName(queue))
.OnDeclaredExchange(
e => e
.WithName(exchange)
.WithType(ExchangeType.Topic))));

Won't it call callback1 when receiving message1 and callback2 when receiving message2 ? We end up trying to deserialize message1 when receiving message2 and trying to call callback1.

Note that we have a custom xml serializer. The serializer succeeds in deserializing the message2, although it fails when trying to convert it to type message1, since the type is returned by

var messageType = GetMessageType(context); return Serializer.Deserialize(messageType, bodyBytes);

and the context in the BodyDeserializationMiddleware is the one from the first subscription...

We're trying to use the one queue multiple message types pattern since we have a legacy MQ wrapper which is setup like that and would like to keep the system as is (if possible) since we have a single exchange and over 200 message types on about 30 queues.

Now maybe are actual configuration is not the recommended MQ configuration. Could you please clarify ?

Thanks

blalonde commented 6 years ago

Ok, I found out how the magic worked in our old wrapper and reimplemented it for rawrabbit. For the curious, here's the pattern :

1) Derive your messages from a base Event class 2) Create a BaseHandler from which all your handlers derive

    public abstract class BaseHandler<T> : IBaseHandler
        where T : class
    {
        public void Handle(Event message)
        {
            this.Handle(message as T);
        }

        public abstract void Handle(T message);
    }

3) Enable binding to multiple routing keys via a custom middleware

public class QueueMultipleBindMiddleware : QueueBindMiddleware
    {
        private readonly QueueMultipleBindOptions _options;

        public QueueMultipleBindMiddleware(ITopologyProvider topologyProvider, QueueMultipleBindOptions options)
            : base(topologyProvider, null)
        {
            this._options = options;
        }

        protected override Task BindQueueAsync(string queue, string exchange, string routingKey, IPipeContext context, CancellationToken ct)
        {
            var task = Task.CompletedTask;
            foreach (var key in this._options.RoutingKeys)
            {
                task = task.ContinueWith(t => this.TopologyProvider.BindQueueAsync(queue, exchange, key), ct);
            }

            return task;
        }
    }

    public class QueueMultipleBindOptions
    {
        public List<string> RoutingKeys { get; set; }
    }

    public static IPipeBuilder UseQueueMultipleBind(this IPipeBuilder builder, QueueMultipleBindOptions options)
        {
            builder.Replace<QueueBindMiddleware, QueueMultipleBindMiddleware>(null, options);
            return builder;
        }

4) Register your handlers in an ioc container

          this.For<BaseHandler<Message1>>()
                .Use<Message1Handler>();

5) Subscribe on this base Event only for the given queue, with multiple routing keys, something like

         client.SubscribeAsync<Event>(
                e =>
                {
                    var handlerType = typeof(BaseHandler<>).MakeGenericType(e.GetType());
                    ((IBaseHandler)container.GetInstance(handlerType)).Handle(e);
                },
                Exchange,
                ProvisioningQueue,
                new List<string>
                {
                    "messageRoutingKey1",
                    "messageRoutingKey2"
                });

public async Task SubscribeAsync<T>(Action<T> callback, string exchange, string queue, IEnumerable<string> routingKeys)
        {
            await this._busClient.SubscribeAsync<T>(
                async msg => { await Task.Run(() => callback(msg)); },
                MultiBindPipe(routingKeys),
                ctx => ctx
                    .UseSubscribeConfiguration(
                        cfg => cfg
                            .Consume(
                                c => c
                                    .WithPrefetchCount(this._prefetchCount)
                                    .WithNoLocal(false))
                            .FromDeclaredQueue(
                                q => q
                                    .WithName(queue))
                            .OnDeclaredExchange(
                                e => e
                                    .WithName(exchange)
                                    .WithType(ExchangeType.Topic))));
        }

        private static Action<IPipeBuilder> MultiBindPipe(IEnumerable<string> routingKeys)
        {
            void Action(IPipeBuilder p)
            {
                SubscribeMessageExtension.SubscribePipe(p);

                var options = new QueueMultipleBindOptions
                {
                    RoutingKeys = routingKeys.ToList()
                };
                p.UseQueueMultipleBind(options);
            }

            return Action;
        }

public static Task SubscribeAsync<TMessage>(
            this IBusClient client,
            Func<TMessage, Task> subscribeMethod,
            Action<IPipeBuilder> pipe,
            Action<ISubscribeContext> context = null,
            CancellationToken ct = default)
        {
            return client.SubscribeAsync<TMessage>(
                async message =>
                {
                    await subscribeMethod(message);
                    return new Ack();
                },
                pipe,
                context,
                ct);
        }

        public static Task SubscribeAsync<TMessage>(
            this IBusClient client,
            Func<TMessage, Task<Acknowledgement>> subscribeMethod,
            Action<IPipeBuilder> pipe,
            Action<ISubscribeContext> context = null,
            CancellationToken ct = default)
        {
            return client.InvokeAsync(
                pipe,
                ctx =>
                {
                    Func<object[], Task<Acknowledgement>> genericHandler = args => subscribeMethod((TMessage)args[0]);

                    ctx.Properties.TryAdd(PipeKey.MessageType, typeof(TMessage));
                    ctx.Properties.TryAdd(PipeKey.MessageHandler, genericHandler);
                    context?.Invoke(new SubscribeContext(ctx));
                },
                ct);
        }

Thus the consumer will listen to the specified message types, with the specified routing keys The deserializer will deserialize this dynamically (we implemented an xml serializer for legacy purposes with the knowntypes attribute pointing to all derived types of event). The generic handler will rely on the ioc to instanciate the right handler

blalonde commented 6 years ago

I still feel like I hacked my way into rawrabbit, notably by my custom queuebindmiddleware. I would appreciate any insight on

  1. The reason for the by-design one message type per queue approach
  2. Any flaws you see with my solution
pardahlman commented 6 years ago

Hi @blalonde - thanks for reporting this and posting your solution đź‘Ť. RawRabbit at its core is not opinionated, as it allows users to define a pipe to execute and comes with some general purpose middelwares that can be leveraged when building a custom pipe.

The operations (like the publish, subscribe, ..) are in fact nothing more than custom pipes that is maintained by the project. They implement some defaults for reliability (durable messages, publish confirms etc), a set of naming conventions for queues, exchanges and routing key and an aims to have an intuitive API. The generic argument TMessage on the subscribe method is used to setup a queue and bind it to an exchange, but it also gives the user the possibility to define a strongly typed subscribe callback. Simplified, each subscriber is tightly coupled to a strongly typed callback, and the "one message type per queue" strategy guarantees that a consumer expecting type A doesn't process a message of type B.

Looking closer at your scenario, I like what I see with the QueueMultipleBindMiddleware; you are using the pipe API just as it is intended and it looks it solved parts of your problem with fairly little coding.

I see three viable approaches for achieving multiple message types on the same queue:

  1. Introduce a new middleware that is called as soon as a message is received. It compares the message type header with the message type of the callback, if they are not the same it will perform a basic.nack and the message will be re-queued. It will then be delivered to the next subscriber that does the same check, once it arrives to the right subscriber, it continues to execute the pipe. Two drawbacks: it might not be performant as in worst case the right subscriber is the last one to try it, meaning that a message will be delivered on the nth try where n is number of consumers on the queue. A more severe drawback is that if no subscriber can handle the message is will be a nack-loop.

  2. Create a custom operation, inspired by the subscribe pipe. It would need to hold some sort of singleton repo with all registered callbacks and its types, keyed by queue name and pick one or more callbacks based on the message type in the header of the incoming message. Each user specified callbacks would need to be added to the repo. This is perhaps the best suggesting, but it will take some time to implement. If you wish to create and contribute it to the project, I can be of help with reviews.

  3. Use the BasicConsumeAsync method from RawRabbit.Operations.Tools. It gives you access to the delivery args and you can deserialize and delegate the message.

Hope this helps!

blalonde commented 6 years ago

Hi, solution 2, although not fully integrated in the rawrabbit framework, is what I mostly implemented in my solution. Instead of delegating the search of the right callback to a singleton repo, I just delegated it to my IoC container.

I agree it would be a nice feature to introduce in the framework.

Unfortunately, for architectural concerns, we decided to stash our rawrabbit solution for now and revert to our in-house solution. That is to say, my solution worked and feel free to integrate or modify it as you wish for further development.

Thanks for the feedback.