datasette / datasette-enrichments

Tools for running enrichments against data stored in Datasette
https://enrichments.datasette.io
Apache License 2.0
16 stars 0 forks source link

Test failure: database table is locked: _enrichment_jobs #50

Open simonw opened 2 months ago

simonw commented 2 months ago

Getting this intermittent test failure in CI: https://github.com/datasette/datasette-enrichments-gpt/actions/runs/8863259601/job/24337099138

/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/datasette_enrichments/utils.py:42: in wait_for_job
    await db.execute("select status from _enrichment_jobs where id = ?", (job_id,))
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/datasette/database.py:267: in execute
    results = await self.execute_fn(sql_operation_in_thread)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/datasette/database.py:213: in execute_fn
    return await asyncio.get_event_loop().run_in_executor(
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/datasette/database.py:211: in in_thread
    return fn(conn)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

conn = <sqlite3.Connection object at 0x7f0d7af1ed40>

    def sql_operation_in_thread(conn):
        time_limit_ms = self.ds.sql_time_limit_ms
        if custom_time_limit and custom_time_limit < time_limit_ms:
            time_limit_ms = custom_time_limit

        with sqlite_timelimit(conn, time_limit_ms):
            try:
                cursor = conn.cursor()
>               cursor.execute(sql, params if params is not None else {})
E               sqlite3.OperationalError: database table is locked: _enrichment_jobs

/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/datasette/database.py:237: OperationalError
----------------------------- Captured stderr call -----------------------------
ERROR: conn=<sqlite3.Connection object at 0x7f0d7af1ed40>, sql = 'select status from _enrichment_jobs where id = ?', params = (1,): database table is locked: _enrichment_jobs
=============================== warnings summary ===============================
tests/test_enrichments_gpt.py::test_enrichments_gpt
tests/test_enrichments_gpt.py::test_enrichments_gpt
tests/test_enrichments_gpt.py::test_enrichments_gpt
tests/test_enrichments_gpt.py::test_enrichments_gpt
tests/test_enrichments_gpt.py::test_enrichments_gpt
tests/test_enrichments_gpt.py::test_enrichments_gpt
  /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/httpx/_client.py:1426: DeprecationWarning: The 'app' shortcut is now deprecated. Use the explicit style 'transport=ASGITransport(app=...)' instead.
    warnings.warn(message, DeprecationWarning)

tests/test_enrichments_gpt.py::test_enrichments_gpt
tests/test_enrichments_gpt.py::test_enrichments_gpt
  /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/httpx/_client.py:1559: DeprecationWarning: Setting per-request cookies=<...> is being deprecated, because the expected behaviour on cookie persistence is ambiguous. Set cookies directly on the client instance instead.
    warnings.warn(message, DeprecationWarning)

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=========================== short test summary info ============================
FAILED tests/test_enrichments_gpt.py::test_enrichments_gpt - sqlite3.OperationalError: database table is locked: _enrichment_jobs
======================== 1 failed, 8 warnings in 0.91s =========================
Task was destroyed but it is pending!
task: <Task pending name='Task-33' coro=<Enrichment.start_enrichment_in_process.<locals>.run_enrichment() running at /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/datasette_enrichments/__init__.py:323> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>>

Looks like it happens in the code that polls to see if the enrichment has finished: https://github.com/datasette/datasette-enrichments-gpt/blob/e5f5da1ccc97ade51f3c0b3f6958dcf2998cf772/tests/test_enrichments_gpt.py#L38-L47

Which calls this: https://github.com/datasette/datasette-enrichments/blob/ad61b35b541272cda10e25fcc7b2c4b5e39d2e72/datasette_enrichments/utils.py#L18-L57

Most relevant code snippet:

    # Otherwise wait for it to complete
    event = datasette._enrichment_completed_events.get((db.name, job_id))
    if not event:
        event = asyncio.Event()
        datasette._enrichment_completed_events[(db.name, job_id)] = event
    if timeout is None:
        await event.wait()
    else:
        await asyncio.wait_for(event.wait(), timeout)
simonw commented 2 months ago

Most of the writes to _enrichment_jobs happen in this task in datasette-enrichments core:

https://github.com/datasette/datasette-enrichments/blob/ad61b35b541272cda10e25fcc7b2c4b5e39d2e72/datasette_enrichments/__init__.py#L303-L378

simonw commented 2 months ago

I'm going to move this to the datasette-enrichments repository. I think it's a bug there.

simonw commented 2 months ago

My best guess is that this is happening because we are running db.execute_write() calls from inside a task that was kicked off like this:

https://github.com/datasette/datasette-enrichments/blob/ad61b35b541272cda10e25fcc7b2c4b5e39d2e72/datasette_enrichments/__init__.py#L378

That's not a pattern I have much experience with, especially within the Datasette ecosystem.

How could that result in a sqlite3.OperationalError: database table is locked error though?

simonw commented 2 months ago

A few options:

  1. Catch those errors, wait a moment and retry
  2. Teach Datasette itself to use some kind of asyncio protecting lock around these operations, to ensure that no concurrent asyncio task ever attempts to access the database when it should be locked
  3. Could I do something clever with db.execute_write_fn()? That should ensure the operation runs on the write thread.

I'll try option 3 first.

simonw commented 2 months ago

I'm trying to reliably replicate this. Here's my first attempt, but the test passed just fine:

@pytest.mark.asyncio
async def test_async_tasks_do_not_crash():
    ds = Datasette()
    db = ds.add_memory_database("test_async_tasks_do_not_crash")
    await db.execute_write("create table counter (id integer primary key, n integer)")
    await db.execute_write("insert into counter (n) values (0)")

    # We are going to spin off a bunch of tasks that all write to the counter
    async def increment():
        for i in range(10):
            await db.execute_write("update counter set n = n + 1 where id = 1")

    loop = asyncio.get_event_loop()
    for i in range(20):
        loop.create_task(increment())
    # Wait 1s
    await asyncio.sleep(1)
    # Check the counter
    counter = await db.execute("select n from counter where id = 1")
    assert counter.first()[0] == 200
simonw commented 2 months ago

Tried it against an on-disk database too, still passed on my laptop:

@pytest.mark.asyncio
async def test_async_tasks_do_not_crash(tmp_path_factory):
    tmpdir = tmp_path_factory.mktemp("test_async_tasks_do_not_crash")
    db_path = str(tmpdir / "test.db")
    ds = Datasette([db_path])
    db = ds.get_database("test")
    await db.execute_write("create table counter (id integer primary key, n integer)")
    await db.execute_write("insert into counter (n) values (0)")

    # We are going to spin off a bunch of tasks that all write to the counter
    async def increment():
        for i in range(10):
            await db.execute_write("update counter set n = n + 1 where id = 1")

    loop = asyncio.get_event_loop()
    for i in range(100):
        loop.create_task(increment())
    # Wait 1s
    await asyncio.sleep(1)
    # Check the counter
    counter = await db.execute("select n from counter where id = 1")
    assert counter.first()[0] == 1000

But maybe it would fail in CI?

simonw commented 2 months ago

That failed with errors like this:

        counter = await db.execute("select n from counter where id = 1")
>       assert counter.first()[0] == 1000
E       assert 862 == 1000

So clearly that 1s wasn't long enough for the tasks to all complete.

simonw commented 2 months ago

I've failed to replicate the error I'm seeing.

simonw commented 2 months ago

I'm going to stick that new test in the Datasette test suite anyway, as an illustration of loop.create_task().