NvTimLiu / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
0 stars 1 forks source link

[BUG] delta_lake_test FAILED on "column mapping mode `id` is not supported for this Delta version" #25

Open NvTimLiu opened 1 year ago

NvTimLiu commented 1 year ago

Describe the bug delta_lake_test FAILED on: org.apache.spark.sql.delta.ColumnMappingUnsupportedException: The column mapping mode id is not supported for this Delta version. Please upgrade if you want to use this mode.

Test FAILED with spark shims 3.2.x & 3.3.x, SKIPPED on other spark shims' pytests.

Related PR: https://github.com/NVIDIA/spark-rapids/pull/9279/

=========================== short test summary info ============================
FAILED ../../src/main/python/delta_lake_test.py::test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}]ER({'local': True}), zeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFr
FAILED ../../src/main/python/delta_lake_test.py::test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': .rapids.sql.reader.multithreaded.combine.sizeBytes': '0', 'spark.rapids.sql.reader.multithreaded.read.keepOrder': True}][IGNORE_ORDER({'local': zeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFr
FAILED ../../src/main/python/delta_lake_test.py::test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'}]ER({'local': True}), zeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFr
FAILED ../../src/main/python/delta_lake_test.py::test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': 'COALESCING', scing.reader.numFilterParallel': '2', 'spark.rapids.sql.reader.chunked': False}][INJECT_OOM, IGNORE_ORDER({'local': True}), zeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFr
FAILED ../../src/main/python/delta_lake_test.py::test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': .rapids.sql.reader.multithreaded.combine.sizeBytes': '64m', 'spark.rapids.sql.reader.multithreaded.read.keepOrder': True}][IGNORE_ORDER({'local': zeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFr
FAILED ../../src/main/python/delta_lake_test.py::test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED', 'spark.rapids.sql.format.parquet.multithreaded.combine.sizeBytes': '64m', 'spark.rapids.sql.format.parquet.multithreaded.read.keepOrder': True}][INJECT_OOM, IGNORE_ORDER({'local': True}), ALLOW_NON_GPU(DeserializeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFromObjectExec,SortExec)]
 =================================== FAILURES ===================================
 _ test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}] _
 [gw2] linux -- Python 3.9.18 /opt/conda/bin/python

 spark_tmp_path = '/tmp/pyspark_tests//it-test-333-815-0w6zw-2hsmg-gw2-933908-153293474/'
 reader_confs = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}
 mapping = 'id'

     @allow_non_gpu(*delta_meta_allow)
     @delta_lake
     @ignore_order(local=True)
     @pytest.mark.parametrize("reader_confs", reader_opt_confs_no_native, ids=idfn)
     @pytest.mark.parametrize("mapping", column_mappings, ids=idfn)
     def test_delta_read_column_mapping(spark_tmp_path, reader_confs, mapping):
         data_path = spark_tmp_path + "/DELTA_DATA"
         gen_list = [("a", int_gen),
                     ("b", SetValuesGen(StringType(), ["x", "y", "z"])),
                     ("c", string_gen),
                     ("d", SetValuesGen(IntegerType(), [1, 2, 3])),
                     ("e", long_gen)]
         confs = copy_and_update(reader_confs, {
             "spark.databricks.delta.properties.defaults.columnMapping.mode": mapping,
             "spark.databricks.delta.properties.defaults.minReaderVersion": "2",
             "spark.databricks.delta.properties.defaults.minWriterVersion": "5",
             "spark.sql.parquet.fieldId.read.enabled": "true"
         })
 >       with_cpu_session(
             lambda spark: gen_df(spark, gen_list).coalesce(1).write.format("delta") \
                 .partitionBy("b", "d") \
                 .save(data_path),
             conf=confs)

 ../../src/main/python/delta_lake_test.py:117: 
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 ../../src/main/python/spark_session.py:116: in with_cpu_session
     return with_spark_session(func, conf=copy)
 ../../src/main/python/spark_session.py:100: in with_spark_session
     ret = func(_spark)
 ../../src/main/python/delta_lake_test.py:118: in <lambda>
     lambda spark: gen_df(spark, gen_list).coalesce(1).write.format("delta") \
 ../../../spark-3.3.3-bin-hadoop3/python/pyspark/sql/readwriter.py:968: in save
     self._jwrite.save(path)
 /home/jenkins/agent/workspace/jenkins-rapids_integration-dev-github-815-3.3.3/jars/spark-3.3.3-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/in __call__
     return_value = get_return_value(
 ../../../spark-3.3.3-bin-hadoop3/python/pyspark/sql/utils.py:190: in deco
     return f(*a, **kw)
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

 answer = 'xro12684'
 gateway_client = <py4j.clientserver.JavaClient object at 0x7f1f92057f40>
 target_id = 'o12683', name = 'save'

     def get_return_value(answer, gateway_client, target_id=None, name=None):
         """Converts an answer received from the Java gateway into a Python object.

         For example, string representation of integers are converted to Python
         integer, string representation of objects are converted to JavaObject
         instances, etc.

         :param answer: the string returned by the Java gateway
         :param gateway_client: the gateway client used to communicate with the Java
             Gateway. Only necessary if the answer is a reference (e.g., object,
             list, map)
         :param target_id: the name of the object from which the answer comes from
             (e.g., *object1* in `object1.hello()`). Optional.
         :param name: the name of the member from which the answer comes from
             (e.g., *hello* in `object1.hello()`). Optional.
         """
         if is_error(answer)[0]:
             if len(answer) > 1:
                 type = answer[1]
                 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                 if answer[1] == REFERENCE_TYPE:
 >                   raise Py4JJavaError(
                         "An error occurred while calling {0}{1}{2}.\n".
                         format(target_id, ".", name), value)
 E                   py4j.protocol.Py4JJavaError: An error occurred while calling o12683.save.
 E                   : org.apache.spark.sql.delta.ColumnMappingUnsupportedException: The column mapping mode `id` is not supported for this Delta e if you want to use this mode.
 E                      at org.apache.spark.sql.delta.DeltaErrorsBase.unsupportedColumnMappingMode(DeltaErrors.scala:1738)
 E                      at org.apache.spark.sql.delta.DeltaErrorsBase.unsupportedColumnMappingMode$(DeltaErrors.scala:1736)
 E                      at org.apache.spark.sql.delta.DeltaErrors$.unsupportedColumnMappingMode(DeltaErrors.scala:2293)
 E                      at org.apache.spark.sql.delta.DeltaColumnMappingBase.verifyAndUpdateMetadataChange(DeltaColumnMapping.scala:113)
 E                      at org.apache.spark.sql.delta.DeltaColumnMappingBase.verifyAndUpdateMetadataChange$(DeltaColumnMapping.scala:102)
 E                      at org.apache.spark.sql.delta.DeltaColumnMapping$.verifyAndUpdateMetadataChange(DeltaColumnMapping.scala:552)
 E                      at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal(OptimisticTransaction.scala:340)
 E                      at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal$(OptimisticTransaction.scala:320)
 E                      at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadataInternal(OptimisticTransaction.scala:101)
 E                      at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:313)
 E                      at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:308)
 E                      at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:101)
 E                      at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:91)
 E                      at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:52)
 E                      at org.apache.spark.sql.delta.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:71)
 E                      at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:207)
 E                      at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98)
 E                      at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91)
 E                      at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:221)
 E                      at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91)
 E                      at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:159)
 E                      at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
 E                      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
 E                      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
 E                      at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
 E                      at org.apache.spark.sql.execution.QueryExecution$eCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
 E                      at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
 E                      at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
 E                      at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
 E                      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
 E                      at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 E                      at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
 E                      at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
 E                      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
 E                      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
 E                      at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
 E                      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$thPruning(LogicalPlan.scala:30)
 E                      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
 E                      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
 E                      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
 E                      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
 E                      at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
 E                      at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
 E                      at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
 E                      at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
 E                      at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
 E                      at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
 E                      at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
 E                      at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:357)
 E                      at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
 E                      at sun.reflect.GeneratedMethodAccessor207.invoke(Unknown Source)
 E                      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 E                      at java.lang.reflect.Method.invoke(Method.java:498)
 E                      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 E                      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 E                      at py4j.Gateway.invoke(Gateway.java:282)
 E                      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 E                      at py4j.commands.CallCommand.execute(CallCommand.java:79)
 E                      at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
 E                      at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
 E                      at java.lang.Thread.run(Thread.java:750)

 /home/jenkins/agent/workspace/jenkins-rapids_integration-dev-github-815-3.3.3/jars/spark-3.3.3-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/avaError
 _ test_delta_read_column_mapping[id-{'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED', 'spark.rapids.sql.reader.multithreaded.combine.sizeBytes': '0', 'spark.rapids.sql.reader.multithreaded.read.keepOrder': True}] _
