gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

how can the client catch the exception and re-establish the connection? #137

Open phker opened 2 years ago

phker commented 2 years ago

If an exception occurs on the server side, or the connection port or server restarts, how can the client catch the exception and re-establish the connection? my code `

from Model.RecordModel import RecordModel from Model.ImageVars import ImageVars import json

import socket

import threading import time import traceback import logging import rabbitpy

import DAO.Record

from BLL.MemcacheHelper import cache_get, cache_set

from BLL.RedisHelper import redis_client, acquire_lock, q_lock, release_lock

from BLL.Task2.Step1_YuChuLiThread import Step1_YuChuLi_AppendTask

from BLL.Task2.Upload2OSS_Thread import 上传到OSSV3

from DAO.SuanLiFenPei import GetSuanLiFenPeiConfig

from Config import Seting

from timeline.TaskTimeLine_MySql import InitTimeLine, SaveRecord, TEnd, TStart

from BLL.RabbitMQConsumer import Consumer

from BLL.RabbitMQHelper import MQConnection, SendMQ, getMQConnection,clearMQConnection

def InitGetJobFromRebbitMQ(processMsgFN): ''' 通过RabbitMQ取任务 '''
queuename = 'Q.AIDetect.AIDetect.on.WebAPI.PhotoIsTaken'

    logging.info(Seting.JiQiHao + "正在初始化MQ队列" + queuename) 
    #queuename ='log.E.OtherSys.CMDTakeImage'
    # MQURL = 'amqp://'+str(Seting.RabbitMQ_UserName)+':'+ str(Seting.RabbitMQ_Password)+'@'+str(Seting.RabbitMQ_IP)+':'+str(Seting.RabbitMQ_Port)+'/%2f?heartbeat=50'
    # bh_group = str(bh_group)
    # priority = priority
    # MQConnection = rabbitpy.Connection(MQURL)
    # channel = MQConnection.channel()

    while True:
        try: 
            MQConnection = getMQConnection()

            # with rabbitpy.Connection(MQURL) as conn: 
            with MQConnection.channel() as MQchannel: 
                MQchannel.prefetch_count(1, False) # 每次只收一条消息
                exchange = rabbitpy.Exchange(MQchannel,  Seting.RabbitMQ_ExchangeName, exchange_type='topic', durable=True)
                exchange.declare()
                # 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去  
                mqqueue = rabbitpy.Queue(MQchannel, queuename)
                mqqueue.durable = True
                mqqueue.declare()

                # 一个队列绑定多个Routingkey
                # mqqueue.bind(exchange, "E.WebAPI.PhotoIsTaken."+ bh_group , args)
                # configs = GetSuanLiFenPeiConfig(Seting.JiQiHao) 
                # for cfg in configs:
                #     bh_group = str(cfg["bh_group"])
                #     priority = int(cfg["priority"])
                #     logging.info(Seting.JiQiHao + "正在初始化MQ线程" +  bh_group) 
                #     args = {"x-priority":priority}
                #     mqqueue.bind(exchange, "CMD.AILoop.AIScore." + bh_group , args)

                # 优先处理本机器的图片数据
                args = {"x-priority":999}
                mqqueue.bind(exchange, "CMD.AILoop.AIScore." +  Seting.JiQiHao ,args)
                mqqueue.bind(exchange, "E.WebAPI.PhotoIsTaken."+  Seting.JiQiHao,args)

                # 也可以处理其它本机器的图片数据, 因为图片已经上传到OSS, 打分的时候会自动下载
                args = {"x-priority":1}
                mqqueue.bind(exchange, "CMD.AILoop.AIScore.#",args)
                mqqueue.bind(exchange, "E.WebAPI.PhotoIsTaken.#" ,args)
                LoopMessage(mqqueue,processMsgFN)

        except Exception as exx: 
            # MQConnection.closed = 1 # 防止MQchannel.close() 陷入死循环 
            # MQchannel = None
            # MQConnection = None
            # del MQConnection
            clearMQConnection()
            logging.error(traceback.format_exc())  

def LoopMessage(mqqueue:rabbitpy.Queue, processMsgFN):

# Exit on CTRL-C
try: 
    logging.info(mqqueue.name + '与 RabbitMQ 连接成功,等待消息中。。。') 
    for message in mqqueue:  
        try:
            strmsg = str(message.body,encoding='UTF-8')
            logging.debug("MQ收到消息"+ strmsg)
        except Exception as ex:
            logging.error("MQ收到消息字符串编码异常"+ message.body)
            strmsg = str(message.body,encoding='GBK') 

        try:
            res =  processMsgFN(strmsg)
            if(res == "reject"):
                logging.error("MQ reject"+strmsg)
                message.reject()
                # message.ack()
            else: 
                message.ack()
        except Exception as ex: 
            logging.error("MQ reject"+strmsg)
            logging.error(traceback.format_exc())
            # message.reject()
            message.ack()

except KeyboardInterrupt:
    logging.info('Exited consumer')
    return
except Exception as mqexxx:
    logging.error('MQ ERROR')
    logging.error(traceback.format_exc())
    time.sleep(1)  
    return # 跳出for 循环 message.

`

BLL/RabbitMQHelper.py ` import time import traceback from Config import Seting import logging

import rabbitpy

URL = 'amqp://'+str(Seting.RabbitMQ_UserName)+':'+ str(Seting.RabbitMQ_Password)+'@'+str(Seting.RabbitMQ_IP)+':'+str(Seting.RabbitMQ_Port)+'/%2f?heartbeat=60' MQConnection = None MQChannel = None

def clearMQConnection(): global MQConnection,MQChannel

# if(not MQConnection  is None):
#    MQConnection.close()

MQConnection = None
MQChannel = None

def getMQConnection(): global MQConnection if(MQConnection is None or MQConnection.closed): MQConnection = rabbitpy.Connection(URL)

return MQConnection

`

I found two bugs,

One is that when the server is restarted,

for message in mqqueue:

 ...

  Break # will fall into an endless loop waiting for the server to return the close response message

The other is that when the server has just started successfully,

with MQConnection. channel() as MQchannel:

     ....

    When the channel is opened, it will fall into infinite waiting