rebus-org / Rebus.SqlServer

:bus: Microsoft SQL Server transport and persistence for Rebus
https://mookid.dk/category/rebus
Other
43 stars 42 forks source link

Exceptions when Sending messages #54

Closed BartHuls closed 4 years ago

BartHuls commented 4 years ago

Hi,

I have set up the following system var services = new ServiceCollection(); // Microsoft.Extensions.DependencyInjection

services.AddRebus(configure => configure
                    .Options(o =>
                        {
                            o.SimpleRetryStrategy(errorQueueAddress: RebusConfig.ErrorQueueAddress);
                            o.SetMaxParallelism(1);
                        }
                    )
                    .Transport(t => t.UseSqlServer(connectionString, RebusConfig.InputQueueNameWorker))
                    .Routing(r => r.TypeBased().Map<SendNotificationMessage>(RebusConfig.DestinationAddressWeb))
                    .Subscriptions(s => s.StoreInSqlServer(connectionString, RebusConfig.Subscription, true))
                );

After sending a few messages. IBus.Send I get one of the following exceptions:

-       InnerException  {System.InvalidOperationException: BeginExecuteNonQuery requires an open and available Connection. The connection's current state is open.
   at System.Data.SqlClient.SqlConnection.GetOpenTdsConnection(String method)
   at System.Data.SqlClient.SqlConnection.ValidateConnectionForExecute(String method, SqlCommand command)
   at System.Data.SqlClient.SqlCommand.ValidateCommand(Boolean async, String method)
   at System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(TaskCompletionSource`1 completion, Boolean sendToPipe, Int32 timeout, Boolean asyncWrite, String methodName)
   at System.Data.SqlClient.SqlCommand.BeginExecuteNonQuery(AsyncCallback callback, Object stateObject)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncImpl(Func`3 beginMethod, Func`2 endFunction, Action`1 endAction, Object state, TaskCreationOptions creationOptions)
   at System.Data.SqlClient.SqlCommand.ExecuteNonQueryAsync(CancellationToken cancellationToken)
--- End of stack trace from previous location where exception was thrown ---
   at Rebus.SqlServer.Transport.SqlServerTransport.InnerSend(String destinationAddress, TransportMessage message, IDbConnection connection)
   at Rebus.SqlServer.Transport.SqlServerTransport.Send(String destinationAddress, TransportMessage message, ITransactionContext context)}  System.Exception {System.InvalidOperationException}

OR

+       InnerException  {System.InvalidOperationException: Invalid operation. The connection is closed.
   at System.Data.SqlClient.SqlConnection.GetOpenTdsConnection()
   at System.Data.SqlClient.SqlCommand.WaitForAsyncResults(IAsyncResult asyncResult)
   at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
   at System.Data.SqlClient.SqlCommand.EndExecuteNonQuery(IAsyncResult asyncResult)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at Rebus.SqlServer.Transport.SqlServerTransport.InnerSend(String destinationAddress, TransportMessage message, IDbConnection connection)
   at Rebus.SqlServer.Transport.SqlServerTransport.Send(String destinationAddress, TransportMessage message, ITransactionContext context)}  System.Exception {System.InvalidOperationException}
mookid8000 commented 4 years ago

Weird – which version of Rebus and Rebus.SqlServer are you using?

BartHuls commented 4 years ago

Rebus : 5.4.0 Rebus.ServiceProvider: 4.0.2 Rebus.SqlServer: 5.1.0

From a .NET Core 2.1 Console Application

mookid8000 commented 4 years ago

I've tried to reproduce the issue with this test, because I had a suspicion that executing many parallel inserts on the same connection could result in that error.

After having run the test without success a couple of times I went through the code, and it turned out that messages are not even sent in parallel – they're queued in an in-mem buffer on each call to Send on the transport, and then they're serially inserted when the transaction completes.

Next up: Try to reproduce with full bus and Microsoft DI in the mix. That'll wait 'till tomorrow. 😄

Let me know if you have any ideas. 🙂

mookid8000 commented 4 years ago

Hi again @BartHuls , quick question: Are you, by any chance, sending the messages from inside a Rebus handler?

And did you await the calls?

E.g. if you do this:

public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
    readonly IBus bus;

    public SomeMessageHandler(IBus bus)
    {
        this.bus = bus;
    }

    public async Task Handle(SomeMessage message)
    {
        bus.Send(new AnotherMessage());
    }
}

then the call to bus.Send will not be completed when the method exits, and all kinds of weird stuff can happen in a very indeterministic fashion, because the connection might be returned to the pool before the message gets sent.

The correct way would be to await the call (because it returns a Task) like this:

public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
    readonly IBus bus;

    public SomeMessageHandler(IBus bus)
    {
        this.bus = bus;
    }

    public async Task Handle(SomeMessage message)
    {
        await bus.Send(new AnotherMessage());
    }
}

OR to use the synchronous bus, if you're in a place where you can't await stuff:

public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
    readonly IBus bus;

    public SomeMessageHandler(IBus bus)
    {
        this.bus = bus;
    }

    public async Task Handle(SomeMessage message)
    {
        SyncMethod();
    }

    void SyncMethod()
    {
        // this Send is synchronous
        bus.Advanced.SyncBus.Send(new AnotherMessage());
    }
}

I'm closing this issue for now – please let me know if this was not the solution to your issue.

BartHuls commented 4 years ago

Hi @mookid8000

I refactured the code and checked for the possibility if there where a change that we call the send message within a Rebus handle event.

This is not the case.

The Events are handled and the "commands" are directly passed through to the actor framework (akka). After the command is finished from the Actor it's is sending the message back to the Rebus >

Every message is awaited.

Regards, Bart