Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SyncQueue and AsyncQueue Protocols #374

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 87 additions & 3 deletions janus/__init__.py
Expand Up @@ -7,7 +7,7 @@
from heapq import heappop, heappush
from queue import Empty as SyncQueueEmpty
from queue import Full as SyncQueueFull
from typing import Any, Callable, Deque, Generic, List, Optional, Set, TypeVar
from typing import Any, Callable, Deque, Generic, List, Optional, Protocol, Set, TypeVar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please import Protocol from typing_extensions module; add typing-extensions>=3.7.4.3 here


__version__ = "0.6.2"
__all__ = ("Queue", "PriorityQueue", "LifoQueue")
Expand Down Expand Up @@ -181,7 +181,49 @@ def _check_closing(self) -> None:
raise RuntimeError("Operation on the closed queue is forbidden")


class _SyncQueueProxy(Generic[T]):
class SyncQueue(Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def join(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
...

def get(self, block: bool = True, timeout: OptFloat = None) -> T:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...


class _SyncQueueProxy(SyncQueue[T]):
"""Create a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.
Expand Down Expand Up @@ -354,7 +396,49 @@ def get_nowait(self) -> T:
return self.get(block=False)


class _AsyncQueueProxy(Generic[T]):
class AsyncQueue(Protocol[T]):

@property
def closed(self) -> bool:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

@property
def maxsize(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

async def put(self, item: T) -> None:
...

def put_nowait(self, item: T) -> None:
...

async def get(self) -> T:
...

def get_nowait(self) -> T:
...

def task_done(self) -> None:
...

async def join(self) -> None:
...


class _AsyncQueueProxy(AsyncQueue[T]):
"""Create a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.
Expand Down