apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.96k stars 2.09k forks source link

Proxy support unavailable for iceberg spark client #9174

Open sravanscb opened 8 months ago

sravanscb commented 8 months ago

Apache Iceberg version

1.3.1

Query engine

Spark

Please describe the bug 🐞

I am trying to use iceberg 1.3 jars in Spark 3.2 - iceberg-spark-runtime-3.2_2.12-1.3.1.jar to be able to connect to Azure Databricks UniForm tables. I have followed the documentation and running the below Spark-shell command:

./spark-shell --conf "spark.ui.port=0" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.unity=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.unity.catalog-impl=org.apache.iceberg.rest.RESTCatalog" --conf "spark.sql.catalog.unity.uri=https://adb-xxxxx.azuredatabricks.net/api/2.1/unity-catalog/iceberg" --conf "spark.sql.catalog.unity.token=dapixxxx" --conf "spark.sql.catalog.unity.io-impl=org.apache.iceberg.aws.s3.S3FileIO" --conf "spark.driver.extraJavaOptions=-Dhttp.proxyHost=10.xx.xx.xx -Dhttp.proxyPort=443 -Dhttps.proxyHost=10.xx.xx.xx -Dhttps.proxyPort=443" --conf "spark.executor.extraJavaOptions=-Dhttp.proxyHost=10.xx.xx.xx -Dhttp.proxyPort=443 -Dhttps.proxyHost=10.xx.xx.xx -Dhttps.proxyPort=443"

We can only reach the ADB URL via proxy and despite passing the proxy options via spark driver and executor options, the REST Client doesn't look like it honors these proxy variables and fails to connect to ADB URL.

Error: org.apache.iceberg.exceptions.RESTException: Error occurred while processing GET request at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:304) at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:219) at org.apache.iceberg.rest.HTTPClient.get(HTTPClient.java:320) at org.apache.iceberg.rest.RESTSessionCatalog.fetchConfig(RESTSessionCatalog.java:823) at org.apache.iceberg.rest.RESTSessionCatalog.initialize(RESTSessionCatalog.java:167) at org.apache.iceberg.rest.RESTCatalog.initialize(RESTCatalog.java:68) at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:239) at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:284) at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:130) at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:479) at org.apache.iceberg.spark.SparkSessionCatalog.buildSparkCatalog(SparkSessionCatalog.java:79) at org.apache.iceberg.spark.SparkSessionCatalog.initialize(SparkSessionCatalog.java:285) at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52) at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:123) at org.apache.spark.sql.connector.catalog.LookupCatalog$SessionCatalogAndIdentifier$.unapply(LookupCatalog.scala:62) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1249) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$15.applyOrElse(Analyzer.scala:1204) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$15.applyOrElse(Analyzer.scala:1167) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127) at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1167) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1133) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) ... 47 elided Caused by: java.net.UnknownHostException: adb-xxxxxxxxxxxx.azuredatabricks.net: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) at java.net.InetAddress.getAllByName0(InetAddress.java:1277) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at org.apache.hc.client5.http.SystemDefaultDnsResolver.resolve(SystemDefaultDnsResolver.java:43) at org.apache.hc.client5.http.impl.io.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:141) at org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:447) at org.apache.hc.client5.http.impl.classic.InternalExecRuntime.connectEndpoint(InternalExecRuntime.java:162) at org.apache.hc.client5.http.impl.classic.InternalExecRuntime.connectEndpoint(InternalExecRuntime.java:172) at org.apache.hc.client5.http.impl.classic.ConnectExec.execute(ConnectExec.java:142) at org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at org.apache.hc.client5.http.impl.classic.ProtocolExec.execute(ProtocolExec.java:192) at org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at org.apache.hc.client5.http.impl.classic.HttpRequestRetryExec.execute(HttpRequestRetryExec.java:96) at org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at org.apache.hc.client5.http.impl.classic.ContentCompressionExec.execute(ContentCompressionExec.java:152) at org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at org.apache.hc.client5.http.impl.classic.RedirectExec.execute(RedirectExec.java:115) at org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at org.apache.hc.client5.http.impl.classic.InternalHttpClient.doExecute(InternalHttpClient.java:170) at org.apache.hc.client5.http.impl.classic.CloseableHttpClient.execute(CloseableHttpClient.java:123) at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:267) ... 116 more

sungwy commented 4 months ago

Maybe having specific proxy configurations for the REST Catalog would be better than relying on the SystemProperties?

It looks like the connection manager is taking the configurations from the system here, but what I've observed is that setting the proxy through the system environment could have unintended consequences for the Spark cluster or Kubernetes node unless we are able to identify all no_proxy comms and set them correctly.

I think setting the proxy configuration in a targeted manner on the HttpUriRequestBase could support this use case better. What are your thoughts on this @sravanscb @danielcweeks ?

sravanscb commented 4 months ago

Hi @syun64 Yes, there should be a way to properly pass the proxy configs to the iceberg client and it be utilized by the code. I tried several ways - at the system level and spark conf level but the underlying itself doesn't have the capability.