Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context vars set in an async dependency are visible for sync routes but not for sync functions run inside ThreadPoolExecutor #2776

Closed
tuukkamustonen opened this issue Feb 9, 2021 · 10 comments
Labels
question Question or problem question-migrate

Comments

@tuukkamustonen
Copy link

tuukkamustonen commented Feb 9, 2021

Potentially related tickets #953 and #2619.

Example

from concurrent.futures.thread import ThreadPoolExecutor
from contextvars import ContextVar

import uvicorn
from fastapi import FastAPI
from fastapi.params import Security

ctx = ContextVar('', default='N/A')
app = FastAPI()


async def auth():
    ctx.set('test')


def sync_task():
    return ctx.get()


@app.get('/sync1', dependencies=[Security(auth)])
def sync1():
    with ThreadPoolExecutor() as tp:
        return {'hello': tp.submit(sync_task).result()}


if __name__ == '__main__':
    uvicorn.run(app)

Description

  • Curl /sync1 and you'll get {"hello": "N/A"}.
  • Should get {"hello": "test"} instead.

So, context vars are not visible within futures run via ThreadPoolExecutor. Why is that? What needs to be done to make them visible? Where is it documented?

Attempt to run via contextvars' run() makes no difference (but I'm noob here):

@app.get('/sync2', dependencies=[Security(auth)])
def sync2():
    def wrapper():
        with ThreadPoolExecutor() as tp:
            return {'hello': tp.submit(sync_task).result()}
    context = contextvars.copy_context()
    return context.run(wrapper)

The result is the same.

Howevever, a direct call works just fine:

@app.get('/sync1', dependencies=[Security(auth)])
def sync1():
    return {'hello': sync_task()}

This returns {"hello": "test"}.

Async routes work always:

async def async_task():
    return ctx.get()

@app.get('/async1', dependencies=[Security(auth)])
async def async1():
    return {'hello': await async_task()}

@app.get('/async2', dependencies=[Security(auth)])
async def async2():
    return {'hello': await run_in_threadpool(sync_task)}

These return {"hello": "test"}.

Environment

  • OS: Linux
  • FastAPI version: 0.61.1
  • Starlette version: 0.13.6
  • Uvicorn version: 0.12.2
  • Python version: 3.7.9
@tuukkamustonen tuukkamustonen added the question Question or problem label Feb 9, 2021
@ycd
Copy link
Contributor

ycd commented Feb 9, 2021

This is a known Python issue rather than FastAPI.

See: python/cpython#9688

I'd love to open PR to fix this on our side. For example this implementation of ThreadPoolExecutor fixes your case.

from concurrent.futures import Future, ThreadPoolExecutor
import functools
import contextvars
from typing import Callable, Future

class ContextSafeThreadPoolExecutor(ThreadPoolExecutor):
    def submit(self, fn: Callable, *args, **kwargs) -> Future:
        ctx = contextvars.copy_context()  
        
        return super().submit(functools.partial(ctx.run, functools.partial(fn, *args, **kwargs)))

@tuukkamustonen
Copy link
Author

tuukkamustonen commented Feb 9, 2021

Hey, thanks for the insight!

As far as I can tell, the linked ticket and (the somewhat confusing) discussion is about loop.run_in_executor(). But I'm not sure we should use that inside a thread already spawned?

I mean, if I'm using a sync route, FastAPI/starlette already spawns a thread. Context var, set in async dependency, is visible within that thread (/sync1 endpoint in the example). Only when I invoke a thread within the thread, will I lose the context var.

But, maybe you just meant that if the linked fix ever gets merged, then we could use loop.run_in_executor() instead of run_in_threadpool()? Or at least that run_in_threadpool() would become simpler?

However, we would still need the sub-classed ContextSafeThreadPoolExecutor, when spawning threads within a sync route?

I'd love to open PR to fix this on our side

What's the API / utility function that you'd add?

@ycd
Copy link
Contributor

ycd commented Feb 10, 2021

As far as I can tell, the linked ticket and (the somewhat confusing) discussion is about loop.run_in_executor(). But I'm not sure we should use that inside a thread already spawned?

The problem here is, like you mentioned a thread is already spawned, you are creating another Thread inside it.

