Skip to content

Commit

Permalink
Fix #88: Drop python 3.4 support
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov committed Jul 28, 2018
1 parent 8dfcd30 commit 93ec36b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 20 deletions.
34 changes: 14 additions & 20 deletions janus/__init__.py
Expand Up @@ -67,16 +67,15 @@ def close(self):
for fut in self._pending:
fut.cancel()

@asyncio.coroutine
def wait_closed(self):
async def wait_closed(self):
# should be called from loop after close().
# Nobody should put/get at this point,
# so lock acquiring is not required
if not self._closing:
raise RuntimeError("Waiting for non-closed queue")
if not self._pending:
return
yield from asyncio.wait(self._pending, loop=self._loop)
await asyncio.wait(self._pending, loop=self._loop)

@property
def maxsize(self):
Expand Down Expand Up @@ -130,9 +129,8 @@ def f():
self._pending.add(fut)

def _notify_async_not_empty(self, *, threadsafe):
@asyncio.coroutine
def f():
with (yield from self._async_mutex):
async def f():
async with self._async_mutex:
self._async_not_empty.notify()

def task_maker():
Expand All @@ -146,9 +144,8 @@ def task_maker():
self._call_soon(task_maker)

def _notify_async_not_full(self, *, threadsafe):
@asyncio.coroutine
def f():
with (yield from self._async_mutex):
async def f():
async with self._async_mutex:
self._async_not_full.notify()

def task_maker():
Expand Down Expand Up @@ -372,8 +369,7 @@ def full(self):
else:
return self.qsize() >= self._parent._maxsize

@asyncio.coroutine
def put(self, item):
async def put(self, item):
"""Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free
Expand All @@ -382,7 +378,7 @@ def put(self, item):
This method is a coroutine.
"""
self._parent._check_closing()
with (yield from self._parent._async_not_full):
async with self._parent._async_not_full:
self._parent._sync_mutex.acquire()
locked = True
try:
Expand All @@ -395,7 +391,7 @@ def put(self, item):
if do_wait:
locked = False
self._parent._sync_mutex.release()
yield from self._parent._async_not_full.wait()
await self._parent._async_not_full.wait()
self._parent._sync_mutex.acquire()
locked = True

Expand All @@ -421,16 +417,15 @@ def put_nowait(self, item):
self._parent._notify_async_not_empty(threadsafe=False)
self._parent._notify_sync_not_empty()

@asyncio.coroutine
def get(self):
async def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
self._parent._check_closing()
with (yield from self._parent._async_not_empty):
async with self._parent._async_not_empty:
self._parent._sync_mutex.acquire()
locked = True
try:
Expand All @@ -441,7 +436,7 @@ def get(self):
if do_wait:
locked = False
self._parent._sync_mutex.release()
yield from self._parent._async_not_empty.wait()
await self._parent._async_not_empty.wait()
self._parent._sync_mutex.acquire()
locked = True

Expand Down Expand Up @@ -491,8 +486,7 @@ def task_done(self):
self._parent._finished.set()
self._parent._all_tasks_done.notify_all()

@asyncio.coroutine
def join(self):
async def join(self):
"""Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
Expand All @@ -504,7 +498,7 @@ def join(self):
with self._parent._sync_mutex:
if self._parent._unfinished_tasks == 0:
break
yield from self._parent._finished.wait()
await self._parent._finished.wait()


class PriorityQueue(Queue):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -74,6 +74,7 @@ def read(f):
url='https://github.com/aio-libs/janus/',
license='Apache 2',
packages=find_packages(),
python_requires='>=3.5.3',
install_requires=install_requires,
tests_require=tests_require,
cmdclass={'test': PyTest},
Expand Down

0 comments on commit 93ec36b

Please sign in to comment.