delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.31k stars 1.64k forks source link

Initialized spark session with “configure_spark_with_delta_pip” stuck #1298

Closed BShraman closed 1 year ago

BShraman commented 1 year ago

Bug

I am working on testing new features for deltalake 2.0 and facing below issue while trying to initialized spark session with “configure_spark_with_delta_pip”. Any advice on what might be missing would be greatly help and appreciated. Spark : 3.2 slack: https://delta-users.slack.com/archives/CJ70UCSHM/p1658850509055239.

Steps to reproduce

import pyspark from delta import *

builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") )

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Observed results

Expected results

Further details

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

dennyglee commented 1 year ago

Thanks @BShraman - will try to repro this locally. Quick question - for .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") by any chance have you tested this locally and still get the same error? Per the Slack thread, wanted to re-iterate that this is happening to delta-core_2.12-2.0.0.jar but not delta-core_2.12-1.2.0.jar. Thanks!

BShraman commented 1 year ago

Thanks @BShraman - will try to repro this locally. Quick question - for .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") by any chance have you tested this locally and still get the same error? Per the Slack thread, wanted to re-iterate that this is happening to delta-core_2.12-2.0.0.jar but not delta-core_2.12-1.2.0.jar. Thanks!

Hi @dennyglee , if i run this locally with ".config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")" it is pulling dependency from maven repository and everything runs without any issue. Fetching dependency directly from maven is blocked in our organization. Alternate option : i am trying with passing delta jar explicitly after downloading locally and this is where i am facing an issue.

dennyglee commented 1 year ago

Based on some quick testing, I believe you may have delta-spark==1.2.0 or lower installed in your environment. Could you try upgrading it to delta-spark==2.0.0 and see if that helps? Note, I'm still investigating another issue related to this.

Details

Running the following commands directly from the local python shell.

Local environment

Local installs using sdkman Spark, Java, and Scala version management and virtual environment for Python version management.

With delta-spark==1.2.0 installed

i.e. with pip install delta-spark==1.2.0

Running with Delta 1.2

import pyspark
from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Result: Works as expected

Running with Delta 2.0

import pyspark
from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Result: Fails with the following error (which is similar to your error)

io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-34819e71-fc19-4aac-8ebd-fab64b77ba27;1.0
    confs: [default]
    found io.delta#delta-core_2.12;2.0.0 in central
    found io.delta#delta-storage;2.0.0 in central
    found org.antlr#antlr4-runtime;4.8 in central
    found org.codehaus.jackson#jackson-core-asl;1.9.13 in spark-list
:: resolution report :: resolve 118ms :: artifacts dl 4ms
    :: modules in use:
    io.delta#delta-core_2.12;2.0.0 from central in [default]
    io.delta#delta-storage;2.0.0 from central in [default]
    org.antlr#antlr4-runtime;4.8 from central in [default]
    org.codehaus.jackson#jackson-core-asl;1.9.13 from spark-list in [default]
    :: evicted modules:
    io.delta#delta-core_2.12;1.2.0 by [io.delta#delta-core_2.12;2.0.0] in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   5   |   0   |   0   |   1   ||   3   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-34819e71-fc19-4aac-8ebd-fab64b77ba27
    confs: [default]
    0 artifacts copied, 3 already retrieved (0kB/3ms)
22/08/07 18:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/07 18:06:40 WARN SparkSession: Cannot use io.delta.sql.DeltaSparkSessionExtension to configure session extensions.

With delta-spark==2.0.0 installed

i.e. with pip install delta-spark==2.0.0

import pyspark
from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Result:: Works as expected

MrPowers commented 1 year ago

@BShraman - I'm happy to help you figure this one out.

Alternate option : i am trying with passing delta jar explicitly after downloading locally and this is where i am facing an issue.

Yea, I don't think you need to pass the Delta JAR in if you've already downloaded locally.

Here's your code snippet with some comments:

import pyspark
# this next line shows that you already have Delta installed.  If you didn't have Delta installed, this line would error out.
from delta import *

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") # I don't think you need this because you already have Delta installed
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Does this work?

import pyspark
from delta import *

builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config('spark.master','local[*]')
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Let me know if this sorts out the issue for you. I am happy to keep helping till you figure this out.

MrPowers commented 1 year ago

This is the draft PR I created to experiment with this BTW. I was getting the same warning errors Denny got when I mixed & matched my locally installed Delta version with a JAR file that had a different version (e.g. when I had Delta 1.2 installed locally and attached the Delta 2.0 JAR file).

santosh-d3vpl3x commented 1 year ago

Could we also ship delta-spark in a self contained fashion similar to pyspark? Having extra dependency on maven is not as seamless in many pyspark dependent orgs.

MrPowers commented 1 year ago

@santosh-d3vpl3x - It's a great idea and that's what's been done. delta-spark is in PyPi and doesn't have a Maven dependency. You can just run pip install delta-spark.

santosh-d3vpl3x commented 1 year ago

@MrPowers Thats good to know. https://docs.delta.io/latest/quick-start.html#python along with https://github.com/delta-io/delta/blob/v2.1.0rc1/python/delta/pip_utils.py#L26-L28 gave me an impression that delta-spark is just a sugar coating over existing flow.

delta-spark is in PyPi and doesn't have a Maven dependency.

On a second thought, do you have some resources handy for this?

MrPowers commented 1 year ago

@santosh-d3vpl3x - Yea, we should make those installation instructions more clear. Here's a blog post I wrote on installing PySpark & Delta Lake with conda. I will try to make more content that'll make it easier to get up-and-running with Delta Lake. Thanks for the comments / feedback.

santosh-d3vpl3x commented 1 year ago

@MrPowers I Spent some time going through the code for delta-spark to understand what it is doing at the moment.

  1. It is indeed a thin wrapper and tool for convenience but not a self complete package like pyspark. configure_spark_with_delta_pip adds maven coordinate to the spark session builder. If you have clean ivy dir/maven dir and disconnected from the internet then the instruction here don't work. 2 . Also to check if we need the configure_spark_with_delta_pip call, excluding configure_spark_with_delta_pip indicates:
    
    import pyspark
    from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()


`Results in java.lang.ClassNotFoundException: io.delta.sql.DeltaSparkSessionExtension`
BShraman commented 1 year ago

@MrPowers

Here are my pip list

image

Spark Configuration :

##################### import pyspark from delta import *

builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.jars","gs://xxxxx/g1/jars/delta-core_2.12-2.0.0.jar") )

spark = configure_spark_with_delta_pip(builder).getOrCreate()

############ import pyspark from delta import *

my_packages = ["io.delta:delta-core_2.12:2.0.0"]

builder = ( pyspark.sql.SparkSession.builder.appName("MyApp") .config('spark.master','local[*]') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") )

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

Both are giving me same error as below :

WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.2.2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /home/xxxxx/.ivy2/cache The jars for the packages stored in: /home/xxxxx/.ivy2/jars io.delta#delta-core_2.12 added as a dependency io.delta#delta-core_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-9f5c2d62-314f-4857-a764-d72116906ec7;1.0 confs: [default]

MrPowers commented 1 year ago

Some users can't access JAR files via Maven. It would be cool if we could upload the JAR files to PyPi and give an option to fetch the relevant JAR from PiPy for users that don't have access to Maven.

FelixQLe commented 1 year ago

I use the following conf, solved the problem, and works perfectly

Create a spark Context class, with custom config

conf = SparkConf()

conf.set('spark.sql.debug.maxToStringFields', 100)

conf.set('spark.default.parallelism', 700) conf.set('spark.sql.shuffle.partitions', 700) conf.set('spark.driver.memory', '30g') conf.set('spark.driver.cores', 8) conf.set('spark.executor.cores', 8) conf.set('spark.executor.memory', '30g') conf.set("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0") conf.set('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') conf.set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')

sc = SparkContext.getOrCreate(conf)

import library

from delta import *

we need to config sparksession

my_packages = ["io.delta:delta-core_2.12:2.0.0"] builder = (pyspark.sql.SparkSession.builder.appName("Myapp")\ .config('spark.master','local[*]'))

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

my spark env: delta-spark 2.0.0 findspark 2.0.1 pyspark 3.2.3 sparksql-magic 0.0.3

trungnghiahoang96 commented 4 months ago

my spark env: delta-spark 2.0.0 pyspark 3.2.2

for anyone have problems with pyspark structure streaming read kafka and write to delta, this is my config:

my_packages = ["io.delta:delta-core_2.12:2.0.0", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2"]

sometimes "my_packages" above is enough but sometimes spark forgot to get dependencies of spark-sql-kafka

so we have another version here

my_packages = [ "io.delta:delta-core_2.12:2.0.0", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2", "org.apache.kafka:kafka-clients:2.8.1", "org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.2.2" ]

builder = ( SparkSession.builder.master("local[*]") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) )

spark = configure_spark_with_delta_pip( builder, extra_packages=my_packages ).getOrCreate()