encode / starlette

The little ASGI framework that shines. 🌟
https://www.starlette.io/
BSD 3-Clause "New" or "Revised" License
10.26k stars 932 forks source link

TestClient.request does not honor stream=True #1102

Open ergoithz opened 3 years ago

ergoithz commented 3 years ago

Checklist

Describe the bug

The requests.request interface exposes an stream=True option which results on the call not waiting for the entire body to arrive. stream=True is not handled properly by starlette.testclient._ASGIAdapter.send, as it is unconditionally waiting for the entire request to finish:

https://github.com/encode/starlette/blob/e4307065ea6dbe708fba9643d14fe7adfff06d46/starlette/testclient.py#L240

To reproduce

  1. Perform an event wait asyncio.Event wait() after your ASGI application responds with your headers (http.response.start) and first chunk (http.response.body), this can be easily done inside the generator passed to starlette.StreamingResponse.
  2. Set the event viaasyncio.Event set() only after doing the request with stream=True.
  3. Deadlock.

Expected behavior

With stream=True, TestClient.request should return right after first http.response.body (asgiref defines server must sent http.response.start headers only after the first http.response.body event is generated).

Streaming-related response methods (like iter_content) should be implemented via awaiting further http.response.body until more_body is missing or False.

Actual behavior

Deadlock.

Additional context

I suspect #533 could be related to this.

[!IMPORTANT]

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.

Fund with Polar

Kludex commented 2 years ago

@falkben How did you overcome this issue?

falkben commented 2 years ago

Yes, I believe we ran into this issue as well.

We found that async_asgi_testclient worked when we needed to consume a stream from a response. Though at the moment, I'm having trouble getting it to work with async generators.

See: https://github.com/encode/starlette/issues/1102#issuecomment-1276480321

adriangb commented 2 years ago

I think this is relevant now: https://github.com/encode/httpx/pull/1491

reidmeyer commented 2 years ago

Any update on this? I'm trying to test my sse endpoint by using the testclient to call the sse endpoint and then pass it to https://pypi.org/project/sseclient-py/ so I can parse the event data. But indeed, the testclient hangs.

falkben commented 2 years ago

Version >= 0.19.0 of Starlette seems to not work with async-asgi-testclient stream. Version 0.18 and below I could test the streaming response.

reidmeyer commented 2 years ago

Version >= 0.19.0 of Starlette seems to not work with async-asgi-testclient stream. Version 0.18 and below I could test the streaming response.

By "test the streaming response", you mean viewing the response with iter_content, correct? I got that to work with starlette 0.20.4. This post helped me do that. Thank you.

