Closed jods4 closed 5 years ago
The transaction context is indeed meant to be thread safe. And as you've correctly figure out, it's scoped to the current execution context (via AsyncLocal
).
The reason it has to be thread safe, is because something like this:
await Task.WhenAll(
Enumerable.Range(0, 1000)
.Select(n => bus.Send($"This is message number {n}"))
);
can execute on many threads in a truly concurrent fashion, and it most likely will (dispatched to thread pool threads).
I found several places where code manipulating the transaction context is not thread-safe (lock-free code is quite hard to write).
If you've found places where you believe there's a race condition or another concurrency problem, then please let me know 😄
I'm closing this issue for now... if you've come across places in the code you believe can result in concurrency issues, please let me know, and I'll reopen it.
@mookid8000 I see.
When distributing work I tend to use Task.Run
or TPL etc. but indeed if the operations are truly async you can fork/join in the way you've shown and the async context will flow across all operations, possibly concurrently.
~Which is useless with DB providers (Oracle, Sql Server) since they serialize the send operation anyway (not to mention Oracle is not even async so there's no parallelism there).~
EDIT well, second thoughts: it's useless in your minimal example but of course it could happen if someone decides to fork the handling of a message. Send
is serialized internally but the actual user work could be parallel, so it's useful.
⚠️ MSMQ transport does not seem to be synchronize Send
calls:
https://github.com/rebus-org/Rebus.Msmq/blob/master/Rebus.Msmq/Msmq/MsmqTransport.cs#L173
Yet MSDN documentation says:
Only the GetAllMessages() method is thread safe.
~Is there even a transport where concurrent writes are a thing?~
⚠️ In Rebus.Oracle
, only the Send
operation uses a semaphore to serialize access to db connection. For instance, Saga storage does not. So if you concurrently manipulate a Saga and Send a message inside the same async context, it might fail.
⚠️ I did not really check, but I don't recall seeing any synchronization code when accessing the connection in SQL Server Data bus storage either (which I copied for Oracle).
The rebus transaction scope does dispose the context before restoring previous context, which means a concurrent operation could get and misuse a disposed connection: https://github.com/rebus-org/Rebus/blob/master/Rebus/Transport/RebusTransactionScope.cs#L50
OK that's a little dubious usage because there's a race condition to know if the operation is part of the transaction or not anyway.
There are other operations that are most likely user errors if called concurrently and are written in non-thread-safe way. E.g. the RebusTransactionScope
ctor can't be called concurrently.
All this leads me to the fact that it's really hard to know which structure are thread-safe or not, how they're protected, etc.
For users, I think it would be nice to have a more precise statement about what APIs you can safely call concurrently in the same async context. Docs say the bus is fully reentrant but that may not be true for all APIs when called from the same async context.
For contributors it would be nice to have a clear pattern to implement that reentrancy. Sharing non thread-safe objects in mutable lock-free structures is really hard.
I'm having a hard time wrapping my head around
ITransactionContext
wrt to threading.On one hand, it seems it's written as a thread-safe class:
Items
is aConcurrentDictionary
, all actions queues areConcurrentQueue
. https://github.com/rebus-org/Rebus/blob/master/Rebus/Transport/TransactionContext.cs#L22ConnectionWrapper
exposes a semaphore that's used to prevent concurrent sends.On the other hand:
TransactionContext
is stored in anAsyncLocal
(insideAmbientTransactionContext
). Correct me if I'm wrong but this works likeThreadLocal
except it flows accrossawait
, so in effect the context is never exposed to concurrent / parallel code.So... what is the threading model?