twosigma / flint

A Time Series Library for Apache Spark
Apache License 2.0
993 stars 184 forks source link

"joinLeft" is not working when joining a clock with a parque-based DataFrame #57

Open LeoDashTM opened 5 years ago

LeoDashTM commented 5 years ago

@icexelloss

(even though the "timeColumn" argument error can be bypassed by renaming the column in question to time) the joinLeft is not working for me:

print( sc.version )
print( tm )

n = df.filter( df['Container'] == 'dbc94d4e3af6' ).select( tm, 'MemPercentG', 'CpuPercentG' )

from ts.flint import FlintContext, clocks
from ts.flint import utils

fc = FlintContext( sqlContext )

r = fc.read \
    .option('isSorted', False) \
    .option('timeUnit', 's') \
    .dataframe( n )

r.show(truncate=False)
print( r )
r.printSchema()

l = clocks.uniform(fc, '30s', begin_date_time='2018-8-1 5:55:35', end_date_time='2018-08-01 05:59:05')
print( type( l ) )
print( l )
l.printSchema()
# l.show(truncate=False)
j = l.leftJoin( r )

With the output being:

2.3.1
time
+-------------------+---------------+------------+
|time               |MemPercentG    |CpuPercentG |
+-------------------+---------------+------------+
|2018-08-01 05:55:35|0.0030517578125|0.002331024 |
|2018-08-01 05:58:05|0.0030517578125|0.0031538776|
|2018-08-01 05:59:05|0.0030517578125|0.0030176123|
+-------------------+---------------+------------+

TimeSeriesDataFrame[time: timestamp, MemPercentG: double, CpuPercentG: float]
root
 |-- time: timestamp (nullable = true)
 |-- MemPercentG: double (nullable = true)
 |-- CpuPercentG: float (nullable = true)

<class 'ts.flint.dataframe.TimeSeriesDataFrame'>
TimeSeriesDataFrame[time: timestamp]
root
 |-- time: timestamp (nullable = true)

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.codegen.ExprCode.value()Ljava/lang/String;
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-911439891027714> in <module>()
     22 l.printSchema()
     23 # l.show(truncate=False)
---> 24 j = l.leftJoin( r )
     25 
     26 

/databricks/python/lib/python3.5/site-packages/ts/flint/dataframe.py in leftJoin(self, right, tolerance, key, left_alias, right_alias)
    606         tolerance = self._timedelta_ns('tolerance', tolerance, default='0ns')
    607         scala_key = utils.list_to_seq(self._sc, key)
--> 608         tsrdd = self.timeSeriesRDD.leftJoin(right.timeSeriesRDD, tolerance, scala_key, left_alias, right_alias)
    609         return TimeSeriesDataFrame._from_tsrdd(tsrdd, self.sql_ctx)
    610 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3324.leftJoin.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.codegen.ExprCode.value()Ljava/lang/String;
    at org.apache.spark.sql.TimestampCast$class.doGenCode(TimestampCast.scala:77)
    at org.apache.spark.sql.NanosToTimestamp.doGenCode(TimestampCast.scala:31)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:111)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:108)
    at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:143)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24$$anonfun$apply$5.apply(CodeGenerator.scala:1367)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24$$anonfun$apply$5.apply(CodeGenerator.scala:1366)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24.apply(CodeGenerator.scala:1366)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24.apply(CodeGenerator.scala:1366)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1227)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressionsForWholeStageWithCSE(CodeGenerator.scala:1365)
    at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:67)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
    at org.apache.spark.sql.execution.RDDScanExec.consume(ExistingRDD.scala:176)
    at org.apache.spark.sql.execution.RDDScanExec.doProduce(ExistingRDD.scala:221)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.RDDScanExec.produce(ExistingRDD.scala:176)
    at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:50)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:40)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:530)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:582)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108)
    at com.twosigma.flint.timeseries.NormalizedDataFrameStore.toOrderedRdd(TimeSeriesStore.scala:252)
    at com.twosigma.flint.timeseries.NormalizedDataFrameStore.orderedRdd(TimeSeriesStore.scala:237)
    at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.orderedRdd(TimeSeriesRDD.scala:1346)
    at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.leftJoin(TimeSeriesRDD.scala:1515)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

By the way, what is the way to even display the contents of the clocks DataFrame? The second to last commented out line (with the .show command) errors out, so I don't understand how TimeSeriesDataFrame is inheriting from a regular DataFrame, for which that method is available... The display method also fails...

Anyways, what is wrong with the leftJoin here? The clock is on the left like you indicated it should be. Swapping left and right data frames also does not help.

Is this reproducible for you?

Please, advise, if I'm not using/calling it correctly or if it's a bug.

The flint libraries (the Scala and the Python ones) I installed on DataBricks via its UI (from the respective online repos, which might be dated) - I can try and install the latest builds from the freshest source code, if you think that will help.

Thanks.

LeoDashTM commented 5 years ago

Update: so dir does show that leftJoin is present for both l and r. I can even print the 2 methods, but invoking it on one with the other as argument does not work for some reason... Any ideas?

LeoDashTM commented 5 years ago

Some googling around might indicate that there's some incompatibility between the framework and the library versions, but your front page does say that the library works with Spark 2.3., and the lib's version (per python) is 0.6.0 Is there a more thorough way to figure out if the framework and the library are compatible with one another, @icexelloss ? Thanks.

