AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
185 stars 95 forks source link

Unable to run Spline in yarn cluster mode unable to access producer url #102

Closed ayanbizz closed 3 years ago

ayanbizz commented 4 years ago

I am getting the following error while trying to initialize spline in yarn cluster mode. The error is related to the producer url access. I am able to run spline in deploy-mode = client , the issue is with cluster mode. This is how I tried to run the spark-submit

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --queue sparkjobs \
  --jars spark-2.3-spline-agent-bundle_2.11-0.5.3.jar \
  --conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener" \
  --conf "spark.spline.producer.url=http://someurl/producer"
20/06/25 20:14:04 ERROR QueryExecutionEventHandlerFactory: Spline initialization failed! Spark Lineage tracking is DISABLED.
za.co.absa.spline.harvester.exception.SplineInitializationException: Spark Agent was not able to establish connection to Spline Gateway
    at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher$$anonfun$1.applyOrElse(HttpLineageDispatcher.scala:120)
    at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher$$anonfun$1.applyOrElse(HttpLineageDispatcher.scala:118)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Failure.recover(Try.scala:216)
    at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher.<init>(HttpLineageDispatcher.scala:118)
    at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher.<init>(HttpLineageDispatcher.scala:88)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.instantiate(DefaultSplineConfigurer.scala:114)
    at za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.lineageDispatcher(DefaultSplineConfigurer.scala:94)
    at za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.queryExecutionEventHandler(DefaultSplineConfigurer.scala:92)
    at za.co.absa.spline.harvester.QueryExecutionEventHandlerFactory.createEventHandler(QueryExecutionEventHandlerFactory.scala:38)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$.za$co$absa$spline$harvester$listener$SplineQueryExecutionListener$$constructEventHandler(SplineQueryExecutionListener.scala:68)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$$lessinit$greater$1.apply(SplineQueryExecutionListener.scala:39)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$$lessinit$greater$1.apply(SplineQueryExecutionListener.scala:39)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.za$co$absa$spline$harvester$listener$SplineQueryExecutionListener$$maybeEventHandler$lzycompute(SplineQueryExecutionListener.scala:33)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.za$co$absa$spline$harvester$listener$SplineQueryExecutionListener$$maybeEventHandler(SplineQueryExecutionListener.scala:33)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$onSuccess$1.apply$mcV$sp(SplineQueryExecutionListener.scala:42)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:51)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:41)
    at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:124)
    at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:123)
    at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:145)
    at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:143)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
    at org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:143)
    at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:123)
    at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
    at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
    at org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:156)
    at org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:122)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3257)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: org.apache.hadoop.fs.FsUrlConnection:http://someurl/producer/status (of class org.apache.hadoop.fs.FsUrlConnection)
    at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:351)
    at scalaj.http.HttpRequest.exec(Http.scala:343)
    at scalaj.http.HttpRequest.asString(Http.scala:491)
    at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher$$anonfun$2.apply(HttpLineageDispatcher.scala:109)
cerveada commented 4 years ago

This error means Agent is not able to reach the Gateway over the network. If Spline works in local mode, it probably means the connection from the cluster to gateway is somehow blocked.

The agent is basically listener in the Spark driver and since in cluster mode the driver is in the cluster, you have to make sure that the Spline Gateway is accessible from cluster and the spark.spline.producer.url is working when you go from cluster.

wajda commented 4 years ago

It looks more like Scalaj-Http or Hadoop issue, or a mix. Look at this line:

Caused by: scala.MatchError: org.apache.hadoop.fs.FsUrlConnection:http://someurl/producer/status (of class org.apache.hadoop.fs.FsUrlConnection)

A FsUrlConnection connection is used in place where HttpUrlConnection is expected.

wajda commented 4 years ago

A similar issue was reported in Spark - https://issues.apache.org/jira/browse/SPARK-25694

wajda commented 4 years ago

The issue should be fixed in Spark 3.0.0, and supposedly also backported to Spark 2.4. (see https://github.com/apache/spark/pull/26530#issuecomment-630493684)

@ayanbizz Can you try it with Spark 2.4?

DaimonPl commented 4 years ago

I can confirm that with spark 2.4.5 we did not have such problems :)

matt12eagles commented 2 years ago

Hello, does anyone have a work around on this besides updating spark? @wajda trying to see if we can have a work around on this from the spline agent side so we dont have to upgrade all apps to spark 2.4.5+ Thanks!

wajda commented 2 years ago

I would try replacing scalaj with another Http client in the HttpLineageDispatcher class.

matt12eagles commented 2 years ago

thank you @wajda we will give that a try! Will look into other options for http clients, hope to get it working!