apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.87k stars 2.06k forks source link

Flink/Azure job graph serialization fails when used with storage account shared key authentication #10245

Open ms1111 opened 2 months ago

ms1111 commented 2 months ago

Apache Iceberg version

1.5.1 (latest release)

Query engine

Flink

Please describe the bug 🐞

ADLSFileIO has an AzureProperties object. When ADLS_SHARED_KEY_ACCOUNT_NAME or ADLS_SHARED_KEY_ACCOUNT_KEY are set, AzureProperties creates a StorageSharedKeyCredential in its constructor. StorageSharedKeyCredential is not Serializable, so serialization fails during job startup.

If the storage account key is not supplied, DefaultAzureCredential will try to get credentials from the Azure CLI or another source like workload identity. That appears to work, but some environments may require shared key authentication.

The serialization error is below:

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:322)
    ... 13 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.
    at org.apache.flink.streaming.api.graph.StreamConfig.lambda$serializeAllConfigs$1(StreamConfig.java:203)
    at java.base/java.util.HashMap.forEach(HashMap.java:1421)
    at org.apache.flink.streaming.api.graph.StreamConfig.serializeAllConfigs(StreamConfig.java:197)
    at org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:174)
    at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.NotSerializableException: com.azure.storage.common.StorageSharedKeyCredential

Sample app to trigger it below.

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.azure.adlsv2.ADLSFileIO;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;

/**
 * Run with environment variables:
 * <ul>
 *     <li>STORAGE_ACCOUNT=storage account name</li>
 *     <li>STORAGE_ACCOUNT_KEY=key</li>
 *     <li>CONTAINER=name of storage container</li>
 * </ul>
 */
public class ADLSSharedKeyAuthIssue {
    public static void main(String[] args) throws Exception {
        final String storageAccount = System.getenv("STORAGE_ACCOUNT");
        final String storageAccountKey = System.getenv("STORAGE_ACCOUNT_KEY");
        final String container = System.getenv("CONTAINER");

        Map<String, String> options = new HashMap<>();
        options.put("warehouse", "abfss://" + container + "@" + storageAccount + ".dfs.core.windows.net");
        options.put("uri", "http://localhost:19120/api/v1");
        options.put(CatalogProperties.FILE_IO_IMPL, ADLSFileIO.class.getCanonicalName());
        options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_NAME, storageAccount);
        options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_KEY, storageAccountKey);

        CatalogLoader catalogLoader = CatalogLoader.custom(
                "flink",
                options,
                new Configuration(),
                CatalogUtil.ICEBERG_CATALOG_NESSIE);

        Catalog catalog = catalogLoader.loadCatalog();

        Schema schema = new Schema(
                Types.NestedField.required(1, "id", Types.LongType.get()));

        PartitionSpec spec = PartitionSpec.builderFor(schema).build();
        Namespace namespace = Namespace.of("nsname_" + UUID.randomUUID().toString().substring(0, 4));
        ((SupportsNamespaces) catalog).createNamespace(namespace);
        TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "t1");
        Table table = catalog.createTable(tableIdentifier, schema, spec);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Row> source = env.fromElements(1)
                .map(data -> {
                    Row row = new Row(1);
                    row.setField(0, data);
                    return row;
                });

        FlinkSink.forRow(source, FlinkSchemaUtil.toSchema(schema))
                .tableLoader(TableLoader.fromCatalog(catalogLoader, tableIdentifier))
                .overwrite(true)
                .append();

        env.execute();
    }
}

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>adls-shared-key-auth-issue</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <iceberg.version>1.5.1</iceberg.version>
        <flink.version>1.18.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-core</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-api</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-parquet</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <!-- Needed for to load the nessie catalog  -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-nessie</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-azure</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-data</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
        </dependency>

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-storage-file-datalake</artifactId>
        </dependency>

        <!--  to be able to create parquet file       -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.3</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-column</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-1.18</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-dropwizard</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.azure</groupId>
                <artifactId>azure-sdk-bom</artifactId>
                <version>1.2.22</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>
nastra commented 2 months ago

This will be fixed by https://github.com/apache/iceberg/pull/10045