apache / rocketmq-client-python

Apache RocketMQ python client
https://rocketmq.apache.org/
Apache License 2.0
272 stars 95 forks source link

multiprocessing pool hangs on join #80

Closed 2h4dl closed 4 years ago

2h4dl commented 4 years ago

I'm trying to run some python code in parallel. Once I received a message , I would use multiprocessing.Pool and 'apply_async' to process this message. A moment later, I found multiprocessing pool hangs on join and no messages consumed. With ps -ppid=${pid} command, I found this:

  PID TTY          TIME CMD
  312 pts/42   00:00:05 ConsumeTP
  313 pts/42   00:00:05 ConsumeTP
  314 pts/42   00:00:07 ConsumeTP
  315 pts/42   00:00:00 ConsumeTP
  317 pts/42   00:00:05 ConsumeTP
44417 pts/42   00:00:01 ConsumeTP
44418 pts/42   00:00:02 ConsumeTP
44419 pts/42   00:00:00 ConsumeTP
44425 pts/42   00:00:01 ConsumeTP
44426 pts/42   00:00:00 ConsumeTP

These processes just in there without running. I don't know if this is the reason why the process hangs and can't consume message.

messense commented 4 years ago

Please provide a code snippet, it's hard to guess what the problem is without actual code.

2h4dl commented 4 years ago

Here is an example:

import multiprocessing as mp

from rocketmq.client import Producer
from rocketmq.client import PushConsumer
from rocketmq.client import Message

def message_parse(message_body):
    '''
    Deserialize message content and return a list from this message
    '''
    ... ...
    return a_list

def worker(task):
    """
    Every task is cpu bound.
    """
    try:
        ''' process '''
    except Exception as e:
        pass

    return process_result

def process(message_body):
    task_list = message_parse(message_body)
    running_pool = []
    pool = mp.Pool(processes=5)
    for task in task_list:
        _result = pool.apply_async(worker, args=(task))
        running_pool.append(_result)
    pool.close()
    pool.join()

    result_list = []
    for p in running_pool:
        try:
            result = p.get(timeout=5)
            result_list.append(result)
        except:
            pass

    return str(result)

def callback(rocketmq_msg):
        msg_body = process(rocketmq_msg.body)
        msg = Message(rocketmq_msg.topic)
        msg.set_keys(rocketmq_msg.keys)
        msg.set_body(msg_body)
        producer.send_sync(msg)

May this can help.

messense commented 4 years ago

Once I received a message , I would use multiprocessing.Pool and 'apply_async' to process this message. A moment later, I found multiprocessing pool hangs on join and no messages consumed.

Is there any message received? I'd add some logging/print to find out after which code did it hang.

messense commented 4 years ago

You can also try py-spy to dump the stacktrace when it hangs.

py-spy dump -p <PID>
2h4dl commented 4 years ago

Hi @messense, stacktrace is here with main process:

Process 31197: python3 json_edit_server.py
Python v3.6.8 (/usr/local/python3.6.8/bin/python3.6)

Thread 31197 (idle): "MainThread"
    select (selectors.py:376)
    serve_forever (socketserver.py:236)
    serve_forever (werkzeug/serving.py:734)
    inner (werkzeug/serving.py:966)
    run_simple (werkzeug/serving.py:1009)
    run (flask/app.py:990)
    <module> (json_edit_server.py:261)
Thread 31344 (idle): "Dummy-2"
    _wait_for_tstate_lock (threading.py:1072)
    join (threading.py:1056)
    join (multiprocessing/pool.py:546)
    pic_edit (json_edit_server.py:122)
    process (json_edit_server.py:192)
    _on_message (rocketmq/client.py:385)
Thread 3585 (idle): "Thread-30"
    _handle_workers (multiprocessing/pool.py:406)
    run (threading.py:864)
    _bootstrap_inner (threading.py:916)
    _bootstrap (threading.py:884)
Thread 3586 (idle): "Thread-31"
    wait (threading.py:295)
    get (queue.py:164)
    _handle_tasks (multiprocessing/pool.py:415)
    run (threading.py:864)
    _bootstrap_inner (threading.py:916)
    _bootstrap (threading.py:884)
Thread 3587 (idle): "Thread-32"
    _recv (multiprocessing/connection.py:379)
    _recv_bytes (multiprocessing/connection.py:407)
    recv (multiprocessing/connection.py:250)
    _handle_results (multiprocessing/pool.py:463)
    run (threading.py:864)
    _bootstrap_inner (threading.py:916)
    _bootstrap (threading.py:884)
Thread 31345 (idle): "Dummy-1"
    _wait_for_tstate_lock (threading.py:1072)
    join (threading.py:1056)
    join (multiprocessing/pool.py:546)
    pic_edit (json_edit_server.py:122)
    process (json_edit_server.py:192)
    _on_message (rocketmq/client.py:385)
Thread 7602 (idle): "Thread-33"
    _handle_workers (multiprocessing/pool.py:406)
    run (threading.py:864)
    _bootstrap_inner (threading.py:916)
    _bootstrap (threading.py:884)
Thread 7603 (idle): "Thread-34"
    wait (threading.py:295)
    get (queue.py:164)
    _handle_tasks (multiprocessing/pool.py:415)
    run (threading.py:864)
    _bootstrap_inner (threading.py:916)
    _bootstrap (threading.py:884)
Thread 7604 (idle): "Thread-35"
    _recv (multiprocessing/connection.py:379)
    _recv_bytes (multiprocessing/connection.py:407)
    recv (multiprocessing/connection.py:250)
    _handle_results (multiprocessing/pool.py:463)
    run (threading.py:864)
    _bootstrap_inner (threading.py:916)
    _bootstrap (threading.py:884)

I don't know to to read it. Could u help me?

2h4dl commented 4 years ago

I solved it and this issue has nothing to do with rocketmq. Close it.

Sultan91 commented 4 years ago

Maybe you will share the secret?