apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.16k stars 2.14k forks source link

Default table properties not respected when using Spark DataFrame API #8265

Open ignaski opened 1 year ago

ignaski commented 1 year ago

Apache Iceberg version

1.3.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

Creating a table via Spark SQL respects the default table properties, however it does not work via DataFrame API. The issue can be reproduced using quickstart example.

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
    --conf spark.sql.catalog.local.uri=http://rest:8181 \
    --conf spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
    --conf spark.sql.catalog.local.warehouse=s3a://warehouse/wh/ \
    --conf spark.sql.catalog.local.s3.endpoint=http://minio:9000 \
    --conf spark.sql.defaultCatalog=local \
    --conf spark.sql.catalog.local.table-default.write.metadata.delete-after-commit.enabled=true

Creation via Spark SQL:

scala> spark.sql("CREATE TABLE local.nyc.taxis (vendor_id bigint) PARTITIONED BY (vendor_id);")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show create table local.nyc.taxis").show(truncate=false)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE local.nyc.taxis (\n  vendor_id BIGINT)\nUSING iceberg\nPARTITIONED BY (vendor_id)\nLOCATION 's3://warehouse/nyc/taxis'\nTBLPROPERTIES (\n  'current-snapshot-id' = 'none',\n  'format' = 'iceberg/parquet',\n  'format-version' = '1',\n  'write.metadata.delete-after-commit.enabled' = 'true')\n|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Creation via DataFrame API

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val schema = StructType( Array(
     |     StructField("vendor_id", LongType,true)
     | ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(vendor_id,LongType,true))

scala> val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],schema)
df: org.apache.spark.sql.DataFrame = [vendor_id: bigint]

scala> df.writeTo("local.nyc.taxis_df").create()

scala> spark.sql("show create table local.nyc.taxis_df").show(truncate=false)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                                                                                                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE local.nyc.taxis_df (\n  vendor_id BIGINT)\nUSING iceberg\nLOCATION 's3://warehouse/nyc/taxis_df'\nTBLPROPERTIES (\n  'created-at' = '2023-08-09T06:41:27.531135128Z',\n  'current-snapshot-id' = '6638980767440031836',\n  'format' = 'iceberg/parquet',\n  'format-version' = '1')\n|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
boushphong commented 10 months ago

Still reproducible on 1.4.2. Can I take this one? :bow: @nastra

boushphong commented 10 months ago

actually I don't think this is a bug This would produce DDL like Spark-SQL API.

df.write.partitionBy("vendor_id").saveAsTable("local.nyc.taxis_df")
nastra commented 10 months ago

@boushphong yes feel free to work on this in case you're interested

github-actions[bot] commented 5 days ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.