Closed JorgeCandeias closed 1 week ago
Added the MariaDB/MySQL database artefacts and updated the tests to cover them.
Notes:
RelationalOrleansQueries_ChaosTest
. This deadlocks the connection pool itself and was making some tests hang forever. This required upgrading the driver in the test project to the latest version v8.4.0.Community contribution is welcome for performance improvement. We don't need nor want MySQL so this is out of scope for us and therefore a best effort to complete this feature.
The test UnitTests.StorageTests.AdoNet.MySqlRelationalStoreTests.CancellationToken_MySql_Test
appears to be failing due to timeout. This is unrelated to this PR as far as I can tell.
Added the PostgreSQL database artefacts and updated the tests to cover them.
A few tests, which were copied from the azure queue implementation, appear flaky. One of these tests is skipped in the original due to such flakiness. However neither the SQL Server nor the MySQL implementations show flakiness for the same test, only PostgreSQL does. So perhaps something is buggy with those particular scripts. Help is welcome.
Also of note, I added a ClearAllPools()
step to the test database setup flow. This was due to PostgreSQL pooled connections not healing themselves fast enough after being force disconnected upon dropping the database, and therefore causing whatever next test to fail. Neither SQL Server nor MySQL suffer from this issue. Not sure how to address it otherwise.
As with MySQL, this is a best effort for us.
The test
UnitTests.StorageTests.AdoNet.MySqlRelationalStoreTests.CancellationToken_MySql_Test
appears to be failing due to timeout. This is unrelated to this PR as far as I can tell.
Upgrading MySql.Data
to 8.4.0 appears to have caused this failure. This test passes with the prior version 8.0.31.
So we either get connection pool deadlocks on < 8.4, or no working cancellation tokens on >= 8.4.
Pick your poison I suppose. Which one do you prefer?
Edit: Trial and error shows cancellation tokens are broken since 8.0.33 (April 2023).
We have the same issue with #8931 (I think). I think it would be safer for now to revert to 8.0.33 for the time being. Thought @ReubenBond ?
@benjaminpetit agreed, but we can't stop end users from upgrading unless we set an upper bound. We will need to investigate further and fix
Lacking control over driver versions, we could wrap all SQL calls with .WaitAsync(token)
, as a protective measure, to guarantee responsiveness to the caller. Or even WaitAsync(timeout, token)
if command timeouts also become buggy, who knows. We could log this event to let users know that their drivers are faulty.
After further testing, here are some funny findings:
The above queries stayed waiting for a schema lock forever in order to drop the database. The commands eventually time out on the client side and therefore newer tests start attempting to drop the database again. What was holding these, who knows, probably each other.
I suspect this happens due to a combination of buggy pooled connection recovery plus dropping the database all the time. So to reduce the likelihood of this happening I made the following code run before all tests:
Tested again x3. No deadlocks, all green.
I also observed in passing that a deadlock will happen quickly once the pool limit is reached but rarely before. Therefore I suspect the concurrency limit I added to the more aggressive tests may be keeping it at bay. And I also suspect the other tests never reach this concurrency to begin with, at least I never observed it. However that limit didn't seem to help the v8.4 drivers against MariaDB 11.3.2 in docker desktop. Neither did it help with the lack of stable connection recovery after a database drop. Only clearing the pool did that.
Anyway, it appears whatever regression there was in the driver, engine, or the interaction between both, doesn't appear to be fully fixed.
I'm not invested enough in MySQL to go down this rabbit hole any further so I'm happy with leaving the old versions as-is.
As discussed in dev channel, I too finally managed to take a look. Looks good, it was a good discussion too.
As for MySQL/MariaDB observations, indeed the RelationalStorage implementation has handling on the official Oracle connector that it just deadlocks on all versions. It might have been sensible to make it apply to any connector and be version based, but it wasn't possible back then. One option would be to ignore such things and tell "tough luck" for developers. But the aim was to make it a core piece of a software-intensive, dependable system, so seem like near zero price to pay to see if such issues could be fixed. SQL Server (the old one) and Postgres have had some intermittent deadlock issues, Oracle MySQL one seem to be persistent..?
The storage tests are capped regaring concurrency also, but due to exhausting the ThreadPool in the CI during high-concurrency banging the DB in tests more than other reasons.
I'll cross-reference https://github.com/dotnet/orleans/issues/634 as this long-standing dream is becoming real. :)
Lacking control over driver versions, we could wrap all SQL calls with
.WaitAsync(token)
, as a protective measure, to guarantee responsiveness to the caller. Or evenWaitAsync(timeout, token)
if command timeouts also become buggy, who knows. We could log this event to let users know that their drivers are faulty.
Or we could surface a configuration option to do that for the developer (see my previous). Not my call to make, but I provide the rational for why it was done it was done in https://github.com/dotnet/orleans/blob/main/src/AdoNet/Shared/Storage/RelationalStorage.cs#L273.
@veikkoeeva Thanks a lot for reviewing. Indeed I'm not sure what do to with MySQL either. I'll leave it to @ReubenBond to decide. It seems stable enough for now at least, with enough band aids around it. Yet I do hope no one has the unfortunate idea of running this provider on a MySQL database regardless of deadlocks. The implementation is quite inefficient due to the limitations of the engine and is just asking for trouble either way.
Not necessarily for this PR, but I was looking at a way to make this provider rewindable. This would improve its ability to support projections with less fuss required in the Orleans code.
From the point of view of SQL artefacts, it looks straightforward:
1) Have another table OrleansStreamArchive
(or Delivered, or History, etc, naming things is hard).
2) Update FailStreamMessage
to optionally move the delivered message to that table instead of deleting it.
3) Update GetStreamMessages
to take an @AfterMessageId
parameter with the MessageId
after which to get messages from. If the parameter is specified then all include all appropriate messages from the archive table.
One issue here arises from the StreamIds
being distributed across the QueueIds
to support a fixed set of pulling agents. I understand why this is required yet it gets in the way of efficient rewinding. Even if you know what QueueId
to target, you may still potentially touch far more foreign messages than the ones that belong to that particular StreamId
you're rewinding.
A solution here is to also store the StreamIds
as part of the clustered key. This would not change cardinality, nor affect the queue-based pulling agent, and would allow rewinding a particular StreamId
without touching anything else.
However I'm having trouble identifying the correct bits in the streaming middleware to plug into. The only rewindable provider I see is EventHubs and its implementation looks tightly coupled to how EventHubs itself works, including having its own cache.
Do you have any suggestions on how to proceed with this without causing too much havoc? @ReubenBond maybe?
As noted on discord, this PR is now ready for formal review.
@benjaminpetit is taking a look
Many thanks!
@benjaminpetit Wow did you just merge this without comment? You're more confident than I am! 😅
Well... that's embarrassing. I merged the wrong PR 😓 But I started to review it few days ago.
I still think it's a worthy addition for a preview, so let's keep it.
I will open a PR to mark this new package "prerelease" or "preview", if that's ok with you?
That's totally fine. Cheers.
This PR adds an ADO.NET Streaming provider.
This provider is intended to be a first-class sibling to the other ADO.NET providers such as Clustering, Persistence and Reminders.
Big Note
Only the Microsoft SQL Server queries are defined as of now. This is causing some provider tests to fail as they expect all RDBMS scripts to be available. My personal experience lies in SQL Server far more than others so any help with them is welcome.
Update: MariaDB/MySQL queries now defined.
Update: PostegreSQL queries now defined.
Approach
The idea here is to have "Azure Queue" like behaviour on top of a database and little else. This provider does not attempt, nor is it capable of, attaining Eventhub-like performance or capability. It does attempt to attain the best possible performance one can have in a database by using regular objects by default and some custom sql magic.
The easy:
The not so easy:
Design Overview
OrleansStreamMessage
,OrleansStreamDeadLetter
andOrleansStreamControl
.OrleansStreamMessage
holds all transient messages. New messages are added to this table upon "queueing", marked appropriately upon "dequeuing" and deleted upon successful confirmation.OrleansStreamDeadLetter
is where undelivered messages are evicted to, either through normal expiry or upon delivery failure.OrleansStreamControl
helps maintain the eviction schedule so it remains independent of cluster size.ClusterOptions.ServiceId
to aServiceId
column. This allows multiple deployments to share the same database without overlapping.ProviderId
column.QueueId
to an appropriateQueueId
column.Implementation Details
Tables:
OrleansStreamMessage
This table holds all the messages waiting to be delivered.
(ServiceId, ProviderId, QueueId, MessageId)
.MessageId
is implemented as a regular sequence. This technically makes the identifier above a non-candidate super-key (an undesirable anti-pattern) and allows for gaps inMessageId
in each(ServiceId, ProviderId, QueueId)
group. However, as the existence of gaps does not impact the implementation, there is no harm to this effect.Dequeued
contains the number of times the message was dequeued. This also doubles a message pop receipt for confirmation/deletion. The deletion request is only successful if the caller is passing the same number it received.VisibleOn
is updated upon dequeing, to the time in the future at which the message will become dequeuable again.Dequeued
reaches theMaxAttempts
option andVisibleOn
is also reached then the message is considered undequeuable and becomes eligible for eviction to dead letters.ExpiresOn
is set upon initial queueing to the time in the future upon which the message will no longer be dequeueable, regardless ofDequeued
count. Messages whereExpiredOn
is reached andVisibleOn
is also reached, regardless ofDequeued
become eligible for eviction to dead letters.CreatedOn
andModifiedOn
are for troubleshooting only and have no impact on logic.Payload
holds the binary serialized Orleans message container.Important: To avoid deadlocks from concurrent batch queries against this table, all such queries are induced to lock data rows in the same order, and that order is the same order of the clustered index above.
OrleansStreamDeadLetter
This table holds messages that failed to be successfully delivered, including:
Eviction happens in two occasions:
OrleansStreamMessage
toOrleansStreamDeadLetter
is attempted by the stream failure component on a case-by-case basis.Columns are copied from the original row in
OrleansStreamMessage
as-is with two exceptions:DeadOn
holds the time at which the message was evicted to dead letters.RemoveOn
holds the time in the future at which the message will be deleted from the dead letters table itself.OrleansStreamControl
This table is designed to hold synchronization variables at queue level to help ensure any opportunistic scheduled task remains stable, regardless of cluster size. For now, the only such task is message eviction.
EvictOn
: The time in the future after which the next opportunistic eviction task will run.At runtime, the dequeuing queries will check this table, and, when
EvictOn
is reached, will attempt to win a race to update the old value to the next schedule in the future. The query that wins that race also gets to run the eviction query. Hence, the process is opportunistic, and guaranteed to run eventually, without the need for extra background workers.Queries / Stored Procedures
QueueStreamMessage
This query adds a new message to the
OrleansStreamMessage
table with the following behaviour:Dequeued
is set to zero.VisibleOn
,CreatedOn
andModifiedOn
are set to "now".ExpiresOn
is set to "now" +@ExpiryTimeout
(seconds).GetStreamMessages
This query performs a number of steps:
OrleansStreamMessage
where:Dequeued
is underMaxAttempts
(options).VisibleOn
has been reached.ExpiresOn
has not been reached.Dequeued
by 1.VisibleOn
to now +@VisibilityTimeout
(seconds)ModifiedOn
to now.ConfirmStreamMessages
This query deletes specific messages from the
OrleansStreamMessage
. A list of messages and their pop receipts (taken from theDequeued
column at the time of dequeueing) is passed in@Items
in the form1:2|3:4|5:6
, where the first number is the message identifier and the second number is the dequeue count. Row deletion only occurs if both numbers match in the table. Otherwise, the row is assumed to have been dequeued again and therefore left alone.FailStreamMessage
This query applies failure logic to a single message in
OrleansStreamMessage
. It is called by the stream failure handler component upon message or subscription failure. The logic is:MaxAttempts
(options) then the message is made visible again.EvictStreamMessages
This query performs opportunistic eviction of a batch of messages from
OrleansStreamMessage
toOrleansStreamDeadLetter
if the eviction policy applies to them. This query is called byGetStreamMessages
at regular intervals.EvictStreamDeadLetters
This query performs opportunistic removal of dead letters from
OrleansStreamDeadLetter
. This query is called byGetStreamMessages
at regular intervals.Integration Artefacts
Middleware streaming artefacts are by and large copied from the Azure Queue and SQS implementations.
AdoNetBatchContainer
: The adonet flavour ofIBatchContainer
, same as other ones.AdoNetQueueAdapter
: The adonetIQueueAdapter
implementation, which simply forwards requests to the RDBMS queries.AdoNetQueueAdapterFactory
: The adonet factory ofIQueueAdapter
instances. Due to the lack async lifetime, this class has some logic to avoid creating more than one instance of the relational queries object.AdoNetQueueAdapterReceiver
: The adonet implementation ofIQueueAdapterReceiver
, which simply forwards requests to the RDBMS queries.AdoNetStreamFailureHandler
: The adonet implementation ofIStreamFailureHandler
, which forwards failure notifications to theFailStreamMessage
query.AdoNetStreamQueueMapper
: Maps OrleansStreamId
andQueueId
values to the appropriateQueueId
column in the message table.Benchmarks (SQL Server)
To be updated as tweaks are made.
QueueStreamMessage
Message "queueing" translates to inserting a new row at the end of a clustered table. This approaches an O(1) operation in principle, however with the expected inefficiencies:
1) Latency correlated to payload size. 2) Concurrent operations competing for writing into the transaction log.
The "queue id" abstraction appears to make no difference, given 2).
GetStreamMessages
"Dequeing" messages is a multi-step process in a SQL query plan:
1) Identify the correct leaf list given the (ServiceId, ProviderId, QueueId) tuple in the clustered table (In a binary tree, this approaches O(log(K)) where K is the number of distinct keys) 2) From the linked list identified by 1), scan through the list, skipping non-visible rows, until enough rows are identified, marking rows non-visible as they are found, up to the batch size.
The cost of 2) is comprised of O(S) where S is the number of skipped rows, plus O(B) where B is the batch size. At best case, the cost is only O(B) when no rows are skipped. At worst case O(S) = O(N) where N is the size of leaf list itself (or the table itself if only one queue is used).
In a healthy scenario, where B rows are marked and then deleted in a stable loop, this implementation will approach the best case of O(log(K)) + O(B).
In an unhealthy scenario, where rows are marked but not deleted, this implementation will march towards O(N) where N is the count of rows for the given tuple.
Given the effect above, having multiple "queue id" values may help improve dequeuing performance in a cluster demanding high throughput regardless of poison pills or consumer stability, by partitioning the leaf lists and reducing overhead from skipped rows.
Note that
QueueCount
also meansConcurrency
in the dequeuing benchmark below.Microsoft Reviewers: Open in CodeFlow