OpenLineage / OpenLineage

An Open Standard for lineage metadata collection
http://openlineage.io
Apache License 2.0
1.78k stars 309 forks source link

SparkR support #1605

Open perttus opened 1 year ago

perttus commented 1 year ago

Is SparkR officially supported or why am I getting this error?

library(sparklyr)

conf <- sparklyr::spark_config()
conf[["spark.openlineage.host"]] <- "https://marquez-backend/"
conf[["spark.jars.packages"]] <- "io.openlineage:openlineage-spark:0.20.4"
conf[["spark.extraListeners"]] <- "io.openlineage.spark.agent.OpenLineageSparkListener"

spark_con <- spark_connect(master = "local", config = conf)

Error in force(code) : Failed during initialize_connection: org.apache.spark.SparkException: Exception when registering SparkListener at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2563) at org.apache.spark.SparkContext.(SparkContext.scala:643) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2704) at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:953) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:947) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at sparklyr.Invoke.invoke(invoke.scala:161) at sparklyr.StreamHandler.handleMethodCall(stream.scala:141) at sparklyr.StreamHandler.read(stream.scala:62) at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:60) at scala.util.control.Breaks.breakable(Breaks.scala:42) at sparklyr.BackendHandler.channelRead0(handler.scala:41) at sparklyr.BackendHandler.channelRead0(handler.scala:14) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: io.openlineage.spark.agent.OpenLineageSparkListener at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.spark.util.Utils$.classForName(Utils.scala:218) at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2921) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108) at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2919) at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1(SparkContext.scala:2552) at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1$adapted(SparkContext.scala:2551) at scala.Option.foreach(Option.scala:407) at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2551) ... 42 more

Log: /tmp/RtmpqHOAFb/file1e5b8e1f4be59a_spark.log

---- Output Log ---- at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: io.openlineage.spark.agent.OpenLineageSparkListener at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.spark.util.Utils$.classForName(Utils.scala:218) at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2921) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108) at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2919) at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1(SparkContext.scala:2552) at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1$adapted(SparkContext.scala:2551) at scala.Option.foreach(Option.scala:407) at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2551) ... 42 more 23/02/09 16:15:33 INFO SparkContext: SparkContext already stopped. 23/02/09 16:15:33 ERRO

mobuchowski commented 1 year ago

@perttus you might be the first person to try and tell us about that πŸ˜„

Caused by: java.lang.ClassNotFoundException: io.openlineage.spark.agent.OpenLineageSparkListener

This definitely looks like

conf[["spark.jars.packages"]] <- "io.openlineage:openlineage-spark:0.20.4"

does not work. Can you for example try with local jar and use spark.jars or put the config in spark.conf?

perttus commented 1 year ago

yes, I just realised the same. It works at least by setting conf[["sparklyr.defaultPackages"]] <- c("io.openlineage:openlineage-spark:0.20.4")

perttus commented 1 year ago

However, I get some errors but I have to debug later which line of code could cause it

[2023-02-09, 16:53:26 UTC] {docker.py:340} INFO - 23/02/09 16:53:26 ERROR EventEmitter: Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: code: 400, response: at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:131) at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:116) at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:42) at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:47) at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.start(SparkSQLExecutionContext.java:174) at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$onJobStart$9(OpenLineageSparkListener.java:149) at java.base/java.util.Optional.ifPresent(Optional.java:183) at io.openlineage.spark.agent.OpenLineageSparkListener.onJobStart(OpenLineageSparkListener.java:145) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

perttus commented 1 year ago

I think this was the actual root cause error for the previous EventEmitter errors.

I'm using Airflow DockerOperator for running the R script. OL fails to create any datasets that are written to S3 for the SparkR jobs but I can try again with some simple example.

