From 5389dec41aedb1d85243f146d655d123ed348ef0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Wed, 24 Nov 2021 13:25:44 +0100 Subject: [PATCH] add SyncQueue and AsyncQueue Protocols (#374) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: René Buffat --- README.rst | 8 ++--- janus/__init__.py | 92 +++++++++++++++++++++++++++++++++++++++++++++-- setup.cfg | 1 + 3 files changed, 95 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 54a7c00..67618ae 100644 --- a/README.rst +++ b/README.rst @@ -35,21 +35,21 @@ Usage example (Python 3.7+) import janus - def threaded(sync_q): + def threaded(sync_q: janus.SyncQueue[int]) -> None: for i in range(100): sync_q.put(i) sync_q.join() - async def async_coro(async_q): + async def async_coro(async_q: janus.AsyncQueue[int]) -> None: for i in range(100): val = await async_q.get() assert val == i async_q.task_done() - async def main(): - queue = janus.Queue() + async def main() -> None: + queue: janus.Queue[int] = janus.Queue() loop = asyncio.get_running_loop() fut = loop.run_in_executor(None, threaded, queue.sync_q) await async_coro(queue.async_q) diff --git a/janus/__init__.py b/janus/__init__.py index 53f8d00..08c61f5 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -8,6 +8,7 @@ from queue import Empty as SyncQueueEmpty from queue import Full as SyncQueueFull from typing import Any, Callable, Deque, Generic, List, Optional, Set, TypeVar +from typing_extensions import Protocol __version__ = "0.6.2" __all__ = ("Queue", "PriorityQueue", "LifoQueue") @@ -17,6 +18,93 @@ OptFloat = Optional[float] +class BaseQueue(Protocol[T]): + + @property + def maxsize(self) -> int: + ... + + @property + def closed(self) -> bool: + ... + + def task_done(self) -> None: + ... + + def qsize(self) -> int: + ... + + @property + def unfinished_tasks(self) -> int: + ... + + def empty(self) -> bool: + ... + + def full(self) -> bool: + ... + + def put_nowait(self, item: T) -> None: + ... + + def get_nowait(self) -> T: + ... + + +class SyncQueue(BaseQueue[T], Protocol[T]): + + @property + def maxsize(self) -> int: + ... + + @property + def closed(self) -> bool: + ... + + def task_done(self) -> None: + ... + + def qsize(self) -> int: + ... + + @property + def unfinished_tasks(self) -> int: + ... + + def empty(self) -> bool: + ... + + def full(self) -> bool: + ... + + def put_nowait(self, item: T) -> None: + ... + + def get_nowait(self) -> T: + ... + + def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: + ... + + def get(self, block: bool = True, timeout: OptFloat = None) -> T: + ... + + def join(self) -> None: + ... + + +class AsyncQueue(BaseQueue[T], Protocol[T]): + + async def put(self, item: T) -> None: + ... + + async def get(self) -> T: + ... + + async def join(self) -> None: + ... + + current_loop = getattr(asyncio, "get_running_loop", None) if current_loop is None: current_loop = asyncio.get_event_loop @@ -181,7 +269,7 @@ def _check_closing(self) -> None: raise RuntimeError("Operation on the closed queue is forbidden") -class _SyncQueueProxy(Generic[T]): +class _SyncQueueProxy(SyncQueue[T]): """Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. @@ -354,7 +442,7 @@ def get_nowait(self) -> T: return self.get(block=False) -class _AsyncQueueProxy(Generic[T]): +class _AsyncQueueProxy(AsyncQueue[T]): """Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. diff --git a/setup.cfg b/setup.cfg index 829f520..70f4326 100644 --- a/setup.cfg +++ b/setup.cfg @@ -48,6 +48,7 @@ zip_safe = True include_package_data = True install_requires = + typing-extensions>=3.7.4.3 [flake8] exclude = .git,.env,__pycache__,.eggs