use-py / use-rabbitmq

A rabbitmq connector that never breaks
0 stars 0 forks source link

rabbitmq断联过久导致数据无法正常消费的问题 #3

Open pandatools opened 1 month ago

pandatools commented 1 month ago

import json
import time
from queue import Queue
from threading import Thread

from use_rabbitmq import RabbitMQStore
import logging

logging.basicConfig(level=logging.DEBUG)
class RMQQueue:

    def __init__(self):
        self.store = None
        self.channel = None
        self.status = True
        self.job_queue = Queue()
        self.store = self.connect_channel()
        self.channel = self.store.channel
        self.store.declare_queue('job.mytest', durable=True)
        self.last_body = ''

    def connect_channel(self):
        parameters = {
            "confirm_delivery": True,
            "host": "",#公司测试地址
            "port":  5672,
            "username": "",
            "password":  "",
            "ssl": False
        }
        return RabbitMQStore(**parameters)

    def is_open(self):
        return self.channel.is_open

    def send(self, honeys):
        properties = {
            'content_type': 'application/json',
        }
        self.store.send('test1', honeys, properties)

    def receive(self, num=1):
        jobs = []
        try:
            for _ in range(num):
                if self.job_queue.empty():
                    print('job queue is empty')
                    break

                message = self.job_queue.get()

                job_dict = json.loads(message.body)
                job_dict['ack_id'] = message.delivery_tag

        except Exception as e:
            print(f'receive error {e}')
        return jobs

    def ack(self, job):
        self.channel.basic.ack(delivery_tag=job.delivery_tag)

    def consume(self):
        def callback_func(message):
            print(f'receive job {message.body},self.last_body={self.last_body}')
            self.job_queue.put(message)
            print('acked message')
            self.ack(message)

        return Thread(target=self.store.start_consuming, args=('test1', callback_func, 1))

if __name__ == '__main__':
    #直连公司测试rabbitmq即可测试
    threads = []
    q = RMQQueue()
    for _ in range(1):

        one = q.consume()
        one.start()
        threads.append(one)

    for thread in threads:
        thread.join()
pandatools commented 1 month ago

上面是测试代码,复现方式,断掉电脑wifi,过一到两分钟,等他出现RabbitmqStore consume connection error<Connection dead, no heartbeat or data received in >= 60s> reconnecting...,再重新连接wifi,即可复现出问题

image
pandatools commented 1 month ago

问题现象:当重连后,不会消费新数据

image
pandatools commented 1 month ago
def start_consuming(
    self, queue_name: str, callback: Callable, prefetch=1, **kwargs
):
   """替代原版start_consuming 该现象可以解决"""

    self.__shutdown = False
    no_ack = kwargs.pop("no_ack", False)
    reconnection_delay = self.RECONNECTION_DELAY
    while not self.__shutdown:
        try:
            if self.isok: #init 函数开始默认为true
                channel = self.channel
                logger.info('use old')
            else:
                parameters = {
                    "confirm_delivery": True,
                    "hostname": "xxxx",
                    "port": 5672,
                    "username": "xxx",
                    "password": "xxxx",
                    "ssl": False
                }
                logger.info('use new')
                connection = amqpstorm.Connection(**parameters)
                channel = connection.channel()
                channel.confirm_deliveries()
            channel.basic.qos(prefetch_count=prefetch)
            channel.basic.consume(
                queue=queue_name, callback=callback, no_ack=no_ack, **kwargs
            )
            channel.start_consuming(to_tuple=False)
            self.isok = True
        except AMQPConnectionError as exc:
            logger.warning(
                f"RabbitmqStore consume connection error<{exc}> reconnecting..."
            )
            del self.connection
            self.isok = False
            reconnection_delay = min(
                reconnection_delay * 2, self.MAX_CONNECTION_DELAY
            )
        except Exception as e:
            if self.__shutdown:
                break
            logger.exception(f"RabbitmqStore consume error<{e}>, reconnecting...")
            del self.connection
            time.sleep(reconnection_delay)
            reconnection_delay = min(
                reconnection_delay * 2, self.MAX_CONNECTION_DELAY
            )
        finally:
            if self.__shutdown:
                break``