awslabs / python-deequ

Python API for Deequ
Apache License 2.0
713 stars 134 forks source link

Check with assertion as parameter throws "OSError: [Errno 98] Address already in use" #86

Open ammar-nizami opened 2 years ago

ammar-nizami commented 2 years ago

Describe the bug When creating a check which accepts an assertion as a parameter, I get an error "OSError: [Errno 98] Address already in use"

To Reproduce

import sys
from pydeequ.analyzers import *
from pydeequ.checks import *
from pydeequ.verification import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# data points to s3
# data = "s3://*****"

spark = SparkSession.builder.appName("pydeequ_poc").getOrCreate()
print("===== get spark context complete =====")
df = spark.read.parquet(data)
print("===== count: " + str(df.count()) + " =====")
df.printSchema()
verification_run_builder = VerificationSuite(spark).onData(df)
_check_name = "hasSize"
_assertion = "lambda x: x >= 3"
_args = [eval(_assertion)]
print("===== _args["+str(type(_args))+"]: " + str(_args) + " =====")
_check = Check(spark, CheckLevel.Warning, _check_name)
_check_func = getattr(_check, _check_name)
print("===== _check_func["+str(type(_check_func))+"]: " + str(_check_func) + " =====")
_check = _check_func(*_args)
print("===== _check["+str(type(_check))+"]: " + str(_check) + " =====")
verification_run_builder = verification_run_builder.addCheck(_check)
verification_result = verification_run_builder.run()
verification_result_df = verification_result.checkResultsAsDataFrame(spark, verification_result, pandas=False)
verification_result_df.show()

Expected behavior Check is created, and results are generated.

Log

===== count: 2619203 =====
root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)

===== _args[<class 'list'>]: [<function <lambda> at 0xffffa15a2b90>] =====
===== _check_func[<class 'method'>]: <bound method Check.hasSize of <pydeequ.checks.Check object at 0xffffa15b0210>> =====
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2207, in start
OSError: [Errno 98] Address already in use

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/ammar/pydeequ_poc_pyspark.py", line 26, in <module>
    _check = _check_func(*_args)
  File "/usr/local/lib/python3.7/site-packages/pydeequ/checks.py", line 134, in hasSize
    assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
  File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 32, in __init__
    super().__init__(gateway)
  File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 16, in __init__
    self.gateway.start_callback_server()
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1894, in start_callback_server
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2216, in start
py4j.protocol.Py4JNetworkError: An error occurred while trying to start the callback server (127.0.0.1:25334)
21/11/19 13:44:06 INFO SparkContext: Invoking stop() from shutdown hook

Additional context I am submitting the spark job using LIVY from an airflow task. Spark has default configs.

ammar-nizami commented 2 years ago

Same as issue #64

oscarcampos-c commented 2 years ago

I was having the same issue, please make sure you shutdown the spark app before spawning another one. This solved it for me

spark.sparkContext._gateway.shutdown_callback_server()

Then

spark.stop()

Ashokgoa commented 10 months ago

@chenliu0831 do you know what will be the solution for this?

Ashokgoa commented 10 months ago

Hi @chenliu0831 @ammar-nizami @oscarcampos-c How did you handle this issue if you run many spark jobs which is running pydeequ checks at the same time?

in my case only one job is running rest of the other jobs are failing with same issue.

Traceback (most recent call last): File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2207, in start OSError: [Errno 98] Address already in use

During handling of the above exception, another exception occurred: Any solutions or suggestions

Traceback (most recent call last): File "/opt/ammar/pydeequ_poc_pyspark.py", line 26, in _check = _check_func(*_args) File "/usr/local/lib/python3.7/site-packages/pydeequ/checks.py", line 134, in hasSize assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 32, in init super().init(gateway) File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 16, in init self.gateway.start_callback_server() File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1894, in start_callback_server File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2216, in start py4j.protocol.Py4JNetworkError: An error occurred while trying to start the callback server (127.0.0.1:25334) 21/11/19 13:44:06 INFO SparkContext: Invoking stop() from shutdown hook