h2oai / sparkling-water

Sparkling Water provides H2O functionality inside Spark cluster
https://docs.h2o.ai/sparkling-water/3.3/latest-stable/doc/index.html
Apache License 2.0
966 stars 360 forks source link

org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF #485

Closed Tagar closed 6 years ago

Tagar commented 6 years ago

We started getting this error on wide datasets after upgrading to latest SW 2.2.3. It was not happening on previous SW release 2.2.2.

executor 16): java.lang.RuntimeException: Error while encoding: org.codehaus.janino.JaninoRuntimeException: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF / 001 / public java.lang.Object generate(Object[] references) { / 002 / return new SpecificUnsafeProjection(references); / 003 / } / 004 / / 005 / class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { / 006 / / 007 / private Object[] references; / 008 / private int argValue; / 009 / private java.lang.String argValue1; / 010 / private boolean isNull11; / 011 / private boolean value11; / 012 / private boolean isNull12; /*

Code:

new_df = df.drop('FILE_CODE', 'ZIP_CODE', 'ZIP_PLUS_4', 'ADDRESS_KEY', 'HOUSEHOLD_KEY', 'AGILITY_ADDRESS', 'AGILITY_HOUSEHOLD')
print "Drop vars"
skippy_binary = hc.as_h2o_frame(new_df,framename='skippy_binary')
skippy_binary["SEGMENT"] = skippy_binary["SEGMENT"].asfactor()
print "H2O Frame Created"

This error happens on a dataframe with ~3k variables, but doesn't happen on a dataframe with ~800 columns for example. But again, SW 2.2.2 didn't have this problem on the same same data/same code.

Tagar commented 6 years ago

This seems to be https://issues.apache.org/jira/browse/SPARK-18016 ? Wonder why we didn't hit this on 2.2.2 ?

Tagar commented 6 years ago

@jakubhava I opened support case with Cloudera. They asked if we could reproduce this outside of h2o/ sparkling water and just using Spark/ PySpark. Could you please point me to a code snippet that hc.as_h2o_frame() runs internally to see if we can reproduce this outside of SW? Thank you.

jakubhava commented 6 years ago

Hi @Tagar , thanks for investigation!

The logic used for converting Spark DataFrame into H2OFrame is stored right here https://github.com/h2oai/sparkling-water/blob/master/core/src/main/scala/org/apache/spark/h2o/converters/SparkDataFrameConverter.scala

mmalohlava commented 6 years ago

@Tagar @jakubhava it is interesting problem. I tracked the differences between 2.2.2..2.2.3 but did not find any reasonable explanation. There are several potential changes, like this one but i do not see reason for triggering https://issues.apache.org/jira/browse/SPARK-18016. @jakubhava WDYT?

Tagar commented 6 years ago

Thank you for looking into this @jakubhava and @mmalohlava .

Cloudera Support confirms it is directly related to SPARK-18016. Although again it's strange we didn't face this problem before the upgrade. We also tried to upgrade to 2.2.4 (from 2.2.3) and users confirm today they still have that issue on very wide datasets (broken on 3k, 13k columns datasets, for example, but works fine on 800 columns datasets).

I also asked users to try spark.sql.codegen.wholeStage to set to false as it seems related to code generation on Spark side somehow. But setting spark.sql.codegen.wholeStage to false didn't change behavior.

axiomoixa commented 6 years ago

@Tagar Could you try Spark 2.2.1? Apparently a lot of those 64KB JVM bytecode limit bugs are fixed now. https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315420&version=12340470

The limit on the number of columns you experienced sounds a lot like what I had experienced with MLLib. Namely, GLM with 500 variables ran fine, when it got up to 2k variables, GLM errored out. https://issues.apache.org/jira/browse/SPARK-22761 , which is one of those 64KB bytecode limit bugs that is apparently fixed in Spark 2.2.1.

Tagar commented 6 years ago

@axiomoixa We use Cloudera's Spark 2.2 build - they sometimes remove certain patches and on other hand can backport certain other fixes. I have updated my Cloudera case and asked if those 2.2.1 fixes of "64KB JVM bytecode limit" made its way to Cloudera - thank you for pointing to that.

jakubhava commented 6 years ago

I just had a quick look and I couldn’t find any particular change in Sparkling Water which would cause such a dramatic column number

axiomoixa commented 6 years ago

@Tagar I must unfortunately correct myself. Apparently, those 64KB bytecode fixes are in the major branch to be released with Spark 2.3.0, but not in 2.2.1 yet.

jakubhava commented 6 years ago

@axiomoixa I can see some 64KB bytecode fixed already in 2.2.1, at least it is stated in the release notes - https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315420&version=12340470

jakubhava commented 6 years ago

@Tagar @axiomoixa I think it was caused by this change in Sparkling Water - https://0xdata.atlassian.net/browse/SW-499. It is a good change though.

Before, the BinaryType was just ignored ( no error thrown ), but right now it is properly handled and when we have for example an Array[Byte] in Spark DataFrame, it will now be expanded into a lots of new columns -> which is probably cause of the exception.

Could you please share the schema of the data you are converting @Tagar ? Or at least, share the information if any of the fields is BinaryType (or Array[Byte]) ?

Tagar commented 6 years ago

@jakubhava thank you for this information. Yes it seems to be a good improvement.

Before, the BinaryType was just ignored ( no error thrown ), but right now it is properly handled and when we have for example an Array[Byte] in Spark DataFrame, it will now be expanded into a lots of new columns -> which is probably cause of the exception.

What would be an example of such a datatype? Does it mean SW-499 creates an enum-like structure for categorical features? We use PySpark primarily. Array[Byte] is probably a string datatype in Spark world, or you mean a nested collection of elements?

Could you please share the schema of the data you are converting @Tagar ? Or at least, share the information if any of the fields is BinaryType (or Array[Byte]) ?

I will upload schema to H2O ticket https://support.h2o.ai/support/tickets/91559 if that's okay with you, as I can't share schema in public domain.

Tagar commented 6 years ago

@jakubhava I updated the H2O case with complete schema. That dataframe has only 'double' and 'string' data types:

>>> print set(t for n,t in df.dtypes)
set(['double', 'string'])

So not sure where Array[Byte] or BinaryType is coming from?

Thank you.

jakubhava commented 6 years ago

@Tagar BinaryType is type used to represent Array[Byte].

If the dataframe has only simple type such as string and double then the change SW-499 does not affect this call, so still not sure why it was started behaving this

jakubhava commented 6 years ago

My last candidate is this change - https://github.com/h2oai/sparkling-water/pull/429 , particularly this line - https://github.com/h2oai/sparkling-water/blob/0fa5510a3a5080d6e378b55199574eb6d2756184/core/src/main/scala/org/apache/spark/h2o/utils/H2OSchemaUtils.scala#L111

During each conversion we call this newly in 2.2.3 to create a new dataframe with possibly renamed columns.

Spark however internally calls this method, with needsConversion set to true. It therefore creates the projection and then creates a Dataset out of the converted data. The project might be reason for triggering the exception above.

private[sql] def createDataFrame(
    rowRDD: RDD[Row],
    schema: StructType,
    needsConversion: Boolean) = {
  // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
  // schema differs from the existing schema on any field data type.
  val catalystRows = if (needsConversion) {
    val encoder = RowEncoder(schema)
    rowRDD.map(encoder.toRow)
  } else {
    rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
  }
  val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
  Dataset.ofRows(self, logicalPlan)
}

Kuba

jakubhava commented 6 years ago

@Tagar I think that this change https://github.com/h2oai/sparkling-water/pull/497/files might actually help in your case, however I still need to test it. If you know how to build sparkling water and want to give it a try as well, feel free to build it from this PR https://github.com/h2oai/sparkling-water/pull/497/files

jakubhava commented 6 years ago

Closing this issue as it is fixed by https://github.com/h2oai/sparkling-water/pull/497 .

However, please note that this is just optimisation of our code to not create additional dataframes/columns. The original issue still exist in Spark and can be reproduced on really large number of columns, however without upgrading Spark, there is currently not much we can do

Tagar commented 6 years ago

Users confirm this issue is fixed now. So we're back to pre-upgrade state.

Also root cause - https://issues.apache.org/jira/browse/SPARK-18016 was fixed and committed to Spark 2.3 today.

Thank you a lot.

jakubhava commented 6 years ago

Hi @Tagar, new Sparkling Water release is out with this and also additional fixes

Tagar commented 6 years ago

Thank you @jakubhava! We will upgrade to 2.2.6 tonight.