celery / py-amqp

amqplib fork
Other
308 stars 194 forks source link

Getting OSError: Socket closed against new RabbitMQ server 3.6.2 (on 2.0.3) #101

Open jakeczyz opened 8 years ago

jakeczyz commented 8 years ago

About 60 seconds after starting a channel.basic_consume followed by a channel.wait(), the connection closes with a Socket Closed error, as here:

Traceback (most recent call last): File "./rmqtools.py", line 437, in run_self_test_loop qh.channel[handle].wait(None) File "/usr/local/lib/python3.4/site-packages/amqp/abstract_channel.py", line 91, in wait self.connection.drain_events(timeout=timeout) File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 436, in drain_events return self.blocking_read(timeout) File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 440, in blocking_read frame = self.transport.read_frame() File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 221, in read_frame frame_header = read(7, True) File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 369, in _read raise IOError('Socket closed') OSError: Socket closed

On the server, the message logged is:
=INFO REPORT==== 12-Jul-2016::19:09:02 === accepting AMQP connection <0.9396.19> (172....1:19638 -> 172....5:5672)

=WARNING REPORT==== 12-Jul-2016::19:10:01 === closing AMQP connection <0.9396.19> (172...1:19638 -> 172....5:5672): client unexpectedly closed TCP connection

Here's something that may be relevant: https://www.rabbitmq.com/heartbeats.html. However, the connection shows a negotiated heartbeat (Connection.heartbeat) of 0, and, when changing it to, say, 5 during connection creation, the same above error happens after 3x the chosen heartbeat (e.g. 15 sec.).

Relatedly, the channel.wait() method is no longer documented in the web docs. Is there a different method that should be called after registering a callback with basic_callback?

A colleague reports this on the 1.4.6 client too.

Thanks!

ask commented 8 years ago

I'll be looking into this, but it's now better to use connection.drain_events(timeout) instead of channel wait. This allows you to consume events from multiple channels at a time, something that was missing from the original amqplib.

ask commented 8 years ago

Btw, I'm not seeing this here, even with channel.wait(None). It would be very helpful if you could write a short snippet that reproduces the issue! I guess you should try using connection.drain_events first though, as I don't think there is much use in having a separate channel.wait().

jakeczyz commented 8 years ago

Thanks. Sure, here's a snippet of how to reproduce the error.

https://gist.github.com/jakeczyz/78ba1d5021b688a544b7cc8a22fefb65

It happens with both method calls. Note that it's not happening with an older RabbitMQ server. One other minor note is that the new server is actually a cluster. But, regular gets and publishing (and other cluster behavior) is fine.

Any advice or pointers would be highly appreciated.

jakeczyz commented 8 years ago

After more investigation, I believe this problem was due to client TCP timeout in the load balancer used for my RabbitMQ cluster. I'll post more details after I can confirm my fix, but wanted to let you know right away so as not to waste your time.

jakeczyz commented 8 years ago

Okay, I have more information on this in case it helps someone in the future. This is knowledge gained (and likely bugs discovered) after about 15 man-hours of beating my head against this. The problem seems to be (partly) caused by Amazon EC2 ELB idle timeout. This is the timeout on the load balancer that will drop any connection (including resetting TCP ones) after too long without any traffic. On AWS/EC2 ELBs the value can be set between 1 sec and 1 hour. This is a low limit if the code that uses rabbitmq registers a callback and then wants to wait()/drain_events() in perpetuity (my use case).

According to digging around, it seems that at least two things should each solve this (1) TCP keepalives or (2) the RabbitMQ heartbeat feature.

It seems to me there are two bugs conspiring against these solutions. First, unfortunately, even after enabling TCP keepalives on both sides (client and each instance on LB) and observing them in action via netstat and tcpdump, the Amazon ELB always dropps the connection after the timeout passes (yielding the OSError Socket Closed on the client as in the original post). I should probably reach out to AWS about this.

Second, although it's annoying, from my understanding, the only way to use the heartbeat feature with the basic_consume-->drain_events[blocking indefinitely] approach is to have a second thread/process that will send the heartbeats (if there's a better approach, please let me know). I tried this, and hit upon what seems like another bug. When using the connection.heartbeat_tick() method call, and printing the connection.last_heartbeat_received and last_heartbeat_sent times, the server only sends one heartbeat and the connection eventually gets closed due to missed heartbeats. Here's an example with heartbeat=5 and where I call the heartbeat_tick method every 2 seconds and print conn.lastheartbeat{received,sent}:

[2016-07-15T01:58:00.097830] DEBUG: negotiated conn.heartbeat: 5 [2016-07-15T01:58:02.100436] DEBUG: heartbeat_tick 1; last rec:41866; last sent:41866 [2016-07-15T01:58:04.102891] DEBUG: heartbeat_tick 2; last rec:41866; last sent:41866 [2016-07-15T01:58:06.105276] DEBUG: heartbeat_tick 3; last rec:41866; last sent:41866 [2016-07-15T01:58:08.107885] DEBUG: heartbeat_tick 4; last rec:41866; last sent:41872 [2016-07-15T01:58:10.110449] DEBUG: heartbeat_tick 5; last rec:41866; last sent:41874 Process Process-13: Traceback (most recent call last): ... File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 663, in heartbeat_tick raise ConnectionForced('Too many heartbeats missed') amqp.exceptions.ConnectionForced: Too many heartbeats missed

As you can see, the server only sends one heartbeat at time 41866 and never again. I tried several different timings with higher heartbeat values with similar results.

The final thing that worked for me was to use connection.send_heartbeat() instead of heartbeat_tick(). If I call that method (in a subprocess) every couple seconds (with or without heartbeats actually turned on for the connection!), the ticks from the client are enough to keep my connection alive from the amqp heartbeats perspective and also keep the ELB client idle timeout at bay. Thanks for reading this far. :)

odedfos commented 6 years ago

I've encountered the same problem when celery (3.1.25) broker has heartbeats enabled and worker is idle (empty queue). The RabbitMQ eventually closes the connection because connection is idle and heartbeats aren't sent. Is there any plan to fix this in celery/kombu/py-amqp?

KiranKVasu commented 5 years ago

python : 3.7 OS: windows10 service: rabbitmq (version 3.7.14)

facing error

OSError: Server unexpectedly closed connection Removing descriptor: 920

also this queue is not available post this error message:

celery@HOSTNAME.celery.pidbox 0

could anyone suggest any fix/ workaround to get rid of this error

Greenev commented 5 years ago

I had these warnings when using celery 4.3.0 with --pool=eventlet, RabbitMQ 3.6.5 Adding connection.heartbeat_check() in celery/celery/worker/pidbox.py resolved the issue for me, see at stackoverflow Please, check if it is a correct solution of a problem?

2019-08-12 14:22:48,564: INFO worker_dummy@dummy-celery ready.
2019-08-12 14:22:50,077: INFO Events of group {task} enabled by remote.
2019-08-12 14:25:48,572: WARNING Traceback (most recent call last):
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/eventlet/hubs/poll.py", line 111, in wait
    listener.cb(fileno)
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/celery/worker/pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/kombu/connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/connection.py", line 500, in drain_events
    while not self.blocking_read(timeout):
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/connection.py", line 505, in blocking_read
    frame = self.transport.read_frame()
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/transport.py", line 252, in read_frame
    frame_header = read(7, True)
2019-08-12 14:25:48,574: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/transport.py", line 444, in _read
    raise IOError('Server unexpectedly closed connection')
2019-08-12 14:25:48,574: WARNING OSError: Server unexpectedly closed connection
2019-08-12 14:25:48,574: WARNING Removing descriptor: 9
tvallois commented 3 years ago

Okay, I have more information on this in case it helps someone in the future. This is knowledge gained (and likely bugs discovered) after about 15 man-hours of beating my head against this. The problem seems to be (partly) caused by Amazon EC2 ELB idle timeout. This is the timeout on the load balancer that will drop any connection (including resetting TCP ones) after too long without any traffic. On AWS/EC2 ELBs the value can be set between 1 sec and 1 hour. This is a low limit if the code that uses rabbitmq registers a callback and then wants to wait()/drain_events() in perpetuity (my use case).

According to digging around, it seems that at least two things should each solve this (1) TCP keepalives or (2) the RabbitMQ heartbeat feature.

It seems to me there are two bugs conspiring against these solutions. First, unfortunately, even after enabling TCP keepalives on both sides (client and each instance on LB) and observing them in action via netstat and tcpdump, the Amazon ELB always dropps the connection after the timeout passes (yielding the OSError Socket Closed on the client as in the original post). I should probably reach out to AWS about this.

Second, although it's annoying, from my understanding, the only way to use the heartbeat feature with the basic_consume-->drain_events[blocking indefinitely] approach is to have a second thread/process that will send the heartbeats (if there's a better approach, please let me know). I tried this, and hit upon what seems like another bug. When using the connection.heartbeat_tick() method call, and printing the connection.last_heartbeat_received and last_heartbeat_sent times, the server only sends one heartbeat and the connection eventually gets closed due to missed heartbeats. Here's an example with heartbeat=5 and where I call the heartbeat_tick method every 2 seconds and print conn.lastheartbeat{received,sent}:

[2016-07-15T01:58:00.097830] DEBUG: negotiated conn.heartbeat: 5 [2016-07-15T01:58:02.100436] DEBUG: heartbeat_tick 1; last rec:41866; last sent:41866 [2016-07-15T01:58:04.102891] DEBUG: heartbeat_tick 2; last rec:41866; last sent:41866 [2016-07-15T01:58:06.105276] DEBUG: heartbeat_tick 3; last rec:41866; last sent:41866 [2016-07-15T01:58:08.107885] DEBUG: heartbeat_tick 4; last rec:41866; last sent:41872 [2016-07-15T01:58:10.110449] DEBUG: heartbeat_tick 5; last rec:41866; last sent:41874 Process Process-13: Traceback (most recent call last): ... File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 663, in heartbeat_tick raise ConnectionForced('Too many heartbeats missed') amqp.exceptions.ConnectionForced: Too many heartbeats missed

As you can see, the server only sends one heartbeat at time 41866 and never again. I tried several different timings with higher heartbeat values with similar results.

The final thing that worked for me was to use connection.send_heartbeat() instead of heartbeat_tick(). If I call that method (in a subprocess) every couple seconds (with or without heartbeats actually turned on for the connection!), the ticks from the client are enough to keep my connection alive from the amqp heartbeats perspective and also keep the ELB client idle timeout at bay. Thanks for reading this far. :)

Hello,

I don't know if we should continue to write in this issue but our team is almost in the same position as the one described above.

import logging
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection 

def callback(body, message):
    logging.debug("before")
    sleep(300)
    logging.debug("after")
    message.ack()

if __name__ == "__main__":
    is_running = True
    queue = Queue("my_queue", routing_key="my_routing_key", no_declare=True)
    with Connection("amqps://blabla", heartbeat=10) as conn:
        with conn.channel() as channel:
            consumer = Consumer(channel, queue)
            consumer.register_callback(callback)
            with consumer:
                while is_running:
                    try:
                         conn.drain_events(timeout=1)
                    except timeout:
                         conn.heartbeat_check()

I use this code to reproduce the error in aws environment (with aws classic load balancer). In this case i'm unable to ack the message in the callback method because the socket connection with rabbitmq has been dropped by ELB.

I tried to use another process to manage heartbeat like this:

import logging
from amqp.method_framing import frame_handler
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection 

logging.basicConfig(level=logging.DEBUG)

def callback(body, message):
    logging.debug("before")
    sleep(300)
    logging.debug("after")
    message.ack()

def manage_heartbeat(conn: Connection):
    while True:
        sleep(1)
        conn.heartbeat_check()

if __name__ == "__main__":
    is_running = True
    queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
    with Connection("blabla", heartbeat=10) as conn:
        try:
            heartbeat_process = Process(target=manage_heartbeat, args=(conn,))
            heartbeat_process.start()
            with conn.channel() as channel:
                consumer = Consumer(channel, queue)
                consumer.register_callback(callback)
                with consumer:
                    while is_running:
                        conn.drain_events()
        finally:
            heartbeat_process.close()

In this way, an exception is raised:

amqp.exceptions.ConnectionForced: Too many heartbeats missed

Is there a way to send heartbeats in a different process with kombu/py-amqp?

auvipy commented 3 years ago

as celery use billiard you should not use multiprocessing i think. also what is your full setup and you are using newest versions?

tvallois commented 3 years ago

I'm not using celery. Only Kombu. Kombu version 5.0.2, amqp 5.0.2 With a solution like this, it works (AWS ELB do not consider the connection as idle) but it would be nice to have this integrated directly in kombu or amqp:

import signal
import logging
import threading
from amqp.method_framing import frame_handler
from socket import timeout
from time import sleep
from kombu import Consumer, Queue, Connection 

logging.basicConfig(level=logging.DEBUG)

def callback(body, message):
    logging.debug("before")
    sleep(60)
    logging.debug("after")
    message.ack()

def handle_heartbeat(conn, event):
    while not event.is_set():
        print("ping")
        sleep(1)
        if conn.connection:
            conn.connection.send_heartbeat()

if __name__ == "__main__":
    event = threading.Event()
    queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
    with Connection("amqps://blabla", heartbeat=10) as conn:
        try:
            t = threading.Thread(target=handle_heartbeat, args=(conn,event))
            t.start()
            with conn.channel() as channel:
                consumer = Consumer(channel, queue)
                consumer.register_callback(callback)
                with consumer:
                    while not event.is_set():
                        try:
                            conn.drain_events(timeout=1)
                        except timeout:
                            conn.heartbeat_check()
        except KeyboardInterrupt:
            event.set()
        finally:
            t.join()
tvallois commented 3 years ago

Nope, it does not work. I have the same stacktrace as this issue in Celery (https://github.com/celery/celery/issues/3773).

auvipy commented 3 years ago

recent related discussion https://github.com/celery/celery/pull/6528#issuecomment-744096067