ERROR AsyncEventQueue: Listener OpenLineageSparkListener threw an exception java.lang.ClassCastException: class org.apache.spark.scheduler.ShuffleMapStage cannot be cast to class java.lang.Boolean (org.apache.spark.scheduler.ShuffleMapStage is in unnamed module of loader 'app'; java.lang.Boolean is in module java.base of loader 'bootstrap') at scala.runtime.BoxesRunTime.unboxToBoolean(BoxesRunTime.java:87) at scala.collection.LinearSeqOptimized.forall(LinearSeqOptimized.scala:85) at scala.collection.LinearSeqOptimized.forall$(LinearSeqOptimized.scala:82) at scala.collection.immutable.List.forall(List.scala:91) at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.registerJob(OpenLineageRunEventBuilder.java:181) at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.setActiveJob(SparkSQLExecutionContext.java:152) at java.base/java.util.Optional.ifPresent(Optional.java:183) at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$onJobStart$9(OpenLineageSparkListener.java:148) at java.base/java.util.Optional.ifPresent(Optional.java:183) at io.openlineage.spark.agent.OpenLineageSparkListener.onJobStart(OpenLineageSparkListener.java:145) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

mobuchowski commented 1 year ago

java.lang.ClassCastException: class org.apache.spark.scheduler.ShuffleMapStage cannot be cast to class java.lang.Boolean (org.apache.spark.scheduler.ShuffleMapStage is in unnamed module of loader 'app'; java.lang.Boolean is in module java.base of loader 'bootstrap')

Interesting. To be honest, solving it now would probably require quite more time than I currently have (especially since I know nothing of SparkR), but I'd love to help review solution if you figured this on your own πŸ‘

perttus commented 1 year ago

Thanks for the help! πŸ‘

The original test was quite complex and included reading from several Vertica tables and joining them in spark with some transformations and finally writing it to S3 and failed to emit the lineage.

I tried again with simple S3 R/W operation and it succeeded without any errors. However, the columnLineage was empty.

df_out <- spark_read_parquet(spark_con,
  name = 'df_out',
  path = input_path)

spark_write_parquet(
  df_out,
  path = out_path,
  mode = "overwrite"
)

Testing again with simple Vertica read and S3 write failed to emit the lineage so the problem seems to be in Vertica read, which I suppose is not supported? We are using vertica-jdbc-9.3.1-0.jar

df_out <- spark_read_vertica(spark_con, "df_out", query)

spark_write_parquet(
  df_out,
  path = out_path,
  mode = "overwrite"
)

[2023-02-10, 15:22:21 UTC] {docker.py:340} INFO - 23/02/10 15:22:21 ERROR ColumnLevelLineageUtils: Error when invoking static method 'buildColumnLineageDatasetFacet' for Spark3 java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageUtils.buildColumnLineageDatasetFacet(ColumnLevelLineageUtils.java:35) at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.lambda$buildOutputDatasets$21(OpenLineageRunEventBuilder.java:424) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildOutputDatasets(OpenLineageRunEventBuilder.java:437) at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.populateRun(OpenLineageRunEventBuilder.java:296) at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildRun(OpenLineageRunEventBuilder.java:279) at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildRun(OpenLineageRunEventBuilder.java:222) at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.start(SparkSQLExecutionContext.java:70) at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$sparkSQLExecStart$0(OpenLineageSparkListener.java:91) at java.base/java.util.Optional.ifPresent(Optional.java:183) at io.openlineage.spark.agent.OpenLineageSparkListener.sparkSQLExecStart(OpenLineageSparkListener.java:91) at io.openlineage.spark.agent.OpenLineageSparkListener.onOtherEvent(OpenLineageSparkListener.java:82) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) Caused by: java.lang.NoClassDefFoundError: com/google/cloud/spark/bigquery/BigQueryRelation at io.openlineage.spark3.agent.lifecycle.plan.column.InputFieldsCollector.extractDatasetIdentifier(InputFieldsCollector.java:88) at io.openlineage.spark3.agent.lifecycle.plan.column.InputFieldsCollector.discoverInputsFromNode(InputFieldsCollector.java:63) at io.openlineage.spark3.agent.lifecycle.plan.column.InputFieldsCollector.collect(InputFieldsCollector.java:47) at io.openlineage.spark3.agent.lifecycle.plan.column.InputFieldsCollector.lambda$collect$0(InputFieldsCollector.java:57) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:31) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at io.openlineage.spark3.agent.lifecycle.plan.column.InputFieldsCollector.collect(InputFieldsCollector.java:57) at io.openlineage.spark3.agent.lifecycle.plan.column.ColumnLevelLineageUtils.buildColumnLineageDatasetFacet(ColumnLevelLineageUtils.java:34) ... 36 more Caused by: java.lang.ClassNotFoundException: com.google.cloud.spark.bigquery.BigQueryRelation at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 46 more [2023-02-10, 15:22:21 UTC] {docker.py:340} INFO - 23/02/10 15:22:21 ERROR EventEmitter: Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: code: 400, response: at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:131) at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:116) at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:42) at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:47) at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.start(SparkSQLExecutionContext.java:77) at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$sparkSQLExecStart$0(OpenLineageSparkListener.java:91) at java.base/java.util.Optional.ifPresent(Optional.java:183) at io.openlineage.spark.agent.OpenLineageSparkListener.sparkSQLExecStart(OpenLineageSparkListener.java:91) at io.openlineage.spark.agent.OpenLineageSparkListener.onOtherEvent(OpenLineageSparkListener.java:82) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

