Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync_to_async does not find root thread inside a task #214

Closed
andrewgodwin opened this issue Nov 6, 2020 · 11 comments · Fixed by #320
Closed

sync_to_async does not find root thread inside a task #214

andrewgodwin opened this issue Nov 6, 2020 · 11 comments · Fixed by #320
Assignees

Comments

@andrewgodwin
Copy link
Member

When you use create_task (or any way of making a coroutine not through asgiref), and the root thread is synchronous, then sync_to_async in thread-sensitive mode will use the root thread outside of tasks and a single, specific new thread inside of tasks, ruining the thread-sensitive guarantee.

Instead, we should make sync_to_async always first look for an executor on the root thread and, if it finds it, use that to run code.

@fitzgeralddp
Copy link

Is there any update on this? I think I'm experiencing something related to this bug.

@fitzgeralddp
Copy link

@andrewgodwin Good afternoon. Do you have a workaround for this? This is a blocker on a project for me. Here's my attempt to write a substitute for async_to_sync:

def my_async_to_sync_wrapper(self, async_func_to_call: Awaitable):
	""" Given an async method, calls that method in the asyncio threadpool
	 from a sync context.
	 """
	loop = safe_get_event_loop()
	return loop.run_until_complete(async_func_to_call)

def safe_get_event_loop():
	try:
		loop = asyncio.get_event_loop()
		if loop is not None:
			return loop
		else:
			loop = asyncio.new_event_loop()
			asyncio.set_event_loop(loop)
			return loop
	except RuntimeError:
		loop = asyncio.new_event_loop()
		asyncio.set_event_loop(loop)
		return loop

But I think I need to set an executor. Any guidance would be much appreciated. Thank you!

@fitzgeralddp
Copy link

FYI I rolled back my usage of asgiref to version 3.2.10, and now the issue doesn't show up for me anymore and I don't have to write my own substitue as I had attempted above. So I suppose the issue was introduced sometime after 3.3.0.

@yourcelf
Copy link

yourcelf commented Mar 4, 2022

This issue also arises when attempting to use grpc.aio.server; it does something lower level with futures to call async handler functions. I spent some time trying to figure out how to identify a main thread executor from within a task, but didn't have any success yet.

This, at least, is a failing test case highlighting the problem:

def test_sync_to_async_inside_task():
    main_thread = threading.current_thread()
    sync_to_async_thread = {}
    
    @sync_to_async
    def sync_to_async_func():
        sync_to_async_thread["current"] = threading.current_thread()
        
    @async_to_sync
    async def run_task():
        task = asyncio.create_task(sync_to_async_func())
        await task
        
    run_task()
       
    assert sync_to_async_thread["current"] == main_thread

@yourcelf
Copy link

yourcelf commented Mar 5, 2022

This issue impacts us mostly when running tests with pytest and pytest-django, since database activity happens in the main thread for test setup, but then other work happens in the single specific other task-descendent @sync_to_async thread. This presents two problems:

  1. The default transaction handling behavior in tests, where tests are run in a big transaction that is rolled back between tests to avoid database setup/cleanup cost, causes fixtures that ran in the main thread to be "invisible" to the task thread (since it has a second connection that doesn't share the transaction). Also, any database records created in the task thread don't get rolled back between tests.
  2. Database teardown at the end fails, since there is an open connection lingering from the task thread, leading to these errors:
    django.db.utils.OperationalError: database "test_database" is being accessed by other users
    DETAIL:  There is 1 other session using the database.
    
    pytest.PytestWarning: Error when trying to teardown test databases: RuntimeWarning("Normally Django will use a connection to the 'postgres' database to avoid running initialization queries against the production database when it's not needed (for example, when running tests). Django was unable to create a connection to the 'postgres' database and will use the first PostgreSQL database instead.
    

We were able to find workarounds for both issues within pytest.

  1. For the first issue: wrapping all tests that involve tasks with @pytest.mark.django_db(transaction=True) restores "normal" transaction behavior where results are actually committed, so separate threads can see them, and after tests the database is truncated. This is quite a bit slower, but it works.
  2. For the lingering connection: we found that explicitly closing the connection at the end of the test freed pytest-django to do its normal cleanup work. For that, we use this fixture:
    import asyncio
    import pytest
    from django.db import connection
    from asgiref.sync import sync_to_async, async_to_sync
    
    @async_to_sync
    async def run_in_task(fn):
        """Run `fn` as an asyncio task, and await its result"""
        return await asyncio.create_task(sync_to_async(fn)())
    
    @pytest.fixture
    def close_task_connection():
        """Close the database connection exposed to `sync_to_async` under asyncio task after test"""
        yield
        run_in_task(lambda: connection.close())

All together:

@pytest.mark.django_db(transaction=True)
def test_stuff(close_task_connection):
    ...

This is slow and a bit noodly, but it's working for now for us.

andersk added a commit to andersk/asgiref that referenced this issue Mar 18, 2022
Fixes django#214.

Signed-off-by: Anders Kaseorg <andersk@mit.edu>
@andersk
Copy link
Contributor

andersk commented Mar 18, 2022

The event loop is ours—can we just patch its create_task method? Opened #320, which seems to work for me.

andersk added a commit to andersk/asgiref that referenced this issue Mar 18, 2022
Fixes django#214.

Signed-off-by: Anders Kaseorg <andersk@mit.edu>
@yourcelf
Copy link

Patching create_task may solve that particular method, but other low-level ways of scheduling coroutines would not be covered. The grpc.aio.server case we were running into expresses this problem but does not use asyncio.create_task.

andersk added a commit to andersk/asgiref that referenced this issue Mar 18, 2022
Fixes django#214.

Signed-off-by: Anders Kaseorg <andersk@mit.edu>
andersk added a commit to andersk/asgiref that referenced this issue Mar 18, 2022
Fixes django#214.

Signed-off-by: Anders Kaseorg <andersk@mit.edu>
@andersk
Copy link
Contributor

andersk commented Mar 18, 2022

@yourcelf I’m not sure if I’m looking at the right code, but it looks like grpc.aio.server does use create_task:

https://github.com/grpc/grpc/blob/v1.44.0/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi#L996

Do you have a test case for grpc.aio.server that you can share?

@yourcelf
Copy link

You might be right -- I had previously grepped for create_task in the call stack and hadn't found it, but I may have made an error. I'll try to put together a minimal test case shortly (the test case I have for it is bound up in a large project).

@yourcelf
Copy link

@andersk here is a repo with as minimal a test case as I can manage with grpc's boilerplate: https://github.com/yourcelf/grpc-asgiref

Test file is here: https://github.com/yourcelf/grpc-asgiref/blob/main/test_grpc_sync_to_async.py, the rest of the repo is boilerplate to set up the protobuf definition and compiled files for the grpc server to use.

Sadly, #320 does not seem to address the problem -- the @sync_to_async decorated method executed by the grpc server still runs on ThreadPoolExecutor-0_0 instead of MainThread with that branch with patched create_task.

andersk added a commit to andersk/asgiref that referenced this issue Mar 19, 2022
Fixes django#214.

Signed-off-by: Anders Kaseorg <andersk@mit.edu>
andersk added a commit to andersk/asgiref that referenced this issue Mar 19, 2022
Fixes django#214.

Signed-off-by: Anders Kaseorg <andersk@mit.edu>
@andersk
Copy link
Contributor

andersk commented Mar 19, 2022

Thanks. I came up with a cleaner solution that also makes your test case pass: associate a CurrentThreadExecutor with each event loop we create, and have sync_to_async look up that CurrentThreadExecutor from the current event loop if it can’t find one from the context.

Updated #320.

andersk added a commit to andersk/asgiref that referenced this issue Mar 20, 2022
Fixes django#214.

Signed-off-by: Anders Kaseorg <andersk@mit.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants