apache / rocketmq-client-python

Apache RocketMQ python client
https://rocketmq.apache.org/
Apache License 2.0
271 stars 95 forks source link

A process is launched within the callback function, and an error occurs when attempting to send an MQ message within that process. #147

Open fanandli opened 5 months ago

fanandli commented 5 months ago

i write a code like that:

from multiprocessing import Process
from rocketmq.client import Producer, Message, PushConsumer, ConsumeStatus
import time
import json

def rmq_consumer():
    consumer = PushConsumer('group_demo')
    consumer.set_name_server_address('10.13.2.12:9876')
    consumer.subscribe('start_or_end_detect', send_message, "*")
    print(' [Consumer] Waiting for messages.')
    consumer.start()
    while True:
        time.sleep(1)

def start_pro():
    producer = Producer('test_producer')
    producer.set_name_server_address('10.13.2.12:9876')
    producer.start()
    try:
        event = {"aa": "vv", "bb": 33}
        msg = Message("fireMsg")
        ss = json.dumps(event).encode('utf-8')
        msg.set_body(ss)
        producer.send_sync(msg)

    finally:
        producer.shutdown()

def send_message():
    i = Process(target=start_pro)
    i.start()

    return ConsumeStatus.CONSUME_SUCCESS

if __name__ == '__main__':
    rmq_con = Process(target=rmq_consumer)

The code snippet "'producer.send_sync(msg)'" throws an error "rocketmq.exceptions.ProducerSendSyncFailed: No route info of this topic: fireMsg,error:-1,in file rocketmq-client-cpp/src/producer/DefaultMQProducerImpl.cpp line:434"

What could be the cause of this? My RocketMQ deployment is functioning normally.

If I don't execute Process(target=start_pro) within the send_message method and instead directly include the contents of start_pro within send_message, the aforementioned error does not occur.

Looking forward to your response. Thank you.