Skip to content

Commit

Permalink
generalize Protocol, update Readme, add install_requires
Browse files Browse the repository at this point in the history
  • Loading branch information
rbuffat committed Nov 24, 2021
1 parent 5bdf1f5 commit 397bacb
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 180 deletions.
12 changes: 6 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
=======
janus
=======
.. image:: https://travis-ci.com/aio-libs/janus.svg?branch=master
:target: https://travis-ci.com/aio-libs/janus
.. image:: https://github.com/aio-libs/janus/actions/workflows/ci.yml/badge.svg
:target: https://github.com/aio-libs/janus/actions/workflows/ci.yml
.. image:: https://codecov.io/gh/aio-libs/janus/branch/master/graph/badge.svg
:target: https://codecov.io/gh/aio-libs/janus
.. image:: https://img.shields.io/pypi/v/janus.svg
Expand Down Expand Up @@ -35,21 +35,21 @@ Usage example (Python 3.7+)
import janus
def threaded(sync_q):
def threaded(sync_q: janus.SyncQueue[int]) -> None:
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q):
async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()
async def main():
queue = janus.Queue()
async def main() -> None:
queue: janus.Queue[int] = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
Expand Down
180 changes: 95 additions & 85 deletions janus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from heapq import heappop, heappush
from queue import Empty as SyncQueueEmpty
from queue import Full as SyncQueueFull
from typing import Any, Callable, Deque, Generic, List, Optional, Protocol, Set, TypeVar
from typing import Any, Callable, Deque, Generic, List, Optional, Set, TypeVar
from typing_extensions import Protocol

__version__ = "0.6.2"
__all__ = ("Queue", "PriorityQueue", "LifoQueue")
Expand All @@ -17,11 +18,104 @@
OptFloat = Optional[float]


class BaseQueue(Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...


class SyncQueue(BaseQueue[T], Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...


def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
...

def get(self, block: bool = True, timeout: OptFloat = None) -> T:
...

def join(self) -> None:
...



class AsyncQueue(BaseQueue[T], Protocol[T]):


async def put(self, item: T) -> None:
...


async def get(self) -> T:
...


async def join(self) -> None:
...


current_loop = getattr(asyncio, "get_running_loop", None)
if current_loop is None:
current_loop = asyncio.get_event_loop



class Queue(Generic[T]):
def __init__(self, maxsize: int = 0) -> None:
self._loop = current_loop()
Expand Down Expand Up @@ -181,48 +275,6 @@ def _check_closing(self) -> None:
raise RuntimeError("Operation on the closed queue is forbidden")


class SyncQueue(Protocol[T]):

@property
def maxsize(self) -> int:
...

@property
def closed(self) -> bool:
...

def task_done(self) -> None:
...

def join(self) -> None:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
...

def get(self, block: bool = True, timeout: OptFloat = None) -> T:
...

def put_nowait(self, item: T) -> None:
...

def get_nowait(self) -> T:
...


class _SyncQueueProxy(SyncQueue[T]):
"""Create a queue object with a given maximum size.
Expand Down Expand Up @@ -396,48 +448,6 @@ def get_nowait(self) -> T:
return self.get(block=False)


class AsyncQueue(Protocol[T]):

@property
def closed(self) -> bool:
...

def qsize(self) -> int:
...

@property
def unfinished_tasks(self) -> int:
...

@property
def maxsize(self) -> int:
...

def empty(self) -> bool:
...

def full(self) -> bool:
...

async def put(self, item: T) -> None:
...

def put_nowait(self, item: T) -> None:
...

async def get(self) -> T:
...

def get_nowait(self) -> T:
...

def task_done(self) -> None:
...

async def join(self) -> None:
...


class _AsyncQueueProxy(AsyncQueue[T]):
"""Create a queue object with a given maximum size.
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools>=51", "wheel>=0.36"]
build-backend = "setuptools.build_meta"
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ pyroma==3.2
pytest-cov==3.0.0
pytest==6.2.5
pytest-asyncio==0.16.0
isort==5.10.0
isort==5.10.1
tox==3.24.4
wheel==0.37.0
52 changes: 52 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,55 @@
[metadata]
name = janus
version = attr: janus.__version__
url = https://github.com/aio-libs/janus
project_urls =
Chat: Gitter = https://gitter.im/aio-libs/Lobby
CI: GitHub Actions = https://github.com/aio-libs/janus/actions/workflows/ci.yml
Coverage: codecov = https://codecov.io/github/aio-libs/janus
GitHub: issues = https://github.com/aio-libs/janus/issues
GitHub: repo = https://github.com/aio-libs/janus
description = Mixed sync-async queue to interoperate between asyncio tasks and classic threads
long_description = file: README.rst
long_description_content_type = text/x-rst
author = Andrew Svetlov <andrew.svetlov@gmail.com>
author_email = andrew.svetlov@gmail.com
license = Apache 2
license_files = LICENSE.txt
classifiers =
Development Status :: 5 - Production/Stable

Framework :: AsyncIO

Intended Audience :: Developers

License :: OSI Approved :: Apache Software License

Operating System :: POSIX
Operating System :: MacOS :: MacOS X
Operating System :: Microsoft :: Windows

Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10

Topic :: Software Development :: Libraries

keywords= janus, queue, asyncio

[options]
python_requires = >=3.6
packages = find:
# https://setuptools.readthedocs.io/en/latest/setuptools.html#setting-the-zip-safe-flag
zip_safe = True
include_package_data = True

install_requires =
typing-extensions>=3.7.4.3

[flake8]
exclude = .git,.env,__pycache__,.eggs
max-line-length = 88
Expand Down

0 comments on commit 397bacb

Please sign in to comment.