Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #31: stop using the deprecated "extension" way of registering events. #34

Merged
merged 2 commits into from Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -9,3 +9,5 @@ dist/
downloads/
eggs/
parts/
include/
lib/
7 changes: 7 additions & 0 deletions CHANGES.rst
Expand Up @@ -6,6 +6,13 @@ Changes

* Add support for Python 3.7

* Fix deprecation warnings for the event system. We already used it in general
but still leveraged the old extenion mechanism in some places. Fixes #31.

To make things clearer we renamed the ZopeTransactionExtension class
to ZopeTransactionEvents. Existing code using the 'register' version stays
compatible.


1.1 (2019-01-03)
----------------
Expand Down
40 changes: 11 additions & 29 deletions src/zope/sqlalchemy/README.rst
Expand Up @@ -66,7 +66,7 @@ First the necessary imports.
>>> from sqlalchemy import *
>>> from sqlalchemy.ext.declarative import declarative_base
>>> from sqlalchemy.orm import scoped_session, sessionmaker, relation
>>> from zope.sqlalchemy import ZopeTransactionExtension
>>> from zope.sqlalchemy import register
>>> import transaction

Now to define the mapper classes.
Expand All @@ -91,16 +91,17 @@ recent version of sqlite/pysqlite is required. 3.4.0 seems to be sufficient.

Now to create the session itself. As zope is a threaded web server we must use
scoped sessions. Zope and SQLAlchemy sessions are tied together by using the
ZopeTransactionExtension from this package.
register

>>> Session = scoped_session(sessionmaker(bind=engine,
... twophase=TEST_TWOPHASE, extension=ZopeTransactionExtension()))
... twophase=TEST_TWOPHASE))

Call the scoped session factory to retrieve a session. You may call this as
many times as you like within a transaction and you will always retrieve the
same session. At present there are no users in the database.

>>> session = Session()
>>> register(session)
>>> session.query(User).all()
[]

Expand Down Expand Up @@ -163,11 +164,11 @@ to the DB.
'ben'
>>> transaction.abort()

If this is a problem you may tell the extension to place the session in the
'changed' state initially.
If this is a problem you may register the events and tell them to place the
session in the 'changed' state initially.

>>> Session.remove()
>>> Session.configure(extension=ZopeTransactionExtension('changed'))
>>> register(Session, 'changed')
>>> session = Session()
>>> conn = session.connection()
>>> conn.execute(users.update(users.c.name=='ben'), name='bob')
Expand All @@ -190,13 +191,12 @@ after a commit. You can tell by trying to access an object after committing:
Traceback (most recent call last):
DetachedInstanceError: Instance <User at ...> is not bound to a Session; attribute refresh operation cannot proceed...

To support cases where a session needs to last longer than a transaction
(useful in test suites) you can specify to keep a session when creating the
transaction extension:
To support cases where a session needs to last longer than a transaction (useful
in test suites) you can specify to keep a session when registering the events:

>>> Session = scoped_session(sessionmaker(bind=engine,
... twophase=TEST_TWOPHASE, extension=ZopeTransactionExtension(keep_session=True)))

... twophase=TEST_TWOPHASE))
>>> register(Session, keep_session=True)
>>> session = Session()
>>> bob = session.query(User).all()[0]
>>> bob.name = 'bobby'
Expand All @@ -208,24 +208,6 @@ The session must then be closed manually:

>>> session.close()

Registration Using SQLAlchemy Events
====================================

The zope.sqlalchemy.register() function performs the same function as the
ZopeTransactionExtension, except makes use of the newer SQLAlchemy event system
which superseded the extension system as of SQLAlchemy 0.7. Usage is similar:

>>> from zope.sqlalchemy import register
>>> Session = scoped_session(sessionmaker(bind=engine,
... twophase=TEST_TWOPHASE))
>>> register(Session, keep_session=True)
>>> session = Session()
>>> jack = User(id=2, name='jack')
>>> session.add(jack)
>>> transaction.commit()
>>> engine.execute("select name from test_users where id=2").scalar()
u'jack'


Development version
===================
Expand Down
3 changes: 2 additions & 1 deletion src/zope/sqlalchemy/__init__.py
Expand Up @@ -12,5 +12,6 @@
#
##############################################################################

from zope.sqlalchemy.datamanager import ZopeTransactionExtension, mark_changed, register
from zope.sqlalchemy.datamanager import ZopeTransactionEvents, mark_changed, register

invalidate = mark_changed
122 changes: 78 additions & 44 deletions src/zope/sqlalchemy/datamanager.py
Expand Up @@ -20,7 +20,6 @@
from transaction._transaction import Status as ZopeStatus
from sqlalchemy.orm.exc import ConcurrentModificationError
from sqlalchemy.exc import DBAPIError
from sqlalchemy.orm.session import SessionExtension
from sqlalchemy.engine.base import Engine

_retryable_errors = []
Expand All @@ -37,23 +36,27 @@
except ImportError:
pass
else:
_retryable_errors.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177))
_retryable_errors.append(
(cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177)
)

# 1213: Deadlock found when trying to get lock; try restarting transaction
try:
import pymysql
except ImportError:
pass
else:
_retryable_errors.append((pymysql.err.OperationalError, lambda e: e.args[0] == 1213))
_retryable_errors.append(
(pymysql.err.OperationalError, lambda e: e.args[0] == 1213)
)

# The status of the session is stored on the connection info
STATUS_ACTIVE = 'active' # session joined to transaction, writes allowed.
STATUS_CHANGED = 'changed' # data has been written
STATUS_READONLY = 'readonly' # session joined to transaction, no writes allowed.
STATUS_ACTIVE = "active" # session joined to transaction, writes allowed.
STATUS_CHANGED = "changed" # data has been written
STATUS_READONLY = "readonly" # session joined to transaction, no writes allowed.
STATUS_INVALIDATED = STATUS_CHANGED # BBB

NO_SAVEPOINT_SUPPORT = set(['sqlite'])
NO_SAVEPOINT_SUPPORT = set(["sqlite"])
ctheune marked this conversation as resolved.
Show resolved Hide resolved

_SESSION_STATE = WeakKeyDictionary() # a mapping of session -> status
# This is thread safe because you are using scoped sessions
Expand All @@ -63,6 +66,7 @@
# The two variants of the DataManager.
#


@implementer(ISavepointDataManager)
class SessionDataManager(object):
"""Integrate a top level sqlalchemy session transaction into a zope transaction
Expand All @@ -75,14 +79,16 @@ def __init__(self, session, status, transaction_manager, keep_session=False):

# Support both SQLAlchemy 1.0 and 1.1
# https://github.com/zopefoundation/zope.sqlalchemy/issues/15
_iterate_parents = getattr(session.transaction, "_iterate_self_and_parents", None) \
or session.transaction._iterate_parents
_iterate_parents = (
getattr(session.transaction, "_iterate_self_and_parents", None)
or session.transaction._iterate_parents
)

self.tx = _iterate_parents()[-1]
self.session = session
transaction_manager.get().join(self)
_SESSION_STATE[session] = status
self.state = 'init'
self.state = "init"
self.keep_session = keep_session

def _finish(self, final_state):
Expand All @@ -100,7 +106,7 @@ def _finish(self, final_state):

def abort(self, trans):
if self.tx is not None: # there may have been no work to do
self._finish('aborted')
self._finish("aborted")

def tpc_begin(self, trans):
self.session.flush()
Expand All @@ -111,19 +117,19 @@ def commit(self, trans):
session = self.session
if session.expire_on_commit:
session.expire_all()
self._finish('no work')
self._finish("no work")

def tpc_vote(self, trans):
# for a one phase data manager commit last in tpc_vote
if self.tx is not None: # there may have been no work to do
self.tx.commit()
self._finish('committed')
self._finish("committed")

def tpc_finish(self, trans):
pass

def tpc_abort(self, trans):
assert self.state is not 'committed'
assert self.state is not "committed"

def sortKey(self):
# Try to sort last, so that we vote last - we may commit in tpc_vote(),
Expand All @@ -141,11 +147,12 @@ def savepoint(self):
# support savepoints but Postgres is whitelisted independent of its
# version. Possibly additional version information should be taken
# into account (ajung)
if set(engine.url.drivername
for engine in self.session.transaction._connections.keys()
if isinstance(engine, Engine)
).intersection(NO_SAVEPOINT_SUPPORT):
raise AttributeError('savepoint')
if set(
engine.url.drivername
for engine in self.session.transaction._connections.keys()
if isinstance(engine, Engine)
).intersection(NO_SAVEPOINT_SUPPORT):
raise AttributeError("savepoint")
return self._savepoint

def _savepoint(self):
Expand All @@ -167,20 +174,21 @@ def should_retry(self, error):
class TwoPhaseSessionDataManager(SessionDataManager):
"""Two phase variant.
"""

def tpc_vote(self, trans):
if self.tx is not None: # there may have been no work to do
self.tx.prepare()
self.state = 'voted'
self.state = "voted"

def tpc_finish(self, trans):
if self.tx is not None:
self.tx.commit()
self._finish('committed')
self._finish("committed")

def tpc_abort(self, trans):
if self.tx is not None: # we may not have voted, and been aborted already
self.tx.rollback()
self._finish('aborted commit')
self._finish("aborted commit")

def sortKey(self):
# Sort normally
Expand All @@ -189,7 +197,6 @@ def sortKey(self):

@implementer(IDataManagerSavepoint)
class SessionSavepoint:

def __init__(self, session):
self.session = session
self.transaction = session.begin_nested()
Expand All @@ -199,7 +206,12 @@ def rollback(self):
self.transaction.rollback()


def join_transaction(session, initial_state=STATUS_ACTIVE, transaction_manager=zope_transaction.manager, keep_session=False):
def join_transaction(
session,
initial_state=STATUS_ACTIVE,
transaction_manager=zope_transaction.manager,
keep_session=False,
):
"""Join a session to a transaction using the appropriate datamanager.

It is safe to call this multiple times, if the session is already joined
Expand All @@ -210,43 +222,57 @@ def join_transaction(session, initial_state=STATUS_ACTIVE, transaction_manager=z
If using the default initial status of STATUS_ACTIVE, you must ensure that
mark_changed(session) is called when data is written to the database.

The ZopeTransactionExtesion SessionExtension can be used to ensure that this is
The ZopeTransactionEvents can be used to ensure that this is
called automatically after session write operations.
"""
if _SESSION_STATE.get(session, None) is None:
if session.twophase:
DataManager = TwoPhaseSessionDataManager
else:
DataManager = SessionDataManager
DataManager(session, initial_state, transaction_manager, keep_session=keep_session)
DataManager(
session, initial_state, transaction_manager, keep_session=keep_session
)


def mark_changed(session, transaction_manager=zope_transaction.manager, keep_session=False):
def mark_changed(
session, transaction_manager=zope_transaction.manager, keep_session=False
):
"""Mark a session as needing to be committed.
"""
assert _SESSION_STATE.get(session, None) is not STATUS_READONLY, "Session already registered as read only"
assert (
_SESSION_STATE.get(session, None) is not STATUS_READONLY
), "Session already registered as read only"
join_transaction(session, STATUS_CHANGED, transaction_manager, keep_session)
_SESSION_STATE[session] = STATUS_CHANGED


class ZopeTransactionExtension(SessionExtension):
class ZopeTransactionEvents(object):
"""Record that a flush has occurred on a session's connection. This allows
the DataManager to rollback rather than commit on read only transactions.
"""

def __init__(self, initial_state=STATUS_ACTIVE, transaction_manager=zope_transaction.manager, keep_session=False):
if initial_state == 'invalidated':
def __init__(
self,
initial_state=STATUS_ACTIVE,
transaction_manager=zope_transaction.manager,
keep_session=False,
):
if initial_state == "invalidated":
initial_state = STATUS_CHANGED # BBB
SessionExtension.__init__(self)
self.initial_state = initial_state
self.transaction_manager = transaction_manager
self.keep_session = keep_session

def after_begin(self, session, transaction, connection):
join_transaction(session, self.initial_state, self.transaction_manager, self.keep_session)
join_transaction(
session, self.initial_state, self.transaction_manager, self.keep_session
)

def after_attach(self, session, instance):
join_transaction(session, self.initial_state, self.transaction_manager, self.keep_session)
join_transaction(
session, self.initial_state, self.transaction_manager, self.keep_session
)

def after_flush(self, session, flush_context):
mark_changed(session, self.transaction_manager, self.keep_session)
Expand All @@ -258,13 +284,18 @@ def after_bulk_delete(self, session, query, query_context, result):
mark_changed(session, self.transaction_manager, self.keep_session)

def before_commit(self, session):
assert session.transaction.nested or \
self.transaction_manager.get().status == ZopeStatus.COMMITTING, \
"Transaction must be committed using the transaction manager"


def register(session, initial_state=STATUS_ACTIVE,
transaction_manager=zope_transaction.manager, keep_session=False):
assert (
session.transaction.nested
or self.transaction_manager.get().status == ZopeStatus.COMMITTING
), "Transaction must be committed using the transaction manager"


def register(
session,
initial_state=STATUS_ACTIVE,
transaction_manager=zope_transaction.manager,
keep_session=False,
):
"""Register ZopeTransaction listener events on the
given Session or Session factory/class.

Expand All @@ -280,12 +311,15 @@ def register(session, initial_state=STATUS_ACTIVE,
"""

from sqlalchemy import __version__
assert tuple(int(x) for x in __version__.split(".")[:2]) >= (0, 7), \
"SQLAlchemy version 0.7 or greater required to use register()"

assert tuple(int(x) for x in __version__.split(".")[:2]) >= (
0,
7,
), "SQLAlchemy version 0.7 or greater required to use register()"
ctheune marked this conversation as resolved.
Show resolved Hide resolved

from sqlalchemy import event

ext = ZopeTransactionExtension(
ext = ZopeTransactionEvents(
initial_state=initial_state,
transaction_manager=transaction_manager,
keep_session=keep_session,
Expand Down