apache / amoro

Apache Amoro (incubating) is a Lakehouse management system built on open data lake formats.
https://amoro.apache.org/
Apache License 2.0
755 stars 262 forks source link

[Bug]: Spark query does not use catalog's built-in ugi #2484

Open ShawHee opened 6 months ago

ShawHee commented 6 months ago

What happened?

In the process of using Amoro, I found that the rpc response time was unstable when connecting to the router, resulting in slow query of the Amoro table. Therefore, I want to connect namanode directly through amoro to optimize query speed. During the testing process, it was found that spark queries would experience exceptions. The reason is: when spark creates a task, the client will first register the token with the router. Later, when yarn starts the container, it will get this token to authenticate with hdfs. However, since this token is registered with the router, and after the task starts executing, the amoro file is actually directly connected to the namenode, so the token cannot be found on the namenode, so the authentication fails. Theoretically, Amoro uses the keytab and principal authentication in the Amoro catalog to read data, and should not use the yarn token.

Affects Versions

master

What engines are you seeing the problem on?

No response

How to reproduce

No response

Relevant log output

2023-12-29 13:01:41 CST TaskSetManager WARN - Lost task 0.0 in stage 0.0 (TID 0) (10.55.156.39 executor 1): com.netease.arctic.shade.org.apache.iceberg.exceptions.RuntimeIOException: Failed to open Parquet file: hdfs://eadhadoop/user/kada/hive_db/kada_ext.db/dim_user_compatible_id/hive/1703800975179_44501/6-B-44501-00152-157-662839286894636421-00001.parquet
    at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.newReader(AdaptHiveReadConf.java:222)
    at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.<init>(AdaptHiveReadConf.java:74)
    at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.init(AdaptHiveParquetReader.java:71)
    at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.iterator(AdaptHiveParquetReader.java:83)
    at com.netease.arctic.io.CloseableIterableWrapper.iterator(CloseableIterableWrapper.java:39)
    at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:152)
    at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:143)
    at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable.iterator(CloseableIterable.java:138)
    at com.netease.arctic.io.reader.AbstractArcticDataReader.readData(AbstractArcticDataReader.java:125)
    at com.netease.arctic.spark.reader.KeyedSparkBatchScan$RowReader.next(KeyedSparkBatchScan.java:246)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1469)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for bdms_hechao: HDFS_DELEGATION_TOKEN owner=bdms_hechao/dev@YOUDAO.163.COM, renewer=yarn, realUser=, issueDate=1703826072124, maxDate=1704430872124, sequenceNumber=942934, masterKeyId=20) can't be found in cache
    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507)
    at org.apache.hadoop.ipc.Client.call(Client.java:1453)
    at org.apache.hadoop.ipc.Client.call(Client.java:1363)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
    at com.sun.proxy.$Proxy20.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
    at com.sun.proxy.$Proxy21.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:845)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:834)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:823)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1062)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:303)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:299)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:311)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:914)
    at com.netease.arctic.shade.org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
    at com.netease.arctic.shade.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:773)
    at com.netease.arctic.shade.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
    at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.newReader(AdaptHiveReadConf.java:220)
    ... 30 more

Anything else

No response

Are you willing to submit a PR?

Code of Conduct

zhoujinsong commented 3 weeks ago

Any update on this issue? Can you help to check if this bug still exists in the master branch? @ShawHee

ShawHee commented 3 weeks ago

@zhoujinsong Yes, it still exists in the master branch. I modified my environment and added doAs in SparkBatchScan, and it took effect.