Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZopeTransactionExtension breaks sqlalchemy's .merge() when no changes are made #8

Closed
yundatm opened this issue Jun 17, 2014 · 3 comments

Comments

@yundatm
Copy link

yundatm commented Jun 17, 2014

Hi there,
I found a very disturbing problem in multi-threaded application. What happens:

  1. Threads 1 and 2 both read record A.
  2. Thread 1 updates record A and commits
  3. Thread 2 calls session.merge() - to read an updated version of record A
  4. BUT! when transaction.commit() is called in Thread 2, record A is updated back to the previous state, essentially reverting all changes made by Thread 1.

This doesn't happen when not using ZopeTransactionExtension (i.e. using only SQLAlchemy and it's session.commit()). Also using ZopeTransactionExtension('changed') fixes the problem.
Sample code below demonstrates the problem (see comments in function thread_2). Only dependencies are sqlalchemy and zope.sqlalchemy, it uses in-memory slqite.
The problem was observed on SQLAlchemy 8.1 and also the latest 9.4 (haven't tested any other versions), Zope version 0.7.2 and latest 0.7.4.
SQLite is used in the example, but the same problem happens on postgresql, too.

import transaction
from time import sleep
from sqlalchemy.engine import engine_from_config
from sqlalchemy.orm.scoping import scoped_session
from zope.sqlalchemy.datamanager import ZopeTransactionExtension
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.ext.declarative.api import declarative_base
from threading import Thread
from sqlalchemy.schema import Column, Sequence
from sqlalchemy.types import Integer, String


settings = {
    # works also on postgresql (and probably any other db server)
    #'sqlalchemy.url': "postgresql://user:pass@localhost/db-dev",
    'sqlalchemy.url': "sqlite:///memory:",
    'sqlalchemy.echo': True}

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
# using the below definition fixes the problem (with some side-effects though)
#DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension('changed')))
Base = declarative_base()


class TestTable(Base):
    __tablename__ = "test_table"
    test_id = Column(Integer, Sequence('test_table_id_seq'), primary_key=True)
    value = Column(String)
    unrelated = Column(String)


def thread_1(test_id):
    dbs = DBSession()
    o = dbs.query(TestTable).get(test_id)
    sleep(0.5)  # update after 0.5s -> enough time for the second thread to read from database
    o.value = '123'
    transaction.commit()


def thread_2(test_id):
    dbs = DBSession()
    o = dbs.query(TestTable).get(test_id)
    sleep(1)  # by this time thread 1 should be finished and changes committed in the db

    # if anything (unrelated) gets updated, the problem doesn't happen
    # try uncommenting the below line and see the different output
    #o.unrelated = '456'
    transaction.commit()
    dbs = DBSession()
    o = dbs.merge(o)
    print "order num after merge " + o.value  # this should print "123", but prints "initial" <- that's wrong!
    transaction.commit()  # and then it updates value "123" back to "initial" (!!!)


if __name__ == '__main__':
    # setup sqlalchemy
    engine = engine_from_config(settings)
    Base.metadata.create_all(engine)
    DBSession.configure(bind=engine)
    Base.metadata.bind = engine

    # create one record in our test table
    dbs = DBSession()
    rec = TestTable(value='initial', unrelated='initial')
    dbs.add(rec)
    dbs.flush()
    test_id = rec.test_id
    transaction.commit()

    # now run two threads parallelly
    t1 = Thread(target=thread_1, args=(test_id,))
    t2 = Thread(target=thread_2, args=(test_id,))
    t1.start()
    t2.start()

    # wait for both of them to finish
    t1.join()
    t2.join()
@lrowe
Copy link
Contributor

lrowe commented Jun 17, 2014

(Note that "sqlite:///memory:" is a file, changing to "sqlite:///:memory:" does not work due to each thread having its own sqlite memory db.)

From the README:

By default, zope.sqlalchemy puts sessions in an 'active' state when they are first used. ORM write operations automatically move the session into a 'changed' state. This avoids unnecessary database commits.

This means that the first transaction.commit() in thread_2 will result in a session.close() instead of a session.commit(). Removing the ZopeTransactionExtension and using session.close() here and session.commit() elsewhere shows the same behaviour.

That difference in behaviour is because SQLAlchemy sessions by default have expire_on_commit=True. When the session is closed rather than committed the instances are not expired. Adding in a manual call to dbs.expire_all() fixes the problem:

from sqlalchemy import inspect

def thread_2(test_id):
    dbs = DBSession()
    o = dbs.query(TestTable).get(test_id)
    sleep(1)  # by this time thread 1 should be finished and changes committed in the db
    print 'expired: %r' % inspect(o).expired
    dbs.expire_all()  # must come before transaction.commit() / session.close()
    transaction.commit()
    print 'expired: %r' % inspect(o).expired
    dbs = DBSession()
    o = dbs.merge(o)
    print "order num after merge " + o.value  # this should print "123", but prints "initial" <- that's wrong!
    transaction.commit()  # and then it updates value "123" back to "initial" (!!!)

Possibly zope.sqlalchemy should call session.expire_all() before calling session.close() when there were no changes to commit.

@yundatm
Copy link
Author

yundatm commented Jun 17, 2014

Thanks for quick response and for explanation!
You're right, calling dbs.expire_all() fixes it. I do believe zope.sqlalchemy should do that automatically, otherwise it would have to be executed everywhere before commit (and only if there is no change).
Also considering the destructive consequences... I mean it would be acceptable if the new value wasn't read from the db at that stage. The problem is that it is actually updated back to the database(!).
Cheers,
Martin.

@lrowe lrowe closed this as completed in a98f951 Jun 18, 2014
@yundatm
Copy link
Author

yundatm commented Jun 18, 2014

Thanks heaps, that was quick! :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants