delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.5k stars 1.68k forks source link

[BUG][Standalone] Java connections against a non secured HDFS fail after obtaining DeltaLog object querying a secured HDFS #2631

Open meluchoMZ opened 7 months ago

meluchoMZ commented 7 months ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

Steps to reproduce

Supose two different HDFS. One of them is secured via Kerberos, and the other is not secured. Execute the following class to reproduce the issue:

package org.example;

import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashMap;

import javax.security.auth.Subject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;

import com.sun.security.auth.module.Krb5LoginModule;

import io.delta.standalone.DeltaLog;
import io.delta.standalone.types.StructField;
import io.delta.standalone.types.StructType;

public class DeltaKerberosTest {
    public static final String PARAM_KERBEROS_DEBUG = "debug";
    public static final String PARAM_KERBEROS_DONOTPRONT = "doNotPrompt";
    public static final String PARAM_KERBEROS_RENEWTGT = "renewTGT";
    public static final String PARAM_KERBEROS_STOREKEY = "storeKey";
    public static final String PARAM_KERBEROS_USEFIRSTPASS = "useFirstPass";
    public static final String PARAM_KERBEROS_USEKEYTAB = "useKeyTab";
    public static final String PARAM_KERBEROS_USETICKETCACHE = "useTicketCache";
    public static final String PARAM_KERBEROS_REFRESH_CONF_FILE = "refreshKrb5Config";
    public static final String PARAM_KERBEROS_DISABLE_REFERRALS = "disableReferrals";

    public static void main(String[] args) throws Exception {
        System.out.println("LISTING A DIRECTORY FROM A NON KERBERIZED");
        querySimpleAuth();
        System.out.println("OBTAINING SCHEMA OF DELTA TABLE FROM KERBERIZED HDFS");
        queryKerberosDeltaTable();
        System.out.println("LISTING A DIRECTORY TO A NON KERBERIZED HFDS AFTERWARDS");
        querySimpleAuth();
    }

    private static void querySimpleAuth() throws Exception {
        final String simpleAuthUri = "hdfs://<host>:<port>/user/hdfs/";
        Configuration conf = new Configuration();
        conf.set("fs.hdfs.impl.disable.cache", "true");
        conf.set("ipc.client.fallback-to-simple-auth-allowed", "true");
        try (FileSystem fs = FileSystem.get(new URI(simpleAuthUri), conf)) {
            RemoteIterator<LocatedFileStatus> directoryIterator = fs.listLocatedStatus(new Path(simpleAuthUri));
            while (directoryIterator.hasNext()) {
                LocatedFileStatus status = directoryIterator.next();
                System.out.println(status.getPath().toString());
            }
        }
    }
    private static void queryKerberosDeltaTable() throws Exception, NoSuchMethodException, InvocationTargetException {
        final String krbUri = "hdfs://<host>:<port>/user/hdfs/delta/test_table";
        Configuration conf = new Configuration();
        conf.set("dfs.datanode.kerberos.principal", "hdfs/<host>@<krb-domain>");
        conf.set("dfs.namenode.kerberos.principal.pattern", "hdfs/*@<krb-domain>");
        conf.set("fs.defaultFS", "hdfs://<host>:<port>/user/hdfs");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        conf.set("fs.hdfs.impl.disable.cache", "true");
        conf.set("hadoop.security.authentication", "kerberos");
        conf.set("hadoop.security.authorization", "true");
        Subject subject = new Subject();
        HashMap<String, Object> sharedState = new HashMap<>();
        sharedState.put("javax.security.auth.login.name", "<client-principal>@<krb-domain>");
        // the same error is reproducible via keytab
        sharedState.put("javax.security.auth.login.password", "<password>".toCharArray());
        HashMap<String, String> options = new HashMap<>();
        options.put(PARAM_KERBEROS_DEBUG, "true");
        options.put(PARAM_KERBEROS_DONOTPRONT, "false");
        options.put(PARAM_KERBEROS_RENEWTGT, "false");
        options.put(PARAM_KERBEROS_STOREKEY, "false");
        options.put(PARAM_KERBEROS_USEFIRSTPASS, "true");
        options.put(PARAM_KERBEROS_USEKEYTAB, "false");
        options.put(PARAM_KERBEROS_USETICKETCACHE, "false");
        options.put(PARAM_KERBEROS_REFRESH_CONF_FILE, "true");
        options.put(PARAM_KERBEROS_DISABLE_REFERRALS, "false");
        Krb5LoginModule module = (Krb5LoginModule) Class.forName(Krb5LoginModule.class.getName()).getConstructor()
            .newInstance();
        module.initialize(subject, null, sharedState, options);
        module.login();
        module.commit();
        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromSubject(subject);
        DeltaLog deltaLog = DeltaLog.forTable(conf, new Path(krbUri));
        StructType type = deltaLog.update().getMetadata().getSchema();
        for (StructField field : type.getFields()) {
            System.out.println(field.getName() + ": " + field.getDataType().getTypeName());
        }
    }
}

