vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
801 stars 34 forks source link

Preserve an aiohttp streaming response. #68

Closed tiptop96 closed 3 years ago

tiptop96 commented 3 years ago

I am looking to preserve an aiohttp streaming response through their async iterator.

The idea is to stream a response to check for a specific criteria that will appear somewhere in the first 5-20 lines of the response and if it is not matched we would close the stream, and if it is matched we would pipe it into a blob storage service.

However I cannot seem to preserve the stream. I have tried this a few way but I cannot seem to get it right.

At first I figured that this it would be something as simple as this:

async def main():
    async with aiohttp.ClientSession() as sesh:
        async with sesh.get("https://jsonplaceholder.typicode.com/todos/1") as resp:
            async with stream.preserve(resp.content).stream() as streamer:
                # This loop runs
                async for line in streamer:
                    print(line)
                    await asyncio.sleep(1)
            # This loop doesn't
            async for line in resp.content:
                print(line)
                await asyncio.sleep(1)   

As the comments say, only one loop runs.

I also tride wrapping the stream in an aiostream.stream.operator but it gave me an exception:

content = operator(resp.content)
Traceback (most recent call last):
  File "test_preserve_http.py", line 29, in <module>
    loop.run_until_complete(main())
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "test_preserve_http.py", line 10, in main
    content = operator(resp.content)
  File "/Users/tiptop96/concepts/.env/lib/python3.6/site-packages/aiostream/core.py", line 368, in operator
    return decorator if func is None else decorator(func)
  File "/Users/tiptop96/concepts/.env/lib/python3.6/site-packages/aiostream/core.py", line 273, in decorator
    name = func.__name__

As a side note, I also receive a warning as follows: bild If I remove the stream() call it also raises an exception.

Any guidance on how to do this or if it is even possible is much appreciated. 👍

vxgmichel commented 3 years ago

Hi @tiptop96,

The perserve operator doesn't not preserve the content of the stream but prevents the stream from being closed when exiting the streaming context. In order to preserve the content, you can add the data to a list and later chain this list with the preserved iterator:

async def main():
    async with aiohttp.ClientSession() as sesh:
        async with sesh.get("https://jsonplaceholder.typicode.com/todos/1") as resp:
            async with stream.preserve(resp.content).stream() as streamer:
                stack = []
                async for line in streamer:
                    stack.append(line)
                    await asyncio.sleep(1)
                    # Some criteria in the middle of content
                    if b"title" in line:
                        break
            preserved_content = stream.iterate(stack) + resp.content
            async with preserved_content.stream() as streamer:
                async for line in streamer:
                    print(line)
                    await asyncio.sleep(1)

You can also factorize this logic into a dedicated operator:

@operator(pipable=True)
async def preserve_content(source, items):
    for item in items:
        yield item
    async with stream.preserve(source).stream() as streamer:
        async for item in streamer:
            items.append(item)
            yield item

async def main():
    async with aiohttp.ClientSession() as sesh:
        async with sesh.get("https://jsonplaceholder.typicode.com/todos/1") as resp:
            items = []
            preserved = preserve_content(resp.content, items)
            async with preserved.stream() as streamer:
                async for line in streamer:
                    await asyncio.sleep(1)
                    # Some criteria in the middle of content
                    if b"title" in line:
                        break
            async with preserved.stream() as streamer:
                async for line in streamer:
                    print(line)
                    await asyncio.sleep(1)

As a side note, I also receive a warning as follows: [...]

The stream operators are classes that are created dynamically, so I'm not surprised that linters might struggle to detect their interface. There might be a way to help them figure out the right info, I'll try to look into it later.

Hope it helps :)

tiptop96 commented 3 years ago

Ahh my bad for missunderstanding the operator! But this is awesome, thanks so much for the thorough response and the awesome lib! 🔥

vxgmichel commented 3 years ago

No problem :)