stackabletech / spark-k8s-operator

Operator for Apache Spark-on-Kubernetes for Stackable Data Platform
https://stackable.tech
Other
51 stars 2 forks source link

Executors don't resolve dependencies #245

Closed razvan closed 12 months ago

razvan commented 1 year ago

Fixes https://github.com/stackabletech/spark-k8s-operator/issues/141

This PR tests is Spark can load --packages in Kubernetes clusters (drivers and executors). It uses the PostgreSQL JDBC driver and Spark 3.4.0 as an example.

It currently fails to load the JDBC driver in the Spark driver:

2023-05-31T08:44:21,700 INFO [Thread-4] org.sparkproject.jetty.server.handler.ContextHandler - Started o.s.j.s.ServletContextHandler@354a8809{/static/sql,null,AVAILABLE,@Spark}
Traceback (most recent call last):
  File "/stackable/spark/jobs/write-to-postgresql.py", line 11, in <module>
    .jdbc("jdbc:postgresql://spark-postgresql/spark", "sparktest",
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/stackable/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1919, in jdbc
  File "/stackable/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/stackable/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
  File "/stackable/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o76.jdbc.
: java.sql.SQLException: No suitable driver
        at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
supsupsap commented 4 months ago

why its closed? @razvan

supsupsap commented 4 months ago

I mean I need postgresql driver for my scripts, and cant provide it to spark

razvan commented 4 months ago

why its closed? @razvan

This was a spike to investigate #141

That issue has been fixed by https://github.com/stackabletech/spark-k8s-operator/pull/281

I mean I need postgresql driver for my scripts, and cant provide it to spark

What have you tried ? Do you have more context ?

supsupsap commented 4 months ago

@razvan Im using stackable Spark 3.5.1 And trying to use JDBC catalog for Iceberg. I need PostgreSQL driver for that.

What I did first:

spec:
  deps:
    packages:
    - org.apache.logging.log4j:log4j-slf4j-impl:2.22.0
    - org.apache.hadoop:hadoop-aws:3.3.4
    - org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0
    - org.apache.iceberg:iceberg-aws-bundle:1.5.0
    - org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.79.0
    - org.postgresql:postgresql:42.6.0

After than in enviroment tab of SparkUI I can see postgresql driver. - spark://pyspark-pi-testtesttest-10f9ee8ef5283865-driver-svc.spark.svc:7078/files/org.postgresql_postgresql-42.6.0.jar Added By User

But I got this exception:

File "/tmp/spark-56bc1d7c-393f-45f1-ab6e-32b50f313935/jdbc_1_get_metadata_json_path.py", line 25, in <module>
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_to_migrate}.{ds}").show()
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/stackable/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1631, in sql
  File "/stackable/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/stackable/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
  File "/stackable/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql.
: org.apache.iceberg.jdbc.UncheckedSQLException: Failed to connect: postgresql://rds-stage-jdbc-catalog.bbb.me:5432/catalog
    at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:57)
    at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:30)
    at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
    at org.apache.iceberg.jdbc.JdbcCatalog.initializeCatalogTables(JdbcCatalog.java:155)
    at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:141)
    at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:255)
    at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:309)
    at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:154)
    at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:751)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
    at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndNamespace$.unapply(LookupCatalog.scala:86)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:51)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
    at org.apache.spark.sql.catalyst.plans.logical.CreateNamespace.mapChildren(v2Commands.scala:548)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:30)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:27)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
    at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.sql.SQLException: No suitable driver found for postgresql://rds-stage-jdbc-catalog.bbb.me:5432/catalog
    at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:702)
    at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)
    at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:55)
    ... 89 more

Also some of my settings: spark.driver.userClassPathFirst: 'false' spark.executor.userClassPathFirst: 'false'

without that other packages is not working and trigger strange errors. Also providing this jar under PVC is not helping. I can see it in SparkUI in classpath but still getting this error

razvan commented 4 months ago

Yes, this is annoying and something that Spark should really fix upstream.

The only solid workaround is listed in the documentation:

Currently it’s not possible to provision dependencies that are loaded by the JVM’s [system class loader](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/ClassLoader.html#getSystemClassLoader()). Such dependencies include JDBC drivers. If you need access to JDBC sources from your Spark application, consider building your own custom Spark image.

I created a new issue from your comment to have another look at this.

supsupsap commented 4 months ago

@razvan Can you help me with one more thing? I just searhing for Dockerfile of stackable spark image and found it as I understood https://github.com/stackabletech/docker-images/blob/main/spark-k8s/Dockerfile

What I should provide here? FROM stackable/image/java-base

ARG PRODUCT ARG PYTHON ARG SPARK ARG HADOOP_SHORT_VERSION ARG HADOOP_LONG_VERSION ARG AWS_JAVA_SDK_BUNDLE ARG AZURE_STORAGE ARG AZURE_KEYVAULT_CORE ARG JACKSON_DATAFORMAT_XML ARG STAX2_API ARG WOODSTOX_CORE ARG JMX_EXPORTER ARG RELEASE ARG TARGETARCH ARG TINI

I found all here https://github.com/stackabletech/docker-images/blob/main/conf.py except ARG RELEASE ARG TARGETARCH

adwk67 commented 4 months ago

@supsupsap It is easier to use an existing image and augment that with additional resources, much like is done here, rather than trying to adapt the original/base dockerfile. But please open an additional issue/discussion if necessary rather than continuing the thread here (or add a comment on https://github.com/stackabletech/spark-k8s-operator/issues/391 if relevant to that issue).