NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
769 stars 226 forks source link

[BUG]delta_lake_update_test.py::test_delta_update_partitions[['a', 'b']-False] failed by DATAGEN_SEED=1707683137 #10410

Closed GaryShen2008 closed 5 months ago

GaryShen2008 commented 5 months ago

Describe the bug delta_lake_update_test.py::test_delta_update_partitions[['a', 'b']-False][DATAGEN_SEED=1707683137

[2024-02-11T20:47:42.987Z] =================================== FAILURES ===================================

[2024-02-11T20:47:42.987Z] ________________ test_delta_update_partitions[['a', 'b']-False] ________________

[2024-02-11T20:47:42.987Z] [gw4] linux -- Python 3.9.18 /opt/conda/bin/python

[2024-02-11T20:47:42.987Z] 

[2024-02-11T20:47:42.987Z] spark_tmp_path = '/tmp/pyspark_tests//it-test-331-663-4hk89-v63bc-gw4-1515130-1140402737/'

[2024-02-11T20:47:42.987Z] use_cdf = False, partition_columns = ['a', 'b']

[2024-02-11T20:47:42.987Z] 

[2024-02-11T20:47:42.987Z]     @allow_non_gpu(*delta_meta_allow)

[2024-02-11T20:47:42.987Z]     @delta_lake

[2024-02-11T20:47:42.987Z]     @ignore_order

[2024-02-11T20:47:42.987Z]     @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)

[2024-02-11T20:47:42.988Z]     @pytest.mark.parametrize("partition_columns", [["a"], ["a", "b"]], ids=idfn)

[2024-02-11T20:47:42.988Z]     @pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")

[2024-02-11T20:47:42.988Z]     def test_delta_update_partitions(spark_tmp_path, use_cdf, partition_columns):

[2024-02-11T20:47:42.988Z]         def generate_dest_data(spark):

[2024-02-11T20:47:42.988Z]             return three_col_df(spark,

[2024-02-11T20:47:42.988Z]                                 SetValuesGen(IntegerType(), range(5)),

[2024-02-11T20:47:42.988Z]                                 SetValuesGen(StringType(), "abcdefg"),

[2024-02-11T20:47:42.988Z]                                 string_gen)

[2024-02-11T20:47:42.988Z]         update_sql = "UPDATE delta.`{path}` SET a = 3 WHERE b < 'c'"

[2024-02-11T20:47:42.988Z] >       assert_delta_sql_update_collect(spark_tmp_path, use_cdf, generate_dest_data,

[2024-02-11T20:47:42.988Z]                                         update_sql, partition_columns)

[2024-02-11T20:47:42.988Z] 

[2024-02-11T20:47:42.988Z] ../../src/main/python/delta_lake_update_test.py:116: 

[2024-02-11T20:47:42.988Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[2024-02-11T20:47:42.988Z] ../../src/main/python/delta_lake_update_test.py:61: in assert_delta_sql_update_collect

[2024-02-11T20:47:42.988Z]     delta_sql_update_test(spark_tmp_path, use_cdf, dest_table_func, update_sql, checker,

[2024-02-11T20:47:42.988Z] ../../src/main/python/delta_lake_update_test.py:36: in delta_sql_update_test

[2024-02-11T20:47:42.988Z]     check_func(data_path, do_update)

[2024-02-11T20:47:42.988Z] ../../src/main/python/delta_lake_update_test.py:60: in checker

[2024-02-11T20:47:42.988Z]     with_cpu_session(lambda spark: assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path))

[2024-02-11T20:47:42.988Z] ../../src/main/python/spark_session.py:147: in with_cpu_session

[2024-02-11T20:47:42.988Z]     return with_spark_session(func, conf=copy)

[2024-02-11T20:47:42.988Z] /opt/conda/lib/python3.9/contextlib.py:79: in inner

[2024-02-11T20:47:42.988Z]     return func(*args, **kwds)

[2024-02-11T20:47:42.988Z] ../../src/main/python/spark_session.py:131: in with_spark_session

[2024-02-11T20:47:42.988Z]     ret = func(_spark)

[2024-02-11T20:47:42.988Z] ../../src/main/python/delta_lake_update_test.py:60: in <lambda>

[2024-02-11T20:47:42.988Z]     with_cpu_session(lambda spark: assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path))

