noxdafox / pebble

Multi threading and processing eye-candy.
GNU Lesser General Public License v3.0
538 stars 52 forks source link

TypeError: an integer is required (got type NoneType) in pebble/pool/process.py", line 178, in message_manager_loop #58

Closed marxin closed 4 years ago

marxin commented 4 years ago

First, I would like to really thank for the library. I need to create a process pool where I need capability to immediately terminate running tasks. I can't understand why the official concurrent.futures lacks the ability.

However, I see various exceptions when I do pool.stop(): https://github.com/marxin/creduce/blob/threadpool/creduce/utils/testing.py#L317-L319

00:00:01 INFO ===< ClangBinarySearchPass::replace-function-def-with-decl >===
Exception in thread Thread-18:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
...
Traceback (most recent call last):
  File "/tmp/bin/creduce/usr/local/bin/creduce", line 204, in <module>
    reducer.reduce(pass_group, skip_initial=args.skip_initial_passes)
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 118, in reduce
    self._run_additional_passes(pass_group["first"])
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 145, in _run_additional_passes
    self.test_manager.run_pass(p)
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 384, in run_pass
    parallel_tests = self.run_parallel_tests()
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 319, in run_parallel_tests
    pool.join()
  File "/usr/lib/python3.8/site-packages/pebble/pool/base_pool.py", line 77, in join
    self._stop_pool()
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 75, in _stop_pool
    loop.join()
  File "/usr/lib64/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/lib64/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Can you please help me what can I do wrong?

marxin commented 4 years ago

Simple reproducer:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

def run():
    pass

while True:
    with ProcessPool(max_workers=16) as pool:
        print('new ProcessPool')
        futures = []
        for i in range(10):
            futures.append(pool.schedule(run))
        wait(futures, return_when=FIRST_COMPLETED)
        pool.close()
        pool.stop()
        pool.join()
$ ./test.py
new ProcessPool
new ProcessPool
...
new ProcessPool
new ProcessPool
Exception in thread Thread-309:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
noxdafox commented 4 years ago

Hello,

pool.close and pool.terminate are supposed to be used in mutual exclusive way and not together. You use close and join if you want to finish all scheduled jobs before garbage collecting the pool. You use terminate and join if, instead, you want to abruptly stop the ongoing work.

Yet, it should not fail as such. Thank for providing a minimum reproducible example. I will fix this ASAP.

marxin commented 4 years ago

Thank you for the quick answer.

You probably mean pool.stop instead of pool.terminate, right? I can confirm that also stop and join suffers from the problem. Reason why I also added close is that I experimented with that:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

import os

def run(foo):
    return foo

while True:
    with ProcessPool(max_workers=16) as pool:
        print('new ProcessPool')
        futures = []
        for i in range(10):
            futures.append(pool.schedule(run))
        wait(futures, return_when=FIRST_COMPLETED)
        pool.stop()
        pool.join()
$ ./pe.py 
new ProcessPool
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
...

Thanks.

noxdafox commented 4 years ago

I just tried your examples with Python 3.8 and could not reproduce it.

Also, the Pool stopping routine is tested via Travis. If there would be some issue, it would have already shown up. What OS are you using?

marxin commented 4 years ago

I'm using openSUSE Tumbleweed with:

$ python3 --version
Python 3.8.2
$ uname -a
Linux kettlebell 5.6.2-1-default #1 SMP Thu Apr 2 06:31:32 UTC 2020 (c8170d6) x86_64 x86_64 x86_64 GNU/Linux

Note that I've just tested that on 3 different Tumbleweed machines and I've also tested podman run -it fedora /bin/bash and the same.

And I also tested SLES 15:

$ uname -a
Linux kunlun 4.12.14-197.29-default #1 SMP Fri Dec 6 12:08:50 UTC 2019 (ca25711) x86_64 x86_64 x86_64 GNU/Linux
$ cat /etc/SUSE-brand 
SLE
VERSION = 15
$ python3 --version
Python 3.6.10
$ ./pe.py
...
new ProcessPool
Exception in thread Thread-45:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 411, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)

new ProcessPool
...
marxin commented 4 years ago

As explained in the pull request, one needs -s option to see the Exception. Otherwise, it's not presented by pytest. Key set-up of the test-case is to join a ProcessFuture before pool.stop is called.

marxin commented 4 years ago

@hroncok Can you please test in on a bare-metal Fedora system?

hroncok commented 4 years ago

