projectnessie / nessie

Nessie: Transactional Catalog for Data Lakes with Git-like semantics
https://projectnessie.org
Apache License 2.0
909 stars 119 forks source link

Wrong ingress/reverse-proxy configuration #8855

Closed sdasdasd1 closed 2 weeks ago

sdasdasd1 commented 2 weeks ago

What happened

I cant create or append to any table in REST nessie catalog (same actions works correctly in classic nessie catalog). There is an error after actions which I described below. On createOrReplace Im getting this:

Py4JJavaError: An error occurred while calling o233.createOrReplace.
: java.lang.NullPointerException: Cannot read field "formatVersion" because "x0" is null
    at org.apache.iceberg.TableMetadata.access$500(TableMetadata.java:50)
    at org.apache.iceberg.TableMetadata$Builder.<init>(TableMetadata.java:936)
    at org.apache.iceberg.TableMetadata$Builder.<init>(TableMetadata.java:864)
    at org.apache.iceberg.TableMetadata.buildFrom(TableMetadata.java:857)
    at org.apache.iceberg.rest.responses.LoadTableResponse.tableMetadata(LoadTableResponse.java:64)
    at org.apache.iceberg.rest.RESTSessionCatalog$Builder.createTransaction(RESTSessionCatalog.java:701)
    at org.apache.iceberg.rest.RESTSessionCatalog$Builder.createOrReplaceTransaction(RESTSessionCatalog.java:794)
    at org.apache.iceberg.CachingCatalog$CachingTableBuilder.createOrReplaceTransaction(CachingCatalog.java:299)
    at org.apache.iceberg.spark.SparkCatalog.stageCreateOrReplace(SparkCatalog.java:302)
    at org.apache.spark.sql.connector.catalog.StagingTableCatalog.stageCreateOrReplace(StagingTableCatalog.java:188)
    at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:204)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
    at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:208)
    at org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:134)
    at java.base[/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0](https://jupyterhub-zdp-stage.zvq.me/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0)(Native Method)
    at java.base[/jdk.internal.reflect.NativeMethodAccessorImpl.invoke](https://jupyterhub-zdp-stage.zvq.me/jdk.internal.reflect.NativeMethodAccessorImpl.invoke)(NativeMethodAccessorImpl.java:77)
    at java.base[/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke](https://jupyterhub-zdp-stage.zvq.me/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke)(DelegatingMethodAccessorImpl.java:43)
    at java.base[/java.lang.reflect.Method.invoke](https://jupyterhub-zdp-stage.zvq.me/java.lang.reflect.Method.invoke)(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base[/java.lang.Thread.run](https://jupyterhub-zdp-stage.zvq.me/java.lang.Thread.run)(Thread.java:840)

Also appending to existing tables (which were created with the classic nessie catalog) is not working. Spark doesn't throw any issues. But after that nothing changes. No new commit in nessie UI, no new data in table.

How to reproduce it

  1. Spark 3.5.1 + settings

    .set("spark.jars.packages", "org.apache.iceberg:iceberg-aws:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.hadoop:hadoop-aws:3.3.4,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.90.4")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.ansi.enabled", "true")
    .set("spark.sql.storeAssignmentPolicy", "ANSI")
    .set("spark.io.compression.codec", "zstd")
    
    .set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    .set("spark.sql.debug", "true")
    
    .set("spark.jars.packages", "org.apache.iceberg:iceberg-aws:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.hadoop:hadoop-aws:3.3.4,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.90.4")
    .set("spark.driver.userClassPathFirst", "false")
    .set("spark.executor.userClassPathFirst", "false")
    
    .set("spark.sql.catalog.iceberg_rest", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.iceberg_rest.uri", "https://.../iceberg/")
    .set("spark.sql.catalog.iceberg_rest.type", "rest")
    .set("spark.sql.defaultCatalog", "iceberg_rest")
  2. Nessie Rest 0.90.4 + settings:
    
    ...
    chart:
    spec:
      chart: nessie
      version: ${nessie_version}
      sourceRef:
        kind: HelmRepository
        name: nessiecatalog
        namespace: trino
      interval: 1m
    values:
    logLevel: DEBUG
    versionStoreType: JDBC
    postgres:
      jdbcUrl: ${nessie_jdbcUrl}
      secret:
        name: nessie-secret
        username: postgres_username
        password: postgres_password
    advancedConfig:
      nessie.catalog.default-warehouse: dwh
      nessie.catalog.warehouses.dwh.location: ${s3_warehouse_bucket}
      nessie.catalog.service.s3.endpoint: "https://obs...."
      nessie.catalog.service.s3.region: "..."
      nessie.catalog.iceberg-config-defaults.io-impl: "org.apache.iceberg.hadoop.HadoopFileIO"
    extraEnv:
      - name: NESSIE_CATALOG_SERVICE_S3_ACCESS_KEY_NAME
        valueFromSecret:
          name: obs-s3-credentials-secret
          key: accessKey
      - name: NESSIE_CATALOG_SERVICE_S3_ACCESS_KEY_SECRET
        valueFromSecret:
          name: obs-s3-credentials-secret
          key: secretKey
3. Try to create table with DataFrameWriterV2

from typing import List, NoReturn, Tuple

from pyspark.sql import Column, DataFrame, DataFrameWriterV2, SparkSession

DEFAULT_PROPERTIES = [ ("write.format.default", "parquet"), ("write.parquet.compression-codec", "ZSTD"), ("write.parquet.compression-level", "6"), ("write.metadata.delete-after-commit.enabled", "true"), ("write.metadata.previous-versions-max", "100"), ("write.distribution-mode", "hash"), ("write.object-storage.enabled", "true"), ("write.data.path", "s3://obs-zdp-warehouse-stage-mz"), ("commit.retry.num-retries", "10"), ("gc.enabled", "true"), ("format-version", "2"), ]

def apply_t_properties_to_df_writer(writer: DataFrameWriterV2, properties: List[Tuple[str, str]]) -> DataFrameWriterV2: """ Applies a list of table properties to a DataFrameWriterV2 instance.

:param writer: The DataFrameWriterV2 instance to apply properties to.
:param properties: A list of tuples, each containing a property name and value.
:return: The modified DataFrameWriterV2 instance with applied properties.
"""
for property, value in properties:
    writer = writer.tableProperty(property, value)
return writer

def get_writer(df: DataFrame, table_name: str) -> DataFrameWriterV2: """ Creates a DataFrameWriterV2 instance for writing a DataFrame to an Iceberg table.

:param df: The DataFrame to be written.
:param table_name: The name of the Iceberg table.
:return: A DataFrameWriterV2 instance configured for the Iceberg table.
"""
return apply_t_properties_to_df_writer(df.writeTo(table_name), DEFAULT_PROPERTIES)

df = spark.read.table("iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log") # read or create any df

get_writer(df, "iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2").createOrReplace()



Also I tried to remove all tbl properties and only format-version property, but got same behavior.

### Nessie server type (docker/uber-jar/built from source) and version

0.90.4

### Client type (Ex: UI/Spark/pynessie ...) and version

Spark 3.5.1

### Additional information

_No response_
snazy commented 2 weeks ago

Hm. Honestly, that stacktrace indicates that the TableMetadata in the LoadTableResponse is actually null- but there's no way that it can actually be null. Very strange.

sdasdasd1 commented 2 weeks ago

Yes. I can see in iceberg code that there is a constant https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableMetadata.java#L53 to refer. Thats why I cant understand wtf is going on. Also in the same spark session everything works well if I switching to standard nessie catalog impl

sdasdasd1 commented 2 weeks ago

@snazy I just created a brand new nessie to avoid any backward compatibility issues (because I just updated an existing meta), created a simple data frame using spark.range(10) and got the same problem, I mean it's easy to reproduce, Would you try it, please?

sdasdasd1 commented 2 weeks ago

just use my packages and settings

snazy commented 2 weeks ago

Sorry, I can't reproduce w/ your reproducer.

What I see though:

  1. Don't use iceberg-aws but iceberg-aws-bundle
  2. Don't set write.data.path - the data location should be managed via the catalog (Nessie verifies that S3 requests happen in the right place)

I've used Minio w/ S3 request signing via Nessie.

$ pyspark   --conf "spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.iceberg:iceberg-aws-bundle:1.5.0"   --conf spark.sql.sources.partitionOverwriteMode=dynamic   --conf spark.sql.ansi.enabled=true   --conf spark.sql.storeAssignmentPolicy=ANSI   --conf spark.io.compression.codec=zstd   --conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem   --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"   --conf spark.sql.debug=true   --conf spark.driver.userClassPathFirst=false   --conf spark.executor.userClassPathFirst=false   --conf spark.sql.catalog.iceberg_rest=org.apache.iceberg.spark.SparkCatalog   --conf spark.sql.catalog.iceberg_rest.uri=http://127.0.0.1:19120/iceberg/   --conf spark.sql.catalog.iceberg_rest.type=rest   --conf spark.sql.defaultCatalog=iceberg_rest
Python 3.11.6 (main, Nov 25 2023, 16:25:20) [GCC 13.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
24/06/18 17:50:12 WARN Utils: Your hostname, shark resolves to a loopback address: 127.0.1.1; using 192.168.5.121 instead (on interface enp14s0)
24/06/18 17:50:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/snazy/.sdkman/candidates/spark/3.5.0/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/snazy/.ivy2/cache
The jars for the packages stored in: /home/snazy/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0ff88c7d-6070-48c2-a507-7ed9d9233d42;1.0
    confs: [default]
    found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 in central
    found org.apache.iceberg#iceberg-aws-bundle;1.5.0 in central
:: resolution report :: resolve 53ms :: artifacts dl 1ms
    :: modules in use:
    org.apache.iceberg#iceberg-aws-bundle;1.5.0 from central in [default]
    org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0ff88c7d-6070-48c2-a507-7ed9d9233d42
    confs: [default]
    0 artifacts copied, 2 already retrieved (0kB/2ms)
24/06/18 17:50:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/

Using Python version 3.11.6 (main, Nov 25 2023 16:25:20)
Spark context Web UI available at http://shark.fritz.box:4040
Spark context available as 'sc' (master = local[*], app id = local-1718725813789).
SparkSession available as 'spark'.
>>> 24/06/18 17:50:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
from typing import List, NoReturn, Tuple
>>> 
>>> from pyspark.sql import Column, DataFrame, DataFrameWriterV2, SparkSession
>>> 
>>> DEFAULT_PROPERTIES = [
...     ("write.format.default", "parquet"),
...     ("write.parquet.compression-codec", "ZSTD"),
...     ("write.parquet.compression-level", "6"),
...     ("write.metadata.delete-after-commit.enabled", "true"),
...     ("write.metadata.previous-versions-max", "100"),
...     ("write.distribution-mode", "hash"),
...     ("write.object-storage.enabled", "true"),
...     # ("write.data.path", "s3://obs-zdp-warehouse-stage-mz"),
...     ("commit.retry.num-retries", "10"),
...     ("gc.enabled", "true"),
...     ("format-version", "2"),
... ]
>>> 
>>> def apply_t_properties_to_df_writer(writer: DataFrameWriterV2, properties: List[Tuple[str, str]]) -> DataFrameWriterV2:
...     """
...     Applies a list of table properties to a DataFrameWriterV2 instance.
... 
...     :param writer: The DataFrameWriterV2 instance to apply properties to.
...     :param properties: A list of tuples, each containing a property name and value.
...     :return: The modified DataFrameWriterV2 instance with applied properties.
...     """
...     for property, value in properties:
...         writer = writer.tableProperty(property, value)
...     return writer
... 
>>> 
>>> def get_writer(df: DataFrame, table_name: str) -> DataFrameWriterV2:
...     """
...     Creates a DataFrameWriterV2 instance for writing a DataFrame to an Iceberg table.
... 
...     :param df: The DataFrame to be written.
...     :param table_name: The name of the Iceberg table.
...     :return: A DataFrameWriterV2 instance configured for the Iceberg table.
...     """
...     return apply_t_properties_to_df_writer(df.writeTo(table_name), DEFAULT_PROPERTIES)
... 
>>> df = spark.read.table("iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log") # read or create any df

24/06/18 17:50:41 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.io.ResolvingFileIO
24/06/18 17:50:41 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.io.ResolvingFileIO
>>> 
>>> 
>>> get_writer(df, "iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2").createOrReplace()
24/06/18 17:50:41 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.io.ResolvingFileIO
24/06/18 17:50:41 INFO SnapshotScan: Scanning empty table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log
24/06/18 17:50:41 INFO SparkPartitioningAwareScan: Reporting UnknownPartitioning with 0 partition(s) for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log
24/06/18 17:50:41 INFO SparkWrite: Requesting 0 bytes advisory partition size for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 17:50:41 INFO SparkWrite: Requesting UnspecifiedDistribution as write distribution for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 17:50:41 INFO SparkWrite: Requesting [] as write ordering for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 17:50:41 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
24/06/18 17:50:42 INFO SparkWrite: Committing append with 0 new data files to table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 17:50:42 INFO SnapshotProducer: Committed snapshot 4556427135299363466 (MergeAppend)
24/06/18 17:50:42 INFO LoggingMetricsReporter: Received metrics report: CommitReport{tableName=iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2, snapshotId=4556427135299363466, sequenceNumber=1, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.107875494S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=null, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=0}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=null, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=0}, addedFilesSizeInBytes=null, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=0}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=3.5.0, app-id=local-1718725813789, engine-name=spark, iceberg-version=Apache Iceberg 1.5.0 (commit 2519ab43d654927802cc02e19c917ce90e8e0265)}}
24/06/18 17:50:42 INFO SparkWrite: Committed in 113 ms
>>> 
sdasdasd1 commented 2 weeks ago

@snazy Please use spark 3.5.1 and this settings for nessie

 advancedConfig:
      nessie.catalog.default-warehouse: dwh
      nessie.catalog.warehouses.dwh.location: ${s3_warehouse_bucket}
      nessie.catalog.service.s3.endpoint: "https://obs...."
      nessie.catalog.service.s3.region: "..."
      nessie.catalog.iceberg-config-defaults.io-impl: "org.apache.iceberg.hadoop.HadoopFileIO"
snazy commented 2 weeks ago

Still works with HadoopFileIO:

$ pyspark \
  --conf "spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.iceberg:iceberg-aws-bundle:1.5.0,org.apache.hadoop:hadoop-aws:3.3.4" \
  --conf spark.sql.sources.partitionOverwriteMode=dynamic \
  --conf spark.sql.ansi.enabled=true \
  --conf spark.sql.storeAssignmentPolicy=ANSI \
  --conf spark.io.compression.codec=zstd \
  --conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
  --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
  --conf spark.sql.debug=true \
  --conf spark.driver.userClassPathFirst=false \
  --conf spark.executor.userClassPathFirst=false \
  --conf spark.sql.catalog.iceberg_rest=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.iceberg_rest.uri=http://127.0.0.1:19120/iceberg/ \
  --conf spark.sql.catalog.iceberg_rest.type=rest \
  --conf spark.sql.defaultCatalog=iceberg_rest \
  --conf spark.sql.catalog.iceberg_rest.hadoop.fs.s3a.endpoint=http://127.0.0.1:9000/ \
  --conf spark.sql.catalog.iceberg_rest.hadoop.fs.s3a.access.key=minioadmin \
  --conf spark.sql.catalog.iceberg_rest.hadoop.fs.s3a.secret.key=minioadmin
Python 3.11.6 (main, Nov 25 2023, 16:25:20) [GCC 13.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
24/06/18 19:12:01 WARN Utils: Your hostname, shark resolves to a loopback address: 127.0.1.1; using 192.168.5.121 instead (on interface enp14s0)
24/06/18 19:12:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/snazy/.sdkman/candidates/spark/3.5.0/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/snazy/.ivy2/cache
The jars for the packages stored in: /home/snazy/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a33b9e13-f73d-41f8-a58c-bf82b6831271;1.0
    confs: [default]
    found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 in central
    found org.apache.iceberg#iceberg-aws-bundle;1.5.0 in central
    found org.apache.hadoop#hadoop-aws;3.3.4 in local-m2-cache
    found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
    found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in local-m2-cache
:: resolution report :: resolve 80ms :: artifacts dl 3ms
    :: modules in use:
    com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
    org.apache.hadoop#hadoop-aws;3.3.4 from local-m2-cache in [default]
    org.apache.iceberg#iceberg-aws-bundle;1.5.0 from central in [default]
    org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 from central in [default]
    org.wildfly.openssl#wildfly-openssl;1.0.7.Final from local-m2-cache in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   5   |   0   |   0   |   0   ||   5   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-a33b9e13-f73d-41f8-a58c-bf82b6831271
    confs: [default]
    0 artifacts copied, 5 already retrieved (0kB/3ms)
24/06/18 19:12:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/

Using Python version 3.11.6 (main, Nov 25 2023 16:25:20)
Spark context Web UI available at http://shark.fritz.box:4040
Spark context available as 'sc' (master = local[*], app id = local-1718730721942).
SparkSession available as 'spark'.
>>> from typing import List, NoReturn, Tuple
>>> 
>>> from pyspark.sql import Column, DataFrame, DataFrameWriterV2, SparkSession
>>> 
>>> DEFAULT_PROPERTIES = [
...     ("write.format.default", "parquet"),
...     ("write.parquet.compression-codec", "ZSTD"),
...     ("write.parquet.compression-level", "6"),
...     ("write.metadata.delete-after-commit.enabled", "true"),
...     ("write.metadata.previous-versions-max", "100"),
...     ("write.distribution-mode", "hash"),
...     ("write.object-storage.enabled", "true"),
...     # ("write.data.path", "s3://obs-zdp-warehouse-stage-mz"),
...     ("commit.retry.num-retries", "10"),
...     ("gc.enabled", "true"),
...     ("format-version", "2"),
... ]
>>> 
>>> def apply_t_properties_to_df_writer(writer: DataFrameWriterV2, properties: List[Tuple[str, str]]) -> DataFrameWriterV2:
...     """
...     Applies a list of table properties to a DataFrameWriterV2 instance.
... 
...     :param writer: The DataFrameWriterV2 instance to apply properties to.
...     :param properties: A list of tuples, each containing a property name and value.
...     :return: The modified DataFrameWriterV2 instance with applied properties.
...     """
...     for property, value in properties:
...         writer = writer.tableProperty(property, value)
...     return writer
... 
>>> 
>>> def get_writer(df: DataFrame, table_name: str) -> DataFrameWriterV2:
...     """
...     Creates a DataFrameWriterV2 instance for writing a DataFrame to an Iceberg table.
... 
...     :param df: The DataFrame to be written.
...     :param table_name: The name of the Iceberg table.
...     :return: A DataFrameWriterV2 instance configured for the Iceberg table.
...     """
...     return apply_t_properties_to_df_writer(df.writeTo(table_name), DEFAULT_PROPERTIES)
... 
>>> df = spark.read.table("iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log") # read or create any df

24/06/18 19:12:08 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.hadoop.HadoopFileIO
24/06/18 19:12:08 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.hadoop.HadoopFileIO
>>> 
>>> 
>>> get_writer(df, "iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2").createOrReplace()
24/06/18 19:12:09 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.hadoop.HadoopFileIO
24/06/18 19:12:09 INFO SnapshotScan: Scanning empty table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log
24/06/18 19:12:09 INFO SparkPartitioningAwareScan: Reporting UnknownPartitioning with 0 partition(s) for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log
24/06/18 19:12:09 INFO SparkWrite: Requesting 0 bytes advisory partition size for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 19:12:09 INFO SparkWrite: Requesting UnspecifiedDistribution as write distribution for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 19:12:09 INFO SparkWrite: Requesting [] as write ordering for table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 19:12:09 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/06/18 19:12:09 INFO SparkWrite: Committing append with 0 new data files to table iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2
24/06/18 19:12:09 INFO SnapshotProducer: Committed snapshot 2633891027168191032 (MergeAppend)
24/06/18 19:12:10 INFO LoggingMetricsReporter: Received metrics report: CommitReport{tableName=iceberg_rest.dds.f_iceberg_maintenance_expire_snapshots_log_test2, snapshotId=2633891027168191032, sequenceNumber=1, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.134650139S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=null, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=0}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=null, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=0}, addedFilesSizeInBytes=null, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=0}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=3.5.0, app-id=local-1718730721942, engine-name=spark, iceberg-version=Apache Iceberg 1.5.0 (commit 2519ab43d654927802cc02e19c917ce90e8e0265)}}
24/06/18 19:12:10 INFO SparkWrite: Committed in 139 ms
>>> 

