python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 149 forks source link

Simple example of stream with Asyncio operations #429

Open massyah opened 3 years ago

massyah commented 3 years ago

Hi,

I've been following the docs and reading through the tests, and I cannot get streamz working with Asyncio :/

Here's a very minimal example of a stream comprising of two async operation and one sync :

  1. we retrieve content via an aiohttp call
  2. return content length
  3. simulate DB write with an Asyncio sleep
  4. sink to stdout
import asyncio

import aiohttp
from streamz import Stream

async def fetch(url):
    print("fetching url {}", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.status)
            body = await resp.text()
    print("Finished w/ url {}", url)
    return body

def count(x):
    print("I", x)
    return len(x)

async def write(x):
    await asyncio.sleep(0.2)
    print("O", x)
    return x

async def f():
    print("Starting stream")
    source = Stream(asynchronous=True)
    source.map(fetch).map(count).map(write).sink(print)
    urls = [
        'https://httpstatus.io/?i=1',
        'https://httpstatus.io/?i=2',
        'https://httpstatus.io/?i=3',
        'https://httpstatus.io/?i=4',
        'https://httpstatus.io/?i=5',
        'https://httpstatus.io/?i=6',
    ]
    for u in urls:
        await source.emit(u)

if __name__ == '__main__':
    asyncio.run(f())

I've tried a lot of combinations using tornado event loop etc. but didn't manage to get anything working. Is this supposed to be possible or is the Asyncio support still behind? Am I missing something obvious?

Thanks for the help

martindurant commented 3 years ago

Streamz uses asyncio/coroutines as a way to manage backpressure, i.e., that the emitting process must wait for there to be space in the pipeline to add stuff in. Your model is the opposite, waiting on some async process as a way to put data into the pipeline. We could very well have a source that does what you want, something like

def from_coroutines(Source):
    def __init__(self, coroutines):
        self.coos = coroutines

    async def _run(self):
        for coro in as_completed(self.coos):
             res = await coro
             await self._emit(res)
martindurant commented 2 years ago

@massyah , did you have a chance to do something with my suggestion? It would make a nice example for the docs, I think - although you don't really need streamz for this particular linear workflow.