delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.43k stars 1.67k forks source link

DeltaTableBuilder does not work on EMR 6.3.0 and Glue Metastore #703

Closed jperkelens closed 3 years ago

jperkelens commented 3 years ago

From what I can tell, attempting to create a entry in a glue metastore on EMR runs into problems with consistent partition fields.

When running the following code on EMR 6.3.0 (Spark 3.1.1) and Delta 1.0.0:

import io.delta.tables.DeltaTable
import org.apache.spark.sql.types._
DeltaTable.create()
   .location("s3://bucket/dataset/v1")
   .tableName("dev_database.datatable")
   .partitionedBy("ingest_date")
   .addColumns(StructType(Seq(StructField("ingest_date", DateType), StructField("id", StringType))))
   .execute()

This exception is thrown:

 diagnostics: User class threw exception: java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.catalyst.catalog.CatalogTable.partitionSchema(interface.scala:319)
    at org.apache.spark.sql.hive.HiveExternalCatalog.newSparkSQLSpecificMetastoreTable$1(HiveExternalCatalog.scala:355)
    at org.apache.spark.sql.hive.HiveExternalCatalog.createDataSourceTable(HiveExternalCatalog.scala:434)
    at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:299)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105)
    at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:270)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:346)
    at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.createTable(V2SessionCatalog.scala:105)
    at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:41)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at io.delta.tables.DeltaTableBuilder.$anonfun$execute$4(DeltaTableBuilder.scala:356)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at io.delta.tables.DeltaTableBuilder.execute(DeltaTableBuilder.scala:356)

The EMR cluster is configured with the following properties:

[{"classification":"spark", "properties":{"maximizeResourceAllocation":"true"}, "configurations":[]},{"classification":"spark-defaults", "properties":{"spark.sql.extension":"io.delta.sql.DeltaSparkSessionExtension", "spark.databricks.hive.metastore.glueCatalog.enabled":"true", "spark.sql.catalon.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"}, "configurations":[]},{"classification":"hive-site", "properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}, "configurations":[]},{"classification":"spark-hive-site", "properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}, "configurations":[]}]
tdas commented 3 years ago

Are you able to create tables in this setup in other formats like Parquet? And if yes, are you able to use older Spark + Delta 0.8.0 to do the same? Just trying to understand the scope of the issue.

TD

On Sun, Jun 20, 2021 at 12:18 PM Jan Paul Erkelens @.***> wrote:

From what I can tell, attempting to create a entry in a glue metastore on EMR runs into problems with consistent partition fields.

When running the following code on EMR 6.3.0 (Spark 3.1.1) and Delta 1.0.0:

import io.delta.tables.DeltaTableimport org.apache.spark.sql.types._DeltaTable.create() .location("s3://bucket/dataset/v1") .tableName("dev_database.datatable") .partitionedBy("ingest_date") .addColumns(StructType(Seq(StructField("ingest_date", DateType), StructField("id", StringType)))) .execute()

This exception is thrown:

diagnostics: User class threw exception: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.sql.catalyst.catalog.CatalogTable.partitionSchema(interface.scala:319) at org.apache.spark.sql.hive.HiveExternalCatalog.newSparkSQLSpecificMetastoreTable$1(HiveExternalCatalog.scala:355) at org.apache.spark.sql.hive.HiveExternalCatalog.createDataSourceTable(HiveExternalCatalog.scala:434) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:299) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105) at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:270) at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:346) at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.createTable(V2SessionCatalog.scala:105) at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:41) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at io.delta.tables.DeltaTableBuilder.$anonfun$execute$4(DeltaTableBuilder.scala:356) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at io.delta.tables.DeltaTableBuilder.execute(DeltaTableBuilder.scala:356)

The EMR cluster is configured with the following properties:

[{"classification":"spark", "properties":{"maximizeResourceAllocation":"true"}, "configurations":[]},{"classification":"spark-defaults", "properties":{"spark.sql.extension":"io.delta.sql.DeltaSparkSessionExtension", "spark.databricks.hive.metastore.glueCatalog.enabled":"true", "spark.sql.catalon.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"}, "configurations":[]},{"classification":"hive-site", "properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}, "configurations":[]},{"classification":"spark-hive-site", "properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}, "configurations":[]}]

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/delta-io/delta/issues/703, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFB5LAYGURBSE3ZCDWXNULTTYIEFANCNFSM47AGXL3Q .

tayler-jones commented 3 years ago

There is a typo in your configuration: spark.sql.catalon.spark_catalog should be spark.sql.catalog.spark_catalog

tdas commented 3 years ago

oh boy! that escaped me too! you have eagle eyes @tayler-jones

jperkelens commented 3 years ago

oh boy! Yeah that fixed it, sorry folks! Appreciate the help :)