NvTimLiu commented 1 year ago

IllegalArgumentException

 ----------------------------- Captured stdout call -----------------------------
 ### CPU RUN ###
 ### GPU RUN ###

 std_input_path = 'hdfs:/input-path/integration_tests/src/test/resources'
 filename = 'date.csv'
 schema = StructType([StructField('ts', TimestampType(), True)])

     @allow_non_gpu('FileSourceScanExec')
     @pytest.mark.skipif(is_before_spark_340(), reason='enableDateTimeParsingFallback is supported from Spark3.4.0')
     @pytest.mark.parametrize('filename,schema',[("date.csv", _date_schema), ("date.csv", _ts_schema,),
                                                 ("ts.csv", _ts_schema)])
     def test_csv_datetime_parsing_fallback_cpu_fallback(std_input_path, filename, schema):
         data_path = std_input_path + "/" + filename
         assert_gpu_fallback_collect(
             lambda spark : spark.read.schema(schema).option('enableDateTimeParsingFallback', "true").csv(data_path),
             'FileSourceScanExec',
 >           conf=_enable_all_types_conf)

 :576: 
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 :430: in assert_gpu_fallback_collect
     from_gpu, gpu_df = with_gpu_session(bring_back, conf=conf)
 :133: in with_gpu_session
     return with_spark_session(func, conf=copy)
 :100: in with_spark_session
     ret = func(_spark)
 :208: in bring_back
     return (df.collect(), df)
 :1217: in collect
     sock_info = self._jdf.collectToPython()
 :1323: in __call__
     answer, self.gateway_client, self.target_id, self.name)
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

 a = ('xro251702', <py4j.clientserver.JavaClient object at 0x7f1cedc63410>, 'o251701', 'collectToPython')
 kw = {}, converted = IllegalArgumentException()

     def deco(*a: Any, **kw: Any) -> Any:
         try:
             return f(*a, **kw)
         except Py4JJavaError as e:
             converted = convert_exception(e.java_exception)
             if not isinstance(converted, UnknownException):
                 # Hide where the exception came from that shows a non-Pythonic
                 # JVM exception message.
 >               raise converted from None

CPU/GPU output not equal

=================================== FAILURES ===================================

 spark_tmp_path = '/tmp/pyspark_tests//ip-172-31-8-237-main-10407-1322606390/'
 metadata_column = 'file_path'

     @pytest.mark.skipif(is_before_spark_330(), reason='Hidden file metadata columns are a new feature of Spark 330')
     @allow_non_gpu(any = True)
     @pytest.mark.parametrize('metadata_column', ["file_path", "file_name", "file_size", "file_modification_time"])
     def test_csv_scan_with_hidden_metadata_fallback(spark_tmp_path, metadata_column):
         data_path = spark_tmp_path + "/hidden_metadata.csv"
         with_cpu_session(lambda spark : spark.range(10) \
                          .selectExpr("id") \
                          .write \
                          .mode("overwrite") \
                          .csv(data_path))

         def do_csv_scan(spark):
             df = spark.read.csv(data_path).selectExpr("_c0", "_metadata.{}".format(metadata_column))
             return df

         assert_cpu_and_gpu_are_equal_collect_with_capture(
             do_csv_scan,
             exist_classes= "FileSourceScanExec",
 >           non_exist_classes= "GpuBatchScanExec")

 :504: 
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 :378: in assert_cpu_and_gpu_are_equal_collect_with_capture
     from_cpu, cpu_df = with_cpu_session(bring_back, conf=conf)
 :116: in with_cpu_session
     return with_spark_session(func, conf=copy)
 :100: in with_spark_session
     ret = func(_spark)
 :207: in bring_back
     df = limit_func(spark)
 :498: in do_csv_scan
     df = spark.read.csv(data_path).selectExpr("_c0", "_metadata.{}".format(metadata_column))
 :3077: in selectExpr
     jdf = self._jdf.selectExpr(self._jseq(expr))
 :1323: in __call__
     answer, self.gateway_client, self.target_id, self.name)
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

 a = ('xro248260', <py4j.clientserver.JavaClient object at 0x7f1cedc63410>, 'o248257', 'selectExpr')
 kw = {}, converted = AnalysisException()

     def deco(*a: Any, **kw: Any) -> Any:
         try:
             return f(*a, **kw)
         except Py4JJavaError as e:
             converted = convert_exception(e.java_exception)
             if not isinstance(converted, UnknownException):
                 # Hide where the exception came from that shows a non-Pythonic
                 # JVM exception message.
 >               raise converted from None