sryza / spark-timeseries

A library for time series analysis on Apache Spark
Apache License 2.0
1.19k stars 424 forks source link

[PYTHON] to_pandas_series_rdd() does not work #193

Open laldonza opened 7 years ago

laldonza commented 7 years ago

When I try to use the method to_pandas_series_rdd of any TimeSeriesRDD, I get this error.

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o231.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

I don't know if anyone else have the same problem but after searching about this error, I realized that the error was caused because of the index().to_pandas_index(), the lazy evaluation makes a spark expression been called in the map, so I rewrote the method like this using deep copy and I don't get the error anymore.

def to_pandas_series_rdd(self):    

    """ Returns an RDD of Pandas Series objects indexed with Pandas DatetimeIndexes """

    def get_index(self):
        index = self.index().to_pandas_index()
        indexDC = index.copy(name="index", deep=True)
        return indexDC

    index = get_index(self)
    return self.map(lambda x: (x[0], pd.Series(x[1], index) ) )