Skip to content

Commit

Permalink
Merge pull request #4 from zzzeek/master
Browse files Browse the repository at this point in the history
add new register() function to establish zope listener via SQLAlchemy events
  • Loading branch information
lrowe committed Sep 29, 2013
2 parents d4b940b + 9e990f2 commit e65bdfc
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 4 deletions.
11 changes: 9 additions & 2 deletions CHANGES.txt
Expand Up @@ -4,6 +4,13 @@ Changes
0.7.4 (unreleased)
------------------

* Add a new function zope.sqlalchemy.register(), which replaces the
direct use of ZopeTransactionExtension to make use
of the newer SQLAlchemy event system to establish instrumentation on
the given Session instance/class/factory. Requires at least
SQLAlchemy 0.7.


0.7.3 (2013-09-25)
------------------

Expand Down Expand Up @@ -56,7 +63,7 @@ Changes
----------------

* Remove redundant session.flush() / session.clear() on savepoint operations.
These were only needed with SQLAlchemy 0.4.x.
These were only needed with SQLAlchemy 0.4.x.

* SQLAlchemy 0.6.x support. Require SQLAlchemy >= 0.5.1.

Expand Down Expand Up @@ -101,7 +108,7 @@ Bugs fixed:

Feature changes:

* For correctness and consistency with ZODB, renamed the function 'invalidate'
* For correctness and consistency with ZODB, renamed the function 'invalidate'
to 'mark_changed' and the status 'invalidated' to 'changed'.

0.2 (2008-06-28)
Expand Down
20 changes: 19 additions & 1 deletion src/zope/sqlalchemy/README.txt
Expand Up @@ -83,7 +83,7 @@ Now to define the mapper classes.
... email = Column('email', String(50))
... user_id = Column('user_id', Integer, ForeignKey('test_users.id'))

Create an engine and setup the tables. Note that for this example to work a
Create an engine and setup the tables. Note that for this example to work a
recent version of sqlite/pysqlite is required. 3.4.0 seems to be sufficient.

>>> engine = create_engine(TEST_DSN, convert_unicode=True)
Expand Down Expand Up @@ -208,6 +208,24 @@ The session must then be closed manually:

>>> session.close()

Registration Using SQLAlchemy Events
====================================

The zope.sqlalchemy.register() function performs the same function as the
ZopeTransactionExtension, except makes use of the newer SQLAlchemy event system
which superseded the extension system as of SQLAlchemy 0.7. Usage is similar:

>>> from zope.sqlalchemy import register
>>> Session = scoped_session(sessionmaker(bind=engine,
... twophase=TEST_TWOPHASE))
>>> register(Session, keep_session=True)
>>> session = Session()
>>> jack = User(id=2, name='jack')
>>> session.add(jack)
>>> transaction.commit()
>>> engine.execute("select name from test_users where id=2").scalar()
u'jack'


Development version
===================
Expand Down
2 changes: 1 addition & 1 deletion src/zope/sqlalchemy/__init__.py
Expand Up @@ -14,5 +14,5 @@

__version__ = '0.7.4dev'

from zope.sqlalchemy.datamanager import ZopeTransactionExtension, mark_changed
from zope.sqlalchemy.datamanager import ZopeTransactionExtension, mark_changed, register
invalidate = mark_changed
37 changes: 37 additions & 0 deletions src/zope/sqlalchemy/datamanager.py
Expand Up @@ -240,3 +240,40 @@ def after_bulk_delete(self, session, query, query_context, result):

def before_commit(self, session):
assert self.transaction_manager.get().status == ZopeStatus.COMMITTING, "Transaction must be committed using the transaction manager"


def register(session, initial_state=STATUS_ACTIVE,
transaction_manager=zope_transaction.manager, keep_session=False):
"""Register ZopeTransaction listener events on the
given Session or Session factory/class.
This function requires at least SQLAlchemy 0.7 and makes use
of the newer sqlalchemy.event package in order to register event listeners
on the given Session.
The session argument here may be a Session class or subclass, a
sessionmaker or scoped_session instance, or a specific Session instance.
Event listening will be specific to the scope of the type of argument
passed, including specificity to its subclass as well as its identity.
"""

from sqlalchemy import __version__
assert tuple(int(x) for x in __version__.split(".")) >= (0, 7), \
"SQLAlchemy version 0.7 or greater required to use register()"

from sqlalchemy import event

ext = ZopeTransactionExtension(initial_state=initial_state,
transaction_manager=transaction_manager,
keep_session=keep_session)

event.listen(session, "after_begin", ext.after_begin)
event.listen(session, "after_attach", ext.after_attach)
event.listen(session, "after_flush", ext.after_flush)
event.listen(session, "after_bulk_update", ext.after_bulk_update)
event.listen(session, "after_bulk_delete", ext.after_bulk_delete)
event.listen(session, "before_commit", ext.before_commit)



47 changes: 47 additions & 0 deletions src/zope/sqlalchemy/tests.py
Expand Up @@ -114,6 +114,13 @@ def connect(dbapi_connection, connection_record):
twophase=TEST_TWOPHASE,
))

EventSession = orm.scoped_session(orm.sessionmaker(
bind=engine,
twophase=TEST_TWOPHASE,
))

tx.register(EventSession)

metadata = sa.MetaData() # best to use unbound metadata


Expand Down Expand Up @@ -330,6 +337,24 @@ def testTransactionJoining(self):
[r for r in t._resources if isinstance(r, tx.SessionDataManager)],
"Not joined transaction")

def testTransactionJoiningUsingRegister(self):
transaction.abort() # clean slate
t = transaction.get()
self.assertFalse(
[r for r in t._resources if isinstance(r, tx.SessionDataManager)],
"Joined transaction too early")
session = EventSession()
session.add(User(id=1, firstname='udo', lastname='juergens'))
t = transaction.get()
self.assertTrue(
[r for r in t._resources if isinstance(r, tx.SessionDataManager)],
"Not joined transaction")
transaction.abort()
conn = EventSession().connection()
self.assertTrue(
[r for r in t._resources if isinstance(r, tx.SessionDataManager)],
"Not joined transaction")

def testSavepoint(self):
use_savepoint = not engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT
t = transaction.get()
Expand Down Expand Up @@ -543,6 +568,28 @@ def testBulkUpdate(self):
results = engine.connect().execute(test_users.select(test_users.c.lastname == "smith"))
self.assertEqual(len(results.fetchall()), 2)

def testBulkDeleteUsingRegister(self):
session = EventSession()
session.add(User(id=1, firstname='udo', lastname='juergens'))
session.add(User(id=2, firstname='heino', lastname='n/a'))
transaction.commit()
session = EventSession()
session.query(User).delete()
transaction.commit()
results = engine.connect().execute(test_users.select())
self.assertEqual(len(results.fetchall()), 0)

def testBulkUpdateUsingRegister(self):
session = EventSession()
session.add(User(id=1, firstname='udo', lastname='juergens'))
session.add(User(id=2, firstname='heino', lastname='n/a'))
transaction.commit()
session = EventSession()
session.query(User).update(dict(lastname="smith"))
transaction.commit()
results = engine.connect().execute(test_users.select(test_users.c.lastname == "smith"))
self.assertEqual(len(results.fetchall()), 2)

def testFailedJoin(self):
# When a join is issued while the transaction is in COMMITFAILED, the
# session is never closed and the session id stays in _SESSION_STATE,
Expand Down

0 comments on commit e65bdfc

Please sign in to comment.