vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
801 stars 34 forks source link

How to duplicate streams properly? #120

Open diego-ppz opened 3 months ago

diego-ppz commented 3 months ago

Hello,

Apologies in advance if opening an issue is not the right approach to ask this, but I can't find any pointer in the documentation on how to go about it. I have a pipeline consisting of a long chain of heavy-duty operations, in this fashion:

processed_xs \
            | pipe.map(map_f1) \
            | pipe.map(map_f2) \
            | pipe.filter(filter_f) \
            | pipe.flatmap(flatmap_f)
            # etc...

and I need to reuse processed_xs as:

sink1_xs = await stream.action(processed_xs, to_sink1_f)
sink2_xs = await stream.action(processed_xs, to_sink2_f)

I was able to reproduce the documented use case on how to reuse a stream as explained in the docs.

While it was easily reproducible with the source stream being a simple list (ie: [1, 2, 3]), it never worked with my stream of real-world entities. I was able to corner down the issue all the way to source, which happens to be the Google Bigquery client.

The problem seems to be that with a basic list, stream.iterate() relies on:

@operator
async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
    """Generate values from a regular iterable."""
    for item in it:
        await asyncio.sleep(0)
        yield item

with a simple list, after the first await for sink1_xs is finished, 'it' still has all the elements to yield them again and again. The gbq RowIterator, has a flag so if it was ever started ever before, it will throw an exception at its iter method. I worked around it by monkey patching the iter method to bypass the check ...but the real issue is that once that the items of the RowIterator have been consumed during the first pass for sink1, they are popped out from the iterator, and subsequent loops to yield them don't find anything.

I suspect that I can't do anything about how RowIterator works in this regard, so the only thing I can think off is to somehow duplicate my 'processed_xs' multiple times and consume the copies ...but I'm not entirely sure how to properly implement this?

Could someone please point me in the right direction? Much appreciated

diego-ppz commented 3 months ago

I attempted to write an operator to broadcast records multiple times, which works fine:

@pipable_operator
async def broadcast(xs: AsyncIterator[Any], keys: list[K]) -> AsyncIterator[tuple[K, Any]]:
    """ Broadcast a stream into multiple for as many specified keys. """
    async with streamcontext(xs) as streamer:
        async for item in streamer:
            for k in keys:
                yield k, item

...but again, when I try to "split" into multiple streams:

processed_data_xs = source_data_xs \
                      | ...
                      | pipe.action(partial(next, counter) \
                      | broadcast.pipe([S1, S2]) 

sink1_xs = processed_data_xs | pipe.filter(lambda kv: kv[0] == S1) | pipe.action(logger.info)
sink2_xs = processed_data_xs | pipe.filter(lambda kv: kv[0] == S2) | pipe.action(logger.info)
await s1
await s2

'await s1' works, and when it reaches 'await s2', crashes with a "Iterator already started" due to the GBQ source not being re-iterable...

vxgmichel commented 3 months ago

Hi @diego-ppz,

You probably want to read this issue where another user had a similar problem: https://github.com/vxgmichel/aiostream/issues/98

In particular, you might want to check this solution with the corresponding explaination: https://github.com/vxgmichel/aiostream/issues/98#issuecomment-2107975984#issuecomment-2107975984

Here's a similar implementation adapted to your use case. The operator is called tee because it is similar to itertools.tee:

from typing import AsyncIterable, TypeVar, AsyncIterator

from aiostream import pipable_operator, stream, pipe
from aiostream.core import streamcontext, Streamer
from aiostream.aiter_utils import AsyncExitStack

import pytest
from anyio import create_memory_object_stream, BrokenResourceError
from anyio.abc import ObjectSendStream

T = TypeVar("T")
K = TypeVar("K")

@pipable_operator
async def tee(
    source: AsyncIterable[T], keys: list[K], max_buffer_size: float = 0
) -> AsyncIterator[tuple[K, Streamer[T]]]:
    mapping: dict[K, ObjectSendStream[T]] = {}
    async with AsyncExitStack() as stack:
        async with streamcontext(source) as source:
            for key in keys:
                sender, receiver = create_memory_object_stream[T](
                    max_buffer_size=max_buffer_size
                )
                mapping[key] = await stack.enter_async_context(sender)
                yield key, streamcontext(receiver)

            async for chunk in source:
                for key in keys:
                    try:
                        await mapping[key].send(chunk)
                    except BrokenResourceError:
                        pass

@pytest.mark.asyncio
async def test_tee():

    def create_substreams(
        key: bool, stream: Streamer[int], *_
    ) -> AsyncIterable[tuple[str, int]]:
        match key:
            case "sum":
                fn = lambda x, y: x + y
            case "product":
                fn = lambda x, y: x * y
        return (
            stream
            | pipe.accumulate(fn)
            | pipe.takelast(1)
            | pipe.map(lambda x, *_: (key, x))
        )

    xs = (
        stream.range(1, 10, interval=0.1)
        | tee.pipe(["sum", "product"])
        | pipe.starmap(create_substreams)
        | pipe.flatten()
        | pipe.list()
    )
    assert await xs == [("sum", 45), ("product", 362880)]

I hope this helps :)

