delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.64k stars 1.71k forks source link

[BUG][Spark] Unable to use saveAsTable in append mode with concurrent.futures.ThreadPoolExecutor in python #2564

Open sangeethsasidharan opened 10 months ago

sangeethsasidharan commented 10 months ago

Bug

Describe the problem

My simplified use case is to read from one location and append the data to a Delta Lake table with a Hive Metastore in batches. I have to do this for a couple of tables concurrently, so I use a Python ThreadPoolExecutor for it. Each thread executes the above append operation for different tables

But when I use append mode with saveAsTable on the second batch append I get this error

The column number of the existing table spark_catalog.test_schema.delta_dummy (struct<>) doesn't match the data schema (struct<id:int,_c1:string>).

Somehow it's not able to get the target table's current schema,

I tried the same with parquet and CSV as target format, It worked as expected in those cases, 

With delta, lake overwrite mode also does not face this issue

With delta, lake writes to s3 files instead of the table and also does not face issue 

Only getting the above error when saving as a table with delta lake format in append mode

Also observed without Python future threads even append working fine

Steps to reproduce

` from pyspark.sql import SparkSession from delta.tables import *

import concurrent.futures

AWS_ACCESS_KEY_ID="AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY" dep_packages = 'io.delta:delta-spark_2.12:3.0.0,'\ 'org.apache.spark:spark-avro_2.12:3.5.0,'\ 'org.apache.hadoop:hadoop-aws:3.3.1,'\ 'com.amazonaws:aws-java-sdk-bundle:1.11.901'

spark = SparkSession\ .builder\ .appName("pyspark-notebook")\ .config("spark.jars.packages", dep_packages)\ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\ .config("hive.metastore.uris", "thrift://localhost:9085")\ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\ .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\ .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)\ .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)\ .config("spark.sql.warehouse.dir", "s3a://dw_path/delta_db")\ .enableHiveSupport()\ .getOrCreate()

def dummy_check_for_new_changes(logic_func): with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: executor.submit(logic_func)

def insert_data(): try: df = spark.read.format("csv")\ .option("header", "true")\ .option("inferSchema", "true")\ .load('test.csv') df.show() df\ .write\ .saveAsTable("test_schema.delta_dummy", format='delta', mode='append') print("inserted data") except Exception as e: print(e) raise e

def execute_code(): insert_data() insert_data()

dummy_check_for_new_changes(execute_code)

`

Observed results

Getting the below error on the second append operation The column number of the existing table spark_catalog.rbl_aura_ledger.hold_history_delta_dummy1 (struct<>) doesn't match the data schema (struct<id:int,_c1:string>).

Expected results

Its suppose to append the data to the table

Further details

Environment information

danricbede commented 9 months ago

I am also running into this issue, but using a DeltaCatalog.

Copy/Paste code to reproduce:

from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
import concurrent.futures

def write_table(spark, table_name):
    spark.sql("SELECT 'def' as name").write.mode("append").saveAsTable(table_name)

def test_simple(spark: SparkSession):
    spark.sql("DROP TABLE IF EXISTS test_table")
    spark.sql("DROP TABLE IF EXISTS test_table_2")

    empty_df = spark.createDataFrame([], StructType([StructField("name", StringType())]))
    empty_df.write.saveAsTable("test_table")
    empty_df.write.saveAsTable("test_table_2")

    # OK
    write_table(spark, "test_table")

    # Not OK
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = executor.submit(write_table, spark, "test_table_2")
        futures.result()

test_simple(spark.getActiveSession())

Relevant Spark Config:

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalogImplementation", "in-memory") 
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

Have tried variations of this config with no success.

Environment info:

The code actually runs ok on Databricks 14.3 LTS.

danricbede commented 5 months ago

The issue can be worked-around by using the DataFrameWriterV2 methods instead:

spark.sql("SELECT 'def' as name").writeTo(table_name).append()