delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.65k stars 1.72k forks source link

[BUG]how to authenticate the adls2 in azure when I use flink connector #2032

Open sunscorch opened 1 year ago

sunscorch commented 1 year ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

I try a few ways to authenticate adls2 when I use

Steps to reproduce

Please include copy-pastable code snippets if possible.

private static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {

        org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
       //set the conf key in the hadoop conf
       configuration.set("fs.azure.account.key.leisun1992.dfs.core.windows.net", "<Storage account key>");
       DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        configuration,
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Observed results

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: Could not create committable serializer.
    at io.delta.flink.sink.internal.DeltaSinkInternal.getGlobalCommittableSerializer(DeltaSinkInternal.java:181)
    at org.apache.flink.streaming.api.transformations.SinkV1Adapter.asSpecializedSink(SinkV1Adapter.java:88)
    at org.apache.flink.streaming.api.transformations.SinkV1Adapter.wrap(SinkV1Adapter.java:70)
    at org.apache.flink.streaming.api.datastream.DataStreamSink.forSinkV1(DataStreamSink.java:91)
    at org.apache.flink.streaming.api.datastream.DataStream.sinkTo(DataStream.java:1274)
    at org.apache.flink.streaming.api.datastream.DataStream.sinkTo(DataStream.java:1254)
    at org.example.mssqlSinkToKafka.createDeltaSink(mssqlSinkToKafka.java:107)
    at org.example.mssqlSinkToKafka.main(mssqlSinkToKafka.java:85)
Caused by: Failure to initialize configuration
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:51)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:586)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1560)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:236)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:183)
    at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at io.delta.flink.sink.internal.DeltaSinkBuilder.createBucketWriter(DeltaSinkBuilder.java:356)
    at io.delta.flink.sink.internal.DeltaSinkBuilder.getGlobalCommittableSerializer(DeltaSinkBuilder.java:348)
    at io.delta.flink.sink.internal.DeltaSinkInternal.getGlobalCommittableSerializer(DeltaSinkInternal.java:179)
    ... 7 more

Expected results

May I ask how to set the adls2 authentication in this code? I also try

Configuration flinkConf = new Configuration();
        flinkConf.setString("fs.azure.account.key.leisun1992.dfs.core.windows.net", "<keys>");
        env.configure(flinkConf);

It seems it does not work as well

BjarkeTornager commented 1 year ago

It should work if you create a core-site.xml file under src/main/resources in your application with the following content:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<configuration>
  <property>
    <name>fs.azure</name>
    <value>org.apache.hadoop.fs.azure.NativeAzureFileSystem</value>
  </property>
  <property>
    <name>fs.azure.account.key.leisun1992.dfs.core.windows.net</name>
    <value>${env.AZURE_ACCOUNT_KEY}</value>
  </property>
</configuration>

Flink should pick up the configuration and authenticate when doing the writing to ADLS Gen2.