mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
https://mars-project.readthedocs.io
Apache License 2.0
2.7k stars 326 forks source link

[BUG] Print in remote functions leads to gevent error: cannot switch to a different thread #2021

Closed qinxuye closed 3 years ago

qinxuye commented 3 years ago

Describe the bug

Print in remote functions leads to gevent error: cannot switch to a different thread.

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version
  2. The version of Mars you use
  3. Versions of crucial packages, such as numpy, scipy and protobuf
  4. Full stack of the error.
  5. Minimized code to reproduce the error.
2021-03-05 15:28:48,690 mars.scheduler.operands.common 335 ERROR    Attempt 1: Unexpected error error occurred in executing operand 7518caeab11c4a18bcce34bc13a3cd0f in 11.28.217.38:28254
Traceback (most recent call last):
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/promise.py", line 100, in _wrapped
    result = func(*args, **kwargs)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/worker/calc.py", line 301, in <lambda>
    .then(lambda context_dict: _start_calc(context_dict)) \
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/worker/calc.py", line 276, in _start_calc
    return self._calc_results(session_id, graph_key, graph, context_dict, chunk_targets)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/utils.py", line 383, in _wrapped
    return func(*args, **kwargs)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/worker/calc.py", line 200, in _calc_results
    chunk_targets, retval=False).result()
  File "src/gevent/event.py", line 383, in gevent._gevent_cevent.AsyncResult.result
  File "src/gevent/event.py", line 305, in gevent._gevent_cevent.AsyncResult.get
  File "src/gevent/event.py", line 335, in gevent._gevent_cevent.AsyncResult.get
  File "src/gevent/event.py", line 323, in gevent._gevent_cevent.AsyncResult.get
  File "src/gevent/event.py", line 303, in gevent._gevent_cevent.AsyncResult._raise_exception
  File "/opt/conda/lib/python3.7/site-packages/gevent/_compat.py", line 65, in reraise
    raise value.with_traceback(tb)
  File "/opt/conda/lib/python3.7/site-packages/gevent/threadpool.py", line 142, in __run_task
    thread_result.set(func(*args, **kwargs))
  File "mars/actors/pool/gevent_pool.pyx", line 201, in mars.actors.pool.gevent_pool.GeventThreadPool._wrap_watch.inner
    result = fn(*args, **kwargs)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/executor.py", line 698, in execute_graph
    res = graph_execution.execute(retval)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/executor.py", line 579, in execute
    future.result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 435, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/utils.py", line 457, in _inner
    return func(*args, **kwargs)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/executor.py", line 446, in _execute_operand
    self.handle_op(first_op, results, self._mock)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/executor.py", line 378, in handle_op
    return Executor.handle(*args, **kw)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/executor.py", line 649, in handle
    return runner(results, op)
  File "/home/admin/work/_public-pyodps-0.10.6.zip/odps/mars_extension/core.py", line 465, in wrapper
    f(ctx, op)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/custom_log.py", line 128, in wrap
    return func(cls, ctx, op)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/utils.py", line 1146, in wrapped
    result = func(cls, ctx, op)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/remote/core.py", line 218, in execute
    result = function(*function_args, **function_kwargs)
  File "<ipython-input-323-6fb7c5eb488f>", line 5, in make_feature_new
  File "/opt/conda/lib/python3.7/site-packages/autoforecast/modules/feature_engineer/feature_engine.py", line 182, in get_feature_df
    sub_feature_df = self.feature_map[i](feature_all[i])
  File "/opt/conda/lib/python3.7/site-packages/autoforecast/modules/feature_engineer/feature_engine.py", line 74, in <lambda>
    lambda x: VolumeFeatureWeibull(o, loader_table, config, x).volume_weibull()
  File "/opt/conda/lib/python3.7/site-packages/autoforecast/modules/feature_engineer/traffic_weibull.py", line 189, in volume_weibull
    hl_res = self.volume_hl()
  File "/opt/conda/lib/python3.7/site-packages/autoforecast/modules/feature_engineer/traffic_weibull.py", line 151, in volume_hl
    df = self.get_volume()
  File "/opt/conda/lib/python3.7/site-packages/autoforecast/modules/feature_engineer/traffic_weibull.py", line 115, in get_volume
    fur_vol, _ = VolCalc(self.o, self.loader_table, self.config, self.config_feature).fur_volume()
  File "/opt/conda/lib/python3.7/site-packages/autoforecast/modules/feature_engineer/input_media.py", line 321, in fur_volume
    fur_spending = self.get_fur_spending()
  File "/opt/conda/lib/python3.7/site-packages/autoforecast/modules/feature_engineer/input_media.py", line 273, in get_fur_spending
    print('--- Get future spending ---')
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/custom_log.py", line 97, in write
    self.stdout.write(data)
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/custom_log.py", line 90, in write
    self._register_log_path()
  File "/home/admin/work/_public-mars-0.6.4.zip/mars/custom_log.py", line 85, in _register_log_path
    worker_addr, log_path)
  File "mars/actors/core.pyx", line 65, in mars.actors.core.ActorRef.__getattr__._mt_call
    return self.send((item,) + args + (kwargs,), wait=wait)
  File "mars/actors/core.pyx", line 37, in mars.actors.core.ActorRef.send
    return self._ctx.send(self, message, wait=wait, callback=callback)
  File "mars/actors/pool/gevent_pool.pyx", line 683, in mars.actors.pool.gevent_pool.ActorRemoteHelper.send
    cpdef send(self, ActorRef actor_ref, object message, bint wait=True, object callback=None):
  File "mars/actors/pool/gevent_pool.pyx", line 684, in mars.actors.pool.gevent_pool.ActorRemoteHelper.send
    return self._send(actor_ref, message, wait_response=True, wait=wait, callback=callback)
  File "mars/actors/pool/gevent_pool.pyx", line 678, in mars.actors.pool.gevent_pool.ActorRemoteHelper._send
    return self._pool.apply(self._send_remote, (actor_ref.address, binaries))
  File "/opt/conda/lib/python3.7/site-packages/gevent/pool.py", line 161, in apply
    return self.spawn(func, *args, **kwds).get()
  File "src/gevent/greenlet.py", line 795, in gevent._gevent_cgreenlet.Greenlet.get
  File "src/gevent/greenlet.py", line 364, in gevent._gevent_cgreenlet.Greenlet._raise_exception
  File "/opt/conda/lib/python3.7/site-packages/gevent/_compat.py", line 65, in reraise
    raise value.with_traceback(tb)
  File "src/gevent/greenlet.py", line 854, in gevent._gevent_cgreenlet.Greenlet.run
  File "mars/actors/pool/gevent_pool.pyx", line 548, in mars.actors.pool.gevent_pool.ActorRemoteHelper._send_remote
    cpdef object _send_remote(self, str address, object binary):
  File "mars/actors/pool/gevent_pool.pyx", line 552, in mars.actors.pool.gevent_pool.ActorRemoteHelper._send_remote
    with self._new_connection(address) as sock:
  File "mars/actors/pool/gevent_pool.pyx", line 555, in mars.actors.pool.gevent_pool.ActorRemoteHelper._send_remote
    res_binary = read_remote_message(sock.recv)
  File "mars/actors/pool/messages.pyx", line 847, in mars.actors.pool.messages.read_remote_message
    read_bytes = _wrap_read_func(read_func, 8)
  File "mars/actors/pool/messages.pyx", line 830, in mars.actors.pool.messages._wrap_read_func
    read_bytes = read_func(size)
  File "/opt/conda/lib/python3.7/site-packages/gevent/_socket3.py", line 454, in recv
    self._wait(self._read_event)
  File "src/gevent/_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
  File "src/gevent/_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
  File "src/gevent/_hub_primitives.py", line 304, in gevent._gevent_c_hub_primitives._primitive_wait
  File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_waiter.py", line 151, in gevent._gevent_c_waiter.Waiter.get
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 65, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_gevent_c_greenlet_primitives.pxd", line 35, in gevent._gevent_c_greenlet_primitives._greenlet_switch
greenlet.error: cannot switch to a different thread
wjsi commented 3 years ago

The cause is that when executing another tileable inside a remote call, the ecalc process yield to that execution while sys.stdout kept rewritten. When another remote call is executed and print is called, two wrapped stdout objects created in different threads will be called inside one thread, which causes the problem.