Skip to content

Commit

Permalink
Merge pull request #75 from vlanse/master
Browse files Browse the repository at this point in the history
async_q.join() sometimes never return
  • Loading branch information
jettify committed Jan 28, 2018
2 parents 69d4155 + 78cde4a commit 81c4622
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
3 changes: 1 addition & 2 deletions janus/__init__.py
Expand Up @@ -277,8 +277,7 @@ def put(self, item, block=True, timeout=None):
if remaining <= 0.0:
raise SyncQueueFull
self._parent._sync_not_full.wait(remaining)
self._parent._put(item)
self._parent._unfinished_tasks += 1
self._parent._put_internal(item)
self._parent._sync_not_empty.notify()
self._parent._notify_async_not_empty(threadsafe=True)

Expand Down
22 changes: 22 additions & 0 deletions tests/test_mixed.py
Expand Up @@ -78,6 +78,28 @@ def go():
for i in range(3):
self.loop.run_until_complete(go())

def test_sync_put_async_join(self):
q = janus.Queue(loop=self.loop)

for i in range(5):
q.sync_q.put(i)

@asyncio.coroutine
def do_work():
yield from asyncio.sleep(1, loop=self.loop)
while True:
yield from q.async_q.get()
q.async_q.task_done()

task = self.loop.create_task(do_work())

@asyncio.coroutine
def wait_for_empty_queue():
yield from q.async_q.join()
task.cancel()

self.loop.run_until_complete(wait_for_empty_queue())

def test_async_put_sync_get(self):
q = janus.Queue(loop=self.loop)

Expand Down

0 comments on commit 81c4622

Please sign in to comment.