rebus-org / Rebus.Oracle

:bus: Oracle transport for Rebus
https://mookid.dk/category/rebus
Other
5 stars 10 forks source link

Feature: Enlist in OracleTransaction #13

Closed jods4 closed 4 years ago

jods4 commented 5 years ago

It would be nice to add an extension method EnlistRebus to OracleTransaction.

Function

It would do the same as Rebus.TransactionScope, i.e. Rebus operations would be part of an outer Oracle transaction.

Motivation

For the same reasons Rebus.TransactionScope exists but optimized. TransactionScope implies a distributed transaction, but that is not needed if your transport and work use the same storage (here: Oracle).

One use case would be to emulate an Outbox in this lucky same-db situation:

using (var tx = oracleConnection.BeginTransaction())
using (var rebusScope = tx.EnlistRebus())
{
  // Do transactionnal stuff in DB with tx
  await bus.SendLocal("Hello");  // Delivery is atomic with DB work
  tx.Commit();
}

Implementation

Create and return a Rebus transaction scope. Put the transaction connexion into the context.

But There is a problem with ConnectionWrapper! When disposed, it disposes both the semaphore (which it should) and the connection (which it shouldn't in this case as it doesn't own it). Suggested fix: add a connectionOwned flag to ConnectionWrapper and only dispose the connection accordingly.

Some code:

public RebusTransactionScope EnlistRebus(this OracleTransaction tx)
{
  var scope = new RebusTransactionScope();
  var context = scope.TransactionContext;
  var wrapper = new ConnectionWrapper(tx.Connection, owned: false);
  context.OnDispose(wrapper.Dispose);
  // Note: return value ignored. Add can't fail because the transaction scope is an AsyncLocal and it can't have been shared by this thread yet.
  context.Items.TryAdd(CurrentConnectionKey, wrapper);
  return scope;
}

Alternatives

This can't be done from outside this library because ConnectionWrapper is private (also CurrentConnectionKey is private but it's just a string)

jods4 commented 5 years ago

@mookid8000 so I'm working on connection/transaction management, I want to add this (i.e. being able to enlist in an existing Oracle transaction w/o DTC.

I don't understand how you're supposed to escape the ambient Rebus transaction, e.g. if you want to send a message outside of the current transaction? You can create a new transaction scope, but that's not thread-safe so it's only an option if you're not currently forking async workers, which is a supported scenario per our previous discussions. Can you give me pointers please?

mookid8000 commented 5 years ago

You can easily escape the ambient Rebus transaction by doing this:

var transactionContext = AmbientTransactionContext.Current;
AmbientTransactionContext.SetCurrent(null);
try
{
    // there's no context in here
}
finally
{
    AmbientTransactionContext.SetCurrent(transactionContext);
}

and it's "thread safe" in the sense that the context is stored in an AsyncLocal (or a named logical data slot on the call context on .NET 4.5), so it flows to spawned tasks as it should.

That was Rebus' transaction context – but you're talking about DTC, so isn't it more about .NET's TransactionScope? Since .NET 4.5.1, one if its constructors would accept a TransactionScopeAsyncFlowOption parameter, which you need to set to TransactionScopeAsyncFlowOption.Enabled – then it will flow to spawned tasks as it should.

I hope that provided pointers 🙂

jods4 commented 5 years ago

@mookid8000 That's what I guessed but I believe it's not thread-safe.

As we discussed previously, you may have other threads running concurrently using the same async context; for example if you forked your worker.

If you are in this situation, the threads running in parallel will lose their transaction during the code you've shown above. This is bad.

jods4 commented 5 years ago

@mookid8000 is my analysis above correct or am I missing something? ☝️

Also, I'm not sure I understand the Receive code: https://github.com/rebus-org/Rebus.Oracle/blob/master/Rebus.Oracle/Oracle/Transport/OracleTransport.cs#L156

It's protected by an AsyncBottleneck(20). My understanding is that this is just a semaphore that will let 20 threads in. But what is its use?

It seems to me Rebus (core) ensures each Receive is called in its own transaction scope (so each one has its own connection, no concurrency problem); and Rebus also controls the number of workers thead (parallelism).

So what is the receive bottleneck for? Is it meant to have a large number of worker threads but limiting the number of threads currently "actively" dequeuing? That can make sense, but then shouldn't the number of concurrent dequeue be much lower than 20? A number like 2-4 seems more reasonable?

mookid8000 commented 5 years ago

If you are in this situation, the threads running in parallel will lose their transaction during the code you've shown above. This is bad.

hmm, I am not entirely sure about this.... my understanding is that once the ambient context is removed, it's removed from the current execution context and thus from all clones of it (which get passed to Tasks spawned from it), but I would not expect it to be able to mutate the execution context from which it self was spawned.

mookid8000 commented 5 years ago

My understanding is that this is just a semaphore that will let 20 threads in.