mobuchowski commented 1 year ago

@perttus sorry for late reply - I was sure I replied last week but must have closed the browser tab before I actually done that.

I think it would help to post Optimized Logical Plan of that job. What OL integration extracts depends on the structure of that plan. If Vertica has their own implementation of logical plan nodes that aren't based on modern Spark primitives like DataSourceV2Relation then we don't support this ATM, however, we for sure would accept PR that implements that as a DatasetBuilder and help with any problems along the way.

Example of DatasetBuilder: https://github.com/OpenLineage/OpenLineage/blob/65a5f021a1ba3035d5198e759587737a05b242e1/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2RelationInputDatasetBuilder.java#L28

On the other hand, I see some exceptions that should not relate to Vertica:

 Caused by: java.lang.ClassNotFoundException: com.google.cloud.spark.bigquery.BigQueryRelation
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

are you additionally using BigQuery? I think we've solved some of BigQuery problems in the latest release, so I'm surprised to see that, however, would be great to know more details.

 [2023-02-10, 15:22:21 UTC] {docker.py:340} INFO - 23/02/10 15:22:21 ERROR EventEmitter: Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: code: 400, response:
at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:131)

Also, would be great to see what this event is - should be logged on DEBUG. My suspicion is something does not cleanly serialize with LogicalPlan that we log as part of event - you could try again with set spark conf spark.openlineage.facets.disabled to [spark_unknown;spark.logicalPlan]

perttus commented 1 year ago

@mobuchowski thanks again for the detailed information. My guess is that the problem is related to that JDBC read is not fully supported because the error message is similar than in #1584 Once that is resolved, then SparkR specific errors can be better separated.

We are not using BigQuery. That BigQueryRelation error is raised every time any JDBC connector is used in Spark.

I could try to use the plain spark_read_jdbc function with Postgres instead of Vertica specific spark_read_vertica to see if the if I can emit the lineage at all. I can also try to get the Logical Plan out if it.

mobuchowski commented 1 year ago

@perttus - @tnazarew works on better support for JDBC in Spark integration currently. Your problems might soon be solved.

BQ errors should only be emitted if you're using BQ and something goes wrong, so that's definitely some kind of bug.

perttus commented 1 year ago

@mobuchowski Seems like the JDBC related issues have been resolved with latest release and Vertica reading works even for SparkR, thanks to @tnazarew. However, I still get the ShuffleMapStage error as before for more complex queries but I'm not sure if that is related to SparkR itself or even to JDBC.

I tried also with spark conf spark.openlineage.facets.disabled=[spark_unknown;spark.logicalPlan]as you suggested but that did not have any effect, except that it's no longer available in Marquez.

Should I close this issue and make another one for the ShuffleMapStage issue?

mobuchowski commented 1 year ago

However, I still get the ShuffleMapStage error as before for more complex queries but I'm not sure if that is related to SparkR itself or even to JDBC.

@perttus Would be great if you could post here some minimal reproduction. If it would work in Python or in SQL best, because we could use it in tests.

perttus commented 1 year ago

However, I still get the ShuffleMapStage error as before for more complex queries but I'm not sure if that is related to SparkR itself or even to JDBC.

@perttus Would be great if you could post here some minimal reproduction. If it would work in Python or in SQL best, because we could use it in tests.

I will have to get back to this later as it's not trivial to create such generic test case for it.