python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Is it possible to use event time/syntethic time rather than system time? #465

Open anovv opened 1 year ago

anovv commented 1 year ago

I want to use streamz for offline processing on historical data, the data is timestamped. Looking through source code, time related streams (e.g. timed_window, delay, rate_limit, etc.) use time() function for event time. Is it possible to fetch existing value (i.e. event['timestamp']) to these streams so I can do offline processing? If not, what would be the best way to go about this?

martindurant commented 1 year ago

You could always have a sink that saves events along with a timestamp. You can also pass events into a stream by reading from a file, or any function that repeatedly calls emit().

However, if you have all the events in hand, you don't actually need any stream processing, right? If the purpose is to exactly simulate how the streamz pipeline will react, you could make a source which uses the event loop call_at method to emit events, or some simple polling - but it seems like extra and potentially fragile work!

anovv commented 1 year ago

If the purpose is to exactly simulate how the streamz pipeline will react

Yes, the goal is to backtest/simulate existing stream as close to real scenario as possible, using recorded historical data. Another point is that a lot of my offline processing logic can't be vectorized, so I need to use iterative aproach (i.e iterating through dataframe's rows) in both offline and online cases, so streamz come in handy here, giving declarative consistency for both online and offline processing logic (yes, I'm aware of perf decrease due to iterating Pandas dataframe's rows).

You could make a source which uses the event loop call_at method to emit events, or some simple polling

Can you please give a code example on this?

Let's say I have a list (or a dataframe) of events, each having timestamp field. I want to run all of them through a rate_limit stream and get an output (or possibly pass downstream or have some custom logic triggered when new element pops out), and I want to do it without waiting for the stream to change it's state based on real time, but rather using artificial/synthetic clock, advancing on each new event emitted.