rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.27k stars 354 forks source link

Customize Message routing key #863

Closed mike-dube closed 4 years ago

mike-dube commented 4 years ago

We are currently using Rebus along with Rebus.RabbitMQ.

Currently, routing_key in RabbitMQ is the message's qualified name (ex: Assembly.Namespace.Messages.MyEvent). Is there any way to change that using Rebus? (ideally an attribute on the POCO message class itself).

We have this requirement as our .NET Core 3.1 worker listens to a routing_key generated by a Python app.

Thanks :)

mookid8000 commented 4 years ago

There's a chance that you can simply

await bus.Advanced.Topics.Subscribe("your_topic");

because this will bind the your_topic topic from the RebusTopics exchange to the bus' input queue.

If you do want to control how type names are mapped to topic names, Rebus has the ITopicNameConvention interface, which you can implement and then have Rebus use it like this:

Configure.With(...)
    .(...)
    .Options(o => o.Register<ITopicNameConvention>(c => new CustomTopicNameConvention()))
    .Start();

possibly wrapper the o.Register(...) stuff in an extension method of OptionsConfigurer, making it look pretty like this:

Configure.With(...)
    .(...)
    .Options(o => o.UseCustomTopicNameConvention()))
    .Start();

Let me know if it works 🙂

mike-dube commented 4 years ago

Thanks a lot :) It works :)

We used an Attribute to decorate the Message to be able to extract a routing_key for RabbitMQ.

One last question.

 1 unhandled exceptions (Message with ID 0456d648-d844-47e7-b45b-23b22ff032af and type A.B.C.Context.IntegrationEvents.LunchReady, A.B.C could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)) ---> Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID 0456d648-d844-47e7-b45b-23b22ff032af and type A.B.C.IntegrationEvents.LunchReady, A.B.C could not be dispatched to any handlers 

We tried disabling the JsonType from Newton

.Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings
                {
                    TypeNameHandling = TypeNameHandling.None
                    // ALSO TRIED: MetadataPropertyHandling = MetadataPropertyHandling.Ignore
                 }))

How can we disable this behavior, to strictly rely on our AttributeTopicNameConvention ?

mike-dube commented 4 years ago

Update 1 - looks like message use a header rbs2-msg-type which contain the Assembly Name.

Tried await bus.Advanced.Routing.Subscribe<TEvent>(destination) but .Subscribe... does not seems to exist in package 6.0.0

Can we disable this validation?

Update 2 - @mookid8000 So far, I tried a custom deserialize. Is there a "simple" way to route by routing_key, rather than the class's name, while using the IHandleMessages pattern?

mookid8000 commented 4 years ago

There's two things at play here: 1) message routing (i.e. how does a message end up in a Rebus instance's input queue), and 2) message handling (i.e. what to do with the message when it is received).

It sounds to me like you got the routing part working. You can completely bypass Rebus' type-based topics by subscribing to a topic like this:

var topics = bus.Advanced.Topics;

await topics.Subscribe("this_is_the_topic");

and then that particular subscriber will receive ANYTHING published to this_is_the_topic:

var topics = bus.Advanced.Topics;

await topics.Publish("this_is_the_topic", "this can be serializable object");

Now, when Rebus receives a message, the default serializer needs to know which type to try to deserialize the received payload into – that's why it insists on looking up a .NET type from the rbs2-msg-type header. There's no way to ignore this header, as that would make it impossible for Rebus to decide which .NET type to use.

For this reason, Rebus 6.1.0 has the built-in ability to completely customize the type name used when serializing messages, thus requiring the types to be known up-front. It can be enabled by doing something like this (combined with interoperable JSON serialization mode):

Configure.With(...)
    .(...)
    .Serialization(s => {
        s.UseNewtonsoftJson(JsonInteroperabilityMode.PureJson);
        s.UseCustomMessageTypeNames()
            .AddWithShortName<SomeMessage>()
            .AddWithShortName<AnotherMessage()

            .AddWithShortNames(new [] { typeof(ThirdMessage), typeof(FourthMessage) })

            .AddWithCustomName<FifthMessage>("🙂");
    })
    .Start();
mike-dube commented 4 years ago

Thanks a lot @mookid8000.

I've tried your code, and made some progress :)

Publisher

Subscriber

