maansthoernvik / rabbitmq_client

Client for a RabbitMQ server.
MIT License
9 stars 5 forks source link

How to make the consumer keep listening on a specific queue #29

Open dahaier opened 7 months ago

dahaier commented 7 months ago

Hi,

I'd like to use this rabbitmq_client to create consumer to keep the connection for listening a queue, in this way I can keep processing the message when someone send message to queue.

Before I use pure pika to do so like below, I'd like to know how to do this using RMQConsumer.

def callback(ch, method, properties, body): start = time.time() logger.info("Start processing converter task at {}".format(start)) message = body.decode() msg = json.loads(message) logger.info(f"msg: {msg}") session_uuid = msg['source']['session_uuid'] gogeta_output_dir = msg['source']['output_path'] try: execute_main(session_uuid) except Exception as e: logger.error(f"error {e} when processing {session_uuid}") ch.basic_ack(delivery_tag=method.delivery_tag) logger.info("gogetasil run end")

Listenner for processing MQ message

def mq_listenner(): credentials = pika.PlainCredentials(MQ_USER, MQ_PASS) try: logger.info("Build connection with MQ") connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.AMQP_host, virtual_host='zf', credentials=credentials, heartbeat=0)) logger.info("Build channel") channel = connection.channel() channel.basic_qos(prefetch_count=1) logger.info("Listen AUTOSAR SIL channel") channel.basic_consume(Q_NAME, callback, auto_ack=False) logger.info("Start consuming message...")

logger.error(1)

    channel.start_consuming()
except Exception as e:
    logger.error("MQ connection error!!")
    logger.error("Reason: %s" % e)
    logger.error("Stop AUTOSAR SIL Operator :(")
dahaier commented 7 months ago

As I need to parse the message using body, I'd like to know how can I access the message body with RMQConsumer object. Using your lib can make the code cleaner. I tried the example code you mentioned in readme. I found the code will finished and if there is no message in the queue, there still can have msg like ComsumeOK, which confuse me.

from pika import ConnectionParameters, PlainCredentials
from rabbitmq_client import RMQConsumer, ConsumeParams, QueueParams
import os
from conf.log_config import logger

def handle_msg(msg):
    err = False
    print(msg)
    return False

def on_message(msg, ack=None):
    error = handle_msg(msg)

    if not error:
        ack()

consumer = RMQConsumer(ConnectionParameters(
    host=os.environ.get('MQ_HOST', 'sh2v00112'),
    virtual_host=os.environ.get('MQ_VHOST', 'zf'),
    credentials=PlainCredentials(os.environ.get('MQ_USER', 'zfmq'),
                                 os.environ.get('MQ_PWD', '123456')),
    heartbeat=0))
consumer.start()
consumer.consume(ConsumeParams(on_message),
                 queue_params=QueueParams(os.environ.get('Q_NAME', 'Gogeta_Vinfast'), durable=True))

Can I know how to using above pattern to re-organize below code. The channel.start_consuming() can keep connecting to server, and the callback(ch, method, properties, body) can help parse the message and process the data continually.

def callback(ch, method, properties, body):
    start = time.time()
    logger.info("Start processing converter task at {}".format(start))
    message = body.decode()
    msg = json.loads(message)
    logger.info(f"msg: {msg}")
    session_uuid = msg['source']['session_uuid']
    gogeta_output_dir = msg['source']['output_path']
    try:
        execute_main(session_uuid)
    except Exception as e:
        logger.error(f"error {e} when processing {session_uuid}")
    ch.basic_ack(delivery_tag=method.delivery_tag)
    logger.info("gogetasil run end")

# Listenner for processing MQ message
def mq_listenner():
    credentials = pika.PlainCredentials(MQ_USER, MQ_PASS)
    try:
        logger.info("Build connection with MQ")
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.AMQP_host,
                                                                       virtual_host='zf',
                                                                       credentials=credentials,
                                                                       heartbeat=0))
        logger.info("Build channel")
        channel = connection.channel()
        channel.basic_qos(prefetch_count=1)
        logger.info("Listen AUTOSAR SIL channel")
        channel.basic_consume(Q_NAME, callback, auto_ack=False)
        logger.info("Start consuming message...")
        # logger.error(1)
        channel.start_consuming()
    except Exception as e:
        logger.error("MQ connection error!!")
        logger.error("Reason: %s" % e)
        logger.error("Stop AUTOSAR SIL Operator :(")