sryza / spark-timeseries

A library for time series analysis on Apache Spark
Apache License 2.0
1.19k stars 424 forks source link

Question - Resample #191

Open sumeshmr opened 7 years ago

sumeshmr commented 7 years ago

Hi, I was trying to use the resample functionality. Its being implemented in Scala, but is not added to Java API(JavaTimeSeriesRDD). How to add this?

matthjes commented 7 years ago

Hi, You can use TimeSeriesRDD instead of JavaTimeSeriesRDD. Here is an excerpt from the class I'm using to resample power data (in watts) into 5 minute and 1 hour consumption intervals (in watt hours):

public class AggregatorTask {

    private static final Logger LOG = LoggerFactory.getLogger(AggregatorTask.class);

    public static void main(String[] args) throws IOException {
        ...
        // create context
        SparkConf conf = new SparkConf().setAppName("Aggregate Time Data");
        JavaSparkContext context = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(context);

        ZoneId zoneId = ZoneId.systemDefault();
        Dataset<Row> df = ... // load data

        // create DateTimeIndex (timestamps for data)
        // assume 1s interval
        ZonedDateTime start = ZonedDateTime.ofInstant(Instant.ofEpochMilli(1496268001000L), zoneId);
        ZonedDateTime end = ZonedDateTime.ofInstant(Instant.ofEpochMilli(1496354399000L), zoneId);
        DateTimeIndex dtIndex = DateTimeIndexFactory.uniformFromInterval(start, end, new SecondFrequency(1));

        // create time series
        TimeSeriesRDD<String> meterDataRdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, df, "timestamp",
                "symbol", "value");

        // cache data in memory
        meterDataRdd.cache();

        // fill gaps using linear interpolation
        meterDataRdd = meterDataRdd.fill("linear");

        // resample data
        resampleAndSave(meterDataRdd, start, end, new MinuteFrequency(5), outputDir, pathSuffix + "\\5m");
        resampleAndSave(meterDataRdd, start, end, new HourFrequency(1), outputDir, pathSuffix + "\\1h");
    }

    private static void resampleAndSave(TimeSeriesRDD<String> meterDataRdd, ZonedDateTime start,
        ZonedDateTime end, Frequency frequency, String outputDirectory, String subDirectory) {
        // resample data to given interval
        DateTimeIndex targetIndex = DateTimeIndexFactory.uniformFromInterval(start, end, frequency);
        TimeSeriesRDD<String> resampled = meterDataRdd.resample(targetIndex, new PowerResampler(),
                false, false);

        // save data to file
        resampled.saveAsCsv(outputDirectory + "\\" + subDirectory);
    }

public class PowerResampler implements Function3<double[], Object, Object, Object>, Serializable {

    @Override
    public Object apply(double[] values, Object startIndex, Object endIndex) {
        // perform numerical integration
        int start = (int) startIndex;
        int end = (int) endIndex;
        double result = 0;
        for (int i = start; i < end - 1; i++) {
            double a = values[i];
            double c = values[i + 1];
            double h = 1.0 / 3600.0;
            result += (a + c) / 2.0 * h;
        }
        return result;
    }

    @Override
    public Function1<double[], Function1<Object, Function1<Object, Object>>> curried() {
        return null;
    }

    @Override
    public Function1<Tuple3<double[], Object, Object>, Object> tupled() {
        return null;
    }
}