twosigma / flint

A Time Series Library for Apache Spark
Apache License 2.0
993 stars 184 forks source link

Both `.option("timeColumn","ds")` and `.parquet(path, time_column='ds') ` don't work #79

Open eromoe opened 4 years ago

eromoe commented 4 years ago

I have tried below way


df = fc.read.option("timeColumn","ds").option('isSorted', False).dataframe(spark.read.parquet('/test/SALECOUNT_OUT'))

df = fc.read.option("timeColumn","ds").option('isSorted', False).parquet('/test/SALECOUNT_OUT')

df = fc.read.option('isSorted', False).parquet('/test/SALECOUNT_OUT', time_column='ds')

Always throw error

Py4JJavaError: An error occurred while calling o91.canonizeTime.
: java.lang.IllegalArgumentException: Field "time" does not exist.
Available fields: store_id, product_id, store_product_id, sale_count, ds
        at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
        at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at org.apache.spark.sql.types.StructType.apply(StructType.scala:273)
        at com.twosigma.flint.timeseries.TimeSeriesRDD$.canonizeTime(TimeSeriesRDD.scala:123)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-3-95bfec85ec04> in <module>
----> 1 df = fc.read.option("timeColumn", "ds").parquet('/test/SALECOUNT_OUT')

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/readwriter.py in parquet(self, *paths)
    399         """
    400         df = self._sqlContext.read.parquet(*paths)
--> 401         return self.dataframe(df)
    402
    403     def _reconcile_reader_args(self, begin=None, end=None, timezone='UTC',

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/readwriter.py in dataframe(self, df, begin, end, timezone, is_sorted, time_column, unit)
    362             time_column=time_column,
    363             is_sorted=is_sorted,
--> 364             unit=self._parameters.timeUnitString())
    365
    366     def parquet(self, *paths):

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/dataframe.py in _from_df(df, time_column, is_sorted, unit)
    248                                    time_column=time_column,
    249                                    is_sorted=is_sorted,
--> 250                                    unit=unit)
    251
    252     @staticmethod

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/dataframe.py in __init__(self, df, sql_ctx, time_column, is_sorted, unit, tsrdd_part_info)
    133         # throw exception
    134         if time_column in df.columns:
--> 135             self._jdf = self._jpkg.TimeSeriesRDD.canonizeTime(self._jdf, self._junit)
    136
    137         if tsrdd_part_info:

/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258
   1259         for temp_arg in temp_args:

/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: 'Field "time" does not exist.\nAvailable fields: store_id, product_id, store_product_id, sale_count, ds'

ds is timestamp type .

eromoe commented 4 years ago

timeColumnName is hard code in TimeSeriesRDD.scala ??? image

So, what is the usage of timeColumn ?

dgrnbrg commented 3 years ago

I think that timeColumn doesn't work, and you must always name it time. That's been my experience.