rebus-org / Rebus.AzureServiceBus

:bus: Azure Service Bus transport for Rebus
https://mookid.dk/category/rebus
Other
33 stars 20 forks source link

Add ability to change the way topic names are created #4

Closed ChristopherHaws closed 6 years ago

ChristopherHaws commented 6 years ago

It would be nice to have the ability to configure the way topics are named. The current implementation is TypeName_AssemblyName which gets weird when using tools like LINQPad which uses a new assembly every time the process is executed. This results in lots of single use topics being created in azure.

mookid8000 commented 6 years ago

Good point 😄

rosieks commented 6 years ago

Should this one be implemented here or maybe in core in these three places:

mookid8000 commented 6 years ago

well.... having thought about this for a couple of days, I think I have reached the conclusion that Rebus should not have to take into account that one might publish/subscribe to types defined in a dynamically generated assembly in LINQpad....

If you need to do that, you can always use the raw topics API and publish/subscribe to custom topics like this:

var topics = bus.Advanced.Topics;

// receive everything published to "whatever"
await topics.Subscribe("whatever");

// publish something to "whatever"
await topics.Publish(new { Text = "ლ(ಠ益ಠლ)" }, "whatever");

By using the topics API, you can easily extend Rebus with an extension method like this:

public static class BusExtensions
{
    public static Task CustomSubscribe<T>(this IBus bus)
    {
        var topic = GetTopicFrom(typeof(T));

        return bus.Advanced.Topics.Subscribe(topic);
    }

    static string GetTopicFrom(Type type) => (....) magic!
}
xela30 commented 4 years ago

@mookid8000 Could you please advise how IHandleMessages<T> will be resolved from DI container in this case? (Where T is "json-compatible" with that anonymous object published.)

mookid8000 commented 4 years ago

If I remember correctly, Rebus will dispatch the message as a Newtonsoft.Json.Linq.JObject if it cannot deserialize into a known C# type.

So if your handler implements IHandleMessages<JObject>, you should be able to handle all types of otherwise-unserializable messages.

xela30 commented 4 years ago

@mookid8000 thanks for quick turnaround! Is there a way to subscribe via custom topic but register a handler based on certain message type instead of JObject? Maybe plug in some kind of topic-to-type resolver on a subscriber side or something? As otherwise messages of different structure from different custom topics bus is subscribed to will be handled by that single one generic IHandleMessages<JObject> which is obviously not convenient at all.

mookid8000 commented 4 years ago

There's no connection between which topics you subscribe to, and which TYPES of messages you receive – you will just receive whatever is published to that topic.

When a message then arrives in your input queue, there's no difference between a published message or a sent message, it just arrives in your input queue.

When the message then is processed by Rebus, the serializer (the default JSON serializer) will check the rbs2-msg-type header, and then see if the current appdomain has an available type by that name. If that's the case, then it will attempt to deserialize into that type, otherwise it will deserialize as a JObject (which will always be possible if the payload is valid JSON).

How you make use of this is pretty much up to you 😄 if you want to affect what happens to the incoming JSON, you can easily augment the default (de)serialization by plugging in a serializer decorator of your own:

Configure.With(...)
    .(...)
    .Serialization(s => s.Decorate(c => new MySpecialSerializer(c.Get<ISerializer>())))
    .Start();

and then MySpecialSerializer could then look somewhat like this:

public class MySpecialSerializer : ISerializer
{
    readonly ISerializer serializer;

    public MySpecialSerializer(ISerializer serializer) => this.serializer = serializer;

    public Task<TransportMessage> Serialize(Message message) => serializer.Serialize(message);

    public async Task<Message> Deserialize(TransportMessage transportMessage)
    {
        // clone headers, so we don't change them
        var clone = transportMessage.Headers.Clone();

        // inspect transport message here and maybe deserialize
        // to something else:
        // return new Message(headers, ...);

        // or fall back to default behavior:
        return await serializer.Deserialize(transportMessage);
    }
}