pudo / dataset

Easy-to-use data handling for SQL data stores with support for implicit table creation, bulk loading, and transactions.
https://dataset.readthedocs.org/
MIT License
4.76k stars 297 forks source link

"Caching" issue with multiple connections #325

Closed c00kiemon5ter closed 4 years ago

c00kiemon5ter commented 4 years ago

Hello, we are using dataset to connect to a mysql database. The database gets multiple connections that can write and read simultaneously. We observed that if a connection does a write, then another connection does not see the new data until it writes again to the database.

I assume there is a caching layer and that is updated when a write operation takes place. This may work in single connection environments, but not in environments where there are multiple connections to the database.

This is a small test case to illustrate the problem:

# create two connections
In [141]: db1 = dataset.connect("mysql://XXX/foo")                                    
In [142]: db2 = dataset.connect("mysql://XXX/foo")                                    

# load the table, to initialize the cache (assumption)
In [143]: db1.get_table("bar").all()                                                                                                                                                                        
Out[143]: <dataset.util.ResultIter at 0x7ff53ee4b438>
In [144]: db2.get_table("bar").all()                                                                                                                                                                        
Out[144]: <dataset.util.ResultIter at 0x7ff53f3f7c88>

# insert new data from one connection
In [145]: db1.get_table("bar").insert({})                                                                                                                                                                   
Out[145]: 156

# read the new data out from another connection
# this fails as the data have been cached and did not get updated (assumption)
In [146]: list(db2.get_table("bar").find(id=156))                                                                                                                                                           
Out[146]: []

I would expect the new data to be available in the second connection.


It is important to have first read the table data. If you do not, then the first read from another connection will work, because it loads the data for the first time and it will include the new data. The second time the process is repeated though, the same as above will happen:

# create two connections
In [149]: db1 = dataset.connect("mysql://XXX/foo")                                    
In [150]: db2 = dataset.connect("mysql://XXX/foo")                                    

# insert new data from one connection
In [151]: db1.get_table("bar").insert({})                                                                                                                                                                   
Out[151]: 157

# read the new data out from another connection
# this works as this is the first time the data are read
In [152]: list(db2.get_table("bar").find(id=157))                                                                                                                                                           
Out[152]: 
[OrderedDict([('id', 157),
              ('value', None)])]

# insert new data from one connection
In [153]: db1.get_table("bar").insert({})                                                                                                                                                                   
Out[153]: 158

# read the new data out from another connection
# this fails as the data have been cached and did not get updated (assumption)
In [154]: list(db2.get_table("bar").find(id=158))                                                                                                                                                           
Out[154]: []
pudo commented 4 years ago

Thanks for filing such a nice bug report. There is caching in dataset, but it only relates to the schema metadata, not any of the actual data. SQLAlchemy (the database layer we use) has caching in it's ORM function, but I don't think the core layer, which we use, does any. To me that mainly leaves transactions as a possible culprit. Perhaps you can wrap the statements you're doing in an explicit transaction block (see dataset docs) to see if that helps?

c00kiemon5ter commented 4 years ago

The fault was on our side. Underneath SQLAlchemy works with Sessions and those need to be short-lived; created and .closed once a set of operations are done. If that isn't done, then connections that have been opened in parallel will not see the new data, as the data hasn't been commited until the session is closed (or .commited).

yinheli commented 3 years ago

@c00kiemon5ter how to solve this?

yinheli commented 3 years ago
import dataset
uri = 'mysql+pymysql://xxx:xxx@localhost:3306/test'
db1 = dataset.connect(uri)
db2 = dataset.connect(uri)

# create record
db1['bar'].insert({'v': 1})
db1['bar'].find_one(id=1)
db2['bar'].find_one(id=1)

db1['bar'].update({'v': 2, 'id': 1}, keys=['id'])
# db2 can't get last updates
db1['bar'].find_one(id=1)
db2['bar'].find_one(id=1)
c00kiemon5ter commented 3 years ago

In general, you'd need to use .commit() or .close() to ensure that the .update() has been persisted and is visible to other connections. It's been some time, and I don't remember all of the context. We still use dataset but we have moved from mysql to postgres.

When experimenting around this issue, I had written the following tests with some comments, that may be helpful to you: (sorry for the long code)

import dataset

import pytest
from pytest import raises

import sqlalchemy
from sqlalchemy.orm.session import Session

from sqlalchemy_utils import create_database
from sqlalchemy_utils import database_exists
from sqlalchemy_utils import drop_database

config = {
    "user": "root",
    "pass": "root",
    "host": "localhost",
    "db_name": "foo",
}

db_conn_str = "mysql://{user}:{pass}@{host}/{db_name}".format(**config)

def setup_module(module):
    if database_exists(db_conn_str):
        drop_database(db_conn_str)
    create_database(db_conn_str)

@pytest.fixture
def cn1():
    conn = dataset.connect(db_conn_str)
    yield conn
    conn.close()

@pytest.fixture
def cn2():
    conn = dataset.connect(db_conn_str)
    yield conn
    conn.close()

# we would expect this to succeed, but this fails
# it seems like there is cache layer that is updated not-on-reads
def test_read_fails(cn1, cn2):
    _ = list(cn1.get_table("bar").all())
    _ = list(cn2.get_table("bar").all())

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))
    r2 = list(cn2.get_table("bar").find(id=new_id))

    # XXX this is the problem: r1 should be the same as r2; but they are not
    with raises(AssertionError):
        assert r1 == r2

# it seems like there is cache layer that is updated on-write
def test_read_after_write_succeeds(cn1, cn2):
    _ = list(cn1.get_table("bar").all())
    _ = list(cn2.get_table("bar").all())

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))
    _ = cn2.get_table("bar").insert({})
    r2 = list(cn2.get_table("bar").find(id=new_id))

    assert r1 == r2

# instead of a write, lets make a transaction
# this succeeds, so, maybe,
# the begin/commit sequence sync the underlying repr
def test_tx_before_read_succeeds(cn1, cn2):
    _ = list(cn1.get_table("bar").all())
    _ = list(cn2.get_table("bar").all())

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))

    cn2.begin()
    cn2.commit()
    r2 = list(cn2.get_table("bar").find(id=new_id))

    assert r1 == r2

# instead of a tx before the read, let's put it before we even write
# this succeeds, too! so, what is going on?
# why can we fetch the new_id now, but we could not in the first case?
def test_tx_nonsense_succeeds(cn1, cn2):
    _ = list(cn1.get_table("bar").all())
    _ = list(cn2.get_table("bar").all())

    cn2.begin()
    cn2.commit()

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))

    r2 = list(cn2.get_table("bar").find(id=new_id))

    assert r1 == r2

# ok, this is strange; what we know so far is that
# given a tx on a connection that is not the one that did the write
# the next read of that connection will succeed
# so .. let's move the tx in another random place
# this fails as there a read now before the important read
def test_twice_fails(cn1, cn2):
    _ = list(cn1.get_table("bar").all())

    cn2.begin()
    cn2.commit()

    _ = list(cn2.get_table("bar").all())

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))

    r2 = list(cn2.get_table("bar").find(id=new_id))

    with raises(AssertionError):
        assert r1 == r2

# right, so, this kind means that we have to couple empty tx with reads
# yep, that works
def test_couple_tx_with_read_succeeds(cn1, cn2):
    _ = list(cn1.get_table("bar").all())

    cn2.begin()
    cn2.commit()
    _ = list(cn2.get_table("bar").all())

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))

    cn2.begin()
    cn2.commit()
    r2 = list(cn2.get_table("bar").find(id=new_id))

    assert r1 == r2

# and it looks like when the connection starts
# it's like it is already in that state when a tx has happened
def test_init_is_in_tx_state(cn1, cn2):
    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))
    r2 = list(cn2.get_table("bar").find(id=new_id))

    assert r1 == r2

# but subsequent reads need a tx
def test_after_init_need_tx_fails(cn1, cn2):
    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))
    r2 = list(cn2.get_table("bar").find(id=new_id))

    assert r1 == r2

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))
    r2 = list(cn2.get_table("bar").find(id=new_id))

    with raises(AssertionError):
        assert r1 == r2

# but subsequent reads need a tx, in any place before the read
def test_after_init_tx_succeeds(cn1, cn2):
    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))
    r2 = list(cn2.get_table("bar").find(id=new_id))

    cn2.begin()
    cn2.commit()

    assert r1 == r2

    new_id = cn1.get_table("bar").insert({})
    r1 = list(cn1.get_table("bar").find(id=new_id))
    r2 = list(cn2.get_table("bar").find(id=new_id))

    assert r1 == r2

# it is as if a tx signals the sync mechanism to take place in the next read
def test_debug():
    engine = sqlalchemy.create_engine(db_conn_str)
    t = sqlalchemy.Table("bar", sqlalchemy.MetaData(), autoload=True, autoload_with=engine)

    #qin = sqlalchemy.insert(t, [])
    qin = "INSERT INTO bar VALUES()"

    #qall = sqlalchemy.select([t])
    qall = "SELECT * FROM bar"

    cn1 = engine.connect()
    cn2 = engine.connect()

    new_id = cn1.execute(qin).lastrowid
    r1 = cn1.execute(qall).fetchall()
    r2 = cn2.execute(qall).fetchall()

    assert r1 == r2

    new_id = cn1.execute(qin).lastrowid
    r1 = cn1.execute(qall).fetchall()
    cn2.begin().commit()
    r2 = cn2.execute(qall).fetchall()

    assert r1 == r2

def test_debug_session():
    engine = sqlalchemy.create_engine(db_conn_str)
    t = sqlalchemy.Table("bar", sqlalchemy.MetaData(), autoload=True, autoload_with=engine)
    qin = sqlalchemy.insert(t, [])
    #qin = "INSERT INTO bar VALUES()"

    cn1 = Session(bind=engine)
    cn2 = Session(bind=engine)

    r1 = cn1.query(t).all()
    r2 = cn2.query(t).all()
    print("initially:", r1, r2)

    new_id = cn1.execute(qin).lastrowid
    cn1.commit() # XXX needed after write
    r1 = cn1.query(t).all()
    # XXX to make the next read on cn2 consistent we need to
    #cn2.commit() # either commit()
    cn2.close() # or close() which does commit()
    #cn2 = Session(bind=engine) # or restart the session
    r2 = cn2.query(t).all()
    print("after first addition:", r1, r2)

    assert r1 == r2

    new_id = cn1.execute(qin).lastrowid
    cn1.close() # XXX needed after write
    r1 = cn1.query(t).all()
    #cn2.commit() # either commit()
    cn2.close() # or close() which does commit()
    #cn2 = Session(bind=engine) # or restart the session
    r2 = cn2.query(t).all()

    assert r1 == r2
yinheli commented 3 years ago

Thanks for your reply.

I have solved by add isolation_level parameter.

db = dataset.connect(
    'mysql+pymysql://xxx:xxx@localhost:3306/test',
    engine_kwargs={
        'pool_recycle': 300,
        'pool_size': 10,
        'isolation_level': 'READ COMMITTED'
    }
)