Can you please test in on a bare-metal Fedora system?

What test exactly? And with what pebble (from RPM, from PyPI or from git master)?

marxin commented 4 years ago

What test exactly? And with what pebble (form RPM, from PyPi or from git master)?

The test mentioned in https://github.com/noxdafox/pebble/issues/58#issuecomment-613818867. I would test the latest release either installed with pip or RPM (btw. does Fedora have the Pebble packaged)? Thank you

hroncok commented 4 years ago

does Fedora have the Pebble packaged

I don't know.

[pebtest]$ python3.8 -m venv __venv__
[pebtest]$ . __venv__/bin/activate
(__venv__) [pebtest]$ pip install pebble
Collecting pebble
  Downloading https://files.pythonhosted.org/packages/9c/35/085d244bc261f720e98a62943d639161f2d69aa068168464494ce05a14a4/Pebble-4.5.1-py2.py3-none-any.whl
Installing collected packages: pebble
Successfully installed pebble-4.5.1
WARNING: You are using pip version 19.3.1; however, version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
(__venv__) [pebtest]$ cat > pe.py
#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

import os

def run(foo):
    return foo

while True:
    with ProcessPool(max_workers=16) as pool:
        print('new ProcessPool')
        futures = []
        for i in range(10):
            futures.append(pool.schedule(run))
        wait(futures, return_when=FIRST_COMPLETED)
        pool.stop()
        pool.join()
(__venv__) [pebtest]$ chmod +x pe.py 
(__venv__) [pebtest]$ ./pe.py 
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
Exception in thread Thread-36:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
new ProcessPool
new ProcessPool
Exception in thread Thread-42:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
^CProcess Process-366:
Exception ignored in: <function _after_fork at 0x7f7d3cf5baf0>
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 1442, in _after_fork
    thread._reset_internal_locks(True)
  File "/usr/lib64/python3.8/threading.py", line 811, in _reset_internal_locks
    self._started._reset_internal_locks()
  File "/usr/lib64/python3.8/threading.py", line 511, in _reset_internal_locks
    self._cond.__init__(Lock())
  File "/usr/lib64/python3.8/threading.py", line 225, in __init__
    self._lock = lock
KeyboardInterrupt: 
Traceback (most recent call last):
  File "/usr/lib64/python3.8/multiprocessing/process.py", line 299, in _bootstrap
    util._close_stdin()
  File "/usr/lib64/python3.8/multiprocessing/util.py", line 399, in _close_stdin
    sys.stdin = open(fd, closefd=False)
  File "/usr/lib64/python3.8/_bootlocale.py", line 33, in getpreferredencoding
    def getpreferredencoding(do_setlocale=True):
KeyboardInterrupt
Traceback (most recent call last):
  File "./pe.py", line 16, in <module>
    futures.append(pool.schedule(run))
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 90, in schedule
    self._check_pool_state()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/base_pool.py", line 91, in _check_pool_state
    self._update_pool_state()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/base_pool.py", line 100, in _update_pool_state
    self._start_pool()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 63, in _start_pool
    self._pool_manager.start()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 192, in start
    self.worker_manager.create_workers()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 364, in create_workers
    self.new_worker()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 376, in new_worker
    worker = launch_process(
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/common.py", line 141, in launch_process
    process.start()
  File "/usr/lib64/python3.8/multiprocessing/process.py", line 120, in start
    _cleanup()
  File "/usr/lib64/python3.8/multiprocessing/process.py", line 64, in _cleanup
    if p._popen.poll() is not None:
  File "/usr/lib64/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
marxin commented 4 years ago

Thank you @hroncok. You see exactly the same problem as I do.

noxdafox commented 4 years ago

Managed to reproduce it on one of my environments. I will investigate further, thanks for reporting this.

marxin commented 4 years ago

@noxdafox Any update on this please?

swills commented 4 years ago

I'm seeing this on FreeBSD too. Any update?

noxdafox commented 4 years ago

This issue is similar as to #10 and #20. It is a race condition happening at pool shutdown which produces a noisy crash.

This does not affect the Pool functionality itself so you can use it safely.

I already submitted a fix and will make a new release over the next week-end.

marxin commented 4 years ago

Good job :+1:

marxin commented 4 years ago

I've just rebuilt openSUSE package and the issue is gone. Thanks.

marxin commented 4 years ago

Can you please merge my PR #61 that tests this issue?

noxdafox commented 4 years ago

Issue fixed in release 4.5.2.