It creates the correct binding with the topic name, as I subscribed

services.AutoRegisterHandlersFromAssemblyOf<SubscriberLunchReadyHandler>();

app.ApplicationServices.UseRebus(async r =>
            {
                await r.Subscribe<SubscriberMessageLunchReady>();
            });

However, when a message is received, I still had the type error Cannot get type corresponding to the name 'Publisher.Message.LunchReady, Publisher.Message'

I've used.AddWithCustomName<SubscriberMessageLunchReady>("lunch.ready"); which is my routing_key (lunch.ready), it does not work. It seems that the method argument must equal to the Type name from the Publisher right (which is contain in the message's header key)? That's the only way I got it to work.

Would it be possible to map the MessageType (SubscriberMessageLunchReady) to the RabbitMQ routing_key? I tried to get it from TransportMessage without any success. If I could, that could let me deserialize in the proper type. Is this something possible with Rebus?

mookid8000 commented 4 years ago

Did you call

s.UseCustomMessageTypeNames()
    .AddWithCustomName<SubscriberMessageLunchReady>("lunch.ready");

on both the publisher and the subscriber?

They need to agree on what to call the types for this to work. 🙂

mike-dube commented 4 years ago

Awesome, it works :) Forgot this important part!

One last question, does Rebus play nice with multiple exchanges in RabbitMQ?

Example:

Service A should be able to send message to exchange restaurant.lunchs and restaurant.audit Service B will bind to message in restaurant.lunchs Service C will bind to messages in restaurant.audit

If so, is there any wiki article?

Thanks!

mookid8000 commented 4 years ago

does Rebus play nice with multiple exchanges in RabbitMQ?

It does, to some degree. It's been a long time since I used the RabbitMQ transport myself, so my memory might be a little off here... but I seem to recall functionality that allows for explicitly specifying the exchange to publish to by using a topic@exchange syntax, e.g.

var evt = new LunchReady("woohoo! 🕺");

await bus.Advanced.Topics.Publish("lunch.ready@restaurant.lunchs", evt);

When configuring Rebus with RabbitMQ, you can specify which exchanges are default for direct messaging (i.e. sending to specific queues – should be one specific exchange in your case) and multicast messages (i.e. pub/sub, basically namespaces for topics).

It's done with the ExchangeNames method on the builder returned from UseRabbitMq, so in your case it could look somewhat like this:

IIRC it's done like this:

Configure.With(...)
    .Transport(t => t.UseRabbitMq(...)
        .ExchangeNames(topicExchangeName: "restaurant.lunchs"))
    .Start();

I hope that brings you a step closer to the perfect solution. 🙂

mike-dube commented 4 years ago

Thanks @mookid8000 :)

I've got all these point perfectly now. The only thing remaining is that i need to support 2+ exchanges. Can Rebus do that? Looking at .UseRabbitMq(...).ExchangeNames(topicExchangeName: "restaurant.lunchs")) it does not seems like?

I need to be able to send two differents message in both restaurant.lunchs and restaurant.audit (both two different types, so I need to route them to the proper exchange).

mookid8000 commented 4 years ago

When you do this .UseRabbitMq(...).ExchangeNames(topicExchangeName: "restaurant.lunchs")) thing, it's just configuring the default exchange to be restaurant.lunchs.

Any bus instance can

await bus.Advanced.Topics.Publish("topic-name@exchange-name", myEvent);

and have myEvent published to the topic-name topic via the exchange-name exchange. When you specify the exchange name as part of the topic, it's like a globally addressable form of the topic, and therefore it doesn't matter which exchange was configured to be the default.

mike-dube commented 4 years ago

Awesome, thanks! @mookid8000 :)

As last question, in a single .NET Core console app, can we subscribe to two different topic in two exchanges?

I tried subscribing to "topic-name@exchange-name" but that just created a binding with "topic-name@exchange-name" as routing-key

mookid8000 commented 4 years ago

(..) can we subscribe to two different topic in two exchanges?

Looks like that is not possible with the way it works now. But it definitely would be nice and consistent with how topics are treated in other places if it was possible.

mike-dube commented 4 years ago

That would be awesome, is it on your roadmap?

mookid8000 commented 4 years ago

Created an issue for the feature here: https://github.com/rebus-org/Rebus.RabbitMq/issues/65