sysid / sse-starlette

BSD 3-Clause "New" or "Revised" License
503 stars 35 forks source link

EventSourceResponse doesn't propagate context variables #103

Open hansonw opened 5 days ago

hansonw commented 5 days ago

Thanks for making this library! We are happy users of it in production :)

I encountered a minor issue when trying to use this in conjunction with starlette-context, which allows setup of request-level ContextVars with Starlette: https://starlette-context.readthedocs.io/en/latest/quickstart.html

However, it seems that context variables are not accessible from the streaming response inside EventSourceResponse. Here's a basic repro (with FastAPI):

from starlette_context import context, request_cycle_context
from fastapi import FastAPI, Depends, HTTPException, Header

async def my_context_dependency():
    data = {"hello": "world"}
    with request_cycle_context(data):
        yield

app = FastAPI(dependencies=[Depends(my_context_dependency)])

from sse_starlette import EventSourceResponse

@app.get("/")
async def hello_sse():
    # this prints {"hello": "world"} as expected
    print(context.data) 
    return EventSourceResponse(streaming_response())

async def streaming_response():
    # now this throws a ContextDoesNotExistError
    yield context.data

I guess this is because technically the body of EventSourceResponse is occurring outside the request cycle?

Could it potentially make sense for EventSourceResponse to call contextvars.copy_context() and then rehydrate them when running stream_response()?

https://github.com/sysid/sse-starlette/blob/main/sse_starlette/sse.py#L270-L279

sysid commented 3 days ago

@hansonw , thank you for raising this interesting topic. I did some preliminary quick analysis. It looks to me like asynchronous generators when yielding are loosing the context (however, not sure about it).

Have you already tried passing the context explicitly, sth like this:

from starlette_context import context, request_cycle_context, context_var
from fastapi import FastAPI, Depends, Header
from sse_starlette import EventSourceResponse
import contextvars

app = FastAPI()

context_var = contextvars.ContextVar("context")

async def my_context_dependency():
    data = {"hello": "world"}
    with request_cycle_context(data):
        yield

@app.get("/")
async def hello_sse():
    # Capture the context data
    context_data = context.data
    return EventSourceResponse(streaming_response(context_data))

async def streaming_response(context_data):
    # Use the captured context data
    context_var.set(context_data)
    while True:
        yield f"data: {context_var.get()}\n\n"
        await asyncio.sleep(1)

Alternatively there might be a way to catpure and re-apply the context within stream_response of EventSourceResponse, but this requires some time and testing, which I cannot afford at the moment. Also I do not want to bring in starlette_context as additional dependency, so this might be a showstopper after all.

PR would be welcome, of course.