SzymonPobiega / NServiceBus.Router

Cross-transport, cross-site and possibly cross-cloud router component for NServiceBus
MIT License
5 stars 10 forks source link

Native subscription logic not called after enabling the router (SQL-server transport) #7

Closed lovaere closed 5 years ago

lovaere commented 5 years ago

I was testing a specific scenario where the NServiceBus.Router package is used as a bridge between an 'island' with SQL-server as transport and another island that uses Azure Service Bus as transport mechanism. I've noticed that the subscriptions on the SQL transport island are no longer created after I've enabled the router. Or in other words it looks that the default SQL-transport behaviour of NServiceBus is broken after enabling the router.

Steps to reproduce

I have created a small sample solution and added it to GitHub that contains some console applications just to validate this scenario.

Subscribe messages are sent to the publisher (router connector disabled)

For the SQL transport, the subscriber should send new subscribe messages to the publisher. In the sample the SqlAsbBridge.SqlHandler will send these to the SqlAsbBridge.SqlPublisher. This does work if we disable the router connector. So we need to add the following lines of code in comment in order to skip the router connector:

// Add routing logic to ASB:
var router = routing.ConnectToRouter("PoCRouter");
router.RegisterPublisher(typeof(SqlActionCompletedEvent), publisherEndpointName: "SqlPublisher");

In that case the MessageDrivenSubscribeTerminator of NServiceBus will take care of this subscribe behaviour and everything is ok.

Clear all existing subscriptions and subscribe messages from the queues

As we want to validate if the subscribe logic on the SQL transport side also work if we enable the router, we can clear the subscriptions table and remove any subscribe messages in order to take a clean start.

Enable the router connector and restart all apps

If we now enable the router connector and restart the SqlAsbBridge.SqlHandler with the extra lines of code, we will no longer see the subscribe messages for the messages on the SQL island inside the queues. The subscribe messages for messages that will use the router are added perfectly (in this case the ones for the ASB island).

What I've learned from debugging the code

Some debugging learned me that the RouterConnectionFeature inside the NServiceBus.Router.Connector will register a new Behavior<ISubscribeContext>. That's the RouterSubscribeBehavior in the following snippet:

// Snippet from RouterConnectionFeature
context.Pipeline.Register(new ForwardSiteMessagesToRouterBehavior(settings.RouterAddress), "Routes messages sent to sites to the bridge.");
context.Pipeline.Register(new RoutingHeadersBehavior(settings.SendRouteTable), "Sets the ultimate destination endpoint on the outgoing messages.");
context.Pipeline.Register(b => new RouterSubscribeBehavior(subscriberAddress, context.Settings.EndpointName(), settings.RouterAddress, b.Build<IDispatchMessages>(), settings.PublisherTable, nativePubSub), "Dispatches the subscribe request via a router.");
context.Pipeline.Register(b => new RouterUnsubscribeBehavior(subscriberAddress, context.Settings.EndpointName(), settings.RouterAddress, b.Build<IDispatchMessages>(), settings.PublisherTable, nativePubSub), 

This RouterSubscribeBehavior will take care of the subscribe logic for messages that need the router. That's fine but depending on the invokeTerminator flag the existing MessageDrivenSubscribeTerminator of NServiceBus itself will be called or not:

public override async Task Invoke(ISubscribeContext context, Func<Task> next)
{
    var eventType = context.EventType;
    if (publisherTable.TryGetValue(eventType, out var publisherEndpoint))
    {

        Logger.Debug($"Sending subscribe request for {eventType.AssemblyQualifiedName} to router queue {routerAddress} to be forwarded to {publisherEndpoint}");

        var subscriptionMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);

        subscriptionMessage.Headers[Headers.SubscriptionMessageType] = eventType.AssemblyQualifiedName;
        subscriptionMessage.Headers[Headers.ReplyToAddress] = subscriberAddress;
        subscriptionMessage.Headers[Headers.SubscriberTransportAddress] = subscriberAddress;
        subscriptionMessage.Headers[Headers.SubscriberEndpoint] = subscriberEndpoint;
        subscriptionMessage.Headers["NServiceBus.Bridge.DestinationEndpoint"] = publisherEndpoint;
        subscriptionMessage.Headers[Headers.TimeSent] = DateTimeExtensions.ToWireFormattedString(DateTime.UtcNow);
        subscriptionMessage.Headers[Headers.NServiceBusVersion] = "6.3.1"; //The code has been copied from 6.3.1

        var transportOperation = new TransportOperation(subscriptionMessage, new UnicastAddressTag(routerAddress));
        var transportTransaction = context.Extensions.GetOrCreate<TransportTransaction>();
        await dispatcher.Dispatch(new TransportOperations(transportOperation), transportTransaction, context.Extensions).ConfigureAwait(false);
    }

    if (invokeTerminator) // This flag is false for SQL transport and so the terminator is not invoked
    {
        await next().ConfigureAwait(false);
    }
}

As this flag is false for the SQL-transport, the NServiceBus code is not called and the subscriptions for the SQL island itself are not added.

Should we invoke this terminator or is there a specific reason why we don't invoke it?

Thanks in advance for your help!

lovaere commented 5 years ago

I'm now validating the following version of the Invoke method for the RouterSubscribeBehavior

public override async Task Invoke(ISubscribeContext context, Func<Task> next)
{
    var eventType = context.EventType;
    if (publisherTable.TryGetValue(eventType, out var publisherEndpoint))
    {

        Logger.Debug($"Sending subscribe request for {eventType.AssemblyQualifiedName} to router queue {routerAddress} to be forwarded to {publisherEndpoint}");

        var subscriptionMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);

        subscriptionMessage.Headers[Headers.SubscriptionMessageType] = eventType.AssemblyQualifiedName;
        subscriptionMessage.Headers[Headers.ReplyToAddress] = subscriberAddress;
        subscriptionMessage.Headers[Headers.SubscriberTransportAddress] = subscriberAddress;
        subscriptionMessage.Headers[Headers.SubscriberEndpoint] = subscriberEndpoint;
        subscriptionMessage.Headers["NServiceBus.Bridge.DestinationEndpoint"] = publisherEndpoint;
        subscriptionMessage.Headers[Headers.TimeSent] = DateTimeExtensions.ToWireFormattedString(DateTime.UtcNow);
        subscriptionMessage.Headers[Headers.NServiceBusVersion] = "6.3.1"; //The code has been copied from 6.3.1

        var transportOperation = new TransportOperation(subscriptionMessage, new UnicastAddressTag(routerAddress));
        var transportTransaction = context.Extensions.GetOrCreate<TransportTransaction>();
        await dispatcher.Dispatch(new TransportOperations(transportOperation), transportTransaction, context.Extensions).ConfigureAwait(false);

        if (invokeTerminator) // Only for native pub sub
        {
            await next().ConfigureAwait(false);
        }
    }
    else // Do not invoke terminator if router already took care of this subscription
    {
        await next().ConfigureAwait(false);
    }
}

That way we will always trigger the MessageDrivenSubscribeTerminator of NServiceBus if we have a route that is not handled by the router (not in publisherTable). However in case we do handle it in the router, then we check the invokeTerminator flag because we would end up with errors if we try to invoke the terminator in that case for message driven transports.

SzymonPobiega commented 5 years ago

Hi @lovaere

Sorry for the late reply. Somehow github unsubscribed me from my own repositories and I didn't get any notifications. Let me read through your issue now.

SzymonPobiega commented 5 years ago

@lovaere :bow: Thanks for reporting this and find the root cause. I've just pushed 3.1.4 which fixes it using the approach you provided in https://github.com/SzymonPobiega/NServiceBus.Router/issues/7#issuecomment-451446801.

lovaere commented 5 years ago

Glad to hear that, thanks @SzymonPobiega !