noxdafox / pebble

Multi threading and processing eye-candy.
GNU Lesser General Public License v3.0
536 stars 52 forks source link

Pebble and pyspark appear to be incompatible #95

Closed daveauerbach closed 2 years ago

daveauerbach commented 2 years ago

Pebble appears to not work correctly when pyspark is imported.

Note that when a hung python process is interupted, the following stacktrace is observed:

python jyn/test_multi_simple.py 
About to call test function in new process
Test function started, waiting for results
inside test function
^CTraceback (most recent call last):
  File "jyn/test_multi_simple.py", line 16, in <module>
    res = future.result()
  File "/Users/daveauerbach/.pyenv/versions/3.7.10/lib/python3.7/concurrent/futures/_base.py", line 430, in result
    self._condition.wait(timeout)
  File "/Users/daveauerbach/.pyenv/versions/3.7.10/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt

Repro: 1) In a clean environment:

pip install pebble
pip install pyspark

2) Demo code:

import pyspark

from pebble import ProcessPool

EXECUTOR = ProcessPool(max_workers=1)

def temp():
    print("inside test function")

print("About to call test function in new process")
future = EXECUTOR.schedule(temp)
print("Test function started, waiting for results")
res = future.result()
print("Test success")
daveauerbach commented 2 years ago

Interestingly, if the pebble import happens first, the failure doesn't occur. However, this can't always be guaranteed if pebble is being used within library code that the spark code is using.

noxdafox commented 2 years ago

I am afraid pyspark and Python multiprocessing don't go along together.

Reason is Spark itself is a platform designed to distribute workload onto multiple nodes and therefore it does not account the use of native multiprocessing. Moreover, where would you expect your processes to be spawned? On the driver or on the workers themselves? How would you instruct Spark to do so?

To reinforce the thesis, this is the result with standard multiprocessing pool.

import pyspark

from multiprocessing import Pool

EXECUTOR = Pool()

def temp():
    print("inside test function")

print("About to call test function in new process")
future = EXECUTOR.apply_async(temp)
print("Test function started, waiting for results")
res = future.get()
print("Test success")

Running it results in a visible crash followed by the process hanging exactly as for Pebble.

$ python test.py
About to call test function in new process
Test function started, waiting for results
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/usr/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'temp' on <module '__main__' from '/home/noxdafox/test.py'>
noxdafox commented 2 years ago

Closing this issue, please re-open it again in case of further discussion.