Particular / NServiceBus.Transport.Msmq

MSMQ transport for NServiceBus
https://docs.particular.net/nservicebus/msmq/
Other
5 stars 11 forks source link

Cannot delay messages using the dispatcher API #621

Closed timbussmann closed 3 months ago

timbussmann commented 9 months ago

Describe the bug

Description

When sending delayed messages using the IDispatchMessages (v7) transport seam API, the configured delayed delivery settings are not applied and instead the message is immediately dispatched.

Expected behavior

The configured delivery delay configured via the DeliveryConstraint setting should be applied and the message should be delayed with the configured values.

Actual behavior

The settings are not applied and the message is immediately dispatched. E.g. here's the output from a simple repro sample that does first send a message using the regular IMessageSession API (using options.DelayDeliveryWith(delay)) and the second using the dispatcher API (using new DelayDeliveryWith(delay)):

Message delay time is 00:00:10
Press [s] to send a message, [d] to dispatch a message, or [ESC] to exit
s
Message sent at 12:34:58.0250817
Received message at 12:35:09.0754242
d
Message dispatched at 12:35:10.0403641
Received message at 12:35:10.0403641

Versions

Verified on 1.2.2 / NSB v7

version 2 (NSB v8) and later doesn't seem to be affected

Steps to reproduce

Dispatch a message using the message dispatcher and providing a delayed delivery constraint

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
using NServiceBus.DelayedDelivery;
using NServiceBus.DeliveryConstraints;
using NServiceBus.Extensibility;
using NServiceBus.Features;
using NServiceBus.Routing;
using NServiceBus.Transport;

public class Program
{
    const string EndpointName = "MsmqReproV1";

    public static async Task Main()
    {
        var config = new EndpointConfiguration(EndpointName);
        config.EnableInstallers();
        var transport = config.UseTransport<MsmqTransport>();
        transport.Transactions(TransportTransactionMode.ReceiveOnly);
        config.SendFailedMessagesTo("error");

        ConfigureMsmqNativeDelayedDelivery(transport, config);
        //ConfigureTimeoutManager(config);

        var serviceCollection = new ServiceCollection();
        var startableEndpoint = EndpointWithExternallyManagedServiceProvider.Create(config, serviceCollection);
        var serviceProvider = serviceCollection.BuildServiceProvider();
        var instance = await startableEndpoint.Start(serviceProvider);

        var dispatcher = serviceProvider.GetRequiredService<IDispatchMessages>();
        var delay = TimeSpan.FromSeconds(10);

        Console.WriteLine($"Message delay time is {delay}");
        Console.WriteLine("Press [s] to send a message, [d] to dispatch a message, or [ESC] to exit");
        while (true)
        {
            var key = Console.ReadKey();
            Console.WriteLine();
            switch (key.Key)
            {
                case ConsoleKey.S:
                    SendOptions options = new SendOptions();
                    options.RouteToThisEndpoint();
                    options.DelayDeliveryWith(delay);
                    options.RequireImmediateDispatch();
                    await instance.Send(new SomeMessage(), options);
                    Console.WriteLine($"Message sent at {DateTime.Now.TimeOfDay}");
                    break;
                case ConsoleKey.D:
                    var transportOperation = new TransportOperation(
                        new OutgoingMessage(
                            Guid.NewGuid().ToString(),
                            new Dictionary<string, string>()
                            {
                                {
                                    Headers.EnclosedMessageTypes,
                                    typeof(SomeMessage).AssemblyQualifiedName
                                }
                            },
                            Encoding.UTF8.GetBytes("<SomeMessage></SomeMessage>")),
                        new UnicastAddressTag(EndpointName),
                         DispatchConsistency.Isolated,
                        //DispatchConsistency.Default,
                        new List<DeliveryConstraint>
                        {
                            new DelayDeliveryWith(delay)
                        });
                    await dispatcher.Dispatch(new TransportOperations(transportOperation), new TransportTransaction(), new ContextBag());
                    Console.WriteLine($"Message dispatched at {DateTime.Now.TimeOfDay}");
                    break;
                case ConsoleKey.Escape:
                    await instance.Stop();
                    return;
            }
        }
    }

    static void ConfigureTimeoutManager(EndpointConfiguration config)
    {
        var persistence = config.UsePersistence<SqlPersistence>();
        persistence.SqlDialect<SqlDialect.MsSqlServer>();
        persistence.ConnectionBuilder(() => new SqlConnection(SqlConnectionString));
        persistence.SubscriptionSettings().DisableCache();
    }

    static void ConfigureMsmqNativeDelayedDelivery(TransportExtensions<MsmqTransport> transport, EndpointConfiguration config)
    {
        // MSMQ native timeout storage:
        transport.NativeDelayedDelivery(new SqlServerDelayedMessageStore(SqlConnectionString));
        config.DisableFeature<TimeoutManager>();
        transport.DisablePublishing();

        config.UsePersistence<LearningPersistence>();
    }
}

public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
    public Task Handle(SomeMessage message, IMessageHandlerContext context)
    {
        Console.WriteLine($"Received message at {DateTime.Now.TimeOfDay}");
        return Task.CompletedTask;
    }
}

public class SomeMessage : IMessage
{
}

Relevant log output

No response

Additional Information

This bug leads to issues when using the TransactionalSession feature with MSMQ transport. The transactional session delays control messages to deal with certain, expected, race conditions. However with this bug, the control messages aren't actually delayed which shortens the TxSession's grace period to settle race conditions significantly. This can lead two observable behaviors:

e.g. the following error message can occur when using NHibernate as the outbox storage:

NHibernate.Exceptions.GenericADOException: could not insert: [NServiceBus.Outbox.NHibernate.OutboxRecord][SQL: INSERT INTO OutboxRecord (MessageId, Dispatched, DispatchedAt, TransportOperations) VALUES (?, ?, ?, ?); select SCOPE_IDENTITY()] ---> System.Data.SqlClient.SqlException: Violation of UNIQUE KEY constraint 'UQ__OutboxRe__C87C0C9D45FA671D'. Cannot insert duplicate key in object 'dbo.OutboxRecord'. The duplicate key value is (Samples.TransactionalSession.Frontend/9ebc2a09-10a6-44fb-82f6-5b17901f5436).

Workarounds

Possible solutions

Additional information

tmasternak commented 3 months ago

This has been solved in