I really doubt that's specific to Spark 3.5.1.

Best option to progress is to provide a self-contained reproducer based on Minio, as I cannot reproduce the issue you describe.

sdasdasd1 commented 2 weeks ago

@snazy You're right. I just created nessie with version_store_type = postgresql in docker-compose and everything working as expected. But helm chart depoyment in k8s is working as I described above. And when I switching my metastore url from my working local nessie(docker) to remote k8s nessie im getting this nullpointer exception. Can you please try that too? My configs for helmchart

kind: HelmRelease
metadata:
  name: nessiecatalog
  namespace: trino
spec:
  interval: 2m30s
  releaseName: nessiecatalog
  chart:
    spec:
      chart: nessie
      version: 0.90.4
      sourceRef:
        kind: HelmRepository
        name: nessiecatalog
        namespace: trino
      interval: 1m
  values:
    logLevel: DEBUG
    versionStoreType: JDBC
    postgres:
      jdbcUrl: ${nessie_jdbcUrl}
      secret:
        name: nessie-secret
        username: postgres_username
        password: postgres_password
    advancedConfig:
      nessie.catalog.default-warehouse: dwh
      nessie.catalog.warehouses.dwh.location: ${s3_warehouse_bucket}
      nessie.catalog.service.s3.endpoint: ${s3_endpoint}
      nessie.catalog.service.s3.region: ${s3_region}
      nessie.catalog.iceberg-config-defaults.io-impl: org.apache.iceberg.hadoop.HadoopFileIO
    extraEnv:
      - name: NESSIE_CATALOG_SERVICE_S3_ACCESS_KEY_NAME
        valueFromSecret:
          name: obs-s3-credentials-secret
          key: accessKey
      - name: NESSIE_CATALOG_SERVICE_S3_ACCESS_KEY_SECRET
        valueFromSecret:
          name: obs-s3-credentials-secret
          key: secretKey
