blue-yonder / tsfresh

Automatic extraction of relevant features from time series:
http://tsfresh.readthedocs.io
MIT License
8.45k stars 1.21k forks source link

how can rolling_time_series deal with big data ? #771

Open lk1983823 opened 4 years ago

lk1983823 commented 4 years ago

I have a 7m rows dataframe to deal with and want to use rolling_time_series in tsfresh. Whether using dask or not, I find it impossible to make it available. Because when using dask, inputs to the rolling_time_series function should using the form of df.compute(), thereby occupying a lot of memories during running. Are there any helpful suggestions ? Thanks!

madpower2000 commented 4 years ago

The easiest way order compute instance with a lot of memory on Google cloud or AWS ;)

lk1983823 commented 4 years ago

The easiest way order compute instance with a lot of memory on Google cloud or AWS ;) But I want to make it run in my machine.

nils-braun commented 4 years ago

What do you mean with the input needs df.compute()?

If you use dask as input to the rolling function, you can also hand over a non-evaluated dask data frame (not a pandas data frame). If you pass this directly to the extract features function, you in principle never need to go outside of dask.

However - of course - that is still a lot of data and I assume the feature extraction on 7M time series will also take a long time...

lk1983823 commented 4 years ago

What do you mean with the input needs df.compute()?

If you use dask as input to the rolling function, you can also hand over a non-evaluated dask data frame (not a pandas data frame). If you pass this directly to the extract features function, you in principle never need to go outside of dask.

However - of course - that is still a lot of data and I assume the feature extraction on 7M time series will also take a long time...

here is my code below:

windowSize = 100
dfSH05FillNan = dd.read_parquet('./testdf.parquet.gzip')
dfSH05FillNanDaskRolled = roll_time_series(dfSH05FillNan, 
                                              column_id="timeContinuity", column_sort="timestamp", chunksize=50,
                                              max_timeshift=windowSize-1, min_timeshift=windowSize-1)

it shows the error as follows:

Traceback (most recent call last):
  File "tsProcess.py", line 169, in <module>
    max_timeshift=windowSize-1, min_timeshift=windowSize-1)
  File "/home/lk/.pyenv/versions/anaconda3-2020.02/envs/ai/lib/python3.7/site-packages/tsfresh/utilities/dataframe_functions.py", line 576, in roll_time_series
    if df[column_sort].isnull().any():
  File "/home/lk/.pyenv/versions/anaconda3-2020.02/envs/ai/lib/python3.7/site-packages/dask/dataframe/core.py", line 199, in __bool__
    "a conditional statement.".format(self)
TypeError: Trying to convert dd.Scalar<series-..., dtype=bool> to a boolean value. Because Dask objects are lazily evaluated, they cannot be converted to a boolean value or used in boolean conditions like if statements. Try calling .compute() to force computation prior to converting to a boolean value or using in a conditional statement.

My tsfresh version is 0.17.0

nils-braun commented 4 years ago

Oh - I am very sorry. I somehow assumed #731 is already finished and implemented but have just remembered that this is not the case. Hm, well then you are completely correct - so far the rolling only accepts pandas data.

I think you can either wait for the PR to be finished (although it is kind of stuck currently) and/or help there - or you could try to do a staged approach where you roll only the first part of your data, then extract features and store it, then the second part... Maybe you can automate this with e.g. luigi.

If you want, I guess working on #731 would be very nice.

arnabbiswas1 commented 3 years ago

Dask support for rolling_time_series() will be highly appreciated.

With larger than memory data, I assume following would be an ideal workflow:

  1. Using Dask, read data for different ids (large number of CSVs or parquet files) stored in hard drive or cloud storage.
  2. rolling_time_series() accepts a Dask DataFrame (source) and outputs the rolled data in another Dask DataFrame. This resultant Dask DataFrame, if computed will be larger than the source DataFrame. Hence, should be passed as it is (as Dask DataFrame without computation) to next step.
  3. dask_feature_extraction_on_chunk() accepts the output from the previous step (in some shape and form). Computes the fetaures for different ids in the form of Dask DataFrame.
  4. Features computed at step 3 can be stored back to the hard drive/cloud storage segregated based on ids. Or the Dask DataFrame from 3 can be passed as it is to Machine Learning libraries for training or prediction directly.
vladjohnson commented 4 months ago

+1 @arnabbiswas1

Can we please get Dask support for rolling_time_series? Thanks!

nils-braun commented 3 months ago

Hi! I am happy for any contribution on this! If someone needs help getting started, feel free to reach out.