delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.42k stars 1.66k forks source link

[BUG] Delta Lake MERGE INTO will hang Spark session without result (10000 columns). #3118

Open DevKretov opened 3 months ago

DevKretov commented 3 months ago

Bug

Which Delta project/connector is this regarding?

I am working with Delta Lake 3.1.0 and Spark 3.5.1.

Describe the problem

I am experimenting with Delta Lake as the primary storage solution for my tabular data, which is updated daily. I tried to mimic the basic use case - an existing target table is updated by the new data that can change existing data, i.e. upserts. I am using a MERGE INTO operation, where the target table is my Delta table and the table with updates is simply saved as a Parquet file.

My tables are special in the number of columns - there are up to 10000 columns, most of them are binary. One column contains an identifier of a row, represented as a string hash, which is used in the matching condition of the merge operation.

I am experimenting with a small main table having 5000 rows, which is 10 MB of one Parquet file on disk and the same table, but stored as a plain Parquet, which has several small 3.5 MB files.

My merge operation takes extremely long, probably stuck without any computation. What am I missing? I haven't tried partitioning since the size of the whole table is just 10 MB. I expected this operation to be extremely fast even in the case of non-optimized tables.

I will appreciate your help, thank you!

UPD: added the code to reproduce the issue.

Spark version: 3.5.1. Delta Lake (delta-spark) version: 3.1.0. Instance: r5n.16xlarge

The only output that I get is

24/05/20 14:18:21 WARN DAGScheduler: Broadcasting large task binary with size 1094.0 KiB 
24/05/20 14:18:21 WARN DAGScheduler: Broadcasting large task binary with size 1097.6 KiB

Then the Spark session hangs and does nothing. The code does not finish.

Steps to reproduce

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from delta import *

def get_spark():
    builder = SparkSession.builder.master("local[4]").appName('SparkDelta') \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.driver.memory", "32g") \
        .config("spark.jars.packages", 
                "io.delta:delta-spark_2.12:3.1.0,"
                "io.delta:delta-storage:3.1.0") \

    spark = builder.getOrCreate()

    return spark

spark = get_spark()

### GENERATE SOME FAKE DATA

import pandas as pd
import numpy as np 

num_rows = 5000
num_cols = 10000
array = np.random.rand(num_rows, num_cols)

df = pd.DataFrame(array)
df = df.reset_index()
df.to_csv('features.csv')

### DATA TO DELTA LAKE TABLE

df_spark = spark.read.format('csv').load('features.csv')
df_spark.write.format('delta').mode("overwrite").save('deltalake_features')

# Creating data to merge - just first 100 rows
df_slice = df.iloc[:100, 0:2]
df_slice.to_csv('features_slice_100.csv')

df_spark_slice = spark.read.format('csv').load('features_slice_100.csv')

### MERGE INTO code

target_delta_table = DeltaTable.forPath(spark, 'deltalake_features')
df_updates = df_spark_slice
df_target = target_delta_table.toDF()

(
    target_delta_table.alias('target')
    .merge(
        df_updates.alias('updates'),
        (
            (F.col('target._c1') == F.col('updates._c1'))
        )
    )
    .whenMatchedUpdate(set={
        '_c2': F.lit(42)
    })
    .execute()
)

Observed results

The Spark session hangs for no reason.

Expected results

I expect the merge to finish successfully.

Further details

If I do a simple inner join, the operation takes just 1 minute to complete, so I'd like to see the same speed here.

Environment information