sdasdasd1 commented 2 weeks ago

@snazy And another thing about you wrote

Don't set write.data.path - the data location should be managed via the catalog (Nessie verifies that S3 requests happen in the right place)

I tried to set it on nessie side -Dnessie.catalog.iceberg-config-defaults.write.data.path=s3a://obs-zdp-warehouse-stage-mz, but seems like it doesnt work.

Its very important parameter in a bundle with write.object-storage.enabled parameter.

Without it hashes is generated after name of the NAMESPACE (dds222)

s3a://obs-zdp-warehouse-stage-mz/dds222/f_iceberg_maintenance_expire_snapshots_log_test2_bf5d58e6-ec8a-453b-b831-96640c0e9cf5/data/OvfPBw/00004-4-d7a8258b-7226-4630-9819-8fe44221bd4c-0-00001.parquet

And with that on table side param its working as expected

s3a://obs-zdp-warehouse-stage-mz/CkHMyw/dds222/f_iceberg_maintenance_expire_snapshots_log_test23_3ab69281-e01f-4101-af91-8bcf1e7b6fb0/00004-11-6fe08e30-85c4-4273-b12c-43a833b2b44e-0-00001.parquet

We should create these hashes in the 'root' of the bucket to avoid hotspots

Снимок экрана 2024-06-19 в 01 02 45
sdasdasd1 commented 2 weeks ago

@snazy Can you please try with helm as i provided above

sdasdasd1 commented 2 weeks ago

@snazy A little bit more information: In docker im using http. Nessie in k8s is only accessible by https. And in nessie logs when im trying to create namespace I see only GET but not POST (like in docker installation) requests. And by command spark.sql("CREATE NAMESPACE xxx").show() in k8s https installation Nothing changes and no errors too. But there is no new namespace.

snazy commented 2 weeks ago

I see only GET but not POST

Can you elaborate what this means?

TBH, this sounds like your ingress is doing something odd maybe?

sdasdasd1 commented 2 weeks ago

@snazy I mean when you trying to create namespace from spark it will generate two requests: First GET for checking of namespace already exists and second POST for creating namespace. And it seems to me that for some reason first GET request is going by https scheme, but second POST by http scheme. Its just my assumption. Because request on http returns 301 error by our itsio-envoy.

sdasdasd1 commented 2 weeks ago

Because in nessie k8s logs I see GET request and thats all, no POST request after that And in nessie docker I see both and namespaces are created after that

snazy commented 2 weeks ago

Please try the options listed here https://quarkus.io/guides/http-reference#reverse-proxy

sdasdasd1 commented 2 weeks ago

@snazy Which options exactly?

snazy commented 2 weeks ago