This is the output obtained from the test:

LISTING A DIRECTORY FROM A NON KERBERIZED
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/mblanco/.m2/repository/org/slf4j/slf4j-reload4j/1.7.36/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/mblanco/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.hadoop.fs.FileSystem).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
hdfs://<host>:<port>/user/hdfs/table_1
hdfs://<host>:<port>/user/hdfs/table_2
hdfs://<host>:<port>/user/hdfs/table_3
hdfs://<host>:<port>/user/hdfs/table_4
hdfs://<host>:<port>/user/hdfs/delta
hdfs://<host>:<port>/user/hdfs/table_5
OBTAINING SCHEMA OF DELTA TABLE FROM KERBERIZED HDFS
Debug is  true storeKey false useTicketCache false useKeyTab false doNotPrompt false ticketCache is null isInitiator true KeyTab is null refreshKrb5Config is true principal is null tryFirstPass is false useFirstPass is true storePass is false clearPass is false
Refreshing Kerberos configuration
username from shared state is <client-principal>@<krb-domain>

username from shared state is <client-principal>@<krb-domain>

password is <password>
principal is <client-principal>@<krb-domain>
Commit Succeeded 

intsample: integer
longsample: long
floatsample: double
doublesample: double
textsample: string
booleansample: boolean
timesample: timestamp
LISTING A DIRECTORY TO A NON KERBERIZED HFDS AFTERWARDS
Exception in thread "main" org.apache.hadoop.security.AccessControlException: Call From <host>/<ip> to <target-hdfs>:<port> failed: Server asks us to fall back to SIMPLE auth, but this client is configured to only allow secure connections.
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:930)
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:895)
    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1588)
    at org.apache.hadoop.ipc.Client.call(Client.java:1530)
    at org.apache.hadoop.ipc.Client.call(Client.java:1427)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:258)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:139)
    at com.sun.proxy.$Proxy12.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:689)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362)
    at com.sun.proxy.$Proxy13.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1702)
    at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:1273)
    at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:1256)
    at org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1201)
    at org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1197)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:1215)
    at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:2230)
    at org.example.DeltaKerberosTest.querySimpleAuth(DeltaKerberosTest.java:60)
    at org.example.DeltaKerberosTest.main(DeltaKerberosTest.java:51)
Caused by: org.apache.hadoop.security.AccessControlException: Server asks us to fall back to SIMPLE auth, but this client is configured to only allow secure connections.
    at org.apache.hadoop.ipc.Client$Connection.setFallBackToSimpleAuth(Client.java:883)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825)
    at org.apache.hadoop.ipc.Client$Connection.access$3800(Client.java:363)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1649)
    at org.apache.hadoop.ipc.Client.call(Client.java:1474)
    ... 25 more

Observed results

After a call to the Delta API querying the secured HDFS, successive calls to the non secured HDFS fail with the following error: Exception in thread "main" org.apache.hadoop.security.AccessControlException: Call From <host>/<ip> to <target_hdfs>:<target_port> failed: Server asks us to fall back to SIMPLE auth, but this client is configured to only allow secure connections

This issue is not present on standard Hadoop API calls, such as FileSystem.listStatus(). Neither is present when accessing AWS S3 nor Azure Data Lake Gen 2;

Expected results

The client should authenticate through SIMPLE authentication (or fallback to simple if configured)

Further details

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

vkorukanti commented 7 months ago

Don't have a Kerberos setup to test it, but shouldn't the Configuration object used in queryKerberosDeltaTable set this option ipc.client.fallback-to-simple-auth-allowed=true? I think what is happening is the second time call to querySimpleAuth is using the UserGroupInformation set by the queryKerberosDeltaTable in thread local which only allows querying a secure cluster.

Let me know if that doesn't work.