python-gino / gino

GINO Is Not ORM - a Python asyncio ORM on SQLAlchemy core.
https://python-gino.org/
Other
2.67k stars 150 forks source link

SELECT ... FOR UPDATE SKIP LOCKED seems to not lock rows #792

Closed idmitrievsky closed 3 years ago

idmitrievsky commented 3 years ago

Describe the bug I am trying to use SELECT ... FOR UPDATE SKIP LOCKED to prevent concurrent tasks from getting the same row. It seems like it doesn't work.

To Reproduce I tried to annotate the example below with as much comments as makes sense. Let me now if something is unclear!

import asyncio
import uuid
from typing import AsyncGenerator, Iterable

import pytest
from gino import Gino

gino_db = Gino()

class ActionRelation(gino_db.Model):
    """
    A very simple table of actions.
    """
    __tablename__ = 'action'

    action_uuid = gino_db.Column(gino_db.String, primary_key=True)

async def every_action_skip_locked() -> AsyncGenerator[ActionRelation, None]:
    """
    Iterate over every action skipping locked ones.

    If two workers are running concurrently they should not be able
    to get the same row/action because `with_for_update` with `skip_locked` is used.
    """
    # get an isolated connection as per Gino docs
    async with gino_db.transaction(reuse=False, reusable=False):
        # get and lock every action that is not locked yet
        iterable: Iterable[ActionRelation] = (
            await ActionRelation.query.with_for_update(skip_locked=True).gino.all()
        )

        # the transaction is not closed
        # until this generator is exhausted
        for action in iterable:
            # update action and yield it
            # await action.update(
            #     ...
            # ).apply()
            yield action

@pytest.mark.asyncio
async def test_skip_locked():
    # init gino
    await gino_db.set_bind("postgres://postgres:postgres@localhost:5432/billing")

    # create two actions
    await ActionRelation.create(
        action_uuid=str(uuid.uuid4()),
    )
    await ActionRelation.create(
        action_uuid=str(uuid.uuid4()),
    )

    # prepare events to organize a scenario:
    # 1. first worker opens transaction and locks both rows/actions
    # 2. second worker opens transaction and select for update returns nothing because skip locked is specified
    # 3. second worker closes transaction
    # 4. first worker gets one more row/action from the generator and closes its transaction
    is_fst_worker_transaction_opened = asyncio.Event()
    is_snd_worker_transaction_closed = asyncio.Event()

    actions_accessible_by_fst_worker = []
    actions_accessible_by_snd_worker = []

    async def fst_worker():
        # when this loop starts the transaction is opened inside the generator
        async for action in every_action_skip_locked():
            is_fst_worker_transaction_opened.set()
            # first worker waits for the second worker to close its transaction before proceeding
            await is_snd_worker_transaction_closed.wait()
            actions_accessible_by_fst_worker.append(action)

    async def snd_worker():
        # second worker opens the transaction only after first worker started and opened its transaction
        await is_fst_worker_transaction_opened.wait()
        # when this loop starts the SECOND DIFFERENT transaction is opened
        async for action in every_action_skip_locked():
            # this will never execute, because all rows are locked by the first worker at this point
            actions_accessible_by_snd_worker.append(action)
        is_snd_worker_transaction_closed.set()

    fst_task = asyncio.create_task(fst_worker())
    snd_task = asyncio.create_task(snd_worker())
    await asyncio.gather(fst_task, snd_task)

    # close gino
    await gino_db.pop_bind().close()

    assert len(actions_accessible_by_fst_worker) == 2
    assert len(actions_accessible_by_snd_worker) == 0

Expected result I believe, the test should pass.

Actual result Test fails because second worker has access to every row in the table.

tmp.py::test_skip_locked FAILED                                          [100%]
tmp.py:43 (test_skip_locked)
2 != 0

Expected :0
Actual   :2

Environment (please complete the following information):

Additional context I tried understanding this behaviour myself, but couldn't. It seems like according to Gino it actually starts two different transactions. But when I used the debugger to stop execution inside the transaction async context manager and tried executing SELECT ... FOR UPDATE SKIP LOCKED with raw SQL in the terminal – I got all rows as a result, as if no rows were locked by Gino.

idmitrievsky commented 3 years ago

I see now, that the query

await ActionRelation.query.with_for_update(skip_locked=True).gino.all()

uses an implicitly acquired connection that is different from the connection acquired for the transaction (it is actually guaranteed to be different, because I specified reusable=False).

Not sure how I missed it, sorry for the trouble!