From 9e990f2838c0665d6eba54a99d50e3502fbcf767 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sun, 29 Sep 2013 15:17:35 -0400 Subject: [PATCH] * Add a new function zope.sqlalchemy.register(), which replaces the direct use of ZopeTransactionExtension to make use of the newer SQLAlchemy event system to establish instrumentation on the given Session instance/class/factory. Requires at least SQLAlchemy 0.7. --- CHANGES.txt | 11 +++++-- src/zope/sqlalchemy/README.txt | 20 ++++++++++++- src/zope/sqlalchemy/__init__.py | 2 +- src/zope/sqlalchemy/datamanager.py | 37 +++++++++++++++++++++++ src/zope/sqlalchemy/tests.py | 47 ++++++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index ba6cf50..7af47e5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,13 @@ Changes 0.7.4 (unreleased) ------------------ +* Add a new function zope.sqlalchemy.register(), which replaces the + direct use of ZopeTransactionExtension to make use + of the newer SQLAlchemy event system to establish instrumentation on + the given Session instance/class/factory. Requires at least + SQLAlchemy 0.7. + + 0.7.3 (2013-09-25) ------------------ @@ -56,7 +63,7 @@ Changes ---------------- * Remove redundant session.flush() / session.clear() on savepoint operations. - These were only needed with SQLAlchemy 0.4.x. + These were only needed with SQLAlchemy 0.4.x. * SQLAlchemy 0.6.x support. Require SQLAlchemy >= 0.5.1. @@ -101,7 +108,7 @@ Bugs fixed: Feature changes: -* For correctness and consistency with ZODB, renamed the function 'invalidate' +* For correctness and consistency with ZODB, renamed the function 'invalidate' to 'mark_changed' and the status 'invalidated' to 'changed'. 0.2 (2008-06-28) diff --git a/src/zope/sqlalchemy/README.txt b/src/zope/sqlalchemy/README.txt index 51338f2..26f1584 100644 --- a/src/zope/sqlalchemy/README.txt +++ b/src/zope/sqlalchemy/README.txt @@ -83,7 +83,7 @@ Now to define the mapper classes. ... email = Column('email', String(50)) ... user_id = Column('user_id', Integer, ForeignKey('test_users.id')) -Create an engine and setup the tables. Note that for this example to work a +Create an engine and setup the tables. Note that for this example to work a recent version of sqlite/pysqlite is required. 3.4.0 seems to be sufficient. >>> engine = create_engine(TEST_DSN, convert_unicode=True) @@ -208,6 +208,24 @@ The session must then be closed manually: >>> session.close() +Registration Using SQLAlchemy Events +==================================== + +The zope.sqlalchemy.register() function performs the same function as the +ZopeTransactionExtension, except makes use of the newer SQLAlchemy event system +which superseded the extension system as of SQLAlchemy 0.7. Usage is similar: + + >>> from zope.sqlalchemy import register + >>> Session = scoped_session(sessionmaker(bind=engine, + ... twophase=TEST_TWOPHASE)) + >>> register(Session, keep_session=True) + >>> session = Session() + >>> jack = User(id=2, name='jack') + >>> session.add(jack) + >>> transaction.commit() + >>> engine.execute("select name from test_users where id=2").scalar() + u'jack' + Development version =================== diff --git a/src/zope/sqlalchemy/__init__.py b/src/zope/sqlalchemy/__init__.py index 87fb4f7..0c9c690 100644 --- a/src/zope/sqlalchemy/__init__.py +++ b/src/zope/sqlalchemy/__init__.py @@ -14,5 +14,5 @@ __version__ = '0.7.4dev' -from zope.sqlalchemy.datamanager import ZopeTransactionExtension, mark_changed +from zope.sqlalchemy.datamanager import ZopeTransactionExtension, mark_changed, register invalidate = mark_changed diff --git a/src/zope/sqlalchemy/datamanager.py b/src/zope/sqlalchemy/datamanager.py index 6108233..57db5fa 100644 --- a/src/zope/sqlalchemy/datamanager.py +++ b/src/zope/sqlalchemy/datamanager.py @@ -240,3 +240,40 @@ def after_bulk_delete(self, session, query, query_context, result): def before_commit(self, session): assert self.transaction_manager.get().status == ZopeStatus.COMMITTING, "Transaction must be committed using the transaction manager" + + +def register(session, initial_state=STATUS_ACTIVE, + transaction_manager=zope_transaction.manager, keep_session=False): + """Register ZopeTransaction listener events on the + given Session or Session factory/class. + + This function requires at least SQLAlchemy 0.7 and makes use + of the newer sqlalchemy.event package in order to register event listeners + on the given Session. + + The session argument here may be a Session class or subclass, a + sessionmaker or scoped_session instance, or a specific Session instance. + Event listening will be specific to the scope of the type of argument + passed, including specificity to its subclass as well as its identity. + + """ + + from sqlalchemy import __version__ + assert tuple(int(x) for x in __version__.split(".")) >= (0, 7), \ + "SQLAlchemy version 0.7 or greater required to use register()" + + from sqlalchemy import event + + ext = ZopeTransactionExtension(initial_state=initial_state, + transaction_manager=transaction_manager, + keep_session=keep_session) + + event.listen(session, "after_begin", ext.after_begin) + event.listen(session, "after_attach", ext.after_attach) + event.listen(session, "after_flush", ext.after_flush) + event.listen(session, "after_bulk_update", ext.after_bulk_update) + event.listen(session, "after_bulk_delete", ext.after_bulk_delete) + event.listen(session, "before_commit", ext.before_commit) + + + diff --git a/src/zope/sqlalchemy/tests.py b/src/zope/sqlalchemy/tests.py index 51e7f45..257c9a9 100644 --- a/src/zope/sqlalchemy/tests.py +++ b/src/zope/sqlalchemy/tests.py @@ -114,6 +114,13 @@ def connect(dbapi_connection, connection_record): twophase=TEST_TWOPHASE, )) +EventSession = orm.scoped_session(orm.sessionmaker( + bind=engine, + twophase=TEST_TWOPHASE, +)) + +tx.register(EventSession) + metadata = sa.MetaData() # best to use unbound metadata @@ -330,6 +337,24 @@ def testTransactionJoining(self): [r for r in t._resources if isinstance(r, tx.SessionDataManager)], "Not joined transaction") + def testTransactionJoiningUsingRegister(self): + transaction.abort() # clean slate + t = transaction.get() + self.assertFalse( + [r for r in t._resources if isinstance(r, tx.SessionDataManager)], + "Joined transaction too early") + session = EventSession() + session.add(User(id=1, firstname='udo', lastname='juergens')) + t = transaction.get() + self.assertTrue( + [r for r in t._resources if isinstance(r, tx.SessionDataManager)], + "Not joined transaction") + transaction.abort() + conn = EventSession().connection() + self.assertTrue( + [r for r in t._resources if isinstance(r, tx.SessionDataManager)], + "Not joined transaction") + def testSavepoint(self): use_savepoint = not engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT t = transaction.get() @@ -543,6 +568,28 @@ def testBulkUpdate(self): results = engine.connect().execute(test_users.select(test_users.c.lastname == "smith")) self.assertEqual(len(results.fetchall()), 2) + def testBulkDeleteUsingRegister(self): + session = EventSession() + session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=2, firstname='heino', lastname='n/a')) + transaction.commit() + session = EventSession() + session.query(User).delete() + transaction.commit() + results = engine.connect().execute(test_users.select()) + self.assertEqual(len(results.fetchall()), 0) + + def testBulkUpdateUsingRegister(self): + session = EventSession() + session.add(User(id=1, firstname='udo', lastname='juergens')) + session.add(User(id=2, firstname='heino', lastname='n/a')) + transaction.commit() + session = EventSession() + session.query(User).update(dict(lastname="smith")) + transaction.commit() + results = engine.connect().execute(test_users.select(test_users.c.lastname == "smith")) + self.assertEqual(len(results.fetchall()), 2) + def testFailedJoin(self): # When a join is issued while the transaction is in COMMITFAILED, the # session is never closed and the session id stays in _SESSION_STATE,