The task I'm currently working on is taking the streaming response, and processing it into a SSE object (event, data, id, retry). https://pypi.org/project/sseclient-py/ attempts to do this, but the response returned by async_asgi_testclient.TestClient.get doesn't seem to work. I believe the starlette testclient might have worked, but it hangs (stream=true doesn't seem to work), I believe because of the issue described here.

Any thoughts on this?

falkben commented 2 years ago

By "test the streaming response", you mean viewing the response with iter_content, correct? I got that to work with starlette 0.20.4. This post helped me do that. Thank you.

You're correct, it does work with newer versions of starlette. I had a problem in one of my middleware's that was trying to consume the client (from scope).

Also, my example above wasn't quite starlette code. here's a better example:

from async_asgi_testclient import TestClient
from starlette.applications import Starlette
from starlette.responses import StreamingResponse
from typing import Generator
from typing import Literal

import pytest

app = Starlette(debug=True)

@app.route("/stream_y")
async def get_stream_y(request) -> StreamingResponse:
    """Stream "y" forever"""

    # stream at most 1000 characters
    max_lines = 1000

    def gen_y() -> Generator[Literal["y"], None, None]:
        i = 0
        while i < max_lines:
            yield "y"
            i += 1

    return StreamingResponse(gen_y())

@pytest.mark.asyncio
async def test_stream_y():
    max_lines = 100
    i = 0

    async with TestClient(app) as client:
        resp = await client.get("/stream_y", stream=True)
        assert resp.status_code == 200
        async for line in resp.iter_content(2):
            if i > max_lines:
                break

            line = line.decode("utf-8").strip()
            assert line == "y"
            i += 1

As for the error in converting the SSE object into a response, that seems more like a async-asgi-testclient issue, or maybe an sseclient-py issue, but what is the error you are getting?

Kludex commented 1 year ago

You can use the follow reference to test SSE: https://github.com/florimondmanca/httpx-sse/blob/master/tests/test_asgi.py.

Kludex commented 1 year ago

The description mentions requests, and although we don't use it anymore, the issue is still valid.

We need the following to be implemented on httpx before acting here: https://github.com/encode/httpx/issues/2186 (if action is really needed after that is implemented there).

reidmeyer commented 1 year ago

async-asgi-testclient streaming response doesn't seem to work with my current versions. It hangs on the request starlette: 0.14.2 sse-starlette: 0.9.0 fastapi: 0.68.2 async_asgi_testclient: 1.4.11

Kludex commented 1 year ago

async-asgi-testclient streaming response doesn't seem to work with my current versions. It hangs on the request starlette: 0.14.2 sse-starlette: 0.9.0 fastapi: 0.68.2 async_asgi_testclient: 1.4.11

Your version is from February 2021...

reidmeyer commented 1 year ago

async-asgi-testclient streaming response doesn't seem to work with my current versions. It hangs on the request starlette: 0.14.2 sse-starlette: 0.9.0 fastapi: 0.68.2 async_asgi_testclient: 1.4.11

Your version is from February 2021...

yup, had to downgrade fastapi/starlette because of: https://github.com/agronholm/anyio/issues/374

:(

currently trying to determine a method to testing the sse consume (async get request) with the current versions.

Kludex commented 1 year ago

Not sure how that issue is related here...

In any case, you probably downgraded from a version that is not the latest one. A release from 2 years ago is just too old.

You can check this comment: https://github.com/encode/starlette/issues/1102#issuecomment-1348144650

reidmeyer commented 1 year ago

You're suggesting I try httpx-sse, as in that example, correct? @Kludex

Kludex commented 1 year ago

You're suggesting I try httpx-sse, as in that example, correct? @Kludex

yes

reidmeyer commented 1 year ago

Sadly, no luck there. it's possible it's an unrelated issue. will continue to investigate and update here

Kludex commented 1 year ago

Sadly, no luck there. it's possible it's an unrelated issue. will continue to investigate and update here

Did you find something?

Kludex commented 1 year ago
import asyncio
import pytest
import httpx
from httpx_sse import aconnect_sse
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse

async def numbers(minimum, maximum):
    for i in range(minimum, maximum + 1):
        await asyncio.sleep(0.9)
        yield dict(data=i)

async def sse(request):
    generator = numbers(1, 5)
    return EventSourceResponse(generator)

routes = [Route("/", endpoint=sse)]

app = Starlette(debug=True, routes=routes)

@pytest.fixture()
def anyio_backend():
    return "asyncio"

@pytest.mark.anyio
async def test_sse_response():
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        async with aconnect_sse(client, "/") as event_source:
            events = [sse async for sse in event_source.aiter_sse()]
            data = [event.data for event in events]
            assert data == ["1", "2", "3", "4", "5"]
reidmeyer commented 1 year ago

@Kludex , It was an unrelated issue i was having. where another part of my code was hanging. I'm currently using async_asgi_testclient and that has fulfilled my purpose at the moment.

I just tried to use httpx-sse (now that I have fixed my other issue, and also since I noticed it was updated last night), but it doesn't seem to work exactly as I expect for my use case. In the example above, you are using a generator that stops after 5. In my use case, I have a generator that never stops.

I wonder if you can change your example above to use a infinite generator, say where:

        async def numbers():
                 i = 0
                 while True:
                       yield dict(data=i)
                        i = i+1

and t hen have your test confirm that the first 3 messages received are 0,1,2.

This is the type of test I have currently implemented with the async_asgi_testclient. It looks something like this:

        resp = await async_client.get("myurl", stream=True)

        async for line in resp.iter_content(100000):
            line = line.decode("utf-8").replace("\r", "")
            end_of_first_line = line.index("\n")
            event = line[7:end_of_first_line]
            data = line[end_of_first_line + 7 :]

            assert event == expected_event
            assert data == expected_data

            break

Not the cleanest, but it's working.

jamesbraza commented 11 months ago

I gave this a whirl tonight. httpx-sse doesn't work for testing with StreamingResponse, because the content type header is not text/event-stream

jamesbraza commented 11 months ago

Hello friends, last night I hunted down the issue to starlette.testclient._TestClientTransport.handle_request. I believe the issue is because _TestClientTransport's portal.call actually fully runs the app's endpoint (including the generator), as opposed to returning a stream. Thanks @adriangb for pointing me to this issue.

For those on the market for a functioning workaround, I used httpx's testing fixtures to create this:

"""
Demo of properly unit testing a starlette StreamingResponse.

httpx==0.25.2
pytest==7.4.3
starlette==0.27.0
uvicorn==0.24.0.post1
"""

import asyncio
import statistics
import time
from collections.abc import Iterator
from threading import Thread

import httpx
import pytest
from starlette.responses import StreamingResponse
from uvicorn import Config, Server

# SEE: https://www.starlette.io/responses/#streamingresponse

async def slow_numbers(minimum, maximum):
    yield "<html><body><ul>"
    for number in range(minimum, maximum + 1):
        yield "<li>%d</li>" % number
        await asyncio.sleep(0.5)
    yield "</ul></body></html>"

async def app(scope, receive, send):
    assert scope["type"] == "http"
    response = StreamingResponse(slow_numbers(1, 5), media_type="text/html")
    await response(scope, receive, send)

# SEE: https://github.com/encode/httpx/blob/0.25.2/tests/conftest.py#L230-L293
# Workaround for https://github.com/encode/starlette/issues/1102

class TestServer(Server):
    __test__ = False

    @property
    def url(self) -> httpx.URL:
        protocol = "https" if self.config.is_ssl else "http"
        return httpx.URL(f"{protocol}://{self.config.host}:{self.config.port}/")

    def install_signal_handlers(self) -> None:
        # Disable the default installation of handlers for signals such as SIGTERM,
        # because it can only be done in the main thread.
        pass

    async def serve(self, sockets=None):
        self.restart_requested = asyncio.Event()

        loop = asyncio.get_event_loop()
        tasks = {
            loop.create_task(super().serve(sockets=sockets)),
            loop.create_task(self.watch_restarts()),
        }
        await asyncio.wait(tasks)

    async def restart(self) -> None:  # pragma: no cover
        # This coroutine may be called from a different thread than the one the
        # server is running on, and from an async environment that's not asyncio.
        # For this reason, we use an event to coordinate with the server
        # instead of calling shutdown()/startup() directly, and should not make
        # any asyncio-specific operations.
        self.started = False
        self.restart_requested.set()
        while not self.started:
            await asyncio.sleep(0.2)

    async def watch_restarts(self) -> None:  # pragma: no cover
        while True:
            if self.should_exit:
                return

            try:
                await asyncio.wait_for(self.restart_requested.wait(), timeout=0.1)
            except asyncio.TimeoutError:
                continue

            self.restart_requested.clear()
            await self.shutdown()
            await self.startup()

def serve_in_thread(server: TestServer) -> Iterator[TestServer]:
    thread = Thread(target=server.run)
    thread.start()
    try:
        while not server.started:
            time.sleep(1e-3)
        yield server
    finally:
        server.should_exit = True
        thread.join()

@pytest.fixture(name="server", scope="session")
def fixture_server() -> Iterator[TestServer]:
    config = Config(app=app, lifespan="off", loop="asyncio")
    server = TestServer(config=config)
    yield from serve_in_thread(server)

# The actual test

def test_streaming(server: TestServer) -> None:
    client = httpx.Client(base_url=server.url)
    with client.stream("GET", "/") as response:
        response: httpx.Response

        texts, times = [], []
        tic = time.perf_counter()
        for value in response.iter_bytes():
            texts.append(value.decode())
            times.append((toc := time.perf_counter()) - tic)
            tic = toc

    assert len(times) > 1, "Should be more than one chunk"
    assert times[0] < 0.6, "Perhaps you streamed everything in first chunk"
    assert statistics.mean(times) < 0.6, "Should be streaming"
    assert all([bool(text) for text in texts]), "Some text was empty"