Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom CapacityLimiter #1724

Open
Kludex opened this issue Jul 2, 2022 · 19 comments
Open

Custom CapacityLimiter #1724

Kludex opened this issue Jul 2, 2022 · 19 comments
Labels
documentation Project documentation good first issue Good for beginners
Milestone

Comments

@Kludex
Copy link
Sponsor Member

Kludex commented Jul 2, 2022

Starlette is a web framework that supports both async and sync functions. The sync part code runs in a threadpool.

The threadpool contains a maximum number of threads: 40.

https://github.com/agronholm/anyio/blob/4f3a8056a8b14dbe43c95039a0d731ede1083cb7/src/anyio/_backends/_asyncio.py#L2071-L2077

The concern on this issue is that the threads are shared between the ones that handle the endpoint, and the background tasks.

Assume we have a simple application:

from time import sleep

from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

num = 0


def count_sleep():
    global num
    num += 1
    print(f"Running number {num}.")
    sleep(10)


def endpoint(request: Request) -> JSONResponse:
    tasks = BackgroundTasks()
    tasks.add_task(count_sleep)
    return JSONResponse({"message": "Hello, world!"}, background=tasks)


app = Starlette(routes=[Route("/", endpoint)])

Running it with uvicorn:

uvicorn main:app

And performing some requests (using httpie):

for run in {1..100}; do
  http :8000 &
done

We can observe that:

  1. We can see Running number 40..
  2. Wait 10 seconds...
  3. We can see Running number 80..
  4. Wait 10 seconds...
  5. We can see Running number 100..

I'm just bringing this up, so people are aware.

@agronholm proposed on Gitter that we create a separated CapacityLimiter dedicated only for handling the application (i.e. request_response()). This means that n (depending on the number of tokens we choose) number of threads would be dedicated for request_response().

Important

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.
Fund with Polar
@jhominal
Copy link
Member

jhominal commented Jul 2, 2022

I think that having a separate capacity limiter makes a lot of sense - if only so that starlette can handle without having its capacity being swallowed by tasks from other applications.

Should we also have a thought about whether the usage of iterate_in_threadpool by StreamingResponse should be covered by the same or another capacity limiter? And the background tasks too?
Otherwise, it would be possible to reproduce the same issue if synchronous iterators compete with background tasks (I also note that file IO from FileResponse all happens on the threadpool with the default limiter, too.)

@adriangb
Copy link
Member

adriangb commented Jul 2, 2022

UploadFile also uses threads:

await run_in_threadpool(self.file.write, data)

I think it may be hard to partition out all uses of thread pools into individual CapacityLimiter's.

Aside form making individual CapacityLimiters for each "use case", it would also be nice to make these tunable by users. Maybe we can do both things: Route accepts a CapacityLimiter, Request.form() or UploadFile.write accepts a CapacityLimiter, etc. and users can decide if they should all be the same instance of different ones. Then we also don't have to decide which use cases get grouped together, we put that in the user's hands.

So users can choose:

def sync_route(req):
    return Response()

# individual limiters
routes = [
    Route("/foo", sync_route, limiter=CapacityLimiter(1000),
    Route("/bar", sync_route, limiter=CapacityLimiter(5),
]

# shared limiter
limiter = CapacityLimiter(100)


async def async_route(req):
    files = await req.form(limiter=CapacityLimiter(10))  # used for writing to the file
    await files["file"].read(limiter=limiter)  # override the above limiter for reading
    return Response()

routes = [
    Route("/foo", async_route
    Route("/bar", endpoint, limiter= limiter,
]

@Kludex
Copy link
Sponsor Member Author

Kludex commented Jul 2, 2022

I don't think the user should have control over this.

I think Starlette should be able to figure it out internally the optimal way.

@adriangb
Copy link
Member

adriangb commented Jul 2, 2022

I think it will be hard to find an "optimal way", this seems super workload / business logic dependent.

@jhominal
Copy link
Member

jhominal commented Jul 2, 2022

Pushing such a depth of choice to the users of starlette is not a real solution either - it would make the API a lot more fiddly, for what I suspect are a minority of users;

Here is my attempt at categorizing all the uses of anyio.to_thread.run_sync and starlette.concurrency.{run,iterate}_in_threadpool:

  • WSGI Middleware - to be deprecated;
  • FileResponse and StaticFiles middleware - to me, these two usages are very linked. We should also note that anyio's AsyncFile currently only supports the default limiter for file operations.
  • UploadFile - automatically used to handle file I/O when receiving a multipart upload;
  • synchronous BackgroundTasks;
  • synchronous StreamingResponse iterators;
  • synchronous Callable[[Request],Response] endpoints (last but not least);

Out of these, I would note that only the cases in bold run user-defined code.

I think that, as a first step, it does not seem necessary to define a custom capacity limiter for the features that do not run user-defined code. Why? The starlette built-in usages of synchronous calls are always made as a "dip", which should multiplex as well as humanly possible with any other tasks that usually use anyio.to_thread.run_sync.

As a second step, I would suggest choosing a single number (e.g. 40), and defining a separate capacity limiter for each of the lines in bold. Why?

  • If BackgroundTasks share a capacity limiter with anyone, they risk throttling the servicing of requests, which is the last thing we want to happen;
  • If synchronous StreamingResponse iterators share a capacity limiter with synchronous Callable[[Request],Response] endpoints, I have a real fear that the endpoints (which will take more individual time per call than the next calls to the iterators) would steal available threads from the synchronous iterators;
  • Why the same number? It makes it simpler to configure, and it can be explained by saying "at each of these three phases of request processing, you can have a maximum of XX concurrent synchronous requests"

@adriangb
Copy link
Member

adriangb commented Jul 2, 2022

Pushing such a depth of choice to the users of starlette is not a real solution either - it would make the API a lot more fiddly, for what I suspect are a minority of users

The fiddly API would only apply to the small minority of users that want to use the feature, the vast majority of users would only see an extra keyword only parameter which in my opinion is not a big issue. But that's the last I'll say on that, I don't mean to force a solution.

I do think you make an excellent point w.r.t BackgroundTasks: the last thing we want is BackgroundTasks blocking requests. So yes I think a good first step would be to give BackgroundTasks their own capacity limiter. I don't think we can use a module level CapacityLimiter, and I don't think we want one per-request cycle (otherwise threads would still grow unbounded under heavy load), so we'd somehow have to create it in the lifespan event (the only thing that happens once per application) and then share it with requests, right?

@Kludex Kludex added the refactor Refactor code label Jul 4, 2022
@adriangb
Copy link
Member

adriangb commented Jul 7, 2022

Here's an example of why I think we should let users set this value: harsh8398/fastapi-dependency-issue#2. The specific issue is with FastAPI's dependency injection system, but the same thing would apply to slow sync endpoints.

I don't use sync dependencies like this but if I were tasked with fixing this in a real app, I would just pump up the capacity limit and hardware as needed until it's no longer the bottleneck. The easiest way to do that would be to let users set that value by passing a CapacityLimiter into the constructor for Route (and for FastAPI, into Depends or something...).

@aminalaee
Copy link
Member

@Kludex Was this discussed again in gitter? Any links to it?
I remember a few months ago this was mentioned and suggested to be left outside of Starlette.

@Kludex
Copy link
Sponsor Member Author

Kludex commented Jul 8, 2022

I think this was mentioned on FastAPI issues. I don't recall an old discussion on gitter about this.

@aminalaee
Copy link
Member

I thin it was mentioned here and here.

@Kludex
Copy link
Sponsor Member Author

Kludex commented Jul 8, 2022

I'm going to write it here how to change the default CapacityLimiter, as it may be relevant...

Right now, you can modify the number of token_tokens on the default CapacityLimiter. Let's use the same application as described above:

import anyio
from time import sleep

from fastapi import FastAPI
from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

num = 0


def count_sleep():
    global num
    num += 1
    print(f"Running number {num}.")
    sleep(10)


def endpoint(request: Request) -> JSONResponse:
    tasks = BackgroundTasks()
    tasks.add_task(count_sleep)
    return JSONResponse({"message": "Hello, world!"}, background=tasks)

# THIS IS THE ADDITION
async def startup():
    limiter = anyio.to_thread.current_default_thread_limiter()
    limiter.total_tokens = 100


app = Starlette(routes=[Route("/", endpoint)], on_startup=[startup])

You can perform the same query as mentioned:

for run in {1..100}; do
  http :8000 &
done

This time, you are NOT going to have the same behavior as mentioned on:

  • We can see Running number 40..
  • Wait 10 seconds...
  • We can see Running number 80..
  • Wait 10 seconds...
  • We can see Running number 100..

The behavior now is:

  • We can see Running number 100..

No waiting time.

@adriangb
Copy link
Member

adriangb commented Jul 8, 2022

That is indeed helpful information! I imagine for a lot of users currently experiencing issues, that's a viable short term solution 🎉!

That said, I think there are still conversation to be had because there is very real situations where you might want one thing (endpoint, FastAPI dependency, background task, etc.) to have a limited number of threads to avoid exhausting memory and another thing to have a lot more threads

@sandys
Copy link

sandys commented Jul 8, 2022

Here's an example of why I think we should let users set this value: harsh8398/fastapi-dependency-issue#2. The specific issue is with FastAPI's dependency injection system, but the same thing would apply to slow sync endpoints.

I don't use sync dependencies like this but if I were tasked with fixing this in a real app, I would just pump up the capacity limit and hardware as needed until it's no longer the bottleneck. The easiest way to do that would be to let users set that value by passing a CapacityLimiter into the constructor for Route (and for FastAPI, into Depends or something...).

this is actually how the vast majority of flask applications are tuned. with the multiple different kind of workers and thread settings.

Gunicorn allows for each of the workers to have multiple threads.

gunicorn --workers=5 --threads=2 --worker-class=gthread main:app
The suggested maximum concurrent requests when using workers and threads is (2*CPU)+1

this is very application dependent and something that we have tuned over time. Having control on this is very important. its ok if you are not picking it up from the gunicorn args...but this control is definitely important.

@omerXfaruq
Copy link

We had a similar situation in Gradio, and resolved it via this kind of approach. Wanted to share to support the issue.

@Kludex
Copy link
Sponsor Member Author

Kludex commented Feb 4, 2023

First step here would be to add some documentation.

@Kludex Kludex added this to the Version 1.x milestone Feb 4, 2023
@Kludex Kludex added the good first issue Good for beginners label Jun 20, 2023
@Kludex
Copy link
Sponsor Member Author

Kludex commented Jun 20, 2023

PR welcome to document this behavior, and how to overcome it.

(I don't know where is most suitable...)

EDIT:

I don't know where is most suitable...

Either a new page, or suggest something.

@Kludex Kludex added documentation Project documentation and removed refactor Refactor code labels Sep 13, 2023
@Kludex
Copy link
Sponsor Member Author

Kludex commented Sep 13, 2023

Let's only document this.

I've already talked to @agronholm some weeks ago about this, and he was cool about just documenting it.

@aminalaee
Copy link
Member

aminalaee commented Sep 13, 2023

Any ideas where's the best place to document this?
I was thinking add a small snippet in https://www.starlette.io/background/ to explain the reason and how to set the total_tokens without the Starlette lifespan and tasks:

import anyio
from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100

def do_something():
    ...

def endpoint(request: Request) -> JSONResponse:
    tasks = BackgroundTasks()
    tasks.add_task(do_something)
    return JSONResponse({"message": "Hello, world!"}, background=tasks)

app = Starlette(routes=[Route("/", endpoint)])

@Kludex
Copy link
Sponsor Member Author

Kludex commented Sep 19, 2023

Any ideas where's the best place to document this?

No.

I think you can suggest with a PR, and we can see if it fits.

@Kludex Kludex modified the milestones: Version 1.x, Version 1.0 Dec 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Project documentation good first issue Good for beginners
Projects
None yet
Development

No branches or pull requests

6 participants