zopefoundation / zope.sqlalchemy

Integration of SQLAlchemy with transaction management
Other
32 stars 34 forks source link

ZopeTransactionExtension breaks sqlalchemy's .merge() when no changes are made #8

Closed yundatm closed 10 years ago

yundatm commented 10 years ago

Hi there, I found a very disturbing problem in multi-threaded application. What happens:

  1. Threads 1 and 2 both read record A.
  2. Thread 1 updates record A and commits
  3. Thread 2 calls session.merge() - to read an updated version of record A
  4. BUT! when transaction.commit() is called in Thread 2, record A is updated back to the previous state, essentially reverting all changes made by Thread 1.

This doesn't happen when not using ZopeTransactionExtension (i.e. using only SQLAlchemy and it's session.commit()). Also using ZopeTransactionExtension('changed') fixes the problem. Sample code below demonstrates the problem (see comments in function thread_2). Only dependencies are sqlalchemy and zope.sqlalchemy, it uses in-memory slqite. The problem was observed on SQLAlchemy 8.1 and also the latest 9.4 (haven't tested any other versions), Zope version 0.7.2 and latest 0.7.4. SQLite is used in the example, but the same problem happens on postgresql, too.

import transaction
from time import sleep
from sqlalchemy.engine import engine_from_config
from sqlalchemy.orm.scoping import scoped_session
from zope.sqlalchemy.datamanager import ZopeTransactionExtension
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.ext.declarative.api import declarative_base
from threading import Thread
from sqlalchemy.schema import Column, Sequence
from sqlalchemy.types import Integer, String

settings = {
    # works also on postgresql (and probably any other db server)
    #'sqlalchemy.url': "postgresql://user:pass@localhost/db-dev",
    'sqlalchemy.url': "sqlite:///memory:",
    'sqlalchemy.echo': True}

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
# using the below definition fixes the problem (with some side-effects though)
#DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension('changed')))
Base = declarative_base()

class TestTable(Base):
    __tablename__ = "test_table"
    test_id = Column(Integer, Sequence('test_table_id_seq'), primary_key=True)
    value = Column(String)
    unrelated = Column(String)

def thread_1(test_id):
    dbs = DBSession()
    o = dbs.query(TestTable).get(test_id)
    sleep(0.5)  # update after 0.5s -> enough time for the second thread to read from database
    o.value = '123'
    transaction.commit()

def thread_2(test_id):
    dbs = DBSession()
    o = dbs.query(TestTable).get(test_id)
    sleep(1)  # by this time thread 1 should be finished and changes committed in the db

    # if anything (unrelated) gets updated, the problem doesn't happen
    # try uncommenting the below line and see the different output
    #o.unrelated = '456'
    transaction.commit()
    dbs = DBSession()
    o = dbs.merge(o)
    print "order num after merge " + o.value  # this should print "123", but prints "initial" <- that's wrong!
    transaction.commit()  # and then it updates value "123" back to "initial" (!!!)

if __name__ == '__main__':
    # setup sqlalchemy
    engine = engine_from_config(settings)
    Base.metadata.create_all(engine)
    DBSession.configure(bind=engine)
    Base.metadata.bind = engine

    # create one record in our test table
    dbs = DBSession()
    rec = TestTable(value='initial', unrelated='initial')
    dbs.add(rec)
    dbs.flush()
    test_id = rec.test_id
    transaction.commit()

    # now run two threads parallelly
    t1 = Thread(target=thread_1, args=(test_id,))
    t2 = Thread(target=thread_2, args=(test_id,))
    t1.start()
    t2.start()

    # wait for both of them to finish
    t1.join()
    t2.join()
lrowe commented 10 years ago

(Note that "sqlite:///memory:" is a file, changing to "sqlite:///:memory:" does not work due to each thread having its own sqlite memory db.)

From the README:

By default, zope.sqlalchemy puts sessions in an 'active' state when they are first used. ORM write operations automatically move the session into a 'changed' state. This avoids unnecessary database commits.

This means that the first transaction.commit() in thread_2 will result in a session.close() instead of a session.commit(). Removing the ZopeTransactionExtension and using session.close() here and session.commit() elsewhere shows the same behaviour.

That difference in behaviour is because SQLAlchemy sessions by default have expire_on_commit=True. When the session is closed rather than committed the instances are not expired. Adding in a manual call to dbs.expire_all() fixes the problem:

from sqlalchemy import inspect

def thread_2(test_id):
    dbs = DBSession()
    o = dbs.query(TestTable).get(test_id)
    sleep(1)  # by this time thread 1 should be finished and changes committed in the db
    print 'expired: %r' % inspect(o).expired
    dbs.expire_all()  # must come before transaction.commit() / session.close()
    transaction.commit()
    print 'expired: %r' % inspect(o).expired
    dbs = DBSession()
    o = dbs.merge(o)
    print "order num after merge " + o.value  # this should print "123", but prints "initial" <- that's wrong!
    transaction.commit()  # and then it updates value "123" back to "initial" (!!!)

Possibly zope.sqlalchemy should call session.expire_all() before calling session.close() when there were no changes to commit.

yundatm commented 10 years ago

Thanks for quick response and for explanation! You're right, calling dbs.expire_all() fixes it. I do believe zope.sqlalchemy should do that automatically, otherwise it would have to be executed everywhere before commit (and only if there is no change). Also considering the destructive consequences... I mean it would be acceptable if the new value wasn't read from the db at that stage. The problem is that it is actually updated back to the database(!). Cheers, Martin.

yundatm commented 10 years ago

Thanks heaps, that was quick! :-)