Particular / NServiceBus.Persistence.Sql

Native SQL Persistence for NServiceBus
https://docs.particular.net/persistence/sql/
Other
36 stars 27 forks source link

Add support for dependency injection in the 'ConnectionBuilder' #713

Open pardahlman opened 3 years ago

pardahlman commented 3 years ago

We are running NServiceBus with NServiceBus.Persistence.Sql and the outbox feature enabled. Our system also relies on TransactionScope for transactionality in the domain code within the handler. Rewriting this code to pass the connection from the storage session is not feasible.

As it works today, both the outbox (without TransactionScope) and UnitOfWork-feature is enabled. This is not optimal, as there is a risk that the inner transaction scope (UoW) completes while the outer (outbox) fails. This can lead to data inconsistency 🙀

When the support for TransactionScope was added to the outbox, we got thrilled for the possibility to finally stop using UnitOfWork. However...

When removing UnitOfWork and using a TransactionScope for the outbox, database reads and writes fails in the handler with the following exception

Immediate Retry is going to retry message '846ba9f8-9eab-45fd-9df7-ad1700e6ae78' because of an exception:
System.PlatformNotSupportedException: This platform does not support distributed transactions.
   at System.Transactions.Distributed.DistributedTransactionManager.GetDistributedTransactionFromTransmitterPropagationToken(Byte[] propagationToken)
   at System.Transactions.TransactionInterop.GetDistributedTransactionFromTransmitterPropagationToken(Byte[] propagationToken)
   at System.Transactions.TransactionStatePSPEOperation.PSPEPromote(InternalTransaction tx)
   at System.Transactions.TransactionStateDelegatedBase.EnterState(InternalTransaction tx)
   at System.Transactions.EnlistableStates.Promote(InternalTransaction tx)
   at System.Transactions.Transaction.Promote()

I believe this is caused by our database access API (that uses Microsoft.Data.SqlClient internally) that creates a new connection as it performs its operations. At this point, I believe that we can solve this with AsyncLocal, but that kind of solution violates at least a handful of principals of software design 😛

This is how I would want to solve this issue:

Today we are configuring the outbox like this

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(() => new SqlConnection(persistenceConnectionString));

What if the ConnectionBuilder would have a callback that passes an instance of IBuilder (or I guess an IServiceProvider in NSB 8.x) so that I could resolve a service, something like IConnectionProvider, that is registered as a scooped service that spans over the handling of the message.

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(builder => builder.Build<IConnectionProvider>().GetOrCreateConnection());

The same service would be injected in our database access API and return the same connection as the one in the connection builder. I do believe that this would prevent the escalation of the TransactionScope.

Looking forward hearing your thoughts on this!

dvdstelt commented 3 years ago

Hi Pär,

Thanks for your suggestion. I've forwarded it to the appropriate team and they will have a look at whether or not we can add this. If there's any update, this issue will be updated as well.

Thanks, Dennis

dvdstelt commented 3 years ago

Hi Pär,

There are several ways to use NServiceBus and your own connection, without having to escalate to a distributed transaction.

  1. The connection strings must be exactly the same (including whitespace and everything) and the connections can't be open at the same time.
  2. SQL Persistence can inject ISqlStorageSession into your handlers, but also into a custom behavior
    1. This is documented here: https://docs.particular.net/persistence/sql/accessing-data and in this sample.
    2. Here's more info on behaviors: https://docs.particular.net/nservicebus/pipeline/manipulate-with-behaviors

A colleague of mine created an example on GitHub where he used option 2, but without ISqlStorageSession and using NHibernate. You can find that here: https://github.com/ramonsmits/NServiceBus.InjectStorageContext/

What he did was

  1. Create a StorageContext to hold the connection and/or transaction.
  2. Create a behavior that fills the StorageContext
  3. Use the StorageContext in a repository.
  4. Inject the repository into the handler.
  5. Make use of the repository in the handler. :-)

I hope this provides some options to share the connection? There are other samples on our docs website that provide alternate ways of setting up/reusing the connection.

pardahlman commented 3 years ago