diego-ppz commented 3 months ago

Hello there,

thanks a lot for the support and hint. Upon careful reading of the above, I'm afraid that won't help... The above example seems oriented to produce a "starmap", only one stream of tuples. In this regard, the problem is not that I need a stream of grouped elements

sink_data_xs = processed_data_xs \
    | pipe.action(...) \
    | # etc...
    | tee.pipe() \
    | pipe.startmap()

... but multiple streams:

sink1_xs = processed_data_xs \
    | pipe.action(...) \
    | # etc...
self.steps[SINK1] = sink1_xs

sink2_xs = processed_data_xs \
    | pipe.action(...) \
    | # etc...
self.steps[SINK2] = sink2_xs

the problem again being, is commonly referred as "non re-iterable" data-sources - 'sink2_xs' will render empty stream here.

Observe how the critical thing here, is that I'm never awaiting the streams here. This "steps" dictionary, can be consulted from different areas of the service, and different moments.

For the mere sake of progressing with the project, I'm working with:

sink1_xs = processed_data_xs \
    | pipe.action(...) \
    | # etc...
    | pipe.list()
self.steps[SINK1] = await sink1_xs

sink2_xs = stream.iterate(sink1_xs) \
    | pipe.action(...) \
    | # etc...
    | pipe.list()
self.steps[SINK2] = await sink2_xs

...but this is precisely what I can't do. I'm forbidden from collecting the streams into a list at any moment, because this is a case of big data not suitable for in-memory (that's my understanding of what stream.list() does, please correct me if I'm wrong) For context, the rationale of all this is precisely producing a version of certain connector that is able to deal data in streaming fashion, replacing a legacy component that works with in-memory data. But both versions need to work interchangeably, so I have to comply with these "steps{}" design ports.

I need to be able to access those steps/stages from different client classes, pointing to multiple different "cold" streams. Any advice?

Much appreciated!

vxgmichel commented 3 months ago

I had little time to write my previous comment, I'll elaborate a bit more now on the solution I proposed.

The above example seems oriented to produce a "starmap", only one stream of tuples

The tee operator implemented above produces a stream of tuple yes, but it's a stream of (key, substream).

Then the starmap operator is used to make those substreams perform two different tasks in parrallel. This way, the values produced by the initial stream are duplicated and processed by two different pipelines. Then the results for those two pipelines are gathered using the flatten operator. This is shown in the following graph similar to this one:

graph TD;
    A(range) --> B(tee);
    B --> C(starmap);
    C --> D(accumulate with sum);
    D --> E(takelast);
    E --> K(map);
    C --> F(accumulate with product);
    F --> G(takelast);
    G --> H(map);
    K --> I(flatten);
    H --> I;
    I --> J(list);  

the problem again being, is commonly referred as "non re-iterable" data-sources - 'sink2_xs' will render empty stream here.

Hmm I think I understand the issue here. In the context of aiostream, streams are re-usable in the sense that the pipeline defined by the operators can run several times, similar to a function that can be called several times. But that does not mean that the same results are going to be produced every time, due to side effects. And producing different results for each await is sometimes the expected behavior for a stream. For instance, in the example from the docs, the same stream is used once per client request, each time producing a different result: https://github.com/vxgmichel/aiostream/blob/2774043f4d80a04796e954fbcb07aebcbd8b3aef/examples/norm_server.py#L81-L89

