Closed zevisert closed 1 year ago
Welcome and thanks for contributing! For some context, I'm a member of encode but not a maintainer of this project. Reading this out of curiosity and just have a couple notes about proper usage of the context var. Someone else with more context for the project will have to do a review of the changes too.
Thanks for the review. I'm eager to move this forward. We use databases in one of our products, and when load testing this certainly fixes an issue with concurrency. Even your two comments were pretty insightful, so if you have another encode member who is interested in taking a look that would be great.
I'll also try the module level context var solution you helped me discover in our codebase to see if that can still fix the issue. As is stands Databases was (and continues) leaking memory from it's usage of non-module level contextvars, but if I can fix that here too then that's great!
Also related #155
Do you have a minimal executable example I could use to explore this?
I think #155 should be close enough, but I'll try and get a dedicated MCVE up here today
@madkinsz Here's a minimal example as a test - turns out I could get it pretty small. This fails on master, but passes perfectly fine here:
# file: tests/test_concurrent_tasks.py
import pytest
import asyncio
from tests.test_databases import async_adapter, DATABASE_URLS
from databases import Database
@pytest.mark.parametrize("database_url", DATABASE_URLS)
@async_adapter
async def test_concurrent_tasks(database_url: str):
"""
Test concurrent tasks.
"""
async with Database(database_url) as database:
# This could be an eg, fastapi / starlette endpoint
@database.transaction()
async def read_items() -> dict:
return dict(await database.fetch_one(query="SELECT 1 AS value"))
# Run 10 concurrent **tasks**
tasks = (asyncio.create_task(read_items()) for _ in range(10))
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Check the responses
assert all(isinstance(response, dict) for response in responses)
assert all(response["value"] == 1 for response in responses)
docker compose
and pytest
to run thisRebased to sign commits, all new and old tests passing, switched to module level contextvars to allow for the garbage collector to clean up unreferenced objects.
Edit: And now all linting checks pass too
Hey! I took some time to poke around at this today. It seems a little wild to use a ContextVar
to store a dictionary that includes the asyncio.Task
in it (the whole point of them is that they handle task locality for you). I'm not sure what we gain by using a context variable in this case; we can just use an instance dictionary and remove a lot of complexity.
For example, I've implemented this at https://github.com/encode/databases/compare/master...madkinsz:example%2Finstance-safe and it passes the MRE you provided (I only checked SQLite because I didn't want to deal with docker-compose)
Curious for your thoughts
we can just use an instance dictionary and remove a lot of complexity
I like that take too. Looking back I was a little too committed to getting the ContextVars
to work properly. With the constraint of only using context variables in the module scope, and the possibility of multiple database instances with connections to different db's, I needed to introduce the current task back into the context variable to keep things separated correctly. I think that what you're showing with it being stored on the instance and using asyncio.current_task
instead of a ContextVar
altogether should work just fine!
I'll give it a go with the other databases right now, and in the project were we initially ran into this problem.
Yeah, this seems great. I've adopted/merged your changes into this MR to accompany the new tests I'm contributing here. I kept a few of the assertions I had added when resolving conflicts - I saw a few of them in the existing code. Let me know what encode's stance is on assert
as well as del some_dict[some_key]
vs some_dict.pop(some_key)
as it seems like we have different styles there too.
Thanks for your help on this, I'm excited for sentry to finally stop notifying us about the errors from this issue!
Glad you think it makes sense too!
I think my last concern is the event-loop compatibility bit (e.g. trio).
@tomchristie Do you think you'd be willing to give this a look soon or point me to a better reviewer?
I don't think Databases supports other event loop implementations like trio, curio, etc. I see an old PR #357 that was working on adding anyio support, but it's in draft still.
Trying not to be demanding your time @tomchristie - I know how open source can be brutal for that.. but if you have a minute it would be great if you or someone else from encode could take a look at this. My team would really like to ship this! Thanks!
For me, it fixes old concurrent transactions issues. But during tests teardown, I am getting errors like
Added these changes to test it more https://github.com/ToGoBananas/databases/commit/5150e407d2dc53b7d4302101a84543142b4ebd6b
Also, this PR brokes code with nested transactions if second transaction in the stack executed via gather
@ToGoBananas can you provide a reproducible example? Existing tests are all passing for me locally, and here in CI as well. I'm happy to explore, but that screenshot doesn't show what you were trying to do. If you can provide something to cause this error I'll try to correct it and add new tests for it.
import asyncio
from db import get_database # just returns global instance of databases.Database (singleton)
async def run_update_from_another_task():
results = await get_database().fetch_one("update project set name = 'test2' where id = 10 RETURNING id")
assert results # we can't update newly created row
async def run():
await get_database().connect()
await get_database().execute("delete from project where id = 10")
async with get_database().transaction():
await get_database().execute("insert into project(id, name) values(10, 'test')")
results = await get_database().fetch_one("update project set name = 'test2' where id = 10 RETURNING id")
assert results # we can update newly created row
await asyncio.gather(run_update_from_another_task())
asyncio.run(run())
Gotcha, thanks for the example - that helps. Here's what I ended up running to test this more, and I have what I think is a solution for you:
Based on the documentation for this project, I'd argue that what you're showing here was not intended behavior, and comes as a side effect of the broken concurrency model I'm trying to fix. Here's the line I'm referencing:
Transaction blocks are managed as task-local state. Nested transactions are fully supported, and are implemented using database savepoints.
To me, this means the effects of transactions should be restricted to the task they are running in. Nested transactions, as far as I'm reading it here, just means that you can nest transaction blocks. The documentation does not state that nested transactions are expected to work across tasks. @tomchristie or @Kludex might be able to clarify. In code, I read that to mean that this is supported:
async def example():
db = databases.Database(os.getenv("DATABASE_URL"))
async with db.transaction():
async with db.transaction():
...
Hm it does seem like tasks should inherit the state of the parent tasks if feasible. If run_update_from_another_task
was created outside of the transaction but ran during it, I would expect it to fail. If it's created from within the transaction, it probably ought to work.
it does seem like tasks should inherit the state of the parent tasks if feasible.
That brings us back to context variables then.
If
run_update_from_another_task
was created outside of the transaction but ran during it, I would expect it to fail. If it's created from within the transaction, it probably ought to work.
Your model of how context works with tasks is slightly off I think. Context is not transferred based on where a coroutine function is defined, it's based on where it is executed. For example, the function signature for asyncio.create_task(coro, *, name=None, context=None)
shows that create_task
is responsible for defining the context that the child task is run within. The default value of None
means the task should inherit a copy of the current context, it's not based on where you defined whatever coroutine is being passed as the coro
argument.
Edit: Most of this is out of date. We are now back to ContextVar to support inheritance to child tasks.
I don't mind the switch back to ContextVars, but the consequence is that databases may need to be more clear with what is supported. The documentation for context variables says:
Important:
ContextVar
s should be created at the top module level and never in closures.Context
objects hold strong references to context variables which prevents context variables from being properly garbage collected.
I understand "closure" from that warning to also include instances. I can't find much discussion related to that warning on the PR that added that it to the docs or on the implementation itself, but I don't think it's memory safe to add ContextVars as instance attributes.
This is all to say that since ContextVar
s should be declared at module level but we want them to track instance-level properties (connection
and transaction
), the immediate consequence I'm foreseeing is that there can only ever be one databases.Database
instance per python program. That's because the global context variables that are used to track instance properties natively have no mechanism to keep track of which of possibly many databases.Database
instances the connection or transaction belongs to. That circles back to an earlier version of this PR where I had context variables containing dictionaries containing connections or transactions. It's possible to manage all of this, but it was seen as too complex/ugly initially - and even then I had an asyncio.Task
as part of the key into that mapping so I'd have to check (not certain) if that actually would work with @ToGoBananas example.
Personally, I think that the majority of databases users are not spawning new tasks within transactions, so I'd like to solve this with documentation. The implementation we have right now is simple and maintainable, and still allows for child tasks to influence a parent transaction if you pass them the connection that's used in the parent's task (like I showed above). FWIW, the good old days of threading did not have this idea of inherited context: threading.Local
variables did not pass copies of themselves to child threads and it was up to the programmer to pass in context explicitly.
Context is not transferred based on where a coroutine function is defined, it's based on where it is executed.
We're definitely on the same page about this; I just wasn't clear in my comment.
I think this pull request would be a breaking change given that example. Gathering asynchronous tasks is pretty common.
Perhaps it's possible to use context variables more cleanly. From my previous comment:
It seems a little wild to use a ContextVar to store a dictionary that includes the asyncio.Task in it (the whole point of them is that they handle task locality for you).
If we exclude the tasks from the context, it might make more sense. I can try taking a look at that implementation again too.
Cool, glad to hear it. I'll dig in a bit more as well.
I agree that this would be a breaking change as it stands right now. I didn't see that as a big deal since databases has yet to become stable (v0.7 right now), so semantic version says that any minor version change is also to be considered breaking. Considering how subtle / problematic this concurrency bug is that has been effecting my project is, I kind of anticipated a breaking change when I came here to start fixing it. I think that some things that may have been enabled by the previous concurrency model might be removed once we land on an implementation that corrects these concurrency issues that I've been referencing above.
@zevisert https://github.com/encode/databases/commit/dccb47dc57a18fe00d801d0bcdfbf9150bc0c9ae works for both examples; I converted @ToGoBananas's example into an actual test.
Summary of the latest changes:
ContextVar
s to support transaction (and connection) inheritance for tasks that are started within a context where a transaction or connection is already established. Thanks to @ToGoBananas for joining in to point out the flaw with tracking by asyncio.Task
!ContextVar
s should be module scoped, and because we should support multiple databases instances in the same program; we're storing a Mapping type in the ContextVar
. This allows us to make sure we're associating the right state with the right instance. This is essentially the core of what this PR is proposing. Thank you @madkinsz for great communication and inspiration along this path.WeakKeyDictionary
as the Mapping
type stored by the ContextVar
s. Now the garbage collector is free to clean up unused resources in these cases, thanks to us no longer storing strong references to objects involved in concurrent operations. Thank you @circlingthesun for picking up on that so quickly!@property
getters and setters to centralize the logic for the above, and it really helps to reduce the code changes made to databases/core.py
when compared to master.weakref
along with some of the other points that have been made along the way.Earlier I suggested that this PR be considered a breaking change - but with this latest approach, I think that isn't the case anymore. @madkinsz may be more mature in thinking about this than I am though, considering their work on poetry-relax. I read some of the supporting articles there, and generally agree that SemVer can be a bit unwieldy. I think that this PR is now aiming to make no breaking changes, but I can't say for sure that these corrections to concurrency won't break someone's code.
This PR has less impact now than where it had previously gotten to, given that inheritance to child tasks is supported and working correctly now.
Can you update the documentation to note that these things are task-local but inherited by child tasks?
Please take a look - feel free to edit / suggest further. I'm not sure if these updated examples are too verbose or not
Done, but see the comment below, as I now think another change is warranted before merge.
Heading to draft for a moment - the MVE in #134 seems really closely related to this issue, but is still failing in the same way as some of the other issues I've been able to resolve with this.
Okay thanks to @reclosedev's issue #134, I've circled back around to thinking that users probably don't want implicit connection inheritance across tasks, BUT do want transaction inheritance if they explicitly reuse the same connection among descendant tasks.
Their example boils down to this:
import os
import asyncio
import databases
database = databases.Database(os.environ["DB_URL"])
async def worker():
async with database.transaction():
await database.execute("SELECT 1 + 1")
async def main():
await database.connect()
await database.execute("SELECT 1 + 1")
await asyncio.gather(*[worker() for _ in range(5)])
asyncio.run(main())
That is:
Database
and .connect
to set up a connection pool with the database backend. (.connect
is and has always been lazy, actual connections are established by database.connection()
)In the current version of this PR (and on master) this example is still broken because we are using ContextVars such that the current connection is inherited by all descendant tasks. The descendant tasks cannot do parallel work within transactions, because they all share the same connection - and a single backend connection can only support nested transactions, not concurrent transactions.
This all makes me think that the best solution is for each database instance to have an attribute that tracks connections in used per-task via something like I had been playing with in 8370299b7b8bd4ed791fbdcf2805321ea12aac9f:
class Database:
_active_connections: WeakKeyDictionary[asyncio.Task, Connection]
This change would make connections task-local with no inheritance. Each new task acquires a new connection from the backend's connection pool unless a connection is explicitly provided to the task. The implicit connection inheritance to descendant tasks is what seems to be the cause of most user frustration with concurrent code. #230 explicitly calls this out.
TransactionBackends, on the other hand, should still be tracked with ContextVariables so that transactions nest given the same connection is used - that's the whole point of Connection._transaction_stack
anyway. I roughly suggested this pattern earlier when responding to @ToGoBananas example, as at the time we had no context variables at all so child tasks did not inherit connections, and he was looking for support for the case where a child task needed to influence the state of a transaction in the parent. My suggestion was to explicitly pass the connection with the open transaction to the descendant task. Passing the connection brings the state of any active transactions on that connection along with it.
Oh and another thing I found just now with the current implementation - context variables undergo a shallow clone when context is copied for new tasks. That means that we had been undoing any isolation that context variables had been providing by mutating the WeakKeyDictionary if it had been created before any descendant tasks started. Simple fix though, just need to treat those dictionaries as immutable.
Anything more I can do for anyone here to get this reviewed and released?
Thanks for your patience!
Thanks for the merge! Could we get @Kludex or @aminalaee to cut a new release so that we can actually have this bug corrected in our codebases? Is there a changelog or blog article I can help write for this? Is there a release cadence or policy that my team can plan from?
@zevisert as far as I can tell, I can make a release. I do not think anyone else is available to maintain this project.
I'd definitely appreciate if you opened a pull request for a changelog entry at https://github.com/encode/databases/blob/master/CHANGELOG.md
There is no planned release cadence for this project. I'd like to release this change and some other fixes but there is little active development here.
This commit should fix an issue with shared state with transactions.
The main issue is very well described by @circlingthesun here: https://github.com/encode/databases/issues/123#issuecomment-1028486810 - we also ran into this most reliably using locust to load test as well, but did find this issue a few times in normal usage.
The big problem is that using the
@db.transaction
decorator creates a newDatabases.Transaction
instance that, when called creates a context manager for a transaction. The problem is that in concurrent frameworks, such as ASGI servers like starlette / fastapi, it's possible to have two concurrent tasks running. Since the Transaction instance is shared between tasks, it'sself
variable is comparable to shared global state, concurrent code can't trust that no race condition has occurred without some helper mechanism like a ContextVar.Here's a mermaid diagram trying to show this:
All I've done with this PR is move the shared global state of
self._connection
andself._transaction
into local or contextvars instead of instance variables.Related issues are: #123 #134 Possibly related: #340 Related meta issue: #456