delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.4k stars 1.66k forks source link

[BUG][Spark][Presto] Delta lake integration with presto #1994

Open rohitvarma01 opened 1 year ago

rohitvarma01 commented 1 year ago

Hi all,

I'm using Spark version 3.2.0 along with Delta Lake version 1.2.1. I've established connectivity between Spark and the Hive metastore. Additionally, I've connected Presto to Hive using a connector. Through Spark SQL, I've created a table in Hive based on a Delta table stored in an S3 location. I can see this newly created table in both the Hive metadata (specifically in the TBLS table) and in Presto.

However, when I attempt to read this table using either Spark SQL or Presto, I encounter the following error:

java.lang.RuntimeException: s3a://db-postgres-data/enc_bank_account_information/_symlink_format_manifest/manifest is not a Parquet file. Expected magic number at tail, but found [117, 101, 116, 10].

Below is my spark conf: spark = SparkSession.builder .appName("presto with Delta Lake") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.jars", "/usr/spark-3.1.2/jars/spark-hive_2.12-3.1.2.jar,/usr/spark-3.1.2/jars/hive-metastore-3.1.3.jar,/usr/spark-3.1.2/jars/postgresql-42.2.27.jre7.jar,/usr/spark-3.1.2/jars/hadoop-aws-3.2.2.jar,/usr/spark-3.1.2/jars/hadoop-common-3.3.6.jar,/usr/spark-3.1.2/jars/aws-java-sdk-bundle-1.11.563.jar,/usr/spark-3.1.2/jars/aws-java-sdk-s3-1.11.563.jar") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") .config("spark.executor.extraJavaOptions","-Dcom.amazonaws.services.s3.enableV4=true") .config("spark.driver.extraJavaOptions","-Dcom.amazonaws.services.s3.enableV4=true") .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") .config("spark.jars.packages", spark_jars_packages) .config("spark.executor.memory", "8g") .config("spark.worker.memory", "8g") .config("spark.sql.warehouse.dir", "work/spark-warehouse") .config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("hive.metastore.uris", hive_metastore) .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "true") .config("spark.hadoop.fs.s3a.fast.upload", "true") .config("spark.sql.sources.partitionOverwriteMode", "dynamic") .config("spark.history.fs.logDirectory", "s3a://storage-for-spark-logs/") .config("spark.sql.catalogImplementation", "hive") .master(master) .enableHiveSupport() .getOrCreate()

spark.sparkContext.setLogLevel("WARN") spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true") spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.path.style.access", "true") spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.access.key", aws_access_key_id) spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.endpoint", "s3-eu-north-1.amazonaws.com")

Below code for creating hive table: spark.sql(""" CREATE TABLE IF NOT EXISTS bank_info ( __v double, _id string, user_id string, createdAt string, updatedAt string, ifsc_code string, account_number_mac string, account_number_encryptedData string, _airbyte_ab_id string, _airbyte_emitted_at timestamp, _airbyte_normalized_at timestamp, _airbyte_bank_account_information_hashid string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3a://db-postgres-data/enc_bank_account_information/_symlink_format_manifest' """)

Please help to resolve this issue.

Environment information

rohitvarma01 commented 1 year ago

@tdas please help me solve this issue.

vkorukanti commented 1 year ago

@rohitvarma01 Few questions:

1) SparkSQL issue: Are you trying to query the Hive table bank_info (which links to the Delta manifest file) from SparkSQL? If yes, AFAIK, it is not supported in Spark (see the JIRA). Spark only supports text files when a manifest file is used. Parquet is not supported. If you want to query from SparkSQL, why not use the DataFrame.saveAsTable API when creating the Delta table?

2) Presto issue: Presto should support reading the manifest file with Parquet files. Could you give the entire call stack of the error in Presto?