so the only thing I can think off is to somehow duplicate my 'processed_xs' multiple times and consume the copies ...but I'm not entirely sure how to properly implement this?

So it seems to me that you don't need to re-use a stream in the sense that's explained above, but you need to re-use the items produced by the stream. And as you pointed out:

I'm forbidden from collecting the streams into a list at any moment, because this is a case of big data not suitable for in-memory

So that means that you need to re-use items without collecting them in memory. In the context of aiostream, the proper way of doing that is by using higher-order operators, i.e a stream of streams to manage multiple pipelines concurrently. This way, you have proper control on the back pressure.

This might seem like a convoluted solution but there a reason for it: awaiting a stream creates a context for running the pipeline (more precisely an async context), that is used to properly manage the resources used by the different operators. When the await returns, that context has been closed and the resources have been properly freed. That's why the whole processing has to be designed to fit into a single pipeline. Once the await returns, the processing is done and can only be restarted from the beginning with a fresh async context.

But both versions need to work interchangeably, so I have to comply with these "steps{}" design ports. I need to be able to access those steps/stages from different client classes, pointing to multiple different "cold" streams.

That's tough. If the streams are cold, that means that data has to stay available somehow, so you would need memory for that and that's precisely what you're trying to avoid. Depending on your constraints, there might be solutions though. But you would have to write some adapters working outside of the aiostream semantics.

Here's a random idea: you could write an operator that feeds the incoming items into anyio memory streams. Then you could assign the receivers as your steps. Finally, you would need a background task to run the "producer" aiostream stream and deal with its hot context.

Hope that helps.

diego-ppz commented 3 months ago

Thanks a lot for this elaborated discussion, I've been letting this run through my head for a couple days -your response has greatly aided me to "rubber-duck" the problem and formalize it. Especially since it is less a caveat of the the library but rather an application design problem...

awaiting a stream creates a context for running the pipeline (more precisely an async context), that is used to properly manage the resources used by the different operators. When the await returns, that context has been closed and the resources have been properly freed

My expertise is with data distribution frameworks like Apache Flink ...without presuming familiarity with it, suffice to mention that all things start with an env = StreamingEnvironment(), and end with env.execute(), the later separating what is the graph definition, from starting off the computing at runtime. I take that this is the equivalent of using await here, which is what triggers the start of the processing... Back in the day, I faced use cases as the one you describe, with a "loop of graphs". Simplifying, the situation can only unfold in 2 ways: either I have 2 different pipelines, with their corresponding await/execute, or I devise it only as one somehow managing to stack up all the stages together...

that data has to stay available somehow, so you would need memory for that and that's precisely what you're trying to avoid. Depending on your constraints, there might be solutions though. But you would have to write some adapters working outside of the aiostream semantics

Precisely. In the original connector, it was possible to access these ports for queryable state at any moment, and all the data was present because they were in-memory dataframes. From the 2 constraints: the connector needing to deal with unbounded streams, and having to be able to interchangeably switch versions, the first is non-negegotiable, which makes me agree with you that the challenge is to work a more sophisticated adapter...

What follows is pseudo-code,

# LEGACY JOB:
await legacy_service.process(source_data) \
    | pipe.action(...) \
    | # etc...
df_1 = legacy_service.steps["stage1"]
df_2 = legacy_service.steps["stage2"]
# etc...
bigquery.save(df_1)
bigquery.save(df_2)

vs.

def job():
    streaming_service.process(source_data)
    bigquery.save(legacy_service.steps["stage1"])
    bigquery.save(legacy_service.steps["stage2"])
    # no awaits...

class StremingConnector():

    sink1_xs = processed_data_xs \
        | pipe.action(...) \
        | # etc...
    self.steps[SINK1] = sink1_xs

    sink2_xs = sink1_xs \
        | pipe.action(...) \
        | # etc...
    self.steps[SINK2] = sink2_xs
    # etc...

class BigquerySink():

    def save(s: Stream):
       stream.action(s, conn_write)
       if s.__name__ == last:
          await s

What I'd be trying to do here is to keep one single pipeline ...but split across the entrypoint/connector/sink. There would be only one await at the very last if I trap that I'm dealing the last stage.

Would your intuition say that something like this could possibly work, or you'd rather see pitfalls ahead??

PS: thanks a lot for such a interesting design discussion