ionelmc / python-tblib

Serialization library for Exceptions and Tracebacks.
BSD 2-Clause "Simplified" License
165 stars 33 forks source link

incompatibility with concurrent.futures #62

Open nmaxwell opened 4 years ago

nmaxwell commented 4 years ago

using tblib.pickling_support.install() causes an exception when using concurrent.futures.ProcessPoolExecutor. Here is a minimal example:

from concurrent.futures import ProcessPoolExecutor

import tblib.pickling_support

tblib.pickling_support.install()

def foo(x):
    if x > 2:
        raise RuntimeError('bar')

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=2) as executor:

        try:
            list(
                executor.map(foo, range(50))
            )
        except Exception as ex:
            print('exception: ', ex)

    print(' --- done --- ')

I get an exception when the atexit functions run, this is the output:

exception:  bar
 --- done --- 
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/nick/lego-root/lib/python3.7/concurrent/futures/process.py", line 102, in _python_exit
    thread_wakeup.wakeup()
  File "/home/nick/lego-root/lib/python3.7/concurrent/futures/process.py", line 90, in wakeup
    self._writer.send_bytes(b"")
  File "/home/nick/lego-root/lib/python3.7/multiprocessing/connection.py", line 183, in send_bytes
    self._check_closed()
  File "/home/nick/lego-root/lib/python3.7/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

Process finished with exit code 0

commenting out tblib.pickling_support.install() solves the problem, I then get the expected output with no exceptions:

exception:  bar
 --- done --- 

Process finished with exit code 0
ionelmc commented 4 years ago

This doesn't reproduce on my python 3.7.8 (fedora). I suspect it's a python bug and you can reproduce it without tblib if you have more runs.

nmaxwell commented 4 years ago

Hey thanks for the follow up.

So it reproduces on Python 3.8.2 (Ubuntu 20.04). I see you mentioned running on Fedora, I wonder if that could influence it. Below is a Docker example that reproduces on Ubuntu. I'll try Fedora also.

See this docker file Dockerfile.txt (you'll have to rename from 'Dockerfile.txt' to 'Dockerfile', github won't me let upload otherwise)

to build it: docker build -t map_bug .

to run it, without installing pickling support, to show the expected output: docker run -it map_bug python3 test.py

output:

exception:  bar
 --- done --- 

to run it, with picking support, to show the error: docker run -it map_bug python3 test.py bug

output:

install pickling support
exception:  bar
 --- done --- 
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 102, in _python_exit
    thread_wakeup.wakeup()
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 90, in wakeup
    self._writer.send_bytes(b"")
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 183, in send_bytes
    self._check_closed()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
ionelmc commented 4 years ago

Well I've looked at it, it does reproduces and I have no clue what that _python_exit code is for.

Possibly related issues:

Either way, this seems like a benign problem. Contrast that to what happens if you register signal handlers (it deadlocks - https://bugs.python.org/issue29759).

nkoep commented 2 years ago

I did a little bit of digging after noticing that this issue disappears on Python 3.9. So I had a look at what was different between concurrent.futures.process between 3.7 and 3.9. Turns out that backporting the _ThreadWakeup class from Python 3.9 was enough to fix the issue for me:

import multiprocessing as mp
import concurrent.futures.process as pe
from concurrent.futures import ProcessPoolExecutor

import tblib.pickling_support

class _ThreadWakeup:
    def __init__(self):
        self._closed = False
        self._reader, self._writer = mp.Pipe(duplex=False)

    def close(self):
        if not self._closed:
            self._closed = True
            self._writer.close()
            self._reader.close()

    def wakeup(self):
        if not self._closed:
            self._writer.send_bytes(b"")

    def clear(self):
        if not self._closed:
            while self._reader.poll():
                self._reader.recv_bytes()

pe._ThreadWakeup = _ThreadWakeup

tblib.pickling_support.install()

def foo(x):
    if x > 2:
        raise RuntimeError("bar")

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=2) as executor:
        try:
            list(executor.map(foo, range(50)))
        except Exception as ex:
            print("exception: ", ex)

    print(" --- done --- ")

Here's the issue with the pull request that fixed the issue: https://github.com/python/cpython/issues/83285. Hope this helps anyone facing the same issue.