Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow background tasks to run with custom BaseHTTPMiddleware's #1441

Closed
wants to merge 9 commits into from

Conversation

Kludex
Copy link
Sponsor Member

@Kludex Kludex commented Jan 28, 2022

This is an old bug. There are some issues that talks about it.

I was debugging #1438, so I'm going to start with that one. Let me show you an example of this issue, retrieved from the mentioned issue:

import traceback

import anyio
from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
from starlette.routing import Route


async def passthrough(request, call_next):
    return await call_next(request)


async def _sleep(identifier, delay):
    print(identifier, "started")
    try:
        await anyio.sleep(delay)
        print(identifier, "completed")
    except BaseException:
        print(identifier, "error")
        traceback.print_exc()
        raise


async def response_with_sleeps(request):
    background_tasks = BackgroundTasks()
    background_tasks.add_task(_sleep, "background task 1", 2)
    return Response(background=background_tasks)


app = Starlette(
    middleware=[
        Middleware(BaseHTTPMiddleware, dispatch=passthrough),
    ],
    routes=[
        Route("/", response_with_sleeps),
    ],
)

Running the above code, you'll be able to see:

Traceback (most recent call last):
  File "/home/marcelo/Development/./main.py", line 18, in _sleep
    await anyio.sleep(delay)
  File "/home/marcelo/anaconda3/envs/classify/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 69, in sleep
    return await get_asynclib().sleep(delay)
  File "/home/marcelo/anaconda3/envs/starlette3.8/lib/python3.8/asyncio/tasks.py", line 659, in sleep
    return await future
asyncio.exceptions.CancelledError

Looking at that, it led me to the culprit:

task_group.cancel_scope.cancel()

I've removed that cancellation, and then I understood why it was there. We have a test that handles this case:

def test_fully_evaluated_response(test_client_factory):
# Test for https://github.com/encode/starlette/issues/1022
class CustomMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
await call_next(request)
return PlainTextResponse("Custom")
app = Starlette()
app.add_middleware(CustomMiddleware)
client = test_client_factory(app)
response = client.get("/does_not_exist")
assert response.text == "Custom"

The test is basically ignoring the StreamingResponse object returned by call_next. The problem here is that our application uses the send_stream (as you can see below), but there's no one to receive that stream, so it blocks. The idea of the above cancellation was to prevent this block.

send_stream, recv_stream = anyio.create_memory_object_stream()
async def coro() -> None:
nonlocal app_exc
async with send_stream:
try:
await self.app(scope, request.receive, send_stream.send)
except Exception as exc:
app_exc = exc
task_group.start_soon(coro)

Ok. Now we solved an issue, but it led to another. What I thought:

  • As we're discarding a Response object, I'm going to consume the stream from that object.

And that is what you see on this PR.

Some references:

A question so I can test this:

  • How do I simulate the client disconnecting itself after receiving the response using the TestClient?

Comment on lines 68 to 71
if call_next_response and response is not call_next_response:
async with recv_stream:
async for _ in recv_stream:
...
Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just discarding the message coro has sent here.

@Kludex Kludex marked this pull request as draft January 28, 2022 14:46
Comment on lines +69 to +71
async with recv_stream:
async for _ in recv_stream:
... # pragma: no cover
Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can create a function that performs this logic, if wanted.

@Kludex Kludex marked this pull request as ready for review January 28, 2022 16:45
Comment on lines +68 to 72
if call_next_response and response is not call_next_response:
async with recv_stream:
async for _ in recv_stream:
... # pragma: no cover
await response(scope, receive, send)
Copy link
Member

@jhominal jhominal Jan 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we will wait for app to make all of its calls to send_stream.send before actually processing the response returned by dispatch_func. If that is the case, and given that all the messages from app are being discarded, I think it would be preferable, either to call await response(scope, receive, send) before draining recv_stream, or to put draining of recv_stream in another task of task_group.

Of course, I do not know the relevant technologies (ASGI/the server implementations/Starlette) as well as you do, so there could very well be a reason that either my analysis is wrong, or that my suggestions are unworkable.

Suggested change
if call_next_response and response is not call_next_response:
async with recv_stream:
async for _ in recv_stream:
... # pragma: no cover
await response(scope, receive, send)
if call_next_response and response is not call_next_response:
async def drain_stream():
async with recv_stream:
async for _ in recv_stream:
... # pragma: no cover
task_group.start_soon(drain_stream)
await response(scope, receive, send)

@adriangb adriangb added the bug Something isn't working label Feb 2, 2022
@Kludex
Copy link
Sponsor Member Author

Kludex commented May 19, 2022

Unfortunately, it looks like this PR introduces a regression.

I'll need to check what. For now, I'm going to close it while I investigate...

@Kludex
Copy link
Sponsor Member Author

Kludex commented Jun 21, 2022

Unfortunately, it looks like this PR introduces a regression.

I'll need to check what. For now, I'm going to close it while I investigate...

I don't remember what was the supposed regression... 😞

@Kludex Kludex reopened this Jun 21, 2022
@jhominal
Copy link
Member

I suspect that the change, as written, would fail in the case where someone returned a Response object that wraps the streaming content from call_next_response - e.g. if I wrap the body_iterator for debug purposes:

async def debug_streaming_middleware(request, call_next):
    call_next_response = await call_next(request)
    return StreamingResponse(
         status_code=call_next_response.status_code,
         headers=call_next_response.headers,
         content=log_content_length(call_next_response.body_iterator),
    )

async def log_content_length(bytes_iterator: AsyncIterator[bytes]):
     total_length = 0
     async for chunk in bytes_iterator:
          total_length += len(chunk)
          yield chunk
     logger.info(f"Body length: {total_length}")

In such a case, call_next_response.body_iterator would be consumed both by the new response, and by drain_stream.

I think that, in order to fix issues related to background tasks (#919 and #1438), it would be better to think about shielding background tasks from cancellation in order to ensure that they are run even if a client closes the connection.

@adriangb
Copy link
Member

adriangb commented Jun 21, 2022

it would be better to think about shielding background tasks from cancellation in order to ensure that they are run even if a client closes the connection.

This has been proposed in #1654. It's not great to try to fix BaseHTTPMiddleware by adding code somewhere else, and shielding from any cancellation seems like it could open up the door for other issues down the road.

I think that a similar but better solution would be to run background tasks outside of the request/response cycle, e.g. by creating an anyio.TaskGroup on application startup, passing that task group down into the request/response and scheduling the BackgroundTask's in that TaskGroup. Of course this is something that users could implement themselves, maybe a writeup of how to do it somewhere would help?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants