cenkalti / kuyruk

⚙️ Simple task queue for Python
https://kuyruk.readthedocs.org/
MIT License
231 stars 17 forks source link

if connectionclosed, we die #36

Closed ybrs closed 9 years ago

ybrs commented 9 years ago

I am opening this pull request to discuss the following case. I'm not sure this is the best approach to recover or it fits with the plans of removing the worker etc. or a better way to do this, so please advice.

if somehow connection is interrupted, workers broken, kaput, here's the traceback

(env)$ STORM_SETTINGS_MODULE="chroma.settings" PYTHONPATH=`pwd` kuyruk -m kuyruk_config worker
I 92720 kuyruk.consumer.next:106 - Waiting for new message...
E 92720 pika.adapters.base_connection._handle_read:347 - Read empty data, calling disconnect
W 92720 pika.adapters.base_connection._check_state_on_disconnect:160 - Socket closed when connection was open
Exception in thread Thread-4:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/kuyruk/consumer.py", line 93, in _process_data_events
    self.queue.channel.connection.process_data_events()
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/kuyruk/connection.py", line 32, in process_data_events
    return super(Connection, self).process_data_events()
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events
    if self._handle_read():
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 348, in _handle_read
    super(BlockingConnection, self)._handle_read()
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 348, in _handle_read
    return self._handle_disconnect()
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 248, in _handle_disconnect
    self._adapter_disconnect()
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 318, in _adapter_disconnect
    self._check_state_on_disconnect()
  File "/Users/aybarsbadur/projects/hipo/chroma/api/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 371, in _check_state_on_disconnect
    raise exceptions.ConnectionClosed()
ConnectionClosed

When connection gets lost etc, it doesn't exit, it doesn't consume, so manual intervention needed, someone needs to restart the workers. Simply restarting rabbitmq causes this, worker stays there frozen.

we run workers in supervisor, so the simplest solution i could find was, shooting the process with sigterm. with autorestart=true in supervisor config, it restarts it a few times and everything gets back to normal.

A better approach would be restarting the thread but I couldn't find a good place to add a Queue or similar message passing to worker, and if the workers going to be replaced/removed this is an easier patch.

what do you guys think ?

coveralls commented 9 years ago

Coverage Status

Coverage decreased (-0.18%) when pulling 02cf50b1b72cbc4ae2a129089ac3212989d5f650 on ybrs:master into 18a6646935cb47187e3afa7206f65af60de7ec7f on cenkalti:master.

cenkalti commented 9 years ago

@ybrs This is on purpose. If connection is broken, worker exits. If you run workers under a process manager it should be fine.

By the way, I am working on version 2. I don't want to spend too much effort on this because the new design do not have Consumer class or separate threads.

ybrs commented 9 years ago

@cenkalti unfortunately it doesn't behave like that, it doesn't exit from the process, the consumer thread dies, but main thread lives, so process doesn't exit. You can reproduce the issue simply by restarting rabbitmq while worker is running. We are using my fork because of this issue - don't know why but at ec2 we see disconnections very often - so this fixes it.

Do you recommend switching to version 2 ? Any help you need on it ?

cenkalti commented 9 years ago

I'm sorry, you are right. Your patch is necessary in this case.

Version 2 is not complete yet. I need a week to finish, maybe two. Thanks for helping.