celery / billiard

Multiprocessing Pool Extensions
Other
419 stars 252 forks source link

In the billiard==3.5.0.5 version, the celery worker process is closed! TypeError: can't pickle thread.lock objects #261

Closed yoncan closed 5 years ago

yoncan commented 5 years ago

When the project is in use, in the celery task, the subscription signal worker_shutting_down is processed before the worker is closed, and an additional task s_worker_shutting_down is added to process the task that the current worker has not completed.

s_worker_shutting_down inside only made a connection redis to write a piece of data!

@signals.worker_shutdown.connect
def s_worker_shutdown(*args, **kwargs):
    """
    :param args:
    :param kwargs:
    :return:
    """
    log.warn('worker_shutdown')

@signals.worker_shutting_down.connect
def s_worker_shutting_down(*args, **kwargs):
    """
    :param args:
    :param kwargs:
    :return:
    """
    log.warn(u'[cd-worker] Receiving signal [worker_shutting_down]'
             u'**** Will actively close all unconfirmed tasks')
    redis.set('__celery_worker_shutdown', 1)
    time.sleep(3)

Then in the process of closing the worker's log below, as long as there are unfinished tasks, the following exception will occur:

[2018-12-20 10:38:11,344: DEBUG/MainProcess] | Worker: Closing Hub...
[2018-12-20 10:38:11,345: DEBUG/MainProcess] | Worker: Closing Pool...
[2018-12-20 10:38:11,345: DEBUG/MainProcess] | Worker: Closing Consumer...
[2018-12-20 10:38:11,346: DEBUG/MainProcess] | Worker: Stopping Consumer...
[2018-12-20 10:38:11,346: DEBUG/MainProcess] | Consumer: Closing Connection...
[2018-12-20 10:38:11,346: DEBUG/MainProcess] | Consumer: Closing Events...
[2018-12-20 10:38:11,347: DEBUG/MainProcess] | Consumer: Closing Heart...
[2018-12-20 10:38:11,347: DEBUG/MainProcess] | Consumer: Closing Mingle...
[2018-12-20 10:38:11,347: DEBUG/MainProcess] | Consumer: Closing Tasks...
[2018-12-20 10:38:11,348: DEBUG/MainProcess] | Consumer: Closing Control...
[2018-12-20 10:38:11,348: DEBUG/MainProcess] | Consumer: Closing Gossip...
[2018-12-20 10:38:11,348: DEBUG/MainProcess] | Consumer: Closing event loop...
[2018-12-20 10:38:11,349: DEBUG/MainProcess] | Consumer: Stopping event loop...
[2018-12-20 10:38:11,349: DEBUG/MainProcess] | Consumer: Stopping Gossip...
[2018-12-20 10:38:11,352: DEBUG/MainProcess] | Consumer: Stopping Control...
[2018-12-20 10:38:11,354: DEBUG/MainProcess] | Consumer: Stopping Tasks...
[2018-12-20 10:38:11,355: DEBUG/MainProcess] Canceling task consumer...
[2018-12-20 10:38:11,355: DEBUG/MainProcess] | Consumer: Stopping Mingle...
[2018-12-20 10:38:11,355: DEBUG/MainProcess] | Consumer: Stopping Heart...
[2018-12-20 10:38:11,357: DEBUG/MainProcess] | Consumer: Stopping Events...
[2018-12-20 10:38:11,357: DEBUG/MainProcess] | Consumer: Stopping Connection...
[2018-12-20 10:38:11,358: DEBUG/MainProcess] | Worker: Stopping Pool...

> [2018-12-20 10:38:11,359: WARNING/MainProcess] self.cache: {0: <%s: 0 ack:True ready:False>}
> [2018-12-20 10:38:11,359: WARNING/MainProcess] key==0, val==<%s: 0 ack:True ready:False> , type== <class 'billiard.pool.ApplyResult'>

[2018-12-20 10:38:11,363: ERROR/MainProcess] Error on stopping Pool: TypeError("can't pickle thread.lock objects",)
Traceback (most recent call last):
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/celery/bootsteps.py", line 151, in send_all
    fun(parent, *args)
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in stop
    return self.obj.stop()
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/celery/concurrency/base.py", line 122, in stop
    self.on_stop()
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 140, in on_stop
    self._pool.join()
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/billiard/pool.py", line 1584, in join
    stop_if_not_current(self._result_handler)
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/billiard/pool.py", line 146, in stop_if_not_current
    thread.stop(timeout)
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/billiard/pool.py", line 503, in stop
    self.on_stop_not_started()
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 332, in on_stop_not_started
    check_timeouts()
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/billiard/pool.py", line 750, in handle_event
    next(self._it)
  File "/opt/edsp-assistant/python/lib/python2.7/site-packages/billiard/pool.py", line 703, in handle_timeouts
    cache = copy.deepcopy(self.cache)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 264, in _deepcopy_method
    return type(x)(x.im_func, deepcopy(x.im_self, memo), x.im_class)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 237, in _deepcopy_tuple
    y.append(deepcopy(a, memo))
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 237, in _deepcopy_tuple
    y.append(deepcopy(a, memo))
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/edsp-assistant/python/lib/python2.7/copy.py", line 182, in deepcopy
    rv = reductor(2)
TypeError: can't pickle thread.lock objects
[2018-12-20 10:38:11,367: DEBUG/MainProcess] | Worker: Stopping Hub...
[2018-12-20 10:38:11,368: DEBUG/MainProcess] | Consumer: Shutdown Gossip...
[2018-12-20 10:38:11,368: DEBUG/MainProcess] | Consumer: Shutdown Control...
[2018-12-20 10:38:11,369: DEBUG/MainProcess] | Consumer: Shutdown Tasks...
[2018-12-20 10:38:11,369: DEBUG/MainProcess] Canceling task consumer...
[2018-12-20 10:38:11,369: DEBUG/MainProcess] Closing consumer channel...
[2018-12-20 10:38:11,370: DEBUG/MainProcess] | Consumer: Shutdown Heart...
[2018-12-20 10:38:11,370: DEBUG/MainProcess] | Consumer: Shutdown Events...
[2018-12-20 10:38:11,371: DEBUG/MainProcess] | Consumer: Shutdown Connection...
--------------------------------------------------------------------------------
WARNING in tasks [/data/cd/edsp_cd/tasks/tasks.py:83]:
worker_shutdown
--------------------------------------------------------------------------------
[2018-12-20 10:38:11,373: DEBUG/MainProcess] removing tasks from inqueue until task handler finished

The problem arises because:

In the 3.5.0.4 version of billiard, copy.deepcopy is not used to copy the self.cache object.

# billiard/pool.py
     .....

 696     def handle_timeouts(self):
 697         cache = self.cache         <<<<<<<<<<< here
 698         t_hard, t_soft = self.t_hard, self.t_soft
 699         dirty = set()
 700         on_soft_timeout = self.on_soft_timeout
 701         on_hard_timeout = self.on_hard_timeout

However, in billiard==3.5.0.5, copy.deepcopy is used, resulting in an error.

 696     def handle_timeouts(self):
 697         cache = copy.deepcopy(self.cache)    <<<<<<<<<<< here
 698         t_hard, t_soft = self.t_hard, self.t_soft
 699         dirty = set()
 700         on_soft_timeout = self.on_soft_timeout
 701         on_hard_timeout = self.on_hard_timeout
 702