awslabs / python-deequ

Python API for Deequ
Apache License 2.0
713 stars 134 forks source link

Spark version detection prevents startup of new spark sessions #111

Closed peter-mcclonski closed 1 year ago

peter-mcclonski commented 1 year ago

Describe the bug With the newest commit giving support to Spark 3.1/3.2, pydeequ uses a new method for detecting the (py)spark version. By spawning a new subprocess within the Spark JVM, and not appropriately cleaning up the created SparkContext, subsequent calls to SparkSession.builder.getOrCreate() fail as it attempts to create a secondary SparkContext. This bug was not present historically, when the Spark version was detected through an environment variable.

Error output:

22/11/18 14:47:33 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark:7077...
22/11/18 14:47:33 INFO TransportClientFactory: Successfully created connection to spark/[removed]:7077 after 35 ms (0 ms spent in bootstraps)
22/11/18 14:47:34 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20221118144734-0000
22/11/18 14:47:34 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39167.
22/11/18 14:47:34 INFO NettyBlockTransferService: Server created on spark:39167
22/11/18 14:47:34 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/11/18 14:47:34 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark, 39167, None)
22/11/18 14:47:34 INFO BlockManagerMasterEndpoint: Registering block manager spark:39167 with 434.4 MiB RAM, BlockManagerId(driver, spark, 39167, None)
22/11/18 14:47:34 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark, 39167, None)
22/11/18 14:47:34 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark, 39167, None)
22/11/18 14:47:34 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
Traceback (most recent call last):
  File "/opt/spark/jobs/pipelines/[removed].py", line 66, in <module>
    [removed]
  File "/usr/local/lib/python3.9/site-packages/[removed].py", line 32, in __init__
    [removed]
  File "/usr/local/lib/python3.9/site-packages/[removed].py", line 53, in __init__
    [removed]
  File "/usr/local/lib/python3.9/site-packages/[removed].py", line 41, in __init__
   [removed]
  File "/usr/local/lib/python3.9/site-packages/[removed].py", line 50, in create_spark_session
    SparkSession.builder.appName(app_name)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 228, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 392, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 146, in __init__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 209, in _do_init
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 329, in _initialize_context
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1585, in __call__
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Unknown Source)
    at org.apache.spark.SparkContext$.$anonfun$assertNoOtherContextIsRunning$2(SparkContext.scala:2647)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2644)
    at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2734)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:95)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Unknown Source)

22/11/18 14:47:35 INFO SparkContext: Invoking stop() from shutdown hook

To Reproduce On a Spark 3.2.2 (likely 3.x) installation:

Expected behavior PySpark won't crash when I try to make a spark session.

Screenshots If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

peter-mcclonski commented 1 year ago

Specifically tagging @lecardozo for visibility. Issue seems to be here:

https://github.com/awslabs/python-deequ/blob/7ec9f6f72839779f370a4753c0db26d6cf052203/pydeequ/configs.py#L22

My suspicion is that the subprocess doesn't fully close out the spark context, or at least it isn't garbage collected quickly enough, and the SparkSession builder somehow isn't able to find it across process boundaries? Not entirely sure, but replacing this with the following snippet works for me locally:

def _get_spark_version() -> str:
    # Get version from a subprocess so we don't mess up with existing SparkContexts.
    session = SparkSession.builder.getOrCreate()
    version = session.version
    spark_version = version[:-2]
    session.stop()
    return spark_version
lecardozo commented 1 year ago

Thanks for the heads up, @pajablons! 😃

My suspicion is that the subprocess doesn't fully close out the spark context, or at least it isn't garbage collected quickly enough, and the SparkSession builder somehow isn't able to find it across process boundaries?

That might be what's happening. I opted for this "weird" subprocess-based solution because tests were failing with a solution similar to yours. Tests failed because, when the tests tried setting up a new context, actually just recycled the context that was already created inside _get_spark_version. As this was created without the deequ_maven_coord configuration, the dependencies were not downloaded, breaking the tests.

I have a feeling that your solution works in this case because you are just recycling the already created session (i.e. just getting, instead of creating). 🤔 Can you confirm to me if you could properly run tests with your suggested approach?

We might need to get back to the previous, explicit, approach of using the SPARK_VERSION. That way we can make sure that we do not interfere with users SparkContext creation.


On an additional note: I feel this _get_spark_version is a weird way to make the life of the user better, by telling it exactly the Maven coordinate that should be installed at runtime. I personally don't use the deequ_maven_config in production, though I agree it's useful for local testing.

lecardozo commented 1 year ago

Would you mind trying out the revert from this branch?

pip install https://github.com/lecardozo/python-deequ/archive/fix/get-spark-version.zip

Let me know if this is working for you, or if you have any suggestions. 😃

peter-mcclonski commented 1 year ago

That revert works perfectly for me, with one little detail caveat-- It's not completely future proof (which isn't a new thing or a big deal). This line:

https://github.com/lecardozo/python-deequ/blob/198c923891cff89230e12b8b92b28e524560c31f/pydeequ/configs.py#L27

Will obviously break if by some miracle Apache decides to drop spark 3.10.0 on us or something. It's one of those super benign things that is probably never going to be an issue, but it's worth noting I think.

chenliu0831 commented 1 year ago

Closing after the revert. Thanks everyone for contributing and reporting.