python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Dynamically add upstreams to zip #459

Closed mrtj closed 1 year ago

mrtj commented 1 year ago

Hello, is it possible to dynamically ad upstreams to zip once it was created? Ideally, I am looking for something like this:

from streamz import Stream

src1 = Stream()
src2 = Stream()

zp = Stream.zip(src1, src2)
zp.sink(print)

src1.emit(1)
src2.emit(2)
# (1, 2)

src3 = Stream()
zp.add_upstream(src3) # <-- ???

src1.emit(1)
src2.emit(2)
src3.emit(3)

# (1, 2, 3)
martindurant commented 1 year ago

What you want is:

src3.connect(zp)
mrtj commented 1 year ago

This rocks, thank you!