awslabs / python-deequ

Python API for Deequ
Apache License 2.0
713 stars 134 forks source link

Running multiple pydeequ checks at the same time in emr cluster OSError: [Errno 98] Address already in use :(127.0.0.1:25334) #173

Open Ashokgoa opened 10 months ago

Ashokgoa commented 10 months ago

Describe the bug How did you handle this issue if you run many spark jobs which is running pydeequ checks at the same time?

in my case only one job is running rest of the other jobs are failing with this issue when the other jobs are running in the emr cluster.

Traceback (most recent call last): File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2207, in start OSError: [Errno 98] Address already in use

During handling of the above exception, another exception occurred: Any solutions or suggestions

Traceback (most recent call last): File "/opt/ammar/pydeequ_poc_pyspark.py", line 26, in _check = _check_func(*_args) File "/usr/local/lib/python3.7/site-packages/pydeequ/checks.py", line 134, in hasSize assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 32, in init super().init(gateway) File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 16, in init self.gateway.start_callback_server() File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1894, in start_callback_server File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2216, in start py4j.protocol.Py4JNetworkError: An error occurred while trying to start the callback server (127.0.0.1:25334) 21/11/19 13:44:06 INFO SparkContext: Invoking stop() from shutdown hookcise description of what the bug is.

To Reproduce Steps to reproduce the behavior: Run multiple pydeequ checks at the same time in an emr cluster

chenliu0831 commented 10 months ago

Please provide a minimal code snippet if possible. Thank you

Ashokgoa commented 10 months ago

Hi Chenliu,

currenlty i have multiple airlfow jobs which are triggered at the same time for different spark jobs to emr cluster

in each of these airflow dags we added a step for data quality checks and in some cases when these data quality checks are running at the same time in cluster. the error occurs this piece of code: check = Check(spark, CheckLevel.Error, "Integrity checks")

checkResult = VerificationSuite(spark) \ .onData(df) \ .addCheck( check .isComplete("SubID")\ .hasCompleteness("ErrorCode_",lambda x: x < 0.2) \ .hasDataType("JobID",ConstrainableDataTypes.Integral)\ .hasMaxLength("JobID",lambda x: x == 8)\ .hasDataType("ListID",ConstrainableDataTypes.Integral)\ .containsEmail("Emailaddr") .hasMaxLength("ListID",lambda x: x == 5))\ .useRepository(repository) \ .saveOrAppendResult(resultKey) \ .run()

most of the jobs fail with error: Traceback (most recent call last): File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2207, in start OSError: [Errno 98] Address already in use

i can understand that the pydeeque always listens to 25334 port which causing other jobs to fail as they run same time.

please refer to this https://github.com/awslabs/python-deequ/issues/19

chenliu0831 commented 10 months ago

Thanks for providing more details and the related issue, will research this a bit.

Ashokgoa commented 10 months ago

can we implement this to make the dq to use next available not 25334 https://www.py4j.org/advanced_topics.html#using-py4j-without-pre-determined-ports-dynamic-port-number