yes, that's what it is 🙂 it came around when I originally made the MSSQL transport, which could easily drain the SQL connection pool (which defaults to have 100 connections, I think) if no such limitation was built in

The number 20 just turned out to be a pretty sensible limit, because then several Rebus instances could be hosted in the same process sharing the same SQL connection pool, and they would not deplete it.

And yes, it's just a wrapper around a SemaphoreSlim.

jods4 commented 5 years ago

hmm, I am not entirely sure about this.... my understanding is that once the ambient context is removed, it's removed from the current execution context and thus from all clones of it (which get passed to Tasks spawned from it), but I would not expect it to be able to mutate the execution context from which it self was spawned.

You are correct. 👍 AsyncLocal is a bit fuzzily defined but what actually happens is that a copy-on-write is established whenever you enter a new async method or start a task w/ Task.Run and friends.

So if I receive a message and fork work to multiple tasks or async methods, they all share the same transaction context. If one task decides to do new RebusTransactionScope() (either a child or a parent task) then it assigns a new value to AsyncLocal, which is only visible to itself, by virtue of copy-on-write. Neither its parent, siblings nor child tasks see the new value (forked tasks after the change would, of course).

So that works.

jods4 commented 5 years ago

I am not sure I understand everything 100% regarding my 2nd question.

My current understanding:

Next one is not 100% clear to me, can you confirm I got it right?

Am I correct?

If I got everything right, a few remarks:

💡 Shouldn't Rebus itself limit the number of workers actively trying to receive messages? It would simplify transports + it could be more efficient (as in no message -> blocked receivers shouldn't attempt to receive at all).

💡 20 seems like a lot of concurrent receivers. Hopefully processing time is significant compared to dequeuing time, which is quite fast. Under that assumption we could limit the number of receivers a lot more.

❓ How does that bottleneck actually prevent connection pool exhaustion? If you have less than 20 workers -> it's useless. If you have more than 20 workers, then workers that are "processing" all have an open connection, right? Because processing a message is done under a transaction? So if you configure 50 workers and want to max them out, you'd have 20 wokers polling and 30 processing and that would still be 50 open connections?

mookid8000 commented 5 years ago

Am I correct?

Yes 😄

With the MSSQL implementation the bottleneck turned out to be important, because one could have a high parallelism setting

.Options(o => o.SetMaxParallelism(100))

and then, because if the async receive, it would completely deplete the connection pool, starving other bits of code in need of a connection to that database.

Shouldn't Rebus itself limit the number of workers actively trying to receive messages? It would simplify transports + it could be more efficient (as in no message -> blocked receivers shouldn't attempt to receive at all).

I actually did a little experiment of sorts with something like that – it's called Rebus.AutoScaling. It works by slowly adding/removing workers, depending on the level of activity.

20 seems like a lot of concurrent receivers. Hopefully processing time is significant compared to dequeuing time, which is quite fast. Under that assumption we could limit the number of receivers a lot more.

You're right. Maybe it would make sense to make it configurable?

How does that bottleneck actually prevent connection pool exhaustion? If you have less than 20 workers -> it's useless. If you have more than 20 workers, then workers that are "processing" all have an open connection, right? Because processing a message is done under a transaction? So if you configure 50 workers and want to max them out, you'd have 20 wokers polling and 30 processing and that would still be 50 open connections?

Good question!

I think I realize now that the bottleneck solution came from when I originally realized that the connection pool could be depleted, and I found that out by running some artificial tests where several Rebus instances would all receive lots of messages, without actually doing any processing...

So you're right – since the connection and transaction are stored as part of the transaction context, and thus last for the full duration of handling the message, then it doesn't actually limit the number of connections....

There's a caveat to this though: The MSSQL transport has a "lease mode" where messages are received by acquiring a lease (with a timeout) on them. This means that once the receive operation is over, the connection could be returned to the connection pool for others to use (but it's not, I just checked.... 😐 )

mookid8000 commented 4 years ago

Hi @jods4 , how is it going? 😁 I'm just going through old issues, and I came by this one... how did it go?

jods4 commented 4 years ago

Hi @mookid8000 sorry for the silence, I was away (traveling) for the past 2 months.

It's still something that I need for our projects, so I'm gonna do it at some point as time/work allows (prob. early next year).

mookid8000 commented 4 years ago

Ah, cool! I'll leave this issue open for now then 🙂

jods4 commented 4 years ago

@mookid8000 sorry for being silent! I am overwhelmed by work + the corona situation makes stuff difficult.

Did you close this because you implemented it or because of inactivity?

If it's the latter I understand, it's been a long time. I'm working on other stuff right now but I'm pretty sure I'll come back to this later (maybe next year <_<). It's still on my radar, I'll open a new issue or PR when I do.

Take care!

mookid8000 commented 4 years ago

I closed it because of inactivity 🙂

(..) It's still on my radar (..)

Excellent! Let's just continue, when you're ready.

Stay safe 😺