Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to test streaming async responses with httpx AsyncClient #2006

Closed
9 tasks done
seweissman opened this issue Sep 3, 2020 · 3 comments
Closed
9 tasks done

How to test streaming async responses with httpx AsyncClient #2006

seweissman opened this issue Sep 3, 2020 · 3 comments
Labels
question Question or problem question-migrate

Comments

@seweissman
Copy link

First check

  • I added a very descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the FastAPI documentation, with the integrated search.
  • I already searched in Google "How to X in FastAPI" and didn't find any information.
  • I already read and followed all the tutorial in the docs and didn't find an answer.
  • I already checked if it is not related to FastAPI but to Pydantic.
  • I already checked if it is not related to FastAPI but to Swagger UI.
  • I already checked if it is not related to FastAPI but to ReDoc.
  • After submitting this, I commit to one of:
    • Read open issues with questions until I find 2 issues where I can help someone and add a comment to help there.
    • I already hit the "watch" button in this repository to receive notifications and I commit to help at least 2 people that ask questions in the future.
    • Implement a Pull Request for a confirmed bug.

Question

I have an async endpoint that streams an arbitrarily large amount of output and I want to write a test to check a portion of its streaming response. I have tried to adapt the FastAPI async tests example to use the httpx.AsyncClient.stream method, but it appears that the call to stream is happening synchronously in the test. How can I do this correctly?

Example

The example app below defines two endpoints:

  • stream_yes_infinite - Streams an infinite stream of "y"s.
  • stream_yes_truncate - Streams a truncated stream of "y"s, length of output controlled by request parameter nlines.

There are two tests:

  • test_stream_yes_infinite - Calls stream_yes_infinite with AsyncClient.stream. Loops over the output with aiter_lines asserting that each line is "y" until we have read 1000 lines, then break out of the loop.
  • test_stream_yes_truncate - Calls stream_yes_truncate with AsyncClient.stream. Loops over the all of the output with aiter_lines asserting that each line is "n". This test should fail right away.
# stream_yes.py
import pytest
import httpx
import asyncio

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import itertools

app = FastAPI()

@app.get("/stream_yes_infinite")
async def get_stream_yes_infinite():
    """
    Returns an infinite stream of "y" followed by newline
    """
    y_gen = itertools.repeat("y\n")
    return StreamingResponse(y_gen)

@app.get("/stream_yes_truncate")
async def get_stream_yes_truncate(nlines: int):
    """
    Returns a truncated stream of "y" followed by newline.
    """
    y_gen = itertools.repeat("y\n", nlines)
    return StreamingResponse(y_gen)

# This test should pass but it hangs forever and prints no output
@pytest.mark.asyncio
async def test_stream_yes_infinite():
    """Get the first 1000 lines from the infinite stream and test that the output is always 'y' """
    max_lines = 1000
    i = 0
    async with httpx.AsyncClient(app=app, base_url="http://test") as aclient:
        async with aclient.stream("GET", "/stream_yes_infinite") as response:
            async for line in response.aiter_lines():
                if i > max_lines:
                    break
                assert line.strip() == "y"
                print(line.strip())
                i += 1

# This test fails as expected, but time to failure is dependent on the value of the nlines parameter.
@pytest.mark.asyncio
async def test_stream_yes_truncate():
    """Get the output of the truncated stream and test that the output is always 'n' """
    async with httpx.AsyncClient(app=app, base_url="http://test") as aclient:
        async with aclient.stream("GET", "/stream_yes_truncate", params={"nlines": 1000}) as response:
            async for line in response.aiter_lines():
                # The test should fail because the output of stream_yes_truncate is "y"
                assert line.strip() == "n"
                print(line.strip())

if __name__ == "__main__":

    # Run: uvicorn stream_yes:app --reload

    # Check that we can stream output from the app running in uvicorn
    async def fetch_stream():
        max_lines = 1000
        i = 0
        async with httpx.AsyncClient() as aclient:
            async with aclient.stream("GET", "http://localhost:8000/stream_yes") as r:
                async for line in r.aiter_lines():
                    if i > max_lines:
                        break
                    print(line.strip())
                    i += 1

    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch_stream())

Description

Things that work as expected:

  • Run uvicorn stream_yes:app --reload
    • Open the browser and call the endpoint /stream_yes_infinite.
      • The browser successfully streams the output of the infinite stream.
    • Run python stream_yes.py.
      • The httpx client streams the response from the app when run in uvicorn (this works with the synchronous client as well).

Things that don't work as expected:

  • Run pytest -s stream_yes.py -k test_stream_yes_infinite.
    • I expected: The test to print a bunch of "y"s and pass.
    • But what happens is: The test prints no output and hangs forever.
  • Run pytest -s stream_yes.py -k test_stream_yes_truncate.
    • I expected: The test to fail right away.
    • But what happens is: The test takes more and more time to fail as the nlines parameter is increased.

Environment

  • OS: macOS
  • FastAPI Version: 0.61.0
  • Python version: 3.8.0
  • pytest version: 6.0.1
  • pytest-asyncio version: 0.14.0
  • httpx version: 0.14.3
@seweissman seweissman added the question Question or problem label Sep 3, 2020
@seweissman
Copy link
Author

seweissman commented Sep 9, 2020

Update: Following some hints in this issue I was able to get this to work with https://pypi.org/project/async-asgi-testclient/ and a conftest.py that sets an event loop test fixture. Still unable to get the stream to work with httpx.AsyncClient.

Updated example:

stream_yes.py

import pytest
from async_asgi_testclient import TestClient

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import itertools

app = FastAPI()

@app.get("/stream_yes_infinite")
async def get_stream_yes_infinite():
    """
    Returns an infinite stream of "y\n"
    """
    y_gen = itertools.repeat("y\n")
    return StreamingResponse(y_gen)

@app.get("/stream_yes_truncate")
async def get_stream_yes_truncate(nlines: int):
    """
    Returns a truncated stream of "y\n"

    """
    y_gen = itertools.repeat("y\n", nlines)
    return StreamingResponse(y_gen)

@pytest.fixture(scope="module")
def event_loop():
    loop = asyncio.get_event_loop()

    yield loop
    # If we close the loop here or use the default event_loop fixture, which also gets closed,
    # we get a bunch of errors about pending Tasks, possibly due to how the test client cleans up tasks.
    # loop.close()

@pytest.mark.asyncio
async def test_stream_yes_infinite():
    """Get the first 1000 lines from the infinite stream and test that the output is always 'y' """
    max_lines = 1000
    i = 0
    async with TestClient(app) as client:
        response = await client.get("/stream_yes_infinite", stream=True)
        async for line in response.iter_content(2):
            if i > max_lines:
                break
            line = line.decode("utf-8").strip()
            assert line == "y"
            i += 1

@pytest.mark.asyncio
async def test_stream_yes_truncate():
    """Get the output of the truncated stream and test that the output is always 'y' """
    async with TestClient(app) as client:
        response = await client.get("/stream_yes_truncate", query_string={"nlines": 100000}, stream=True)
        async for line in response.iter_content(2):
            line = line.decode("utf-8").strip()
            assert line == "n"

Running pytest -s experiments/stream_yes.py now passes the first test and fails the second test immediately, as expected.

@falkben
Copy link
Sponsor Contributor

falkben commented Dec 8, 2020

somewhat related (not httpx but if this issue is resolved it may provide another solution): encode/starlette#1102

@havardthom
Copy link

This issue should probably be opened/moved to httpx repo since it seems to be caused by httpx.AsyncClient

Repository owner locked and limited conversation to collaborators Feb 28, 2023
@tiangolo tiangolo converted this issue into discussion #9126 Feb 28, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
question Question or problem question-migrate
Projects
None yet
Development

No branches or pull requests

4 participants