Skip to content

Commit

Permalink
MOTOR-900 Fix synchro failures on PyMongo 4.1.1 (#158)
Browse files Browse the repository at this point in the history
Co-authored-by: Shane Harvey <shnhrv@gmail.com>
  • Loading branch information
blink1073 and ShaneHarvey committed Apr 21, 2022
1 parent f5ade74 commit c16a62d
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 36 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ jobs:
- name: Run tests
run: |
tox
docs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: 3.7
cache: 'pip'
cache-dependency-path: 'setup.py'
- name: Start MongoDB with Custom Options
run: |
mkdir data
mongod --fork --dbpath=$(pwd)/data --logpath=$PWD/mongo.log --setParameter enableTestCommands=1
- name: Install Python dependencies
run: |
python -m pip install -U pip tox
- name: Run docs
run: |
tox -e py3-sphinx-docs
Expand Down
10 changes: 10 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ Changelog

.. currentmodule:: motor.motor_tornado

Motor 3.0
---------

Motor 3.0 adds support for PyMongo 4.0+.


Breaking Changes
~~~~~~~~~~~~~~~~
- Prevent use of :class:`~pymongo.database.Database` and :class:`~pymongo.collection.Collection` in boolean expressions.

Motor 2.5.1
-----------

Expand Down
47 changes: 41 additions & 6 deletions motor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def watch(
start_at_operation_time=None,
session=None,
start_after=None,
comment=None,
):
"""Watch changes on this cluster.
Expand Down Expand Up @@ -202,10 +203,15 @@ def watch(
- `start_after` (optional): The same as `resume_after` except that
`start_after` can resume notifications after an invalidate event.
This option and `resume_after` are mutually exclusive.
- `comment` (optional): A user-provided comment to attach to this
command.
:Returns:
A :class:`~MotorChangeStream`.
.. versionchanged:: 3.0
Added ``comment`` parameter.
.. versionchanged:: 2.1
Added the ``start_after`` parameter.
Expand All @@ -229,6 +235,7 @@ def watch(
start_at_operation_time,
session,
start_after,
comment,
)

def __getattr__(self, name):
Expand Down Expand Up @@ -511,6 +518,7 @@ class AgnosticDatabase(AgnosticBaseProperties):
__delegate_class__ = Database

__hash__ = DelegateMethod()
__bool__ = DelegateMethod()
command = AsyncCommand(doc=docstrings.cmd_doc)
create_collection = AsyncCommand().wrap(Collection)
dereference = AsyncRead()
Expand All @@ -531,7 +539,7 @@ def __init__(self, client, name, **kwargs):

super().__init__(delegate)

def aggregate(self, pipeline, **kwargs):
def aggregate(self, pipeline, *args, **kwargs):
"""Execute an aggregation pipeline on this database.
Introduced in MongoDB 3.6.
Expand Down Expand Up @@ -588,7 +596,11 @@ async def f():

# Latent cursor that will send initial command on first "async for".
return cursor_class(
self["$cmd.aggregate"], self._async_aggregate, pipeline, **unwrap_kwargs_session(kwargs)
self["$cmd.aggregate"],
self._async_aggregate,
pipeline,
*unwrap_args_session(args),
**unwrap_kwargs_session(kwargs)
)

def watch(
Expand All @@ -602,6 +614,7 @@ def watch(
start_at_operation_time=None,
session=None,
start_after=None,
comment=None,
):
"""Watch changes on this database.
Expand Down Expand Up @@ -642,10 +655,15 @@ def watch(
- `start_after` (optional): The same as `resume_after` except that
`start_after` can resume notifications after an invalidate event.
This option and `resume_after` are mutually exclusive.
- `comment` (optional): A user-provided comment to attach to this
command.
:Returns:
A :class:`~MotorChangeStream`.
.. versionchanged:: 3.0
Added ``comment`` parameter.
.. versionchanged:: 2.1
Added the ``start_after`` parameter.
Expand All @@ -669,6 +687,7 @@ def watch(
start_at_operation_time,
session,
start_after,
comment,
)

@property
Expand Down Expand Up @@ -728,6 +747,7 @@ class AgnosticCollection(AgnosticBaseProperties):
__delegate_class__ = Collection

__hash__ = DelegateMethod()
__bool__ = DelegateMethod()
bulk_write = AsyncCommand(doc=docstrings.bulk_write_doc)
count_documents = AsyncRead()
create_index = AsyncCommand()
Expand Down Expand Up @@ -865,7 +885,7 @@ async def get_raw():

return cursor_class(cursor, self)

def aggregate(self, pipeline, **kwargs):
def aggregate(self, pipeline, *args, **kwargs):
"""Execute an aggregation pipeline on this collection.
The aggregation can be run on a secondary if the client is connected
Expand Down Expand Up @@ -956,7 +976,13 @@ async def f():
)

# Latent cursor that will send initial command on first "async for".
return cursor_class(self, self._async_aggregate, pipeline, **unwrap_kwargs_session(kwargs))
return cursor_class(
self,
self._async_aggregate,
pipeline,
*unwrap_args_session(args),
**unwrap_kwargs_session(kwargs)
)

def aggregate_raw_batches(self, pipeline, **kwargs):
"""Perform an aggregation and retrieve batches of raw BSON.
Expand Down Expand Up @@ -999,6 +1025,7 @@ def watch(
start_at_operation_time=None,
session=None,
start_after=None,
comment=None,
):
"""Watch changes on this collection.
Expand Down Expand Up @@ -1109,12 +1136,17 @@ def main():
- `start_after` (optional): The same as `resume_after` except that
`start_after` can resume notifications after an invalidate event.
This option and `resume_after` are mutually exclusive.
- `comment` (optional): A user-provided comment to attach to this
command.
:Returns:
A :class:`~MotorChangeStream`.
See the :ref:`tornado_change_stream_example`.
.. versionchanged:: 3.0
Added ``comment`` parameter.
.. versionchanged:: 2.1
Added the ``start_after`` parameter.
Expand All @@ -1141,9 +1173,10 @@ def main():
start_at_operation_time,
session,
start_after,
comment,
)

def list_indexes(self, session=None):
def list_indexes(self, session=None, **kwargs):
"""Get a cursor over the index documents for this collection. ::
async def print_indexes():
Expand All @@ -1159,7 +1192,7 @@ async def print_indexes():
)

# Latent cursor that will send initial command on first "async for".
return cursor_class(self, self._async_list_indexes, session=session)
return cursor_class(self, self._async_list_indexes, session=session, **kwargs)

def wrap(self, obj):
if obj.__class__ is Collection:
Expand Down Expand Up @@ -1732,6 +1765,7 @@ def __init__(
start_at_operation_time,
session,
start_after,
comment,
):
super().__init__(delegate=None)
# The "target" object is a client, database, or collection.
Expand All @@ -1746,6 +1780,7 @@ def __init__(
"start_at_operation_time": start_at_operation_time,
"session": session,
"start_after": start_after,
"comment": comment,
}

def _lazy_init(self):
Expand Down
21 changes: 15 additions & 6 deletions synchro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from pymongo.encryption_options import *
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT
from pymongo.errors import *
from pymongo.event_loggers import *
from pymongo.helpers import _check_command_response
from pymongo.message import (
_COMMAND_OVERHEAD,
Expand Down Expand Up @@ -334,6 +335,8 @@ class MongoClient(Synchro):
max_pool_size = SynchroProperty()
start_session = Sync()
watch = WrapOutgoing()
__iter__ = None # PYTHON-3084
__next__ = Sync()

def __init__(self, host=None, port=None, *args, **kwargs):
# So that TestClient.test_constants and test_types work.
Expand Down Expand Up @@ -368,12 +371,15 @@ def __getitem__(self, name):

# For PyMongo tests that access client internals.
_MongoClient__all_credentials = SynchroProperty()
_MongoClient__kill_cursors_queue = SynchroProperty()
_MongoClient__options = SynchroProperty()
_cache_credentials = SynchroProperty()
_close_cursor_now = SynchroProperty()
_get_topology = SynchroProperty()
_topology = SynchroProperty()
_kill_cursors_executor = SynchroProperty()
_topology_settings = SynchroProperty()
_process_periodic_tasks = SynchroProperty()


class _SynchroTransactionContext(Synchro):
Expand Down Expand Up @@ -425,14 +431,15 @@ class Database(Synchro):
get_collection = WrapOutgoing()
watch = WrapOutgoing()
aggregate = WrapOutgoing()
__bool__ = Sync()

def __init__(self, client, name, **kwargs):
assert isinstance(client, MongoClient), "Expected MongoClient, got %s" % repr(client)

self._client = client
self.delegate = kwargs.get("delegate") or motor.MotorDatabase(
client.delegate, name, **kwargs
)
self.delegate = kwargs.get("delegate")
if self.delegate is None:
self.delegate = motor.MotorDatabase(client.delegate, name, **kwargs)

assert isinstance(
self.delegate, motor.MotorDatabase
Expand All @@ -458,6 +465,7 @@ class Collection(Synchro):
aggregate_raw_batches = WrapOutgoing()
list_indexes = WrapOutgoing()
watch = WrapOutgoing()
__bool__ = WrapOutgoing()

def __init__(self, database, name, **kwargs):
if not isinstance(database, Database):
Expand All @@ -467,9 +475,9 @@ def __init__(self, database, name, **kwargs):
)

self.database = database
self.delegate = kwargs.get("delegate") or motor.MotorCollection(
self.database.delegate, name, **kwargs
)
self.delegate = kwargs.get("delegate")
if self.delegate is None:
self.delegate = motor.MotorCollection(self.database.delegate, name, **kwargs)

if not isinstance(self.delegate, motor.MotorCollection):
raise TypeError(
Expand Down Expand Up @@ -711,6 +719,7 @@ def __setattr__(self, key, value):
"upload_date",
"aliases",
"metadata",
"md5",
):
raise AttributeError()

Expand Down

0 comments on commit c16a62d

Please sign in to comment.