apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

TableOperations.locationProvider is not respected by Spark #11527

Open jamesbornholt opened 1 week ago

jamesbornholt commented 1 week ago

Apache Iceberg version

1.6.1

Query engine

Spark

Please describe the bug 🐞

I have a custom Catalog implementation that overrides TableOperations.locationProvider. But running DML queries with Spark doesn't seem to invoke that overriden method. Instead it calls LocationProviders.locationsFor (the BaseMetastoreTableOperations implementation of locationProvider) directly:

https://github.com/apache/iceberg/blob/11d21b26ecbb30361b2b2eee0c335d6cd9560c8d/core/src/main/java/org/apache/iceberg/SerializableTable.java#L244

through this stack trace when running something like spark.sql("INSERT INTO ..."):

org.apache.iceberg.LocationProviders.locationsFor(LocationProviders.java:42)
org.apache.iceberg.SerializableTable.locationProvider(SerializableTable.java:244)
org.apache.iceberg.io.OutputFileFactory$Builder.build(OutputFileFactory.java:177)
org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:681)
org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:668)
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:441)
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)

It looks like this may have broken in #9029 (@przemekd, @aokolnychyi) -- I tried reverting that change and my locationProvider is now invoked as expected.

Willingness to contribute

przemekd commented 1 week ago

@jamesbornholt I see. It looks like that PR fixed one issue but also introduced new one. I was unaware that you can provide custom default location provider by implementing custom Catalog. But I still argue the spark job shouldn't fail if it just reads data from tables without providing that custom location provider implementation. I can change the code to use:

table.locationProvider()

for getting the table's location provider, but instead of letting it possibly failing right away we could wrap that into a custom Result class such as:

import java.io.Serializable;

public class Result<T> implements Serializable {
    private static final long serialVersionUID = 1L;

    private final T value;
    private final Exception error;

    private Result(T value, Exception error) {
        this.value = value;
        this.error = error;
    }

    public static <T> Result<T> success(T value) {
        return new Result<>(value, null);
    }

    public static <T> Result<T> failure(Exception error) {
        return new Result<>(null, error);
    }

    public boolean isSuccess() {
        return error == null;
    }

    public T getValue() {
        if (error != null) {
            throw new IllegalStateException("Cannot get value from a failed result", error);
        }
        return value;
    }

    public Exception getError() {
        if (error == null) {
            throw new IllegalStateException("No error present in a successful result");
        }
        return error;
    }
}

and it will get only unpacked when the location provider is truly needed. Let me know what you think about that @jamesbornholt @aokolnychyi

jamesbornholt commented 2 days ago

I agree it’s desirable to allow read queries to succeed without the custom location provider available. I like the idea of just deferring the exception like this!