python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 149 forks source link

How to parametrize stream/pipeline creation? #456

Closed anovv closed 1 year ago

anovv commented 1 year ago

Hi, thanks for the great project!

What I'm trying to achieve is to create a custom builder interface/wrapper around streamz to have predefined building blocks of preconfigured streams which can be used together. Example


def _custom_stream_builder_1(input_stream):
    return input_stream.accumulate(_custom_acc, returns_state=True, start=_custom_state)

def _custom_stream_builder_2(input_stream):
    return input_stream.filter(lambda event: _custom_condition)

def _custom_stream_builder_3(input_stream):
    return input_stream.map(lambda event: _custom_map)

stream = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream)))
stream.sink(print)

for event in events:
    stream.emit(event)

However it looks like the code inside those functions does not alter the initial stream object and all emitted events go straight to sink. What am I doing wrong? Can you please point me in right direction?

Another question, what is the difference between


stream = Stream()
stream.sink(print)

for event in events:
    stream.emit(event)

and


stream = Stream()
stream = stream.sink(print) # any other function map/filter/etc. here

for event in events:
    stream.emit(event)

I feel like the answer is somewhere in this example but don't understand where. Thanks!

martindurant commented 1 year ago

The problem is, you have redefined what stream is, so that when you emit() you are emitting on the last node of your stream graph, not the input.

If you had

stream_in = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream_in)))
stream.sink(print)

for event in events:
    stream_in.emit(event)

you will get the behaviour you are after.

anovv commented 1 year ago

Thanks @martindurant, that worked!