twosigma / flint

A Time Series Library for Apache Spark
Apache License 2.0
995 stars 184 forks source link

Extending Window capabilities #36

Open degloff opened 6 years ago

degloff commented 6 years ago

I would like to extend the window capabilities and like to discuss how to best implement these. Considering existing functionality we can do:

val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time  price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0   [[1000L, 1.0]]
// 1500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]

1) Window at predefined time stamps only This creates a window at each row backward. For very "dense" time series with samples at nano scale we might not do the window at each observation but run some statistics or other discovery method to find those points at which we want to create a window.

2) Windows of varying length In the trading world we can imagine windows of varying time length, e.g. determined by a "volume clock"

3) Windows of fixed number of observations. I saw a count window but not clear to me how to use it.

4) Two segment windows. A first segment (section) of a window could be used to calculate some online statistics which are then consumed by a summary function which is applied over the second part of the window (adaptive summary stats, e.g. consider thresholds based on an online volatility estimator).

How would these more general features best implemented? Any good advise how to add these extensions? Happy to contribute as well.

icexelloss commented 6 years ago

Hi - I believe some of these are good additions but also non-trivial to implement:

  1. There are something called window join in KDB which is a window operations with two tables and for each row in the left table, associate it with a window from rows in the right table. But currently not available in Flint... Another option is maybe to use http://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.functions.window

  2. Can you explain what is a "volume clock"?

  3. https://github.com/twosigma/flint/issues/40

  4. Can you explain this a bit more? What is done in the first segment v.s. the second?

dgrnbrg commented 4 years ago

I would also like to use a "volume clock". Basically, the idea is that instead of a global/constant shift, there's a second column which determines the size of the shift. This makes it possible to have essentially per-row time shifts. Is this possible to do in flint?

icexelloss commented 4 years ago

I believe this is possible. With the Python API, you just write a udf to reset the time, i.e.

@udf('timestamp')
def set_new_time(time, other_column):
      new_time = ...
      return new_time

df = df.withColumn('time', set_new_time(df['time'], df['other_column']))

Although, after this, you need to resort the dataframe because time is not guaranteed to be in order:

df = flintContext.read.dataframe(df, is_sorted=False)
dgrnbrg commented 4 years ago

I see what you mean by resetting the times; however, what I want to calculate isn't quite like that. I want to implement a ShiftTimeWindow that isn't restricted to look at only the time column--I want it to also look at one or more additional columns when it invokes shift().

What do you think about extending that API?

dgrnbrg commented 4 years ago

Also @icexelloss, have you thought about how to make a KDB-style window join? I'd be happy to help implement it.

icexelloss commented 4 years ago

I see. I think it's a useful feature but it seems quite tricky to extend the existing API to work with that. It will probably be something like Window[K1, K2] where K1 is time and K2 is some other column and we need to do sth with the OrderedRDD[K1, V] interface to incorporate K2. Do you have some ideas?

icexelloss commented 4 years ago

Have you seen similar API to the extended window API that you described?

dgrnbrg commented 4 years ago

I think the way to do the join would be to solely drive it from a summarizer, since you can always just collect_list if you don’t want a summary join. I imagine a high-level api like a.leftWindowSummary(b, window, sunmarizer, key). You’d basically just run a window summarizer on b, but with the time stamps from a. Then, you could follow up with a left join on the summarized df.

I think the part I’m not sure about if how to drive a window summarizer with a different orderedrdd’s timestamps.