@sdasdasd1 that depends on your environment including ingress/istio/envoy settings - we cannot provide configuration details for a specific environment that we do not know.

sdasdasd1 commented 2 weeks ago

@snazy Its something with spark because it unrespect https scheme in catalog uri and just makes http requests. itsio return 301 redirect and than POST transforming to GET. But I dont understand why spark sends http requests...

sdasdasd1 commented 2 weeks ago

@snazy I make a little research and understand that first spark connects to nessie by https and get configs, and in configs I have http everywhere

{
  "defaults" : {
    "io-impl" : "org.apache.iceberg.hadoop.HadoopFileIO",
    "prefix" : "main",
    "warehouse" : "s3a://{redacted}",
    "client.region" : "ru-moscow-1",
    "rest-page-size" : "200",
    "rest-metrics-reporting-enabled" : "false"
  },
  "overrides" : {
    "nessie.default-branch.name" : "main",
    "nessie.is-nessie-catalog" : "true",
    "s3.endpoint" : "https://{redacted}",
    "nessie.prefix-pattern" : "{ref}|{warehouse}",
    "nessie.core-base-uri" : "http://{redacted}/api/",
    "nessie.iceberg-base-uri" : "http://{redacted}/iceberg/",
    "nessie.catalog-base-uri" : "http://{redacted}/catalog/v1/",
    "s3.remote-signing-enabled" : "true",
    "client.region" : "ru-moscow-1",
    "uri" : "http://{redacted}/iceberg/"
  }

Is there a way to ovverride these settings to https? where I can do it?

snazy commented 2 weeks ago

That's an issue w/ your environment. Please take a look at https://projectnessie.org/guides/reverse-proxy/

sdasdasd1 commented 2 weeks ago

thank you, I finally made it work.

snazy commented 2 weeks ago

thank you, I finally made it work.

@sdasdasd1 By any chance - do you use istio/envoy? If you, would you mind sharing your settings for this so we can add some example to this page?