celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.82k stars 920 forks source link

Heartbeats, correct implementation? #621

Open agalera opened 7 years ago

agalera commented 7 years ago

I'm trying to send the heartbeat because sometimes I close the connection, I've been doing some tests and does not seem to work. what am I doing wrong?

Thx!

root@sb1:~/work/RocketTM/examples# python3 bla2.py

DEBUG:amqp:Start from server, version: 0.9, properties: {'cluster_name': 'rabbit@sb1', 'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'copyright': 'Copyright (C) 2007-2014 GoPivotal, Inc.', 'capabilities': {'publisher_confirms': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'basic.nack': True, 'exchange_exchange_bindings': True}, 'version': '3.3.5', 'product': 'RabbitMQ', 'platform': 'Erlang/OTP'}, mechanisms: ['AMQPLAIN', 'PLAIN'], locales: ['en_US']
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:using channel_id: 1
DEBUG:amqp:heartbeat_tick : Prev sent/recv: None/None, now - 3/3, monotonic - 6827158.980075557, last_heartbeat_sent - 6827158.980069186, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
DEBUG:amqp:Channel open
INFO:root:create queue: rocket1 durable: True
DEBUG:amqp:using channel_id: 2
DEBUG:amqp:Channel open
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
0
1
2
3
4
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 3/3, now - 14/16, monotonic - 6827163.986512617, last_heartbeat_sent - 6827163.986500512, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
5
6
7
8
9
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 14/16, now - 14/16, monotonic - 6827168.992872438, last_heartbeat_sent - 6827163.986500512, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
10
11
12
13
14
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 14/16, now - 14/16, monotonic - 6827173.997195548, last_heartbeat_sent - 6827163.986500512, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick: sending heartbeat for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
15
16
17
18
19
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 14/16, now - 15/16, monotonic - 6827179.001209586, last_heartbeat_sent - 6827179.001199761, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
20
21
22
23
24
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 15/16, monotonic - 6827184.007938261, last_heartbeat_sent - 6827179.001199761, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
25
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 15/16, monotonic - 6827185.010983692, last_heartbeat_sent - 6827179.001199761, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
26
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 15/16, monotonic - 6827186.013820113, last_heartbeat_sent - 6827179.001199761, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
27
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 15/16, monotonic - 6827187.016858898, last_heartbeat_sent - 6827179.001199761, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
28
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 15/16, monotonic - 6827188.019653773, last_heartbeat_sent - 6827179.001199761, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
29
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 15/16, monotonic - 6827189.022403865, last_heartbeat_sent - 6827179.001199761, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick: sending heartbeat for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
30
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 16/16, monotonic - 6827190.024884635, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
31
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827191.027322521, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
32
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827192.029842284, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
33
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827193.032489108, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
34
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827194.035081419, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
35
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827195.036884144, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
36
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827196.039481093, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
37
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827197.042221926, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
38
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827198.04424532, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
39
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827199.047431084, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
40
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 16/16, monotonic - 6827200.050093538, last_heartbeat_sent - 6827190.024876723, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick: sending heartbeat for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
41
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/16, now - 17/16, monotonic - 6827201.05386609, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
42
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/16, monotonic - 6827202.056305858, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
43
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/16, monotonic - 6827203.05815703, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
44
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/16, monotonic - 6827204.060817594, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
45
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/16, monotonic - 6827205.063507986, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
46
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/16, monotonic - 6827206.066147127, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
47
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
48
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/16, monotonic - 6827207.069477861, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
49
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/16, monotonic - 6827208.072213321, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT FAILED
run task
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/16, now - 17/18, monotonic - 6827209.075941383, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/18, now - 17/21, monotonic - 6827214.084745324, last_heartbeat_sent - 6827201.053860754, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick: sending heartbeat for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
INFO:root:<Consumer: [<Queue rocket1 -> <Exchange rocket1(direct) bound to chan:2> -> rocket1 bound to chan:2>]>
DEBUG:amqp:heartbeat_tick : for connection d2661778529a41bea4d377d61a7daab6
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 17/21, now - 18/22, monotonic - 6827219.088833282, last_heartbeat_sent - 6827219.088823367, heartbeat int. - 10.0 for connection d2661778529a41bea4d377d61a7daab6
HEARTBEAT OK
^CWARNING:root:server stop!
DEBUG:amqp:Closed channel #1
DEBUG:amqp:Closed channel #2

exception:

ERROR:root:Traceback (most recent call last):
  File "bla2.py", line 28, in monitor_heartbeat
    conn.heartbeat_check(rate)
  File "/usr/local/lib/python3.4/dist-packages/kombu-4.0.0rc3-py3.4.egg/kombu/connection.py", line 284, in heartbeat_check
    return self.transport.heartbeat_check(self.connection, rate=rate)
  File "/usr/local/lib/python3.4/dist-packages/kombu-4.0.0rc3-py3.4.egg/kombu/transport/pyamqp.py", line 144, in heartbeat_check
    return connection.heartbeat_tick(rate=rate)
  File "/usr/local/lib/python3.4/dist-packages/amqp/connection.py", line 663, in heartbeat_tick
    raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed

source:

from kombu import Connection, Exchange, Queue
import logging
import time
from threading import Thread
import sys
import traceback

logging.basicConfig(level=10, stream=sys.stdout)

durable = True
name = 'rocket1'

def callback(body, message):
    message.ack()
    for x in range(50):
        print(x)
        time.sleep(1)
    print("run task")
    return True

def monitor_heartbeat(conn):
    rate = conn.heartbeat
    while True:
        try:
            #conn.connection.send_heartbeat()
            conn.heartbeat_check(rate)
            print("HEARTBEAT OK")
            time.sleep(rate/2.0)
        except (KeyboardInterrupt, SystemExit):
            logging.warning("monitor heartbeat stop")
            break
        except:
            print("HEARTBEAT FAILED")
            #logging.error(traceback.format_exc())
            time.sleep(1)

with Connection('amqp://guest:guest@%s//?heartbeat=10' % 'localhost') as conn:
    conn.ensure_connection()
    t = Thread(target=monitor_heartbeat, args=(conn,))
    t.setDaemon(True)
    t.start()
    exchange = Exchange(name, 'direct', durable=durable)
    queue = Queue(name=name,
                  exchange=exchange,
                  durable=durable, routing_key=name)
    queue(conn).declare()
    logging.info("create queue: %s durable: %s" % (name, durable))
    channel = conn.channel()
    channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
    with conn.Consumer(queue, callbacks=[callback], channel=channel) as consumer:
        while True:
            try:
                logging.info(consumer)
                conn.drain_events()

            except (KeyboardInterrupt, SystemExit):
                logging.warning("server stop!")
                break

            except:
                # logging.error(traceback.format_exc())
                logging.error("connection loss, try reconnect")
                time.sleep(5)
ask commented 7 years ago

I doubt you'd be able to use a thread for that, as it wouldn't be safe to read from/write to the amqp socket from multiple threads. You'd need to use a mutex for every socket read/write, which would slow things way down. You'd really have to use something like eventlet, gevent, asyncio or similar for this to work optimally.

agalera commented 7 years ago

identical exception with eventlet, any ideas?

amqp.exceptions.ConnectionForced: Too many heartbeats missed

it seems I'm not reading the answer RabbitMQ not?

"DEBUG:amqp:heartbeat_tick: sending heartbeat for connection 9ba9740dd54142399895c4d8f792b90b"

Thanks for your help

source:

from kombu import Connection, Exchange, Queue
from kombu.common import eventloop
import logging
import time
import sys
import weakref
import eventlet
from eventlet import spawn_after

eventlet.monkey_patch()

logging.basicConfig(level=10, stream=sys.stdout)

durable = True
name = 'rocket1'

def callback(body, message):
    for x in range(22):
        print(x)
        time.sleep(1)
    print("finish task")
    message.ack()
    return True

def monitor_heartbeats(connection, rate=2):
    """Function to send heartbeat checks to RabbitMQ. This keeps the
       connection alive over long-running processes."""
    interval = connection.heartbeat / 2.0
    cref = weakref.ref(connection)
    logging.info("Starting heartbeat monitor.")

    def heartbeat_check():
        conn = cref()
        if conn is not None and conn.connected:
            try:
                conn.heartbeat_check(rate=rate)
                logging.info("Heartbeat ok")
            except:
                logging.error("Heartbeat failed")
            spawn_after(interval, heartbeat_check)
    return spawn_after(interval, heartbeat_check)

with Connection('amqp://guest:guest@%s//?heartbeat=10' % 'localhost', heartbeat=10) as conn:
    conn.ensure_connection()
    monitor_heartbeats(conn)
    exchange = Exchange(name, 'direct', durable=durable)
    queue = Queue(name=name,
                  exchange=exchange,
                  durable=durable, routing_key=name)
    queue(conn).declare()
    logging.info("create queue: %s durable: %s" % (name, durable))

    with conn.Consumer(queue, callbacks=[callback]) as consumer:
        consumer.qos(prefetch_count=1)
        for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
            pass
DEBUG:amqp:Start from server, version: 0.9, properties: {'cluster_name': 'rabbit@sb1', 'platform': 'Erlang/OTP', 'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'capabilities': {'publisher_confirms': True, 'consumer_priorities': True, 'connection.blocked': True, 'consumer_cancel_notify': True, 'per_consumer_qos': True, 'authentication_failure_close': True, 'exchange_exchange_bindings': True, 'basic.nack': True}, 'copyright': 'Copyright (C) 2007-2014 GoPivotal, Inc.', 'version': '3.3.5'}, mechanisms: ['AMQPLAIN', 'PLAIN'], locales: ['en_US']
INFO:root:Starting heartbeat monitor.
DEBUG:amqp:using channel_id: 1
DEBUG:amqp:Channel open
INFO:root:create queue: rocket1 durable: True
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: None/None, now - 12/12, monotonic - 6904829.985433596, last_heartbeat_sent - 6904829.985424093, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 12/12, now - 12/12, monotonic - 6904832.487990192, last_heartbeat_sent - 6904829.985424093, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
0
1
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 12/12, now - 12/15, monotonic - 6904834.990522305, last_heartbeat_sent - 6904829.985424093, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
2
3
4
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 12/15, now - 12/15, monotonic - 6904837.493593533, last_heartbeat_sent - 6904829.985424093, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
5
6
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 12/15, now - 12/15, monotonic - 6904839.997124459, last_heartbeat_sent - 6904829.985424093, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick: sending heartbeat for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
7
8
9
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 12/15, now - 13/15, monotonic - 6904842.500985768, last_heartbeat_sent - 6904842.500978223, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
10
11
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 13/15, now - 13/15, monotonic - 6904845.0045556, last_heartbeat_sent - 6904842.500978223, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
12
13
14
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 13/15, now - 13/15, monotonic - 6904847.506937645, last_heartbeat_sent - 6904842.500978223, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
15
16
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 13/15, now - 13/15, monotonic - 6904850.009044969, last_heartbeat_sent - 6904842.500978223, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
17
18
19
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 13/15, now - 13/15, monotonic - 6904852.512543026, last_heartbeat_sent - 6904842.500978223, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick: sending heartbeat for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
20
21
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 13/15, now - 14/15, monotonic - 6904855.017000154, last_heartbeat_sent - 6904855.016993846, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat failed
22
finish task
DEBUG:amqp:heartbeat_tick : for connection 568036154faf4da9a7108bd36df760e3
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 16/15, now - 17/20, monotonic - 6904885.06410167, last_heartbeat_sent - 6904885.064089782, heartbeat int. - 10.0 for connection 568036154faf4da9a7108bd36df760e3
INFO:root:Heartbeat ok
ask commented 7 years ago

How long does it take before you get the error?

I'm trying here with kombu 4.0 (dev) and it's been working fine so far with heartbeat=10 and heartbeat=1

ask commented 7 years ago

Your code has been running for 2 hours with heartbeat=10 here now.

agalera commented 7 years ago

I used the "4.0-devel" branch and it seems the error persists.

When I launch a task from the callback if the work lasts longer than 20 seconds, the heartbeat begins to fail.

So that the code above fails, you must add a message to the queue named "rocket1"

DEBUG:amqp:heartbeat_tick : for connection 5aae7adcedf9417faf304f82d5cfafbd
DEBUG:amqp:heartbeat_tick : Prev sent/recv: 15/16, now - 15/16, monotonic - 9156374.278685136, last_heartbeat_sent - 9156369.272508236, heartbeat int. - 10.0 for connection 5aae7adcedf9417faf304f82d5cfafbd
HEARTBEAT FAILED
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "server.py", line 28, in monitor_heartbeat
    conn.heartbeat_check(rate)
  File "/root/test123/test_kombu/kombu/kombu/connection.py", line 268, in heartbeat_check
    return self.transport.heartbeat_check(self.connection, rate=rate)
  File "/root/test123/test_kombu/kombu/kombu/transport/pyamqp.py", line 130, in heartbeat_check
    return connection.heartbeat_tick(rate=rate)
  File "/usr/local/lib/python3.4/dist-packages/amqp/connection.py", line 662, in heartbeat_tick
    raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed
ask commented 7 years ago

If the connection is closed when the callback spends more than 20 seconds, then it must mean that the callback is blocking the event loop from running!

You probably want to spawn that task into a separate greenlet thread.

But note also that when using eventlet/gevent any blocking call will block the eventloop, and that includes calculations like:

for i in long_list:
   for j in another_long_list:
      i ** j

gevent and eventlet uses cooperative scheduling, which means the threads need to voluntarily switch back into the event loop, such as by performing I/O or calling time.sleep:

for i in long_list:
   for j in another_long_list:
      i ** j
   time.sleep(0)

Then there's also the problem with greenlet-incompatible code, such as C extensions that are not affected by the monkey patches.

dhananjaysathe commented 7 years ago

What would be the right way to do with when using connection pools?

agalera commented 6 years ago

I found this error again and I found a solution (code below)

When I did this integration for the first time, I wanted to use the call that kombu does to my code, and this is a bad idea because we block the process with which rabbitmq communicates.

At first I thought that if I kept everything in an internal queue I would consume everything that was in rabbit, but it is not like that. If you set a prefetch of 10 (for example) it will only let you have 10 messages without you having returned the ack.

I leave the code here, I tried to keep it simple for the example.

Perhaps a similar example would be useful within the repository.

import logging
import sys
from queue import Queue as qq
from threading import Thread

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin

logging.basicConfig(level=10, stream=sys.stdout)
durable = True
name = 'rocket1'
rabbit_url = "amqp://guest:guest@localhost:5672/"

class Worker(ConsumerMixin):
    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
        self.q = qq()
        Thread(target=self.run_tasks).start()

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.queues,
                         callbacks=[self.on_message],
                         prefetch_count=1)]

    def on_message(self, body, message):
        print("new message to internal queue")
        self.q.put((body, message))

    def run_tasks(self):
        while True:
            try:
                self.on_task(*self.q.get())
            except Exception as ex:
                logging.error(ex)

            except KeyboardInterrupt:
                break

    def on_task(self, body, message):
        print("run task")
        import time
        for x in range(50):
            time.sleep(1)
            print(x)
        message.ack()

exchange = Exchange(name, type="direct", durable=durable)
queues = [Queue(name, exchange, routing_key=name)]
with Connection(rabbit_url, heartbeat=4) as conn:
        worker = Worker(conn, queues)
        worker.run()

@ask What do you think about this? Is there another more elegant way?

petroslamb commented 8 months ago

Hello, also facing this problem several years later, not sure what is the proper way to handle my rpc using kombu and RMQ, for long running tasks.

I cannot make the heartbeat too long, as the RMQ cluster is shared and disabling it does little favour to my application.

Trying to make use of the existing Connection interface and using py-amqp, i also centered around looping the heartbeat_check method in the main thread and launching a second thread with the actual task, (but this thread must join to return a result).

I seems as if the heartbeat_check method has no effect. I tried coupling it with drain_events on a short timeout, but still I was not able to find a solution.

@ask It would be great to know what is the official way to deal with this. Should I not mess with the heartbeat, is it for internal use only? Is this issue planned to be solved in the future, or is it a non-issue/wont-fix? Am I missing the solution in one of the other bug reports?

The code, which did not work:

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from threading import Thread, Event
import time
import logging

rabbit_url = "amqp://guest:guest@localhost:5672//"
exchange = Exchange("example-exchange", type="direct")
queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")
heartbeat_rate = 120

class ThreadReturningValue(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.result = None

    def run(self):
        self.result = self._target(*self._args, **self._kwargs)

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self.result

def process_task(message, done_event):
    print('Processing task that takes more than 2 * heartbeat seconds...')
    time.sleep(300)
    done_event.set()
    print('event set')
    return 'worker done'

class Worker(ConsumerMixin):
    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
        self.thread_done = Event()
        self.worker_thread = None

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.queues, callbacks=[self.on_message])]

    def on_message(self, body, message):
        print('Received message: {0}'.format(body))
        self.worker_thread = ThreadReturningValue(target=process_task, args=(message, self.thread_done))
        self.worker_thread.start()
        while not self.thread_done.is_set():
            try:
                self.connection.drain_events(timeout=0.1)
            except Exception as e:
                print(e)
            self.connection.heartbeat_check()
            print('heartbeat check')
            time.sleep(1)
        print('Task completed.')
        self.worker_thread.join()
        self.thread_done.clear()
        message.ack()
        print('Message acknowledged.')
        reply = self.worker_thread.result
        print('Reply: {0}'.format(reply)) 

with Connection(rabbit_url, heartbeat=heartbeat_rate) as conn:
    conn.ensure_connection()
    print('Connection supports heartbeats: {0}'.format(conn.supports_heartbeats))

    worker = Worker(conn, [queue])
    worker.run()

In the above snippet, for heartbeats * 2 < task-processing-time, RMQ reschedules the message multiple times.

Versions: