Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix up Python 3.8 loop argument warnings #246

Merged
merged 8 commits into from Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Expand Up @@ -58,4 +58,8 @@ target/

coverage

.pytest_cache
.pytest_cache/
.mypy_cache/

# pyenv
.python-version
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -3,6 +3,8 @@ language: python
python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"


install:
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.rst
@@ -1,6 +1,11 @@
Changes
=======

to be released
--------------

- Remove explicit loop arguments and forbid creating queues outside event loops #246

0.4.0 (2018-07-28)
------------------

Expand Down
55 changes: 49 additions & 6 deletions README.rst
Expand Up @@ -26,16 +26,50 @@ Synchronous is fully compatible with `standard queue
follows `asyncio queue design
<https://docs.python.org/3/library/asyncio-queue.html>`_.

Usage example
=============
Usage example (Python 3.7+)
===========================

.. code:: python
import asyncio
import janus
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q):
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()
async def main():
queue = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
asyncio.run(main())
Usage example (Python 3.5 and 3.6)
==================================

.. code:: python
import asyncio
import janus
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
def threaded(sync_q):
Expand All @@ -51,9 +85,18 @@ Usage example
async_q.task_done()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_coro(queue.async_q))
loop.run_until_complete(fut)
async def main():
queue = janus.Queue()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
try:
loop.run_until_complete(main())
finally:
loop.close()
Communication channels
Expand Down
34 changes: 19 additions & 15 deletions janus/__init__.py
Expand Up @@ -16,16 +16,17 @@


T = TypeVar('T')
OptLoop = Optional[asyncio.AbstractEventLoop]
OptInt = Optional[int]


class Queue(Generic[T]):
def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None:
if loop is None:
loop = asyncio.get_event_loop()
current_loop = getattr(asyncio, 'get_running_loop', None)
if current_loop is None:
current_loop = asyncio.get_event_loop


self._loop = loop # type: asyncio.AbstractEventLoop
class Queue(Generic[T]):
def __init__(self, maxsize: int = 0) -> None:
self._loop = current_loop()
self._maxsize = maxsize

self._init(maxsize)
Expand All @@ -37,12 +38,10 @@ def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None:
self._sync_not_full = threading.Condition(self._sync_mutex)
self._all_tasks_done = threading.Condition(self._sync_mutex)

self._async_mutex = asyncio.Lock(loop=self._loop)
self._async_not_empty = asyncio.Condition(
self._async_mutex, loop=self._loop)
self._async_not_full = asyncio.Condition(
self._async_mutex, loop=self._loop)
self._finished = asyncio.Event(loop=self._loop)
self._async_mutex = asyncio.Lock()
self._async_not_empty = asyncio.Condition(self._async_mutex)
self._async_not_full = asyncio.Condition(self._async_mutex)
self._finished = asyncio.Event()
self._finished.set()

self._closing = False
Expand Down Expand Up @@ -78,9 +77,14 @@ async def wait_closed(self) -> None:
# so lock acquiring is not required
if not self._closing:
raise RuntimeError("Waiting for non-closed queue")
# give execution chances for the task-done callbacks
# of async tasks created inside
# _notify_async_not_empty, _notify_async_not_full
# methods.
await asyncio.sleep(0)
if not self._pending:
return
await asyncio.wait(self._pending, loop=self._loop)
await asyncio.wait(self._pending)

@property
def closed(self) -> bool:
Expand Down Expand Up @@ -143,7 +147,7 @@ async def f() -> None:
self._async_not_empty.notify()

def task_maker() -> None:
task = asyncio.ensure_future(f(), loop=self._loop)
task = self._loop.create_task(f())
task.add_done_callback(self._pending.discard)
self._pending.add(task)

Expand All @@ -158,7 +162,7 @@ async def f() -> None:
self._async_not_full.notify()

def task_maker() -> None:
task = asyncio.ensure_future(f(), loop=self._loop)
task = self._loop.create_task(f())
task.add_done_callback(self._pending.discard)
self._pending.add(task)

Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Expand Up @@ -7,5 +7,6 @@ mypy==0.770
pyroma==2.6
pytest-cov==2.8.1
pytest==5.4.1
pytest-asyncio==0.10.0
tox==3.14.6
wheel==0.34.2
15 changes: 5 additions & 10 deletions setup.py
Expand Up @@ -6,9 +6,6 @@

from setuptools.command.test import test as TestCommand

PY_33 = sys.version_info < (3, 4)
PY_35 = sys.version_info >= (3, 5)


class PyTest(TestCommand):
user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")]
Expand Down Expand Up @@ -43,13 +40,10 @@ def read(f):

install_requires = []

if PY_33:
install_requires.append('asyncio')

# if not PY_35:
# install_requires.append('typing')

tests_require = install_requires + ['pytest']
tests_require = install_requires + [
'pytest>=5.4',
'pytest-asyncio>=0.10.0',
]
extras_require = {}


Expand All @@ -66,6 +60,7 @@ def read(f):
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Software Development :: Libraries',
'Framework :: AsyncIO',
],
Expand Down