apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.53k stars 2.25k forks source link

Iceberg defaulting to URLConnectionHttpClient instead of Apache HTTP Client #11116

Open anthonysgro opened 2 months ago

anthonysgro commented 2 months ago

Apache Iceberg version

1.5.0

Query engine

Spark

Please describe the bug 🐞

Receiving this stack trace when reading from cross-account iceberg glue table:

 diagnostics: User class threw exception: java.lang.NoClassDefFoundError: software/amazon/awssdk/http/urlconnection/UrlConnectionHttpClient
    at org.apache.iceberg.aws.AwsClientFactories.configureHttpClientBuilder(AwsClientFactories.java:160)
    at org.apache.iceberg.aws.AwsClientFactories$DefaultAwsClientFactory.glue(AwsClientFactories.java:111)
    at org.apache.iceberg.aws.glue.GlueCatalog.initialize(GlueCatalog.java:141)
    at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:200)
    at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:237)
    at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:119)
    at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:411)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
    at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:122)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$1(Analyzer.scala:1314)
    at scala.Option.orElse(Option.scala:447)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$resolveRelation(Analyzer.scala:1313)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1163)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1124)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1124)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1083)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:236)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:319)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:368)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:319)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:309)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:309)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:195)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:191)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:260)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:256)
    at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:190)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:256)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:243)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:242)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:219)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:256)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:625)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:256)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:255)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:69)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:93)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:91)
    at org.apache.spark.sql.DataFrameReader.table(DataFrameReader.scala:608)
    at com.amazon.myaggregation.spark.SparkIcebergTableDao.readAsDataFrame(SparkIcebergTableDao.scala:12)
    at com.amazon.myaggregation.spark.Iceberg.RJIcebergDao.readDataFrame(RJcebergDao.scala:18)
    at com.amazon.myaggregation.spark.readers.RJReader.read(RJReader.scala:24)
    at com.amazon.myaggregation.spark.readers.RJReader.readJournals(RJReader.scala:57)
    at com.amazon.myaggregation.reports.DataGenerator$.generateEvents$1(DataGenerator.scala:78)
    at com.amazon.myaggregation.reports.DataGenerator$.generateData(DataGenerator.scala:60)
    at com.amazon.myaggregation.reports.DataGenerator$.main(DataGenerator.scala:41)
    at com.amazon.myaggregation.reports.DataGenerator.main(DataGenerator.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:569)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)
Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    ... 78 more

     ApplicationMaster host: ip-10-0-1-95.ec2.internal
     ApplicationMaster RPC port: 40951
     queue: default
     start time: 1726089925586
     final status: FAILED
     tracking URL: http://ip-10-0-1-254.ec2.internal:20888/proxy/application_1726089801574_0001/
     user: hadoop

My SparkSession configuration:

  def crossAccountAyclIcebergSession(appName: String, domain: String): SparkSession = {
    SparkSession
      .builder
      .config(new SparkConf()
        .setAppName(appName)
        .setMaster("yarn")
        .set("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35")
        .set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35")
        .set("spark.sql.hive.metastore.jars", "maven")
        .set("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
      )
      .config(s"spark.sql.catalog.${CATALOG}.http-client.type", "apache")
      .config(s"spark.sql.catalog.${CATALOG}", "org.apache.iceberg.spark.SparkCatalog")
      .config(s"spark.sql.catalog.${CATALOG}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
      .config(s"spark.sql.catalog.${CATALOG}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
      .config(s"spark.sql.catalog.${CATALOG}.warehouse", "my-warehouse")
      .config(s"spark.sql.catalog.${CATALOG}.glue.id", "my-account")
      .config(s"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .config(s"spark.sql.iceberg.vectorization.enabled", "true")
      .enableHiveSupport()
      .getOrCreate()
  }

Happens after I run a simple read:

sparkSession.read.table("my_catalog.my_db.my_table")

Really not sure why this is happening. The only thing I can think of is that I am using two Spark Sessions (though I am using the one I provided specifically to read from the cross-account table). This works with another entrypoint in my Spark project, so I can only assume there is something weird going on with Iceberg.

I saw https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java#L166 that the only way I can be requesting the URLConnectionHttpClient is if clientType === HttpClientProperties.CLIENT_TYPE_URLCONNECTION, but I have explicitly set it to "apache" in my config.

Additionally, I tried adding the UrlConnectionHttpClient and just dealing with it, but that leads to other NoMethodFound issues so I am back to square one.

Willingness to contribute

anthonysgro commented 2 months ago

ended up fixing this by just resetting the config right before reading from the table:

sparkSession.sparkContext.setConf(s"spark.sql.catalog.${CATALOG}.http-client.type", "apache")

still weird, not sure why it would do this. Might be a spark issue.

nastra commented 2 months ago

@anthonysgro which Iceberg / AWS dependencies are you using? For 1.5.0 you should only need spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.0,org.apache.iceberg:iceberg-aws-bundle:1.5.0