From 6cbf0eb14be2a1183d56290f71486c8a60cf66c4 Mon Sep 17 00:00:00 2001 From: Christian Theune Date: Sat, 12 Oct 2019 10:29:12 +0200 Subject: [PATCH 1/2] Fix #31: stop using the deprecated "extension" way of registering events. --- .gitignore | 2 + CHANGES.rst | 7 + src/zope/sqlalchemy/README.rst | 40 ++-- src/zope/sqlalchemy/__init__.py | 3 +- src/zope/sqlalchemy/datamanager.py | 122 +++++++----- src/zope/sqlalchemy/tests.py | 301 ++++++++++++++++------------- 6 files changed, 266 insertions(+), 209 deletions(-) diff --git a/.gitignore b/.gitignore index e68e44e..a73be6d 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ dist/ downloads/ eggs/ parts/ +include/ +lib/ diff --git a/CHANGES.rst b/CHANGES.rst index 4e72379..5fd2acf 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,13 @@ Changes * Add support for Python 3.7 +* Fix deprecation warnings for the event system. We already used it in general + but still leveraged the old extenion mechanism in some places. Fixes #31. + + To make things clearer we renamed the ZopeTransactionExtension class + to ZopeTransactionEvents. Existing code using the 'register' version stays + compatible. + 1.1 (2019-01-03) ---------------- diff --git a/src/zope/sqlalchemy/README.rst b/src/zope/sqlalchemy/README.rst index f4dcbb1..818bd3e 100644 --- a/src/zope/sqlalchemy/README.rst +++ b/src/zope/sqlalchemy/README.rst @@ -66,7 +66,7 @@ First the necessary imports. >>> from sqlalchemy import * >>> from sqlalchemy.ext.declarative import declarative_base >>> from sqlalchemy.orm import scoped_session, sessionmaker, relation - >>> from zope.sqlalchemy import ZopeTransactionExtension + >>> from zope.sqlalchemy import register >>> import transaction Now to define the mapper classes. @@ -91,16 +91,17 @@ recent version of sqlite/pysqlite is required. 3.4.0 seems to be sufficient. Now to create the session itself. As zope is a threaded web server we must use scoped sessions. Zope and SQLAlchemy sessions are tied together by using the -ZopeTransactionExtension from this package. +register >>> Session = scoped_session(sessionmaker(bind=engine, - ... twophase=TEST_TWOPHASE, extension=ZopeTransactionExtension())) + ... twophase=TEST_TWOPHASE)) Call the scoped session factory to retrieve a session. You may call this as many times as you like within a transaction and you will always retrieve the same session. At present there are no users in the database. >>> session = Session() + >>> register(session) >>> session.query(User).all() [] @@ -163,11 +164,11 @@ to the DB. 'ben' >>> transaction.abort() -If this is a problem you may tell the extension to place the session in the -'changed' state initially. +If this is a problem you may register the events and tell them to place the +session in the 'changed' state initially. >>> Session.remove() - >>> Session.configure(extension=ZopeTransactionExtension('changed')) + >>> register(Session, 'changed') >>> session = Session() >>> conn = session.connection() >>> conn.execute(users.update(users.c.name=='ben'), name='bob') @@ -190,13 +191,12 @@ after a commit. You can tell by trying to access an object after committing: Traceback (most recent call last): DetachedInstanceError: Instance is not bound to a Session; attribute refresh operation cannot proceed... -To support cases where a session needs to last longer than a transaction -(useful in test suites) you can specify to keep a session when creating the -transaction extension: +To support cases where a session needs to last longer than a transaction (useful +in test suites) you can specify to keep a session when registering the events: >>> Session = scoped_session(sessionmaker(bind=engine, - ... twophase=TEST_TWOPHASE, extension=ZopeTransactionExtension(keep_session=True))) - + ... twophase=TEST_TWOPHASE)) + >>> register(Session, keep_session=True) >>> session = Session() >>> bob = session.query(User).all()[0] >>> bob.name = 'bobby' @@ -208,24 +208,6 @@ The session must then be closed manually: >>> session.close() -Registration Using SQLAlchemy Events -==================================== - -The zope.sqlalchemy.register() function performs the same function as the -ZopeTransactionExtension, except makes use of the newer SQLAlchemy event system -which superseded the extension system as of SQLAlchemy 0.7. Usage is similar: - - >>> from zope.sqlalchemy import register - >>> Session = scoped_session(sessionmaker(bind=engine, - ... twophase=TEST_TWOPHASE)) - >>> register(Session, keep_session=True) - >>> session = Session() - >>> jack = User(id=2, name='jack') - >>> session.add(jack) - >>> transaction.commit() - >>> engine.execute("select name from test_users where id=2").scalar() - u'jack' - Development version =================== diff --git a/src/zope/sqlalchemy/__init__.py b/src/zope/sqlalchemy/__init__.py index 0a606fb..34ccb16 100644 --- a/src/zope/sqlalchemy/__init__.py +++ b/src/zope/sqlalchemy/__init__.py @@ -12,5 +12,6 @@ # ############################################################################## -from zope.sqlalchemy.datamanager import ZopeTransactionExtension, mark_changed, register +from zope.sqlalchemy.datamanager import ZopeTransactionEvents, mark_changed, register + invalidate = mark_changed diff --git a/src/zope/sqlalchemy/datamanager.py b/src/zope/sqlalchemy/datamanager.py index c9006d3..39a407f 100644 --- a/src/zope/sqlalchemy/datamanager.py +++ b/src/zope/sqlalchemy/datamanager.py @@ -20,7 +20,6 @@ from transaction._transaction import Status as ZopeStatus from sqlalchemy.orm.exc import ConcurrentModificationError from sqlalchemy.exc import DBAPIError -from sqlalchemy.orm.session import SessionExtension from sqlalchemy.engine.base import Engine _retryable_errors = [] @@ -37,7 +36,9 @@ except ImportError: pass else: - _retryable_errors.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177)) + _retryable_errors.append( + (cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177) + ) # 1213: Deadlock found when trying to get lock; try restarting transaction try: @@ -45,15 +46,17 @@ except ImportError: pass else: - _retryable_errors.append((pymysql.err.OperationalError, lambda e: e.args[0] == 1213)) + _retryable_errors.append( + (pymysql.err.OperationalError, lambda e: e.args[0] == 1213) + ) # The status of the session is stored on the connection info -STATUS_ACTIVE = 'active' # session joined to transaction, writes allowed. -STATUS_CHANGED = 'changed' # data has been written -STATUS_READONLY = 'readonly' # session joined to transaction, no writes allowed. +STATUS_ACTIVE = "active" # session joined to transaction, writes allowed. +STATUS_CHANGED = "changed" # data has been written +STATUS_READONLY = "readonly" # session joined to transaction, no writes allowed. STATUS_INVALIDATED = STATUS_CHANGED # BBB -NO_SAVEPOINT_SUPPORT = set(['sqlite']) +NO_SAVEPOINT_SUPPORT = set(["sqlite"]) _SESSION_STATE = WeakKeyDictionary() # a mapping of session -> status # This is thread safe because you are using scoped sessions @@ -63,6 +66,7 @@ # The two variants of the DataManager. # + @implementer(ISavepointDataManager) class SessionDataManager(object): """Integrate a top level sqlalchemy session transaction into a zope transaction @@ -75,14 +79,16 @@ def __init__(self, session, status, transaction_manager, keep_session=False): # Support both SQLAlchemy 1.0 and 1.1 # https://github.com/zopefoundation/zope.sqlalchemy/issues/15 - _iterate_parents = getattr(session.transaction, "_iterate_self_and_parents", None) \ - or session.transaction._iterate_parents + _iterate_parents = ( + getattr(session.transaction, "_iterate_self_and_parents", None) + or session.transaction._iterate_parents + ) self.tx = _iterate_parents()[-1] self.session = session transaction_manager.get().join(self) _SESSION_STATE[session] = status - self.state = 'init' + self.state = "init" self.keep_session = keep_session def _finish(self, final_state): @@ -100,7 +106,7 @@ def _finish(self, final_state): def abort(self, trans): if self.tx is not None: # there may have been no work to do - self._finish('aborted') + self._finish("aborted") def tpc_begin(self, trans): self.session.flush() @@ -111,19 +117,19 @@ def commit(self, trans): session = self.session if session.expire_on_commit: session.expire_all() - self._finish('no work') + self._finish("no work") def tpc_vote(self, trans): # for a one phase data manager commit last in tpc_vote if self.tx is not None: # there may have been no work to do self.tx.commit() - self._finish('committed') + self._finish("committed") def tpc_finish(self, trans): pass def tpc_abort(self, trans): - assert self.state is not 'committed' + assert self.state is not "committed" def sortKey(self): # Try to sort last, so that we vote last - we may commit in tpc_vote(), @@ -141,11 +147,12 @@ def savepoint(self): # support savepoints but Postgres is whitelisted independent of its # version. Possibly additional version information should be taken # into account (ajung) - if set(engine.url.drivername - for engine in self.session.transaction._connections.keys() - if isinstance(engine, Engine) - ).intersection(NO_SAVEPOINT_SUPPORT): - raise AttributeError('savepoint') + if set( + engine.url.drivername + for engine in self.session.transaction._connections.keys() + if isinstance(engine, Engine) + ).intersection(NO_SAVEPOINT_SUPPORT): + raise AttributeError("savepoint") return self._savepoint def _savepoint(self): @@ -167,20 +174,21 @@ def should_retry(self, error): class TwoPhaseSessionDataManager(SessionDataManager): """Two phase variant. """ + def tpc_vote(self, trans): if self.tx is not None: # there may have been no work to do self.tx.prepare() - self.state = 'voted' + self.state = "voted" def tpc_finish(self, trans): if self.tx is not None: self.tx.commit() - self._finish('committed') + self._finish("committed") def tpc_abort(self, trans): if self.tx is not None: # we may not have voted, and been aborted already self.tx.rollback() - self._finish('aborted commit') + self._finish("aborted commit") def sortKey(self): # Sort normally @@ -189,7 +197,6 @@ def sortKey(self): @implementer(IDataManagerSavepoint) class SessionSavepoint: - def __init__(self, session): self.session = session self.transaction = session.begin_nested() @@ -199,7 +206,12 @@ def rollback(self): self.transaction.rollback() -def join_transaction(session, initial_state=STATUS_ACTIVE, transaction_manager=zope_transaction.manager, keep_session=False): +def join_transaction( + session, + initial_state=STATUS_ACTIVE, + transaction_manager=zope_transaction.manager, + keep_session=False, +): """Join a session to a transaction using the appropriate datamanager. It is safe to call this multiple times, if the session is already joined @@ -210,7 +222,7 @@ def join_transaction(session, initial_state=STATUS_ACTIVE, transaction_manager=z If using the default initial status of STATUS_ACTIVE, you must ensure that mark_changed(session) is called when data is written to the database. - The ZopeTransactionExtesion SessionExtension can be used to ensure that this is + The ZopeTransactionEvents can be used to ensure that this is called automatically after session write operations. """ if _SESSION_STATE.get(session, None) is None: @@ -218,35 +230,49 @@ def join_transaction(session, initial_state=STATUS_ACTIVE, transaction_manager=z DataManager = TwoPhaseSessionDataManager else: DataManager = SessionDataManager - DataManager(session, initial_state, transaction_manager, keep_session=keep_session) + DataManager( + session, initial_state, transaction_manager, keep_session=keep_session + ) -def mark_changed(session, transaction_manager=zope_transaction.manager, keep_session=False): +def mark_changed( + session, transaction_manager=zope_transaction.manager, keep_session=False +): """Mark a session as needing to be committed. """ - assert _SESSION_STATE.get(session, None) is not STATUS_READONLY, "Session already registered as read only" + assert ( + _SESSION_STATE.get(session, None) is not STATUS_READONLY + ), "Session already registered as read only" join_transaction(session, STATUS_CHANGED, transaction_manager, keep_session) _SESSION_STATE[session] = STATUS_CHANGED -class ZopeTransactionExtension(SessionExtension): +class ZopeTransactionEvents(object): """Record that a flush has occurred on a session's connection. This allows the DataManager to rollback rather than commit on read only transactions. """ - def __init__(self, initial_state=STATUS_ACTIVE, transaction_manager=zope_transaction.manager, keep_session=False): - if initial_state == 'invalidated': + def __init__( + self, + initial_state=STATUS_ACTIVE, + transaction_manager=zope_transaction.manager, + keep_session=False, + ): + if initial_state == "invalidated": initial_state = STATUS_CHANGED # BBB - SessionExtension.__init__(self) self.initial_state = initial_state self.transaction_manager = transaction_manager self.keep_session = keep_session def after_begin(self, session, transaction, connection): - join_transaction(session, self.initial_state, self.transaction_manager, self.keep_session) + join_transaction( + session, self.initial_state, self.transaction_manager, self.keep_session + ) def after_attach(self, session, instance): - join_transaction(session, self.initial_state, self.transaction_manager, self.keep_session) + join_transaction( + session, self.initial_state, self.transaction_manager, self.keep_session + ) def after_flush(self, session, flush_context): mark_changed(session, self.transaction_manager, self.keep_session) @@ -258,13 +284,18 @@ def after_bulk_delete(self, session, query, query_context, result): mark_changed(session, self.transaction_manager, self.keep_session) def before_commit(self, session): - assert session.transaction.nested or \ - self.transaction_manager.get().status == ZopeStatus.COMMITTING, \ - "Transaction must be committed using the transaction manager" - - -def register(session, initial_state=STATUS_ACTIVE, - transaction_manager=zope_transaction.manager, keep_session=False): + assert ( + session.transaction.nested + or self.transaction_manager.get().status == ZopeStatus.COMMITTING + ), "Transaction must be committed using the transaction manager" + + +def register( + session, + initial_state=STATUS_ACTIVE, + transaction_manager=zope_transaction.manager, + keep_session=False, +): """Register ZopeTransaction listener events on the given Session or Session factory/class. @@ -280,12 +311,15 @@ def register(session, initial_state=STATUS_ACTIVE, """ from sqlalchemy import __version__ - assert tuple(int(x) for x in __version__.split(".")[:2]) >= (0, 7), \ - "SQLAlchemy version 0.7 or greater required to use register()" + + assert tuple(int(x) for x in __version__.split(".")[:2]) >= ( + 0, + 7, + ), "SQLAlchemy version 0.7 or greater required to use register()" from sqlalchemy import event - ext = ZopeTransactionExtension( + ext = ZopeTransactionEvents( initial_state=initial_state, transaction_manager=transaction_manager, keep_session=keep_session, diff --git a/src/zope/sqlalchemy/tests.py b/src/zope/sqlalchemy/tests.py index 667b85f..a9ad080 100644 --- a/src/zope/sqlalchemy/tests.py +++ b/src/zope/sqlalchemy/tests.py @@ -40,8 +40,8 @@ from zope.sqlalchemy import mark_changed from zope.testing.renormalizing import RENormalizing -TEST_TWOPHASE = bool(os.environ.get('TEST_TWOPHASE')) -TEST_DSN = os.environ.get('TEST_DSN', 'sqlite:///:memory:') +TEST_TWOPHASE = bool(os.environ.get("TEST_TWOPHASE")) +TEST_DSN = os.environ.get("TEST_DSN", "sqlite:///:memory:") class SimpleModel(object): @@ -50,7 +50,7 @@ def __init__(self, **kw): setattr(self, k, v) def asDict(self): - return dict((k, v) for k, v in self.__dict__.items() if not k.startswith('_')) + return dict((k, v) for k, v in self.__dict__.items() if not k.startswith("_")) class User(SimpleModel): @@ -66,74 +66,68 @@ class Skill(SimpleModel): # See https://code.google.com/p/pysqlite-static-env/ HAS_PATCHED_PYSQLITE = False -if engine.url.drivername == 'sqlite': +if engine.url.drivername == "sqlite": try: from pysqlite2.dbapi2 import Connection except ImportError: pass else: - if hasattr(Connection, 'operation_needs_transaction_callback'): + if hasattr(Connection, "operation_needs_transaction_callback"): HAS_PATCHED_PYSQLITE = True if HAS_PATCHED_PYSQLITE: from sqlalchemy import event from zope.sqlalchemy.datamanager import NO_SAVEPOINT_SUPPORT - NO_SAVEPOINT_SUPPORT.remove('sqlite') - @event.listens_for(engine, 'connect') + NO_SAVEPOINT_SUPPORT.remove("sqlite") + + @event.listens_for(engine, "connect") def connect(dbapi_connection, connection_record): dbapi_connection.operation_needs_transaction_callback = lambda x: True -Session = orm.scoped_session(orm.sessionmaker( - bind=engine, - extension=tx.ZopeTransactionExtension(), - twophase=TEST_TWOPHASE, -)) +Session = orm.scoped_session(orm.sessionmaker(bind=engine, twophase=TEST_TWOPHASE)) +tx.register(Session) -UnboundSession = orm.scoped_session(orm.sessionmaker( - extension=tx.ZopeTransactionExtension(), - twophase=TEST_TWOPHASE, -)) +UnboundSession = orm.scoped_session(orm.sessionmaker(twophase=TEST_TWOPHASE)) +tx.register(UnboundSession) -EventSession = orm.scoped_session(orm.sessionmaker( - bind=engine, - twophase=TEST_TWOPHASE, -)) +EventSession = orm.scoped_session(orm.sessionmaker(bind=engine, twophase=TEST_TWOPHASE)) +tx.register(EventSession) -KeepSession = orm.scoped_session(orm.sessionmaker( - bind=engine, - extension=tx.ZopeTransactionExtension(keep_session=True), - twophase=TEST_TWOPHASE, -)) +KeepSession = orm.scoped_session(orm.sessionmaker(bind=engine, twophase=TEST_TWOPHASE)) +tx.register(KeepSession, keep_session=True) -tx.register(EventSession) metadata = sa.MetaData() # best to use unbound metadata test_users = sa.Table( - 'test_users', + "test_users", metadata, - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('firstname', sa.VARCHAR(255)), # mssql cannot do equality on a text type - sa.Column('lastname', sa.VARCHAR(255)), + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("firstname", sa.VARCHAR(255)), # mssql cannot do equality on a text type + sa.Column("lastname", sa.VARCHAR(255)), ) test_skills = sa.Table( - 'test_skills', + "test_skills", metadata, - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('user_id', sa.Integer), - sa.Column('name', sa.VARCHAR(255)), - sa.ForeignKeyConstraint(('user_id',), ('test_users.id',)), + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("user_id", sa.Integer), + sa.Column("name", sa.VARCHAR(255)), + sa.ForeignKeyConstraint(("user_id",), ("test_users.id",)), ) bound_metadata1 = sa.MetaData(engine) bound_metadata2 = sa.MetaData(engine2) -test_one = sa.Table('test_one', bound_metadata1, sa.Column('id', sa.Integer, primary_key=True)) -test_two = sa.Table('test_two', bound_metadata2, sa.Column('id', sa.Integer, primary_key=True)) +test_one = sa.Table( + "test_one", bound_metadata1, sa.Column("id", sa.Integer, primary_key=True) +) +test_two = sa.Table( + "test_two", bound_metadata2, sa.Column("id", sa.Integer, primary_key=True) +) class TestOne(SimpleModel): @@ -151,10 +145,13 @@ def setup_mappers(): m1 = orm.mapper( User, test_users, - properties={'skills': orm.relation( - Skill, - primaryjoin=test_users.columns['id'] == test_skills.columns['user_id']), - }) + properties={ + "skills": orm.relation( + Skill, + primaryjoin=test_users.columns["id"] == test_skills.columns["user_id"], + ) + }, + ) m2 = orm.mapper(Skill, test_skills) m3 = orm.mapper(TestOne, test_one) @@ -198,7 +195,7 @@ def tpc_vote(self, trans): raise DummyTargetRaised(e) raise DummyTargetResult(result) else: - raise DummyException('DummyDataManager cannot commit') + raise DummyException("DummyDataManager cannot commit") def tpc_finish(self, trans): pass @@ -211,7 +208,6 @@ def sortKey(self): class ZopeSQLAlchemyTests(unittest.TestCase): - def setUp(self): self.mappers = setup_mappers() metadata.drop_all(engine) @@ -224,14 +220,15 @@ def tearDown(self): def testMarkUnknownSession(self): import zope.sqlalchemy.datamanager - dummy = DummyDataManager(key='dummy.first') + + dummy = DummyDataManager(key="dummy.first") session = Session() mark_changed(session) self.assertTrue(session in zope.sqlalchemy.datamanager._SESSION_STATE) def testAbortBeforeCommit(self): # Simulate what happens in a conflict error - dummy = DummyDataManager(key='dummy.first') + dummy = DummyDataManager(key="dummy.first") session = Session() conn = session.connection() mark_changed(session) @@ -282,28 +279,30 @@ def testSimplePopulation(self): rows = query.all() self.assertEqual(len(rows), 0) - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.flush() rows = query.order_by(User.id).all() self.assertEqual(len(rows), 2) row1 = rows[0] d = row1.asDict() - self.assertEqual(d, {'firstname': 'udo', 'lastname': 'juergens', 'id': 1}) + self.assertEqual(d, {"firstname": "udo", "lastname": "juergens", "id": 1}) # bypass the session machinary - stmt = sql.select(test_users.columns).order_by('id') + stmt = sql.select(test_users.columns).order_by("id") conn = session.connection() results = conn.execute(stmt) - self.assertEqual(results.fetchall(), [(1, 'udo', 'juergens'), (2, 'heino', 'n/a')]) + self.assertEqual( + results.fetchall(), [(1, "udo", "juergens"), (2, "heino", "n/a")] + ) def testRelations(self): session = Session() - session.add(User(id=1, firstname='foo', lastname='bar')) + session.add(User(id=1, firstname="foo", lastname="bar")) - user = session.query(User).filter_by(firstname='foo')[0] - user.skills.append(Skill(id=1, name='Zope')) + user = session.query(User).filter_by(firstname="foo")[0] + user.skills.append(Skill(id=1, name="Zope")) session.flush() def testTransactionJoining(self): @@ -311,27 +310,31 @@ def testTransactionJoining(self): t = transaction.get() self.assertFalse( [r for r in t._resources if isinstance(r, tx.SessionDataManager)], - "Joined transaction too early") + "Joined transaction too early", + ) session = Session() - session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=1, firstname="udo", lastname="juergens")) t = transaction.get() # Expect this to fail with SQLAlchemy 0.4 self.assertTrue( [r for r in t._resources if isinstance(r, tx.SessionDataManager)], - "Not joined transaction") + "Not joined transaction", + ) def testTransactionJoiningUsingRegister(self): transaction.abort() # clean slate t = transaction.get() self.assertFalse( [r for r in t._resources if isinstance(r, tx.SessionDataManager)], - "Joined transaction too early") + "Joined transaction too early", + ) session = EventSession() - session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=1, firstname="udo", lastname="juergens")) t = transaction.get() self.assertTrue( [r for r in t._resources if isinstance(r, tx.SessionDataManager)], - "Not joined transaction") + "Not joined transaction", + ) def testSavepoint(self): use_savepoint = not engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT @@ -347,12 +350,12 @@ def testSavepoint(self): return # sqlite databases do not support savepoints s1 = t.savepoint() - session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=1, firstname="udo", lastname="juergens")) session.flush() self.assertTrue(len(query.all()) == 1, "Users table should have one row") s2 = t.savepoint() - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.flush() self.assertTrue(len(query.all()) == 2, "Users table should have two rows") @@ -373,15 +376,19 @@ def testRollbackAttributes(self): self.assertFalse(query.all(), "Users table should be empty") s1 = t.savepoint() - user = User(id=1, firstname='udo', lastname='juergens') + user = User(id=1, firstname="udo", lastname="juergens") session.add(user) session.flush() s2 = t.savepoint() - user.firstname = 'heino' + user.firstname = "heino" session.flush() s2.rollback() - self.assertEqual(user.firstname, 'udo', "User firstname attribute should have been rolled back") + self.assertEqual( + user.firstname, + "udo", + "User firstname attribute should have been rolled back", + ) def testCommit(self): session = Session() @@ -396,8 +403,8 @@ def testCommit(self): session = Session() query = session.query(User) - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.flush() rows = query.order_by(User.id).all() @@ -409,13 +416,13 @@ def testCommit(self): rows = query.order_by(User.id).all() self.assertEqual(len(rows), 0) - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.flush() rows = query.order_by(User.id).all() row1 = rows[0] d = row1.asDict() - self.assertEqual(d, {'firstname': 'udo', 'lastname': 'juergens', 'id': 1}) + self.assertEqual(d, {"firstname": "udo", "lastname": "juergens", "id": 1}) transaction.commit() @@ -423,7 +430,7 @@ def testCommit(self): self.assertEqual(len(rows), 2) row1 = rows[0] d = row1.asDict() - self.assertEqual(d, {'firstname': 'udo', 'lastname': 'juergens', 'id': 1}) + self.assertEqual(d, {"firstname": "udo", "lastname": "juergens", "id": 1}) # bypass the session (and transaction) machinary results = engine.connect().execute(test_users.select()) @@ -433,8 +440,8 @@ def testCommitWithSavepoint(self): if engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT: return session = Session() - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.flush() transaction.commit() @@ -458,23 +465,23 @@ def testNestedSessionCommitAllowed(self): if engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT: return session = Session() - session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=1, firstname="udo", lastname="juergens")) session.begin_nested() - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.commit() transaction.commit() def testSessionCommitDisallowed(self): session = Session() - session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=1, firstname="udo", lastname="juergens")) self.assertRaises(AssertionError, session.commit) def testTwoPhase(self): session = Session() if not session.twophase: return - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.flush() transaction.commit() @@ -484,7 +491,7 @@ def testTwoPhase(self): def target(): return engine.connect().recover_twophase() - dummy = DummyDataManager(key='~~~dummy.last', target=target) + dummy = DummyDataManager(key="~~~dummy.last", target=target) t.join(dummy) session = Session() query = session.query(User) @@ -499,11 +506,19 @@ def target(): except DummyTargetRaised as e: raise e.args[0] - self.assertEqual(len(result), 1, "Should have been one prepared transaction when dummy aborted") + self.assertEqual( + len(result), + 1, + "Should have been one prepared transaction when dummy aborted", + ) transaction.begin() - self.assertEqual(len(engine.connect().recover_twophase()), 0, "Test no outstanding prepared transactions") + self.assertEqual( + len(engine.connect().recover_twophase()), + 0, + "Test no outstanding prepared transactions", + ) def testThread(self): transaction.abort() @@ -520,15 +535,17 @@ def target(): rows = query.all() self.assertEqual(len(rows), 0) - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) session.flush() rows = query.order_by(User.id).all() self.assertEqual(len(rows), 2) row1 = rows[0] d = row1.asDict() - self.assertEqual(d, {'firstname': 'udo', 'lastname': 'juergens', 'id': 1}) + self.assertEqual( + d, {"firstname": "udo", "lastname": "juergens", "id": 1} + ) except Exception as err: global thread_error thread_error = err @@ -542,8 +559,8 @@ def target(): def testBulkDelete(self): session = Session() - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) transaction.commit() session = Session() session.query(User).delete() @@ -553,19 +570,21 @@ def testBulkDelete(self): def testBulkUpdate(self): session = Session() - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) transaction.commit() session = Session() session.query(User).update(dict(lastname="smith")) transaction.commit() - results = engine.connect().execute(test_users.select(test_users.c.lastname == "smith")) + results = engine.connect().execute( + test_users.select(test_users.c.lastname == "smith") + ) self.assertEqual(len(results.fetchall()), 2) def testBulkDeleteUsingRegister(self): session = EventSession() - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) transaction.commit() session = EventSession() session.query(User).delete() @@ -575,13 +594,15 @@ def testBulkDeleteUsingRegister(self): def testBulkUpdateUsingRegister(self): session = EventSession() - session.add(User(id=1, firstname='udo', lastname='juergens')) - session.add(User(id=2, firstname='heino', lastname='n/a')) + session.add(User(id=1, firstname="udo", lastname="juergens")) + session.add(User(id=2, firstname="heino", lastname="n/a")) transaction.commit() session = EventSession() session.query(User).update(dict(lastname="smith")) transaction.commit() - results = engine.connect().execute(test_users.select(test_users.c.lastname == "smith")) + results = engine.connect().execute( + test_users.select(test_users.c.lastname == "smith") + ) self.assertEqual(len(results.fetchall()), 2) def testFailedJoin(self): @@ -590,7 +611,7 @@ def testFailedJoin(self): # which means the session won't be joined in the future either. This # causes the session to stay open forever, potentially accumulating # data, but never issuing a commit. - dummy = DummyDataManager(key='dummy.first') + dummy = DummyDataManager(key="dummy.first") transaction.get().join(dummy) try: transaction.commit() @@ -602,15 +623,14 @@ def testFailedJoin(self): session = Session() # try to interact with the session while the transaction is still # in COMMITFAILED - self.assertRaises(TransactionFailedError, - session.query(User).all) + self.assertRaises(TransactionFailedError, session.query(User).all) transaction.abort() # start a new transaction everything should be ok now transaction.begin() session = Session() self.assertEqual([], session.query(User).all()) - session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=1, firstname="udo", lastname="juergens")) # abort transaction, session should be closed without commit transaction.abort() @@ -621,18 +641,18 @@ def testKeepSession(self): try: with transaction.manager: - session.add(User(id=1, firstname='foo', lastname='bar')) + session.add(User(id=1, firstname="foo", lastname="bar")) user = session.query(User).get(1) # if the keep_session works correctly, this transaction will not close # the session after commit with transaction.manager: - user.firstname = 'super' + user.firstname = "super" session.flush() # make sure the session is still attached to user - self.assertEqual(user.firstname, 'super') + self.assertEqual(user.firstname, "super") finally: # KeepSession does not rollback on transaction abort @@ -640,7 +660,7 @@ def testKeepSession(self): def testExpireAll(self): session = Session() - session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=1, firstname="udo", lastname="juergens")) transaction.commit() session = Session() @@ -651,7 +671,6 @@ def testExpireAll(self): class RetryTests(unittest.TestCase): - def setUp(self): self.mappers = setup_mappers() metadata.drop_all(engine) @@ -662,18 +681,12 @@ def setUp(self): # unfortunately that is not supported by cx_Oracle. e1 = sa.create_engine(TEST_DSN) e2 = sa.create_engine(TEST_DSN) - self.s1 = orm.sessionmaker( - bind=e1, - extension=tx.ZopeTransactionExtension(transaction_manager=self.tm1), - twophase=TEST_TWOPHASE, - )() - self.s2 = orm.sessionmaker( - bind=e2, - extension=tx.ZopeTransactionExtension(transaction_manager=self.tm2), - twophase=TEST_TWOPHASE, - )() + self.s1 = orm.sessionmaker(bind=e1, twophase=TEST_TWOPHASE)() + tx.register(self.s1, transaction_manager=self.tm1) + self.s2 = orm.sessionmaker(bind=e2, twophase=TEST_TWOPHASE)() + tx.register(self.s2, transaction_manager=self.tm2) self.tm1.begin() - self.s1.add(User(id=1, firstname='udo', lastname='juergens')) + self.s1.add(User(id=1, firstname="udo", lastname="juergens")) self.tm1.commit() def tearDown(self): @@ -687,12 +700,16 @@ def testRetry(self): tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2 # make sure we actually start a session. tm1.begin() - self.assertTrue(len(s1.query(User).all()) == 1, "Users table should have one row") + self.assertTrue( + len(s1.query(User).all()) == 1, "Users table should have one row" + ) tm2.begin() - self.assertTrue(len(s2.query(User).all()) == 1, "Users table should have one row") + self.assertTrue( + len(s2.query(User).all()) == 1, "Users table should have one row" + ) s1.query(User).delete() user = s2.query(User).get(1) - user.lastname = u'smith' + user.lastname = u"smith" tm1.commit() raised = False try: @@ -707,10 +724,14 @@ def testRetryThread(self): tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2 # make sure we actually start a session. tm1.begin() - self.assertTrue(len(s1.query(User).all()) == 1, "Users table should have one row") + self.assertTrue( + len(s1.query(User).all()) == 1, "Users table should have one row" + ) tm2.begin() s2.connection().execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") - self.assertTrue(len(s2.query(User).all()) == 1, "Users table should have one row") + self.assertTrue( + len(s2.query(User).all()) == 1, "Users table should have one row" + ) s1.query(User).delete() raised = False @@ -721,7 +742,7 @@ def target(): thread = threading.Thread(target=target) thread.start() try: - user = s2.query(User).with_lockmode('update').get(1) + user = s2.query(User).with_lockmode("update").get(1) except exc.DBAPIError as e: # This error wraps the underlying DBAPI module error, some of which are retryable raised = True @@ -732,7 +753,6 @@ def target(): class MultipleEngineTests(unittest.TestCase): - def setUp(self): self.mappers = setup_mappers() bound_metadata1.drop_all() @@ -760,30 +780,41 @@ def testTwoEngines(self): def tearDownReadMe(test): - Base = test.globs['Base'] - engine = test.globs['engine'] + Base = test.globs["Base"] + engine = test.globs["engine"] Base.metadata.drop_all(engine) def test_suite(): from unittest import TestSuite, makeSuite import doctest + optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS - checker = RENormalizing([ - # Python 3 includes module name in exceptions - (re.compile(r"sqlalchemy.orm.exc.DetachedInstanceError:"), - "DetachedInstanceError:"), - # Python 3 drops the u'' prefix on unicode strings - (re.compile(r"u('[^']*')"), r"\1"), - # PyPy includes __builtin__ in front of classes defined in doctests - (re.compile(r"__builtin__[.]Address"), "Address"), - ]) + checker = RENormalizing( + [ + # Python 3 includes module name in exceptions + ( + re.compile(r"sqlalchemy.orm.exc.DetachedInstanceError:"), + "DetachedInstanceError:", + ), + # Python 3 drops the u'' prefix on unicode strings + (re.compile(r"u('[^']*')"), r"\1"), + # PyPy includes __builtin__ in front of classes defined in doctests + (re.compile(r"__builtin__[.]Address"), "Address"), + ] + ) suite = TestSuite() suite.addTest(makeSuite(ZopeSQLAlchemyTests)) suite.addTest(makeSuite(MultipleEngineTests)) - if TEST_DSN.startswith('postgres') or TEST_DSN.startswith('oracle'): + if TEST_DSN.startswith("postgres") or TEST_DSN.startswith("oracle"): suite.addTest(makeSuite(RetryTests)) - suite.addTest(doctest.DocFileSuite('README.rst', optionflags=optionflags, - checker=checker, tearDown=tearDownReadMe, - globs={'TEST_DSN': TEST_DSN, 'TEST_TWOPHASE': TEST_TWOPHASE})) + suite.addTest( + doctest.DocFileSuite( + "README.rst", + optionflags=optionflags, + checker=checker, + tearDown=tearDownReadMe, + globs={"TEST_DSN": TEST_DSN, "TEST_TWOPHASE": TEST_TWOPHASE}, + ) + ) return suite From fd191b468877cc7ce108bf82a00d81937f6e0e04 Mon Sep 17 00:00:00 2001 From: Christian Theune Date: Wed, 16 Oct 2019 08:44:00 +0200 Subject: [PATCH 2/2] Require SQLAlchemy > 0.7 and remove compatibility assertion for register() Update set([]) to -> {} syntax as suggested by mgedmin. --- setup.py | 2 +- src/zope/sqlalchemy/datamanager.py | 10 +--------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/setup.py b/setup.py index ecedbb5..babebc2 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ install_requires=[ 'setuptools', - 'SQLAlchemy>=0.5.1', + 'SQLAlchemy>=0.7', 'transaction>=1.6.0', 'zope.interface>=3.6.0', ], diff --git a/src/zope/sqlalchemy/datamanager.py b/src/zope/sqlalchemy/datamanager.py index 39a407f..c95aa05 100644 --- a/src/zope/sqlalchemy/datamanager.py +++ b/src/zope/sqlalchemy/datamanager.py @@ -56,7 +56,7 @@ STATUS_READONLY = "readonly" # session joined to transaction, no writes allowed. STATUS_INVALIDATED = STATUS_CHANGED # BBB -NO_SAVEPOINT_SUPPORT = set(["sqlite"]) +NO_SAVEPOINT_SUPPORT = {"sqlite"} _SESSION_STATE = WeakKeyDictionary() # a mapping of session -> status # This is thread safe because you are using scoped sessions @@ -309,14 +309,6 @@ def register( passed, including specificity to its subclass as well as its identity. """ - - from sqlalchemy import __version__ - - assert tuple(int(x) for x in __version__.split(".")[:2]) >= ( - 0, - 7, - ), "SQLAlchemy version 0.7 or greater required to use register()" - from sqlalchemy import event ext = ZopeTransactionEvents(