Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SyncQueue and AsyncQueue Protocols #374

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.rst
Expand Up @@ -35,21 +35,21 @@ Usage example (Python 3.7+)
import janus


def threaded(sync_q):
def threaded(sync_q: janus.SyncQueue[int]) -> None:
for i in range(100):
sync_q.put(i)
sync_q.join()


async def async_coro(async_q):
async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()


async def main():
queue = janus.Queue()
async def main() -> None:
queue: janus.Queue[int] = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
Expand Down
92 changes: 90 additions & 2 deletions janus/__init__.py
Expand Up @@ -8,6 +8,7 @@
from queue import Empty as SyncQueueEmpty
from queue import Full as SyncQueueFull
from typing import Any, Callable, Deque, Generic, List, Optional, Set, TypeVar
from typing_extensions import Protocol

__version__ = "0.6.2"
__all__ = ("Queue", "PriorityQueue", "LifoQueue")
Expand All @@ -17,6 +18,93 @@
OptFloat = Optional[float]


class BaseQueue(Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...


class SyncQueue(BaseQueue[T], Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...

def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
...

def get(self, block: bool = True, timeout: OptFloat = None) -> T:
...

def join(self) -> None:
...


class AsyncQueue(BaseQueue[T], Protocol[T]):

async def put(self, item: T) -> None:
...

async def get(self) -> T:
...

async def join(self) -> None:
...


current_loop = getattr(asyncio, "get_running_loop", None)
if current_loop is None:
current_loop = asyncio.get_event_loop
Expand Down Expand Up @@ -181,7 +269,7 @@ def _check_closing(self) -> None:
raise RuntimeError("Operation on the closed queue is forbidden")


class _SyncQueueProxy(Generic[T]):
class _SyncQueueProxy(SyncQueue[T]):
"""Create a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.
Expand Down Expand Up @@ -354,7 +442,7 @@ def get_nowait(self) -> T:
return self.get(block=False)


class _AsyncQueueProxy(Generic[T]):
class _AsyncQueueProxy(AsyncQueue[T]):
"""Create a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Expand Up @@ -48,6 +48,7 @@ zip_safe = True
include_package_data = True

install_requires =
typing-extensions>=3.7.4.3

[flake8]
exclude = .git,.env,__pycache__,.eggs
Expand Down