Thanks for getting back on this. I don't believe the first option is feasible when using the outbox with TransactionScope: The outbox creates a connection that is opened throughout the execution of the IMessageHandler<>, so even if I use the exact same connection string and open a new connection, it will be opened at the same time as the outbox connection and the transaction will escalate.

As for the second options - that is interesting! I've been trying it out, and it works very will in our normal message handling scenarios! Thanks for pointing me to the documentation 🙏 However, there's one issue with injecting ISqlStorageSession: it can only be resolved within a "message handler context", e.g. it does not work if I want to use the repository elsewhere, like a IHostedService in Microsoft's generic host.

In fact, if I try to resolve the ISqlStorageSession in a IHostedService, it fails with a NullReferenceException, at the resolve callback. For us, it would be great if we could either detect if the ISqlStorageSession is available or not. On suggestion is that instead of throwing an exception, return default (or a session without a connection?) to signal that there is no storage session present. A behavior like that could be used for us to fallback to opening a new connection if the session is not present.

sebboss commented 1 year ago

Hi

I would like to catch up on this although i don't have exaclty the same problem as the opener, but i would like to use dependency injection in the connection-builder too.

My scenario is the following, we would like to use azure managed identity to access our azure sql server as persistence. What i found so far is, that you can do this like that: https://discuss.particular.net/t/outbox-with-dapper-orm/3038/2

var accessToken = FetchAccessToken(); // Must fetch the Azure Managed Identity access token 

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        var conn = new SqlConnection("Data Source=<AZURE-SQL-SERVERNAME>; Initial Catalog=<DATABASE>;");
        conn.AccessToken = accessToken; // Assign token in addition to creating the connection.
        return conn;
    });

The problem in this example is, that the token above is static and will not be refreshed until the instance is restarting. But we have expiring tokens and must refresh the token regularly. We have some kind of token-provider class which fetches a new token if the old token is expired or will expire soon. New tokens are cached in a memory cache until it is replaced with a new token.

However, this token-provider class is registered in the DI-container and the connection-builder cannot inject it.

My workaround is to instantiate the token-provoder class manually while configuering the persistence of the endpoint for NSB, but it is quite ugly.

So i would like to ask if there is another way to refresh a access token for a sql connection regularly for NSB or are there any other ways i missed?

I already tried Behavior in the pipelines to set the AccessToken of the connection using ISqlStorageSession, unfortunately, the connection is already open and setting the AccessToken ends with an error "Not allowed to change the 'AccessToken' property. The connection's current state is open.".

Thanks in adanvce and let me know if i have to open a new thread.

mikeminutillo commented 1 year ago

@sebboss Do you ever use the endpoint outside of the context of a message handler? If not, you might be able to encapsulate your access token in a static wrapper around an AsyncLocal and inject it via a behavior at an early point in the pipeline. Something like this:

static class AccessTokenHolder
{
  private static AsyncLocal<string> accessToken = new AsyncLocal<string>();

  public static string AccessToken
  {
    get { return accessToken.Value; }
    set { accessToken.Value = value; }
  }
}

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(
  connectionBuilder: () =>
  {
    var conn = new SqlConnection(...);
    conn.AccessToken = AccessTokenHolder.AccessToken;
    return conn;
  });

class SetupAccessTokenBehavior : Behavior<ITransportReceiveContext>
{
  private readonly IAccessTokenGenerator accessTokenGenerator;

  public SetupAccessTokenBehavior(IAccessTokenGenerator accessTokenGenerator)
  {
    this.accessTokenGenerator = accessTokenGenerator;
  }

  public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
  {
    AccessTokenHolder.AccessToken = accessTokenGenerator.GenerateToken();
    await next().ConfigureAwait(false);
  }
}

When a message is received, the behavior will load the access token from your generator into the async local state. When the pipeline gets to the point of trying to open a connection the access token is already there. If it does try and open a connection outside of the context of a message handler, the access token will be missing.

sebboss commented 1 year ago

Thanks @mikeminutillo!, this seems to work as you explained, but yes, we use the endpoints for some scenarios outside of a message handler, so i'll keep my solution. Stilll would be great to support DI in connection builder.

Thank you for your effort!