celery / billiard

Multiprocessing Pool Extensions
Other
419 stars 252 forks source link

fix for #249 introduces a crash in warm shutdown #260

Closed daleevans closed 5 years ago

daleevans commented 5 years ago

commit 309f9663663c6dad6d40bf017514695a7c154fd appears to have introduced an issue when doing warm restarts. Now when I send a SIGTERM, I get a crash at line 697 of billiard/pool.py which kills all the workers, including those with jobs in progress. Rolling back to 3.5.0.4 resolves this for me.

worker: Warm shutdown (MainProcess)
[2018-12-05 12:23:44,846: ERROR/MainProcess] Error on stopping Pool: TypeError("can't pickle _thread.lock objects",)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 317, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 593, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 298, in create_loop
    events = poll(poll_timeout)
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/eventio.py", line 84, in poll
    return self._epoll.poll(timeout if timeout is not None else -1)
  File "/usr/local/lib/python3.6/site-packages/celery/apps/worker.py", line 284, in _handle_request
    raise exc(exitcode)
celery.exceptions.WorkerShutdown: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 151, in send_all
    fun(parent, *args)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 373, in stop
    return self.obj.stop()
  File "/usr/local/lib/python3.6/site-packages/celery/concurrency/base.py", line 122, in stop
    self.on_stop()
  File "/usr/local/lib/python3.6/site-packages/celery/concurrency/prefork.py", line 140, in on_stop
    self._pool.join()
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1578, in join
    stop_if_not_current(self._result_handler)
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 143, in stop_if_not_current
    thread.stop(timeout)
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 500, in stop
    self.on_stop_not_started()
  File "/usr/local/lib/python3.6/site-packages/celery/concurrency/asynpool.py", line 332, in on_stop_not_started
    check_timeouts()
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 744, in handle_event
    next(self._it)
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 697, in handle_timeouts
    cache = copy.deepcopy(self.cache)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 169, in deepcopy
    rv = reductor(4)
TypeError: can't pickle _thread.lock objects
darklow commented 5 years ago

I confirm having same error every time I do warm shutdown with SIGTERM.

@daleevans Thank you for spotting this, I spent lots of hours debugging my celery setup and trying to understood what I did wrong as my dev was on previous billiard version and production went with new billiard version, it was driving me nuts why it fails. Saving requirements to billiard==3.5.0.4 fixed error.

patrys commented 5 years ago

It also introduces a slightly different crash if any of the ApplyResults has a schedule set:

RuntimeError: _SimpleQueue objects should only be shared between processes through inheritance
Traceback (most recent call last):
  (...)
  File "/usr/lib/python3.5/copy.py", line 223, in <listcomp>
    y = [deepcopy(a, memo) for a in x]
  File "/usr/lib/python3.5/copy.py", line 174, in deepcopy
    rv = reductor(4)
  File "/usr/local/lib/python3.5/dist-packages/billiard/queues.py", line 348, in __getstate__
    context.assert_spawning(self)
  File "/usr/local/lib/python3.5/dist-packages/billiard/context.py", line 421, in assert_spawning
    ' through inheritance' % type(obj).__name__
patrys commented 5 years ago

@thedrow Does the cache need to be deep-copied? It seems copying unresolved ApplyResults would result in them getting new locks, new job IDs and would likely result in things falling out of sync. If the goal is to make the cache dict itself thread safe then maybe it would be enough to do cache = self.cache.copy()?

patrys commented 5 years ago

That change works for me but I don't know how to reproduce #248 and #249 did not provide any test cases.

patrys commented 5 years ago

@adamdelman Maybe you could help as you were the original reporter of #248?

neg3ntropy commented 5 years ago

Celery still depends on the buggy version as of today (celery 4.2.1 -> billiard<3.6.0,>=3.5.0.2)

WRinnovation commented 5 years ago

Is it the issue resolved now?

hheimbuerger commented 5 years ago

While waiting for the final release of Celery 4.3, can anyone name a good reason not to run Celery 4.2.2 with Billiard 3.6.0.0?