From 5bdf1f5be9b5c7a9d4397cb828e6fbd3dbd50ece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Thu, 4 Nov 2021 22:16:55 +0100 Subject: [PATCH 1/3] add SyncQueue and AsyncQueue Protocols --- janus/__init__.py | 90 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 87 insertions(+), 3 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 53f8d00..8a24fa4 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -7,7 +7,7 @@ from heapq import heappop, heappush from queue import Empty as SyncQueueEmpty from queue import Full as SyncQueueFull -from typing import Any, Callable, Deque, Generic, List, Optional, Set, TypeVar +from typing import Any, Callable, Deque, Generic, List, Optional, Protocol, Set, TypeVar __version__ = "0.6.2" __all__ = ("Queue", "PriorityQueue", "LifoQueue") @@ -181,7 +181,49 @@ def _check_closing(self) -> None: raise RuntimeError("Operation on the closed queue is forbidden") -class _SyncQueueProxy(Generic[T]): +class SyncQueue(Protocol[T]): + + @property + def maxsize(self) -> int: + ... + + @property + def closed(self) -> bool: + ... + + def task_done(self) -> None: + ... + + def join(self) -> None: + ... + + def qsize(self) -> int: + ... + + @property + def unfinished_tasks(self) -> int: + ... + + def empty(self) -> bool: + ... + + def full(self) -> bool: + ... + + def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: + ... + + def get(self, block: bool = True, timeout: OptFloat = None) -> T: + ... + + def put_nowait(self, item: T) -> None: + ... + + def get_nowait(self) -> T: + ... + + +class _SyncQueueProxy(SyncQueue[T]): """Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. @@ -354,7 +396,49 @@ def get_nowait(self) -> T: return self.get(block=False) -class _AsyncQueueProxy(Generic[T]): +class AsyncQueue(Protocol[T]): + + @property + def closed(self) -> bool: + ... + + def qsize(self) -> int: + ... + + @property + def unfinished_tasks(self) -> int: + ... + + @property + def maxsize(self) -> int: + ... + + def empty(self) -> bool: + ... + + def full(self) -> bool: + ... + + async def put(self, item: T) -> None: + ... + + def put_nowait(self, item: T) -> None: + ... + + async def get(self) -> T: + ... + + def get_nowait(self) -> T: + ... + + def task_done(self) -> None: + ... + + async def join(self) -> None: + ... + + +class _AsyncQueueProxy(AsyncQueue[T]): """Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. From 397bacb1712b0604abc11ab3a0631ee79e44d19d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Wed, 24 Nov 2021 11:03:00 +0100 Subject: [PATCH 2/3] generalize Protocol, update Readme, add install_requires --- README.rst | 12 +-- janus/__init__.py | 180 +++++++++++++++++++++++-------------------- pyproject.toml | 3 + requirements-dev.txt | 2 +- setup.cfg | 52 +++++++++++++ setup.py | 90 +--------------------- 6 files changed, 159 insertions(+), 180 deletions(-) create mode 100644 pyproject.toml diff --git a/README.rst b/README.rst index 8527649..67618ae 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,8 @@ ======= janus ======= -.. image:: https://travis-ci.com/aio-libs/janus.svg?branch=master - :target: https://travis-ci.com/aio-libs/janus +.. image:: https://github.com/aio-libs/janus/actions/workflows/ci.yml/badge.svg + :target: https://github.com/aio-libs/janus/actions/workflows/ci.yml .. image:: https://codecov.io/gh/aio-libs/janus/branch/master/graph/badge.svg :target: https://codecov.io/gh/aio-libs/janus .. image:: https://img.shields.io/pypi/v/janus.svg @@ -35,21 +35,21 @@ Usage example (Python 3.7+) import janus - def threaded(sync_q): + def threaded(sync_q: janus.SyncQueue[int]) -> None: for i in range(100): sync_q.put(i) sync_q.join() - async def async_coro(async_q): + async def async_coro(async_q: janus.AsyncQueue[int]) -> None: for i in range(100): val = await async_q.get() assert val == i async_q.task_done() - async def main(): - queue = janus.Queue() + async def main() -> None: + queue: janus.Queue[int] = janus.Queue() loop = asyncio.get_running_loop() fut = loop.run_in_executor(None, threaded, queue.sync_q) await async_coro(queue.async_q) diff --git a/janus/__init__.py b/janus/__init__.py index 8a24fa4..233fb2d 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -7,7 +7,8 @@ from heapq import heappop, heappush from queue import Empty as SyncQueueEmpty from queue import Full as SyncQueueFull -from typing import Any, Callable, Deque, Generic, List, Optional, Protocol, Set, TypeVar +from typing import Any, Callable, Deque, Generic, List, Optional, Set, TypeVar +from typing_extensions import Protocol __version__ = "0.6.2" __all__ = ("Queue", "PriorityQueue", "LifoQueue") @@ -17,11 +18,104 @@ OptFloat = Optional[float] +class BaseQueue(Protocol[T]): + + @property + def maxsize(self) -> int: + ... + + @property + def closed(self) -> bool: + ... + + def task_done(self) -> None: + ... + + def qsize(self) -> int: + ... + + @property + def unfinished_tasks(self) -> int: + ... + + def empty(self) -> bool: + ... + + def full(self) -> bool: + ... + + def put_nowait(self, item: T) -> None: + ... + + def get_nowait(self) -> T: + ... + + +class SyncQueue(BaseQueue[T], Protocol[T]): + + @property + def maxsize(self) -> int: + ... + + @property + def closed(self) -> bool: + ... + + def task_done(self) -> None: + ... + + def qsize(self) -> int: + ... + + @property + def unfinished_tasks(self) -> int: + ... + + def empty(self) -> bool: + ... + + def full(self) -> bool: + ... + + def put_nowait(self, item: T) -> None: + ... + + def get_nowait(self) -> T: + ... + + + def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: + ... + + def get(self, block: bool = True, timeout: OptFloat = None) -> T: + ... + + def join(self) -> None: + ... + + + +class AsyncQueue(BaseQueue[T], Protocol[T]): + + + async def put(self, item: T) -> None: + ... + + + async def get(self) -> T: + ... + + + async def join(self) -> None: + ... + + current_loop = getattr(asyncio, "get_running_loop", None) if current_loop is None: current_loop = asyncio.get_event_loop + class Queue(Generic[T]): def __init__(self, maxsize: int = 0) -> None: self._loop = current_loop() @@ -181,48 +275,6 @@ def _check_closing(self) -> None: raise RuntimeError("Operation on the closed queue is forbidden") -class SyncQueue(Protocol[T]): - - @property - def maxsize(self) -> int: - ... - - @property - def closed(self) -> bool: - ... - - def task_done(self) -> None: - ... - - def join(self) -> None: - ... - - def qsize(self) -> int: - ... - - @property - def unfinished_tasks(self) -> int: - ... - - def empty(self) -> bool: - ... - - def full(self) -> bool: - ... - - def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: - ... - - def get(self, block: bool = True, timeout: OptFloat = None) -> T: - ... - - def put_nowait(self, item: T) -> None: - ... - - def get_nowait(self) -> T: - ... - - class _SyncQueueProxy(SyncQueue[T]): """Create a queue object with a given maximum size. @@ -396,48 +448,6 @@ def get_nowait(self) -> T: return self.get(block=False) -class AsyncQueue(Protocol[T]): - - @property - def closed(self) -> bool: - ... - - def qsize(self) -> int: - ... - - @property - def unfinished_tasks(self) -> int: - ... - - @property - def maxsize(self) -> int: - ... - - def empty(self) -> bool: - ... - - def full(self) -> bool: - ... - - async def put(self, item: T) -> None: - ... - - def put_nowait(self, item: T) -> None: - ... - - async def get(self) -> T: - ... - - def get_nowait(self) -> T: - ... - - def task_done(self) -> None: - ... - - async def join(self) -> None: - ... - - class _AsyncQueueProxy(AsyncQueue[T]): """Create a queue object with a given maximum size. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..617caea --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=51", "wheel>=0.36"] +build-backend = "setuptools.build_meta" diff --git a/requirements-dev.txt b/requirements-dev.txt index bdc11be..5c4baa9 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -9,6 +9,6 @@ pyroma==3.2 pytest-cov==3.0.0 pytest==6.2.5 pytest-asyncio==0.16.0 -isort==5.10.0 +isort==5.10.1 tox==3.24.4 wheel==0.37.0 diff --git a/setup.cfg b/setup.cfg index 3de04f7..70f4326 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,55 @@ +[metadata] +name = janus +version = attr: janus.__version__ +url = https://github.com/aio-libs/janus +project_urls = + Chat: Gitter = https://gitter.im/aio-libs/Lobby + CI: GitHub Actions = https://github.com/aio-libs/janus/actions/workflows/ci.yml + Coverage: codecov = https://codecov.io/github/aio-libs/janus + GitHub: issues = https://github.com/aio-libs/janus/issues + GitHub: repo = https://github.com/aio-libs/janus +description = Mixed sync-async queue to interoperate between asyncio tasks and classic threads +long_description = file: README.rst +long_description_content_type = text/x-rst +author = Andrew Svetlov +author_email = andrew.svetlov@gmail.com +license = Apache 2 +license_files = LICENSE.txt +classifiers = + Development Status :: 5 - Production/Stable + + Framework :: AsyncIO + + Intended Audience :: Developers + + License :: OSI Approved :: Apache Software License + + Operating System :: POSIX + Operating System :: MacOS :: MacOS X + Operating System :: Microsoft :: Windows + + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + + Topic :: Software Development :: Libraries + +keywords= janus, queue, asyncio + +[options] +python_requires = >=3.6 +packages = find: +# https://setuptools.readthedocs.io/en/latest/setuptools.html#setting-the-zip-safe-flag +zip_safe = True +include_package_data = True + +install_requires = + typing-extensions>=3.7.4.3 + [flake8] exclude = .git,.env,__pycache__,.eggs max-line-length = 88 diff --git a/setup.py b/setup.py index dd7dedd..8bf1ba9 100644 --- a/setup.py +++ b/setup.py @@ -1,88 +1,2 @@ -import codecs -import os -import re -import sys - -from setuptools import find_packages, setup -from setuptools.command.test import test as TestCommand - - -class PyTest(TestCommand): - user_options = [("pytest-args=", "a", "Arguments to pass to py.test")] - - def initialize_options(self): - TestCommand.initialize_options(self) - self.pytest_args = [] - - def finalize_options(self): - TestCommand.finalize_options(self) - self.test_args = [] - self.test_suite = True - - def run_tests(self): - # import here, cause outside the eggs aren't loaded - import pytest - - errno = pytest.main(self.pytest_args) - sys.exit(errno) - - -with codecs.open( - os.path.join(os.path.abspath(os.path.dirname(__file__)), "janus", "__init__.py"), - "r", - "latin1", -) as fp: - try: - version = re.findall(r'^__version__ = "([^"]+)"$', fp.read(), re.M)[0] - except IndexError: - raise RuntimeError("Unable to determine version.") - - -def read(f): - return open(os.path.join(os.path.dirname(__file__), f)).read().strip() - - -install_requires = [] - -tests_require = install_requires + [ - "pytest>=5.4", - "pytest-asyncio>=0.10.0", -] -extras_require = {} - - -setup( - name="janus", - version=version, - description=( - "Mixed sync-async queue to interoperate between " - "asyncio tasks and classic threads" - ), - long_description="\n\n".join((read("README.rst"), read("CHANGES.rst"))), - classifiers=[ - "License :: OSI Approved :: Apache Software License", - "Intended Audience :: Developers", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Topic :: Software Development :: Libraries", - "Framework :: AsyncIO", - ], - author="Andrew Svetlov", - author_email="andrew.svetlov@gmail.com", - url="https://github.com/aio-libs/janus/", - license="Apache 2", - packages=find_packages(), - python_requires=">=3.6", - install_requires=install_requires, - tests_require=tests_require, - cmdclass={"test": PyTest}, - include_package_data=True, - zip_safe=True, - keywords=["janus", "queue", "asyncio"], - extras_require=extras_require, -) +from setuptools import setup +setup() From 3360844ddfa7adf4e707a6dbffae2a8937c0e9f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Wed, 24 Nov 2021 11:07:20 +0100 Subject: [PATCH 3/3] remove accidential newlines --- janus/__init__.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 233fb2d..08c61f5 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -83,7 +83,6 @@ def put_nowait(self, item: T) -> None: def get_nowait(self) -> T: ... - def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: ... @@ -94,18 +93,14 @@ def join(self) -> None: ... - class AsyncQueue(BaseQueue[T], Protocol[T]): - async def put(self, item: T) -> None: ... - async def get(self) -> T: ... - async def join(self) -> None: ... @@ -115,7 +110,6 @@ async def join(self) -> None: current_loop = asyncio.get_event_loop - class Queue(Generic[T]): def __init__(self, maxsize: int = 0) -> None: self._loop = current_loop()