[2024-02-11T20:47:42.988Z] ../../src/main/python/delta_lake_utils.py:139: in assert_gpu_and_cpu_delta_logs_equivalent

[2024-02-11T20:47:42.988Z]     assert_delta_log_json_equivalent(file, cpu_json, gpu_json)

[2024-02-11T20:47:42.988Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[2024-02-11T20:47:42.988Z] 

[2024-02-11T20:47:42.988Z] filename = '00000000000000000001.json'

[2024-02-11T20:47:42.988Z] c_json = {'add': {'dataChange': True, 'partitionValues': {'a': '3', 'b': 'b'}, 'path': 'a=3/b=b/partsnappy.parquet', 'stats': '...x82ÛÂrº\\u000F\xad"},"maxValues":{"c":"ÿûÜÌ\x9bã±½?«¨\\u00171}mò\\u0015>Ò\\u0011f\x9a¤óRúTÄÂ0"},"nullCount":{"c":7}}'}}

[2024-02-11T20:47:42.988Z] g_json = {'add': {'dataChange': True, 'partitionValues': {'a': '3', 'b': 'b'}, 'path': 'a=3/b=b/partsnappy.parquet', 'stats': '...x82ÛÂrº\\u000F\xad"},"maxValues":{"c":"ÿûÜÌ\x9bã±½?«¨\\u00171}mò\\u0015>Ò\\u0011f\x9a¤óRúTÄÂ0"},"nullCount":{"c":9}}'}}

[2024-02-11T20:47:42.988Z] 

[2024-02-11T20:47:42.988Z]     def assert_delta_log_json_equivalent(filename, c_json, g_json):

[2024-02-11T20:47:42.988Z]         assert c_json.keys() == g_json.keys(), "Delta log {} has mismatched keys:\nCPU: {}\nGPU: {}".format(filename, c_json, g_json)

[2024-02-11T20:47:42.988Z]         def fixup_path(d):

[2024-02-11T20:47:42.988Z]             """Modify the 'path' value to remove random IDs in the pathname"""

[2024-02-11T20:47:42.988Z]             parts = d["path"].split("-")

[2024-02-11T20:47:42.988Z]             d["path"] = "-".join(parts[0:1]) + ".".join(parts[-1].split(".")[-2:])

[2024-02-11T20:47:42.988Z]         def del_keys(key_list, c_val, g_val):

[2024-02-11T20:47:42.988Z]             for key in key_list:

[2024-02-11T20:47:42.988Z]                 c_val.pop(key, None)

[2024-02-11T20:47:42.988Z]                 g_val.pop(key, None)

[2024-02-11T20:47:42.988Z]         for key, c_val in c_json.items():

[2024-02-11T20:47:42.989Z]             g_val = g_json[key]

[2024-02-11T20:47:42.989Z]             # Strip out the values that are expected to be different

[2024-02-11T20:47:42.989Z]             c_tags = c_val.get("tags", {})

[2024-02-11T20:47:42.989Z]             g_tags = g_val.get("tags", {})

[2024-02-11T20:47:42.989Z]             del_keys(["INSERTION_TIME", "MAX_INSERTION_TIME", "MIN_INSERTION_TIME"], c_tags, g_tags)

[2024-02-11T20:47:42.989Z]             if key == "metaData":

[2024-02-11T20:47:42.989Z]                 assert c_val.keys() == g_val.keys(), "Delta log {} 'metaData' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_val, g_val)

[2024-02-11T20:47:42.989Z]                 del_keys(("createdTime", "id"), c_val, g_val)

[2024-02-11T20:47:42.989Z]             elif key == "add":

[2024-02-11T20:47:42.989Z]                 assert c_val.keys() == g_val.keys(), "Delta log {} 'add' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_val, g_val)

[2024-02-11T20:47:42.989Z]                 del_keys(("modificationTime", "size"), c_val, g_val)

[2024-02-11T20:47:42.989Z]                 fixup_path(c_val)

[2024-02-11T20:47:42.989Z]                 fixup_path(g_val)

[2024-02-11T20:47:42.989Z]             elif key == "cdc":

[2024-02-11T20:47:42.989Z]                 assert c_val.keys() == g_val.keys(), "Delta log {} 'cdc' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_val, g_val)

[2024-02-11T20:47:42.989Z]                 del_keys(("size",), c_val, g_val)

[2024-02-11T20:47:42.989Z]                 fixup_path(c_val)

[2024-02-11T20:47:42.989Z]                 fixup_path(g_val)

[2024-02-11T20:47:42.989Z]             elif key == "commitInfo":

[2024-02-11T20:47:42.989Z]                 assert c_val.keys() == g_val.keys(), "Delta log {} 'commitInfo' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_val, g_val)

[2024-02-11T20:47:42.989Z]                 del_keys(("timestamp", "txnId"), c_val, g_val)

[2024-02-11T20:47:42.989Z]                 for v in c_val, g_val:

[2024-02-11T20:47:42.989Z]                     _fixup_operation_metrics(v.get("operationMetrics", {}))

[2024-02-11T20:47:42.989Z]                     _fixup_operation_parameters(v.get("operationParameters", {}))

[2024-02-11T20:47:42.989Z]             elif key == "remove":

[2024-02-11T20:47:42.989Z]                 assert c_val.keys() == g_val.keys(), "Delta log {} 'remove' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_val, g_val)

[2024-02-11T20:47:42.989Z]                 del_keys(("deletionTimestamp", "size"), c_val, g_val)

[2024-02-11T20:47:42.989Z]                 fixup_path(c_val)

[2024-02-11T20:47:42.989Z]                 fixup_path(g_val)

[2024-02-11T20:47:42.989Z] >           assert c_val == g_val, "Delta log {} is different at key '{}':\nCPU: {}\nGPU: {}".format(filename, key, c_val, g_val)

[2024-02-11T20:47:42.989Z] E           AssertionError: Delta log 00000000000000000001.json is different at key 'add':

[2024-02-11T20:47:42.989Z] E           CPU: {'path': 'a=3/b=b/partsnappy.parquet', 'partitionValues': {'a': '3', 'b': 'b'}, 'dataChange': True, 'stats': '{"numRecords":147,"minValues":{"c":"\\u0001\x80\\\\\\f\\u00039\x9eÜVJ³¾\x93÷æô¸è=¯ûo`\x82ÛÂrº\\u000F\xad"},"maxValues":{"c":"ÿûÜÌ\x9bã±½?«¨\\u00171}mò\\u0015>Ò\\u0011f\x9a¤óRúTÄÂ0"},"nullCount":{"c":7}}'}

[2024-02-11T20:47:42.989Z] E           GPU: {'path': 'a=3/b=b/partsnappy.parquet', 'partitionValues': {'a': '3', 'b': 'b'}, 'dataChange': True, 'stats': '{"numRecords":149,"minValues":{"c":"\\u0001\x80\\\\\\f\\u00039\x9eÜVJ³¾\x93÷æô¸è=¯ûo`\x82ÛÂrº\\u000F\xad"},"maxValues":{"c":"ÿûÜÌ\x9bã±½?«¨\\u00171}mò\\u0015>Ò\\u0011f\x9a¤óRúTÄÂ0"},"nullCount":{"c":9}}'}

[2024-02-11T20:47:42.989Z] 

[2024-02-11T20:47:42.989Z] ../../src/main/python/delta_lake_utils.py:104: AssertionError

Steps/Code to reproduce bug Run IT on Spark 3.3.1 with datagen_seed=1707683137.

Expected behavior Test case should succeed.

jlowe commented 5 months ago

I can reproduce this locally via:

SPARK_SUBMIT_FLAGS="--master local[8]" DATAGEN_SEED=1707683137 TEST_PARALLEL=0 SPARK_HOME=~/spark-3.3.3-bin-hadoop3/ PYSP_TEST_spark_jars_packages=io.delta:delta-core_2.12:2.3.0 PYSP_TEST_spark_sql_extensions=io.delta.sql.DeltaSparkSessionExtension PYSP_TEST_spark_sql_catalog_spark__catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog integration_tests/run_pyspark_from_build.sh -k "test_delta_update_partitions and False" --delta_lake

I looked into the issue and it is a known non-deterministic behavior with sorting for partitioned write. In this case the CPU sampled and split the data such that one null row went to task A, but in the GPU case that null row went to task B. Output is semantically equivalent in the resulting table but the metadata won't line up because the individual files don't all have the same corresponding number of rows.

Will update the test to pin the data seed.