apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.44k stars 2.23k forks source link

[Bug]: Can't create/update tables in REST nessie catalog via Spark. Iceberg Cannot read field "formatVersion" because "x0" is null #10533

Closed sdasdasd1 closed 4 months ago

sdasdasd1 commented 4 months ago

Apache Iceberg version

1.5.0

Query engine

Spark

Please describe the bug 🐞

What happened

I cant create or append to any table in REST nessie catalog (same actions works correctly in classic nessie catalog). There is an error after actions which I described below. On createOrReplace Im getting this:

Py4JJavaError: An error occurred while calling o233.createOrReplace.
: java.lang.NullPointerException: Cannot read field "formatVersion" because "x0" is null
    at org.apache.iceberg.TableMetadata.access$500(TableMetadata.java:50)
    at org.apache.iceberg.TableMetadata$Builder.<init>(TableMetadata.java:936)
    at org.apache.iceberg.TableMetadata$Builder.<init>(TableMetadata.java:864)
    at org.apache.iceberg.TableMetadata.buildFrom(TableMetadata.java:857)
    at org.apache.iceberg.rest.responses.LoadTableResponse.tableMetadata(LoadTableResponse.java:64)
    at org.apache.iceberg.rest.RESTSessionCatalog$Builder.createTransaction(RESTSessionCatalog.java:701)
    at org.apache.iceberg.rest.RESTSessionCatalog$Builder.createOrReplaceTransaction(RESTSessionCatalog.java:794)
    at org.apache.iceberg.CachingCatalog$CachingTableBuilder.createOrReplaceTransaction(CachingCatalog.java:299)
    at org.apache.iceberg.spark.SparkCatalog.stageCreateOrReplace(SparkCatalog.java:302)
    at org.apache.spark.sql.connector.catalog.StagingTableCatalog.stageCreateOrReplace(StagingTableCatalog.java:188)
    at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:204)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
    at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:208)
    at org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:134)
    at java.base[/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0](https://jupyterhub-zdp-stage.zvq.me/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0)(Native Method)
    at java.base[/jdk.internal.reflect.NativeMethodAccessorImpl.invoke](https://jupyterhub-zdp-stage.zvq.me/jdk.internal.reflect.NativeMethodAccessorImpl.invoke)(NativeMethodAccessorImpl.java:77)
    at java.base[/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke](https://jupyterhub-zdp-stage.zvq.me/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke)(DelegatingMethodAccessorImpl.java:43)
    at java.base[/java.lang.reflect.Method.invoke](https://jupyterhub-zdp-stage.zvq.me/java.lang.reflect.Method.invoke)(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base[/java.lang.Thread.run](https://jupyterhub-zdp-stage.zvq.me/java.lang.Thread.run)(Thread.java:840)

Also appending to existing tables (which were created with the classic nessie catalog) is not working. Spark doesn't throw any issues. But after that nothing changes. No new commit in nessie UI, no new data in table.

How to reproduce it

  1. Spark 3.5.1 + settings

    .set("spark.jars.packages", "org.apache.iceberg:iceberg-aws:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.hadoop:hadoop-aws:3.3.4,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.90.4")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.ansi.enabled", "true")
    .set("spark.sql.storeAssignmentPolicy", "ANSI")
    .set("spark.io.compression.codec", "zstd")
    
    .set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    .set("spark.sql.debug", "true")
    
    .set("spark.jars.packages", "org.apache.iceberg:iceberg-aws:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.hadoop:hadoop-aws:3.3.4,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.90.4")
    .set("spark.driver.userClassPathFirst", "false")
    .set("spark.executor.userClassPathFirst", "false")
    
    .set("spark.sql.catalog.iceberg_rest", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.iceberg_rest.uri", "https://.../iceberg/")
    .set("spark.sql.catalog.iceberg_rest.type", "rest")
    .set("spark.sql.defaultCatalog", "iceberg_rest")
  2. Nessie Rest 0.90.4 + settings:
    
    ...
    chart:
    spec:
      chart: nessie
      version: ${nessie_version}
      sourceRef:
        kind: HelmRepository
        name: nessiecatalog
        namespace: trino
      interval: 1m
    values:
    logLevel: DEBUG
    versionStoreType: JDBC
    postgres:
      jdbcUrl: ${nessie_jdbcUrl}
      secret:
        name: nessie-secret
        username: postgres_username
        password: postgres_password
    advancedConfig:
      nessie.catalog.default-warehouse: dwh
      nessie.catalog.warehouses.dwh.location: ${s3_warehouse_bucket}
      nessie.catalog.service.s3.endpoint: "https://obs...."
      nessie.catalog.service.s3.region: "..."
      nessie.catalog.iceberg-config-defaults.io-impl: "org.apache.iceberg.hadoop.HadoopFileIO"
    extraEnv:
      - name: NESSIE_CATALOG_SERVICE_S3_ACCESS_KEY_NAME
        valueFromSecret:
          name: obs-s3-credentials-secret
          key: accessKey
      - name: NESSIE_CATALOG_SERVICE_S3_ACCESS_KEY_SECRET
        valueFromSecret:
          name: obs-s3-credentials-secret
          key: secretKey
3. Try to create table with DataFrameWriterV2

from typing import List, NoReturn, Tuple

from pyspark.sql import Column, DataFrame, DataFrameWriterV2, SparkSession

DEFAULT_PROPERTIES = [ ("write.format.default", "parquet"), ("write.parquet.compression-codec", "ZSTD"), ("write.parquet.compression-level", "6"), ("write.metadata.delete-after-commit.enabled", "true"), ("write.metadata.previous-versions-max", "100"), ("write.distribution-mode", "hash"), ("write.object-storage.enabled", "true"), ("write.data.path", "s3://obs-zdp-warehouse-stage-mz"), ("commit.retry.num-retries", "10"), ("gc.enabled", "true"), ("format-version", "2"), ]

def apply_t_properties_to_df_writer(writer: DataFrameWriterV2, properties: List[Tuple[str, str]]) -> DataFrameWriterV2: """ Applies a list of table properties to a DataFrameWriterV2 instance.

:param writer: The DataFrameWriterV2 instance to apply properties to.
:param properties: A list of tuples, each containing a property name and value.
:return: The modified DataFrameWriterV2 instance with applied properties.
"""
for property, value in properties:
    writer = writer.tableProperty(property, value)
return writer

def get_writer(df: DataFrame, table_name: str) -> DataFrameWriterV2: """ Creates a DataFrameWriterV2 instance for writing a DataFrame to an Iceberg table.

:param df: The DataFrame to be written.
:param table_name: The name of the Iceberg table.
:return: A DataFrameWriterV2 instance configured for the Iceberg table.
"""
return apply_t_properties_to_df_writer(df.writeTo(table_name), DEFAULT_PROPERTIES)

df = spark.read.table("iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log") # read or create any df

get_writer(df, "iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2").createOrReplace()



Also I tried to remove all tbl properties and only format-version property, but got same behavior.

### Nessie server type (docker/uber-jar/built from source) and version

0.90.4

### Client type (Ex: UI/Spark/pynessie ...) and version

Spark 3.5.1

### Additional information

_No response_
Fokko commented 4 months ago

@ajantha-bhat do you have time to dig into this?

ajantha-bhat commented 4 months ago

yeah. We will check it.

cc: @dimas-b, @snazy

ajantha-bhat commented 4 months ago

Closing it as a duplicate as the same issue was opened at Nessie side and @sdasdasd1 has confirmed that it has resolved.

https://github.com/projectnessie/nessie/issues/8855#issuecomment-2182385103