sryza / spark-timeseries

A library for time series analysis on Apache Spark
Apache License 2.0
1.19k stars 424 forks source link

Support partitioned DateTimeIndices? #84

Open SimonOuellette35 opened 8 years ago

SimonOuellette35 commented 8 years ago

This is a follow-up in the discussion that started in https://github.com/cloudera/spark-timeseries/issues/82. The question is first of all whether or not we want to support individual time series whose cardinality exceeds 2^32 datapoints (which is not currently possible, due to the use of standard scala Arrays).

As discussed, there are quite a few real use cases we (contributors) are facing that would require being able to handle time series with more than 2^32 data points.

My suggestion is to add support, rather than replace the current design (because for use cases where a time series fits in memory, it would be sub-optimal to partition them along the time axis anyway), for a notion of partitioned date time indices which will allow us to have time series partitioned across machines along the time axis. @sryza I know you are reticent about this.

sryza commented 8 years ago

I definitely think we'll need support for this in the long run.

This is of course a tricky one, because it requires the ability to shard individual series across nodes, meaning that all the operations that were simple map operations over vectors now become distributed operations requiring shuffles or collection to the driver.

I think a big API question is: do we want to support this under TimeSeriesRDD or another class that makes the notion of partitioning more explicit.

SimonOuellette35 commented 8 years ago

Something I just thought of: because of the prevalence of virtualization, the need for being able to support datetime indices greater than 2^32 is conceptually different than the need for partitioned datetime indices.

For example, in my case, if we managed to find some sort of LongArray-like data structure (maybe there's a 3rd party library for that somewhere?) that supports 2^64 items, then I'd be good to go with my very large datasets because I could just setup enough RAM on my cloud server to support it -- and I would still benefit from Spark's parallelization across cores.

The need for partitioning across machines really only occurs in the specific case of a cluster of machines. I don't know how much of a frequent use case is the latter, given how accessible virtualization/cloud computing is. I feel a lot of people use Spark as a framework for multi-threaded processing rather than an actual cluster computing tool -- but maybe I'm way off about that. Ultimately I want my application to support both cases (virtualized AND a cluster of computers), but I could go a long way with the virtualization route alone.

@sryza do you think spark-timeseries' use case should exclusively be thought of as a cluster computing context?

In any case, perhaps if we could find a LongArray kind of data structure, we should still replace the current DateTimeIndex with that. To answer your API question above: I would definitely opt for another class that uses "longitudinal" partitioning (i.e. along the time axis) rather than "transversal" partitioning as it does now. We need both use cases. (or one class with some kind of flag? but then it would increase the cyclomatic complexity of the code because everywhere we'd have to check if we're doing longitudinal or transversal partitioning)

euandekock commented 8 years ago

I'll add my need for a Time Series that can be partitioned along the date axis. We currently work with a data set that has some data sets at 1 second intervals over an 8 year time span - that's about 250 million observations. We currently do a custom partitionBy to force our rows to get distributed into daily partitions, and do the imputing and statistical work within single day units. This solution doesn't allow us to span across date boundaries or to detect whole missing days. I'd be interested in discussing how we could model a solution that allowed fluid windowing across the whole data set.

sryza commented 8 years ago

Thanks for adding your data point @euandekock. 250 million does sound pretty big to have in one place - although it should be feasible on modern hardware, right, given that 250 million * 8 bytes = 2 GB?

Mind if I ask some additional questions to help understand what the right API is for partitioning by time?

What kinds of operations do you perform on your dataset? How large are the windows you'd like to be able to deal with? How many univariate series is the dataset composed of?

euandekock commented 8 years ago

Hi Sandy,

If be surprised if we end up with just 8 bytes per observation, given that we'd also be storing the date at least once.

Currently we are treating each series of historic observations separately , but we actually have about 30000 observations, identified by an equipment tag. The real-time side of our processes will process these together, but over a much shorter time span - 5 to 10 minutes.

Also, each observation actually has two related numbers, the actual value, and a confidence level as a percentage, it would be useful to have support for this sort of setup.

We are presently using a custom partitioning class to guarantee each partition contains a single day of data by doing a modulus on the long epoch timestamp. We calculate 5 minute mean, median, min and max over the partition window. I'd imagine we'd need to shuffle/broadcast all the end values to do a proper moving average and true min and max values.

Hope that helps,

Regards,

Euan.

On 7 January 2016 2:36:53 AM AWST, Sandy Ryza notifications@github.com wrote:

Thanks for adding your data point @euandekock. 250 million does sound pretty big to have in one place - although it should be feasible on modern hardware, right, given that 250 million * 8 bytes = 2 GB?

Mind if I ask some additional questions to help understand what the right API is for partitioning by time?

What kinds of operations do you perform on your dataset? How large are the windows you'd like to be able to deal with? How many univariate series is the dataset composed of?


Reply to this email directly or view it on GitHub: https://github.com/cloudera/spark-timeseries/issues/84#issuecomment-169413500

sryza commented 8 years ago

Thanks @euandekock, that's definitely helpful.