agoragames / haigha

AMQP Python client
BSD 3-Clause "New" or "Revised" License
160 stars 41 forks source link

Connection blocked by one consumer #65

Open simomo opened 9 years ago

simomo commented 9 years ago

Hi, Sorry for the interruption. I found my server can't receive data/msg from rabbitmq server for several minutes and then it recovered itself.

When I look into the log, I found that one consumer were trying to send data to mobile app through gevent.socket.sendall function, this function blocked.... Several minutes later, this sendall function raise the exception

2014-12-01 22:53:30,350 - root.tcp - ERROR - consumer sendall ERROR! Info from full_stack:

Traceback (most recent call last):

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run

    result = self._run(*self.args, **self.kwargs)

  File "/root/python-workspace/p2g_staging/mq-stage2/rmq.py", line 27, in message_pump

    conn.read_frames()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/connection.py", line 432, in read_frames

    self._transport.process_channels(p_channels)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/transports/transport.py", line 40, in process_channels

    channel.process_frames()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 234, in process_frames

    self.dispatch(frame)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 212, in dispatch

    klass.dispatch(method_frame) 

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/classes/protocol_class.py", line 80, in dispatch

    callback(method_frame)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/classes/basic_class.py", line 203, in _recv_deliver

    func(msg) 

  File "tcp_server.py", line 79, in rmq_msg_handler

    sock.sendall(raw_msg)  ##########  <---- This line is my code :P

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/socket.py", line 458, in sendall

    data_sent += self.send(_get_memory(data, data_sent), flags)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/socket.py", line 443, in send

    return sock.send(data, flags)

error: [Errno 32] Broken pipe

and the same time , this socket's recv function raised the Connection timed out error

2014-12-01 22:53:30,349 - root.tcp - DEBUG - [5] recv error: _role_ None - None : Error: [Errno 110] Connection timed out

So I guess the tcp connection was closed before calling the sendall function, and sendall function was blocked until time out error. When it was blocked, all the other consumers couldn't receive the data from Rabbitmq server. To fix this bug, I'm planning to add the SO_SNDTIMEO option for the sendall function.

_But on haigha's side, could I do something to avoid one blocked consumer blocking the entire Rabbitmq connection? How about running every consumer in a new greenlet or running all consumers on a greenlets pool?_

Any comments would be helpful, many thanks!

simomo commented 9 years ago

Hi, I just hacked the basic_class.py, I changed the line 203 to

if func:
    gevent.spawn(func, msg)

Hope this can fix my issue :)

awestendorf commented 9 years ago

Yup, any "blocking" (loosely defined) function on a consumer should be in another greenlet. There is a GeventPoolTransport to assist with this, though I have not used that in some time and it may need some work. You're likely better off spawning a greenlet on the consumer in your own code.