Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE are batched #484

Open
sola-ed opened this issue Apr 5, 2024 · 6 comments
Open

SSE are batched #484

sola-ed opened this issue Apr 5, 2024 · 6 comments

Comments

@sola-ed
Copy link

sola-ed commented Apr 5, 2024

Hi guys,

I've been trying to test aiohttp-sse with the EventSource component in Dash. In a previous issue raised here, I failed to make it work since I forgot to include CORS handling. That is now solved, but I notice that the events arrive buffered to Dash. You can reproduce this with the following code.

# server.py

from aiohttp import web
import json
import asyncio
from datetime import datetime
from aiohttp_sse import sse_response
import aiohttp_cors

app = web.Application()
routes = web.RouteTableDef()

cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_methods="*",
        allow_headers="*",
        max_age=3600
    )
})

@routes.get("/hello")
async def hello(request: web.Request) -> web.StreamResponse:
    async with sse_response(request) as resp: 
        while resp.is_connected(): 
            services = json.dumps({
                "time": f"Server Time : {datetime.now()}"
            })
            await resp.send(services)
            await asyncio.sleep(1)
    return resp

app.router.add_routes(routes)
for route in app.router.routes():
    cors.add(route)

if __name__ == "__main__":
    web.run_app(app, host='127.0.0.1', port=5000)

Now the Dash client:

# client.py

from dash_extensions import EventSource
from dash_extensions.enrich import html, dcc, Output, Input, DashProxy
from dash.exceptions import PreventUpdate
import json

# Create small example app.
app = DashProxy(__name__)
app.layout = html.Div([
    EventSource(id="sse", url="http://127.0.0.1:5000/hello"),
    html.Span('SSE'),
    html.Div(id="display")
])

@app.callback(
    Output("display", "children"), 
    Input("sse", "message"),
)
def display(msg):
    if msg is not None:
        return msg
    else:
        raise PreventUpdate()
    
if __name__ == "__main__":
    app.run_server(debug=True)

When I run these scripts, I get chucks like this in the msg variable in Dash:

'{"time": "Server Time : 2024-04-05 09:44:52.022164"}\n{"time": "Server Time : 2024-04-05 09:44:53.023039"}\n{"time": "Server Time : 2024-04-05 09:44:54.023770"}\n{"time": "Server Time : 2024-04-05 09:44:55.025389"}\n{"time": "Server Time : 2024-04-05 09:44:56.027151"}\n{"time": "Server Time : 2024-04-05 09:44:57.029044"}\n{"time": "Server Time : 2024-04-05 09:44:58.030822"}\n{"time": "Server Time : 2024-04-05 09:44:59.032468"}\n{"time": "Server Time : 2024-04-05 09:45:00.033961"}\n{"time": "Server Time : 2024-04-05 09:45:01.035243"}\n{"time": "Server Time : 2024-04-05 09:45:02.036953"}\n{"time": "Server Time : 2024-04-05 09:45:03.038641"}\n{"time": "Server Time : 2024-04-05 09:45:04.040436"}\n{"time": "Server Time : 2024-04-05 09:45:05.041850"}\n{"time": "Server Time : 2024-04-05 09:45:06.043279"}'

Running the same thing in Starlette

import asyncio
import json
from datetime import datetime
import uvicorn
from sse_starlette import EventSourceResponse
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware

middleware = Middleware(CORSMiddleware, allow_origins=["*"], allow_headers=["*"])
server = Starlette(middleware=[middleware])

async def random_data():
    while True:
        await asyncio.sleep(1)
        yield json.dumps({
            "time": f"Server Time : {datetime.now()}"
        })

@server.route("/hello")
async def sse(request):
    generator = random_data()
    return EventSourceResponse(generator)

if __name__ == "__main__":
    uvicorn.run(server, port=5000)

gives atomic answers in the msg variable.

{"time": "Server Time : 2024-04-05 11:04:50.266653"}

Note that, when I visit http://127.0.0.1:5000/hello after running the aiohttp-sse example, the events are received atomically, as expected.

So my question is, is there something in the transmission process that Starlette is doing and aiohttp-sse is omitting?

Please note that I would like to avoid the question of where is the bug (Dash EventSource or aiohttp-sse), since this easily leads to a kind of chicken-or-egg dilemma ... since their stuff works using the Starlette example. I'm just raising this here to see if you have any hint regarding this problem. I'm still resisting to switch to Starlette (or FastAPI) just because of this issue. Thanks in advance!

@Dreamsorcerer
Copy link
Member

So my question is, is there something in the transmission process that Starlette is doing and aiohttp-sse is omitting?

In your last issue, it seemed like starlette is sending a badly formatted SSE format and that Dash doesn't actually support SSE. Has something changed or have we found evidence that is not correct?

Without investigating, it seems likely to me that it is the exact same issue that it's expecting the wrong number of line breaks (i.e. it's not buffering, it's just assuming that they are all part of one message).

since this easily leads to a kind of chicken-or-egg dilemma

If it's correct that Dash doesn't support any valid SSE stream and only works with the broken streams produced by Starlette (as suggested by the posts linked to in your last issue), then I don't see any way that Dash can claim that their implementation is correct when it doesn't work with any other library. This needs to be fixed there.

If we were to match Starlette then we'd potentially break every conforming SSE client that interacts with the library, as every message would now suddenly have a newline at the start.

@sola-ed
Copy link
Author

sola-ed commented Apr 8, 2024

The confusion comes from the link that I sent in the previous issue: it is just wrong!

I have gone through some details and both starlette (left) and aiohttp-sse (right) use the same separators and protocol

sse_sep

That only 2x \r\n is used by starlette can be checked by breaking into its stream_response function

starlette_stream

A sample from the chunk variable is

b'data: {"time": "Server Time : 2024-04-08 10:59:30.249653"}\r\n\r\n'

which immediately appears in the browser (Dash app) after passing through the send method. On the other hand, doing the same with aiohttp-sse:

aiohttp_stream

one notices the first time that the chunk variable is

'{"time": "Server Time : 2024-04-08 11:29:29.951909"}'

but it is not immediately seen in the browser after finishing executing the send. It is kinda stuck for a while. Then the error

ConnectionResetError: Cannot write to closing transport

appears, and the whole batch

{"time": "Server Time : 2024-04-08 11:30:22.047700"} {"time": "Server Time : 2024-04-08 11:30:30.146010"} {"time": "Server Time : 2024-04-08 11:30:31.147563"} {"time": "Server Time : 2024-04-08 11:30:32.148499"} {"time": "Server Time : 2024-04-08 11:30:33.150090"} {"time": "Server Time : 2024-04-08 11:30:34.151646"} {"time": "Server Time : 2024-04-08 11:30:35.152449"} {"time": "Server Time : 2024-04-08 11:30:36.153865"} {"time": "Server Time : 2024-04-08 11:30:37.155267"} {"time": "Server Time : 2024-04-08 11:30:38.156443"} {"time": "Server Time : 2024-04-08 11:30:39.157877"} {"time": "Server Time : 2024-04-08 11:30:40.159344"} {"time": "Server Time : 2024-04-08 11:30:41.160452"} {"time": "Server Time : 2024-04-08 11:30:42.161835"} {"time": "Server Time : 2024-04-08 11:30:43.163273"}

is observed in the browser afterward. So there is definitely something worth taking a look at within aiohttp-sse. Is there some parameter that should be set in the aiohttp-sse API to avoid the batching?

@Olegt0rr
Copy link
Collaborator

Olegt0rr commented Apr 8, 2024

Can't reproduce.

As soon as EventSourceResponse.send() become awaited, I can see new event in the browser.

Screen.Recording.2024-04-08.at.17.08.34.mp4

@Dreamsorcerer
Copy link
Member

I have gone through some details and both starlette (left) and aiohttp-sse (right) use the same separators and protocol

Thanks for digging into this, but have you confirmed with a simple client the exact bytes received? It'd be really useful if you can create a reproducer in isolation.

If you look in our test file, we have tests that just read the body with aiohttp and verify what messages are received:
https://github.com/aio-libs/aiohttp-sse/blob/master/tests/test_sse.py

So, if you can figure out a reproducer in the form of a test (and create a PR with it), then we can certainly figure out what's happening from there. I'd suggest initially comparing responses to a starlette app to verify they are exactly the same bytes. If it is actually buffering, then you'd need to tweak those tests to use the streaming API for the testing (instead of resp.text() which reads the full body).

@Dreamsorcerer
Copy link
Member

Also, it might be useful to know what OS, event loop etc. you are running on.
I've just found the code in Dash, and it looks to me like it's just using the regular browser API:
https://github.com/emilhe/dash-extensions/blob/master/src/lib/components/EventSource.react.js#L9

So, surely you'd be able to produce this without Dash if that's the case?

@sola-ed
Copy link
Author

sola-ed commented Apr 10, 2024

Hey guys,

This problem is hard to reproduce, it happens kinda randomly. @Olegt0rr , you won't see it if you run the server and client in the same app. I tried this, and everything works as expected. @Dreamsorcerer , surely I have seen this even without using Dash.

# server
from aiohttp import web
import json
import asyncio
from datetime import datetime
from aiohttp_sse import sse_response
import aiohttp_cors

app = web.Application()
routes = web.RouteTableDef()

cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_methods="*",
        allow_headers="*",
        max_age=3600
    )
})

@routes.get("/hello")
async def hello(request: web.Request) -> web.StreamResponse:
    async with sse_response(request) as resp: 
        while resp.is_connected(): 
            data = json.dumps({
                "time": f"Server Time : {datetime.now()}"
            })
            await resp.send(data)
            await asyncio.sleep(1)
    return resp

app.router.add_routes(routes)
for route in app.router.routes():
    cors.add(route)

if __name__ == "__main__":
    web.run_app(app, host='127.0.0.1', port=5000)

and then

# client
from aiohttp import web


async def index(_request: web.Request) -> web.StreamResponse:
    html = """
        <html>
            <body>
                <script>
                    var eventSource = new EventSource("http://127.0.0.1:5000/hello");
                    eventSource.addEventListener("message", event => {
                        document.getElementById("response").innerText = event.data;
                    });
                </script>
                <h1>Response from server:</h1>
                <div id="response"></div>
            </body>
        </html>
    """
    return web.Response(text=html, content_type="text/html")


app = web.Application()
app.router.add_route("GET", "/", index)
web.run_app(app, host="127.0.0.1", port=8080)

My observations:

  1. Sometimes by just running the scripts in separate terminals and checking the browser, things work as expected.
  2. In another trial, I put a breakpoint in the await resp.send(data) line in the server and step line by line in the while loop, and then I observed the batched response in the browser.
  3. In another trial, I did 1) again, and observed the batched data in the browser.
  4. With Starlette, batched data is never seen.

I'm curious to see if you find this as well. I am running this with aiohttp-sse=2.1.0, in a machine with Ubuntu 20.04.6 LTS and python Python 3.10.13.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants