AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
185 stars 95 forks source link

Add Snowflake Plugin #818

Open sethjones348 opened 4 months ago

sethjones348 commented 4 months ago

Overview

This PR adds support for snowflake target sources when using the spline-spark-agent in conjunction with the snowflake spark connector.

sethjones348 commented 4 months ago

@wajda I implemented some of your suggestions. Turns out the read operation was not hard to capture after all! I also added an integration test, but I've marked the PR as draft as its not ready to merge yet. I have a couple of questions for ya.

1) Snowflake connector is dependent on spark 3.2, 3.3, or 3.4. I have written an integration test, but if I update the value of spark.version to spark-33.version I can't even attempt to run the test as I run into Slf4J binding errors:

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
24/07/17 16:03:48 INFO SparkContext: Running Spark version 3.3.1
24/07/17 16:03:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/17 16:03:48 INFO ResourceUtils: ==============================================================
24/07/17 16:03:48 INFO ResourceUtils: No custom resources configured for spark.driver.
24/07/17 16:03:48 INFO ResourceUtils: ==============================================================
24/07/17 16:03:48 INFO SparkContext: Submitted application: b911d908-3b0b-4a3d-9990-532586fee26c
24/07/17 16:03:48 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/07/17 16:03:48 INFO ResourceProfile: Limiting resource is cpu
24/07/17 16:03:48 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/07/17 16:03:48 INFO SecurityManager: Changing view acls to: sethjones
24/07/17 16:03:48 INFO SecurityManager: Changing modify acls to: sethjones
24/07/17 16:03:48 INFO SecurityManager: Changing view acls groups to: 
24/07/17 16:03:48 INFO SecurityManager: Changing modify acls groups to: 
24/07/17 16:03:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(sethjones); groups with view permissions: Set(); users  with modify permissions: Set(sethjones); groups with modify permissions: Set()

An exception or error caused a run to abort: 'void io.netty.buffer.PooledByteBufAllocator.<init>(boolean, int, int, int, int, int, int, boolean)' 
java.lang.NoSuchMethodError: 'void io.netty.buffer.PooledByteBufAllocator.<init>(boolean, int, int, int, int, int, int, boolean)'
    at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:171)
    at org.apache.spark.network.util.NettyUtils.getSharedPooledByteBufAllocator(NettyUtils.java:142)
    at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:111)
    at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:144)
    at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:77)
    at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:492)
    at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:58)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:271)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:279)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:464)

I thought this was a little strange being that I see other tests where it is tagged as ignore if < spark 3.0 or > spark 3.3, which tells me there is a way to run the integration tests at a spark 3.3.x version. Any insight into why this is happening or just instructions on how to run an integration test at a specific spark version other than updating the <spark.version> mvn variable? I believe that my test will work, but am blocked from testing until I figure this error out.

2) I am using this Snowflake plugin within Databricks. I noticed that if I build the spark spline agent bundle JAR file on the latest develop branch, my compute clusters enter a crashback loop if I try and do any spark operations. I saw an error related to potentially conflicting scala versions. When I build the JAR off of release/2.0, I can run the databricks notebooks just fine and the spline spark agent works as expected. Any ideas as to why this is happening?

Running the databricks runtime associated with scala 2.12 and spark 3.3.x.

Any insight would be greatly appreciated! Thanks!

wajda commented 4 months ago

1 ... Any insight into why this is happening or just instructions on how to run an integration test at a specific spark version other than updating the <spark.version> mvn variable?

To run integration tests at a specific Spark version activate a corresponding spark-x.y Maven profile:

$ mvn ... -Pspark-3.3

2 ... I noticed that if I build the spark spline agent bundle JAR file on the latest develop branch, my compute clusters enter a crashback loop if I try and do any spark operations. I saw an error related to potentially conflicting scala versions. When I build the JAR off of release/2.0, I can run the databricks notebooks just fine and the spline spark agent works as expected. Any ideas as to why this is happening?

That's interesting, but it's difficult to tell without looking at detailed logs. There were quite a few changes since the 2.0.x version. The latest release is 2.1.0 that comes with the Spark 3.4 support. Did you try that one? The difference between release/2.1 and develop is very minor on the other hand. It's just this - a6f63b19a8710c0b0a3eeffb3d89b317eee735a5

3 ... Failing auto builds

Did you check them?

The ones for Scala_2.12 fail because they can't connect to the Snowflake instance. Perhaps host naming env specific issue or alike. Can you take a look at it please?

The builds for Scala_2.11 fail because the given version of the Snowflake library doesn't exist for scala 2.11. You need to move Snowflake maven dependency under the the spark-x.y profiles and specify library version that corresponds the given profile.

sonarcloud[bot] commented 2 months ago

Quality Gate Passed Quality Gate passed

Issues
0 New issues
0 Accepted issues

Measures
0 Security Hotspots
0.0% Coverage on New Code
0.0% Duplication on New Code

See analysis details on SonarCloud