In here, when you spawn another thread inside a thread, you are doing something like this:
fsdafdsaf

As i shown in the diagram, the auth coroutine never changes your original context. asyncio is natively context-safe, but threads are not.

Since Python 3.9 we have asyncio.to_thread

async def to_thread(func, /, *args, **kwargs):
    """Asynchronously run function *func* in a separate thread.

    Any *args and **kwargs supplied for this function are directly passed
    to *func*. Also, the current :class:`contextvars.Context` is propogated,
    allowing context variables from the main thread to be accessed in the
    separate thread.

    Return a coroutine that can be awaited to get the eventual result of *func*.
    """
    loop = events.get_running_loop()
    ctx = contextvars.copy_context()
    func_call = functools.partial(ctx.run, func, *args, **kwargs)
    return await loop.run_in_executor(None, func_call)

Which is pretty similar to what Starlette doing right now:

async def run_in_threadpool(
    func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
) -> typing.Any:
    loop = asyncio.get_event_loop()
    if contextvars is not None:  # pragma: no cover
        # Ensure we run in the same context
        child = functools.partial(func, *args, **kwargs)
        context = contextvars.copy_context()  # <------------------ here
        func = context.run
        args = (child,)
    elif kwargs:  # pragma: no cover
        # loop.run_in_executor doesn't accept 'kwargs', so bind them in here
        func = functools.partial(func, **kwargs)
    return await loop.run_in_executor(None, func, *args)

So the Starlette done his part right so far. They can't do better than that (I believe)

the linked ticket and (the somewhat confusing) discussion is about loop.run_in_executor()

So coming back to this again, currently you can not retain the context that changes inside of run_in_executor.

Run in executor basically wraps the function and submit's it to the executor using executor.submit. Which also looks like this.

return futures.wrap_future(executor.submit(runner), loop=self)

But, maybe you just meant that if the linked fix ever gets merged, then we could use loop.run_in_executor() instead of run_in_threadpool()? Or at least that run_in_threadpool() would become simpler?

However, we would still need the sub-classed ContextSafeThreadPoolExecutor, when spawning threads within a sync route?

No, if this gets merged (I don't think this will happen in near future.), we won't need to subclass it anymore, in depth, the threads have their own stack, when you run a function with run_in_threadpool it goes from these steps.

  1. Your function goes into run_in_threadpool(func, *args, **kwargs) -> T:
  2. run_in_threadpool copies the context that is inside the global state.
    1-) Wraps the function to run inside the context. functools.partial(context.run, func, *args, **kwargs)
    2-) Then uses run_in_executor to run the function `await loop.run_in_executor(None, func).
  3. run_in_executor, uses executor.submit to submit to function to a thread.
    • Nothing fancy in executor.submit, has two locks, one on global state and another on shared state. It is just a basic threading.Lock. Locks the both and puts function to the queue.SimpleQueue, nothing fancy.

This is the most interesting part (run_in_executor doesn't copies the context on global state). Lots of discussions happened on this topic.

Let's start with: Bugs Python Issue 34014:
* Issue context: PEP 567 supports copying context into another threads, but for asyncio users each call run_in_executor requires explicit context propagation.

What's the API / utility function that you'd add?

I'm not sure about we can do something about this, all we are doing is copying the context that is at global state and sharing it between the states. But in your example, you are creating a new Subclass of Executor which is ThreadPoolExecutor.

Someone said in the earlier discussion at #953 "logically dependency should have the same context as route handler.". It is also not right.

This will work, or without making this coroutine will work (def sync1()).

@app.get('/sync1', dependencies=[Depends(auth)])
async def sync1():
    return {'hello':sync_task()}

But we can do nothing about the code below, this is completely up to Python interpreter.

    with ThreadPoolExecutor() as tp:
        return {'hello': tp.submit(sync_task).result()}

As far as i seen from the discussions this is also a wont-fix feature request and there are plently amount of reasons that makes this comprehensible.

Yury Selivanov said:

I considered enabling that, but in the end decided not to. The reason is that it's possible to use a ProcessPoolExecutor with run_in_execuror(), and context cars currently don't support pickling (and probably never will). We can't have a single api that sometimes works with contextvars and sometimes doesn't.

So this is a "won't fix".

@tuukkamustonen
Copy link
Author

tuukkamustonen commented Feb 10, 2021

Impressive diagram! :) Thanks for sketching that out.

I would have placed Event Loop directly under Uvicorn/FastAPI but you clearly know this better. Actually, pardon me if I got something wrong, as I don't know this stuff by heart:

In short, if I understand right, asyncio.to_thread() is nice, a more "standard" way to do run_in_threadpool() (though there are subtle differences). The loop.run_in_executor() fix (to copy context) is not really needed, as both run_in_threadpool() and asyncio.to_thread() already have the necessary boilerplate to run inside the context.

None of these actually retain the changes made to context vars within. Which is not the topic here, as the problem is seeing the context vars. Propagating back, is indeed #953.

However, these are solutions for spawning threads from event loop (async) context, where you await. My need is to really do some sync calls concurrently in sync parts of code. Which means that I cannot do await asyncio.to_thread(), await loop.run_in_executor() or await run_in_threadpool(), so I guess that leaves me your ContextSafeThreadPoolExecutor. Right?

@ycd
Copy link
Contributor

ycd commented Feb 13, 2021

I would have placed Event Loop directly under Uvicorn/FastAPI but you clearly know this better. Actually, pardon me if I got something wrong, as I don't know this stuff by heart

That's right, that part is not clear enough, but as you mentioned it uses asyncio.get_event_loop to get current thread's event loop. I tried to seperate the function and coroutine there, ups.

In short, if I understand right, asyncio.to_thread() is nice, a more "standard" way to do run_in_threadpool() (though there are subtle differences)

It is actually the same thing, but unlike asyncio.to_thread, starlette's run_in_threadpool prechecks the contextvars since it is added in 3.7.

try:
    import contextvars  # Python 3.7+ only or via contextvars backport.
except ImportError:  # pragma: no cover
    contextvars = None  # type: ignore

The loop.run_in_executor() fix (to copy context) is not really needed, as both run_in_threadpool() and asyncio.to_thread() already have the necessary boilerplate to run inside the context.

None of these actually retain the changes made to context vars within. Which is not the topic here, as the problem is seeing the context vars. Propagating back, is indeed #953.

Exactly, in your case if anything is going to change, it should be in standard library.

However, these are solutions for spawning threads from event loop (async) context, where you await. My need is to really do some sync calls concurrently in sync parts of code. Which means that I cannot do await asyncio.to_thread(), await loop.run_in_executor() or await run_in_threadpool(), so I guess that leaves me your ContextSafeThreadPoolExecutor. Right?

Yup, i'd love to see if any other solutions are available because i had a similar situation before and this is what we came up with.

Which is not the topic here, as the problem is seeing the context vars. Propagating back, is indeed #953.

Yup, Django's _restore_context looks promising. Though there are plenty amount of abstractions in Django's codebase that i really don't like to read 😆

That also being said, keep this one open, since the context and use case is different from #953

@dimaqq
Copy link
Contributor

dimaqq commented Dec 8, 2021

Additionally, it seems that yield dependency calls the tail in a different context:

async def get_db():
    token = DB.set(42) 
    try:
        yield
    finally:
        DB.reset(token)

Sprinkling in logging.warn("xx %r", id(asyncio.current_task())) shows that:

  • task1 is used to set up the db
  • task1 is used to process the request
  • task2 is used to cleanup the db 🙈

task1 details:

<Task pending
  name='starlette.middleware.base.BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.coro'
  coro=<BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.coro()
    running at /.../starlette/middleware/base.py:30>
  cb=[TaskGroup._spawn.<locals>.task_done() at /.../anyio/_backends/_asyncio.py:629]>

task2 details:

<Task pending
  name='Task-23'
  coro=<RequestResponseCycle.run_asgi()
    running at /.../uvicorn/protocols/http/h11_impl.py:373>
  cb=[set.discard()]>

@Kludex
Copy link
Sponsor Collaborator

Kludex commented Dec 8, 2021

We now depend on anyio, and the last release (3.4.0) should have fixed this (even if the original issue was without anyio).

https://anyio.readthedocs.io/en/stable/versionhistory.html

@tiangolo
Copy link
Owner

Thanks for the discussion everyone!

As @Kludex says, indeed, the original issue should be solved by the new AnyIO version. It is discussed here: agronholm/anyio#363 , the fix was here: agronholm/anyio#390

About @dimaqq's comment, that's another independent issue that I've been working on for several days. 😅 It is fixed by the just released FastAPI 0.74.0. 🎉

More details in the PR: #4575

@tuukkamustonen
Copy link
Author

Quickly glancing, would it be so that the AnyIO improvement also fixes #953?

@github-actions github-actions bot removed the answered label Feb 21, 2022
@godiedelrio
Copy link

Hi, I reran the test @tuukkamustonen sent in the OP with a minimal modification to run it using pytest and it still fails.
I'm using anyio==3.5.0, fastapi==0.75.2 and Python 3.9.11.

from concurrent.futures.thread import ThreadPoolExecutor
from contextvars import ContextVar

from fastapi import FastAPI
from fastapi.params import Security
from fastapi.testclient import TestClient

ctx = ContextVar('', default='N/A')
app = FastAPI()


async def auth():
    ctx.set('test')


def sync_task():
    return ctx.get()


@app.get('/sync1', dependencies=[Security(auth)])
def sync1():
    with ThreadPoolExecutor() as tp:
        result = tp.submit(sync_task).result()
    return {'hello': result}


client = TestClient(app)


def test_context_var_propagation():
    response = client.get("/sync1")
    assert response.status_code == 200
    assert response.json() == {"hello": "test"}
/Users/diego/.pyenv/versions/python-3.9/bin/python "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py" --path "/Users/diego/Library/Application Support/JetBrains/PyCharmCE2021.3/scratches/scratch_2.py"
Testing started at 17:48 ...
Launching pytest with arguments /Users/diego/Library/Application Support/JetBrains/PyCharmCE2021.3/scratches/scratch_2.py --no-header --no-summary -q in /Users/diego/Library/Application Support/JetBrains/PyCharmCE2021.3/scratches

============================= test session starts ==============================
collecting ... collected 1 item

scratch_2.py::test_context_var_propagation FAILED                        [100%]
scratch_2.py:29 (test_context_var_propagation)
{'hello': 'N/A'} != {'hello': 'test'}

Expected :{'hello': 'test'}
Actual   :{'hello': 'N/A'}

I was able to make the test pass by using a ThreadPoolExecutor's initializer as a workaround:

from concurrent.futures.thread import ThreadPoolExecutor
from contextvars import ContextVar

from fastapi import FastAPI
from fastapi.params import Security
from fastapi.testclient import TestClient

ctx = ContextVar('', default='N/A')
app = FastAPI()


async def auth():
    ctx.set('test')


def sync_task():
    return ctx.get()


@app.get('/sync1', dependencies=[Security(auth)])
def sync1():
    current_ctx = ctx.get()
    with ThreadPoolExecutor(initializer=lambda: ctx.set(current_ctx)) as tp:
        result = tp.submit(sync_task).result()
    return {'hello': result}


client = TestClient(app)


def test_context_var_propagation():
    response = client.get("/sync1")
    assert response.status_code == 200
    assert response.json() == {"hello": "test"}
/Users/diego/.pyenv/versions/python-3.9/bin/python "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py" --path "/Users/diego/Library/Application Support/JetBrains/PyCharmCE2021.3/scratches/scratch_2.py"
Testing started at 17:52 ...
Launching pytest with arguments /Users/diego/Library/Application Support/JetBrains/PyCharmCE2021.3/scratches/scratch_2.py --no-header --no-summary -q in /Users/diego/Library/Application Support/JetBrains/PyCharmCE2021.3/scratches

============================= test session starts ==============================
collecting ... collected 1 item

scratch_2.py::test_context_var_propagation PASSED                        [100%]

============================== 1 passed in 0.25s ===============================

Process finished with exit code 0

EltonChou added a commit to FigureHook/hook_api that referenced this issue Sep 9, 2022
Repository owner locked and limited conversation to collaborators Feb 28, 2023
@tiangolo tiangolo converted this issue into discussion #9006 Feb 28, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
question Question or problem question-migrate
Projects
None yet
Development

No branches or pull requests

6 participants