Skip to content

Commit

Permalink
add SyncQueue and AsyncQueue Protocols (#374)
Browse files Browse the repository at this point in the history
Co-authored-by: René Buffat <rene@clickahoy.com>
  • Loading branch information
rbuffat and René Buffat committed Nov 24, 2021
1 parent d54ae42 commit 5389dec
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 6 deletions.
8 changes: 4 additions & 4 deletions README.rst
Expand Up @@ -35,21 +35,21 @@ Usage example (Python 3.7+)
import janus
def threaded(sync_q):
def threaded(sync_q: janus.SyncQueue[int]) -> None:
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q):
async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()
async def main():
queue = janus.Queue()
async def main() -> None:
queue: janus.Queue[int] = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
Expand Down
92 changes: 90 additions & 2 deletions janus/__init__.py
Expand Up @@ -8,6 +8,7 @@
from queue import Empty as SyncQueueEmpty
from queue import Full as SyncQueueFull
from typing import Any, Callable, Deque, Generic, List, Optional, Set, TypeVar
from typing_extensions import Protocol

__version__ = "0.6.2"
__all__ = ("Queue", "PriorityQueue", "LifoQueue")
Expand All @@ -17,6 +18,93 @@
OptFloat = Optional[float]


class BaseQueue(Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...


class SyncQueue(BaseQueue[T], Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...

def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
...

def get(self, block: bool = True, timeout: OptFloat = None) -> T:
...

def join(self) -> None:
...


class AsyncQueue(BaseQueue[T], Protocol[T]):

async def put(self, item: T) -> None:
...

async def get(self) -> T:
...

async def join(self) -> None:
...


current_loop = getattr(asyncio, "get_running_loop", None)
if current_loop is None:
current_loop = asyncio.get_event_loop
Expand Down Expand Up @@ -181,7 +269,7 @@ def _check_closing(self) -> None:
raise RuntimeError("Operation on the closed queue is forbidden")


class _SyncQueueProxy(Generic[T]):
class _SyncQueueProxy(SyncQueue[T]):
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
Expand Down Expand Up @@ -354,7 +442,7 @@ def get_nowait(self) -> T:
return self.get(block=False)


class _AsyncQueueProxy(Generic[T]):
class _AsyncQueueProxy(AsyncQueue[T]):
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Expand Up @@ -48,6 +48,7 @@ zip_safe = True
include_package_data = True

install_requires =
typing-extensions>=3.7.4.3

[flake8]
exclude = .git,.env,__pycache__,.eggs
Expand Down

0 comments on commit 5389dec

Please sign in to comment.