LeoDashTM commented 5 years ago

The error trace is calling this line surely: https://github.com/apache/spark/blob/master/python/pyspark/sql/utils.py#L60

icexelloss commented 5 years ago

Are you using the databricks jar? The apache jar doesn't work on databricks runtime for some reason.

LeoDashTM commented 5 years ago

Are you using the databricks jar? The apache jar doesn't work on databricks runtime for some reason.

Li Jin, I appreciate your reply. I'm not sure what you mean by the "databricks jar"

I downloaded the flint jar file that is currently loaded onto DataBricks, its size is 2171677 and here is its manifest file:

Manifest-Version: 1.0
Implementation-Title: flint
Implementation-Version: 0.6.0
Specification-Vendor: com.twosigma
Specification-Title: flint
Implementation-Vendor-Id: com.twosigma
Specification-Version: 0.6.0
Implementation-URL: https://github.com/twosigma/flint
Implementation-Vendor: com.twosigma

As I stated previously: The flint libraries (the Scala and the Python ones) I installed on DataBricks via its UI (from the respective online repos, which might be dated) - I can try and install the latest builds from the freshest source code, if you think that will help. Or if you could provide me with a jar that you think should work I could try installing and using it instead.

Please, assist further! Thanks.

LeoDashTM commented 5 years ago

Built from source (master branch) and its size is 8075230 (flint-assembly-0.6.0-SNAPSHOT.jar)

icexelloss commented 5 years ago

There is issues with using the standard flint jar on Databricks platform so they build a specific jar for Flint 0.6 for their platform here:

https://github.com/databricks/databricks-accelerators/tree/master/projects/databricks-flint

LeoDashTM commented 5 years ago

There is issues with using the standard flint jar on Databricks platform so they build a specific jar for Flint 0.6 for their platform here:

https://github.com/databricks/databricks-accelerators/tree/master/projects/databricks-flint

I downloaded the one from DataBricks, its size is 2252415 and its name is flint_0_6_0_databricks.jar Still the same error... What gives?

icexelloss commented 5 years ago

@LeoDashTM Could you please try this in the local pyspark notebook with --packages?

LeoDashTM commented 5 years ago

@LeoDashTM Could you please try this in the local pyspark notebook with --packages?

I'm not too familiar yet with local pyspark, but see the 2nd comment of the issue I opened with DataBricks: https://github.com/databricks/databricks-accelerators/issues/1 I used the --jars argument there.

Ok, actually just tried: spark-submit --packages com.twosigma:flint:0.6.0 test.py and that did seem to produce the results I want! So is it because locally I'm running 2.3.2 and it's 2.3.1 on DataBricks? I'm not sure I can upgrade DataBricks instances from to 2.3.2 from 2.3.1, the latter is the highest stable version of Spark available on DB at the moment...

The data file is attached to the first comment and both have the reproducible code (the first one for DB and the second one for a local installation). Are you able to run either one of them against Spark 2.3.1 and see the issue? I really need it to work against that version, you see.

Thanks.

LeoDashTM commented 5 years ago

Update. Just installed Spark 2.3.1 locally and running pyspark --packages com.twosigma:flint:0.6.0 then executing each line by hand seems to work correctly. So the deal is indeed with the flavor of Spark installed on DataBricks then? Is that the right conclusion?

icexelloss commented 5 years ago

It sounds like it’s a Databricks specific issue. I think the way they handle third party packages is different from the open source one. That’s why the Databricks jar exists in the first place. On Thu, Nov 1, 2018 at 2:40 AM Leo Dashevskiy notifications@github.com wrote:

Update. Just installed Spark 2.3.1 locally and running pyspark --packages com.twosigma:flint:0.6.0 then executing each line by hand seems to work correctly. So the deal is indeed with the flavor of Spark installed on DataBricks then? Is that the right conclusion?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/twosigma/flint/issues/57#issuecomment-434946989, or mute the thread https://github.com/notifications/unsubscribe-auth/AAwbrMP_0bfk5tCTEXdgZH8UIG9QNYIpks5uqpd3gaJpZM4X812S .

LeoDashTM commented 5 years ago

With the Databricks' latest commit: https://github.com/databricks/databricks-accelerators/commit/d23782aa5036b04be0102f6fbe778690eae54d3a the printing of the clocks and the joining both seem to be working.

The results of a clock creation and printing are a bit unexpected for me:

>>> l = clocks.uniform(fc, '30s', begin_date_time='2018-8-1 5:55:35', end_date_time='2018-08-01 05:56:05')
>>> l.show( truncate = False )
+-----------------------+
|time                   |
+-----------------------+
|50552-02-05 14:23:200.0|
|50552-02-05 22:43:200.0|
+-----------------------+

Are these results as expecte0d @icexelloss ? Are you getting the exact same thing on your setup? And if this is as expected, then how do I create a clock with the range I specified via begin_date_time and end_date_time? If not, what is deal, any idea? Thanks.

LeoDashTM commented 5 years ago

Ok, I tried with TwoSigma's official latest build and the data frame is displayed as expected, so it's a DataBricks issue again...

LeoDashTM commented 5 years ago

Same issue: https://github.com/twosigma/flint/issues/58 ? @icexelloss ?