apache / rocketmq-client-python

Apache RocketMQ python client
https://rocketmq.apache.org/
Apache License 2.0
271 stars 95 forks source link

why 2? #123

Open zuoyc opened 2 years ago

zuoyc commented 2 years ago

class RocketMQ(): def init(self): logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.logger = logging.getLogger(name) self.consumer = PushConsumer("CID_test") self.consumer.set_namesrv_addr("XXXX:XX") self.topic_name = "XXXXXX" self.timeHash=[]

    #减少日志输出
    dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)
def callback(self,msg):
    test_body = json.loads(msg)
    self.my_func(test_body)

def onMessage(self):
    self.consumer.subscribe(self.topic_name, self.callback)
    self.consumer.start()
    while True:
        time.sleep(30)
    self.consumer.shutdown()
def my_func(self,data):
    print(data)
    result = self.swlTask(data)
    tem["id"] = data["id"]
    tem["cameraId"] = data["deviceId"]
    tem["makedPic"] = os.path.join(time.strftime('%Y%m%d', time.localtime()), result)
    sum.append(tem)
    # 否开启子进程来跑模型输出的结果
    cmd = "cd /home/algo/matching;python demo.py"
    with os.popen(cmd) as p:
        for index in p.readlines():
            tem = index
            if "Complete registration" in tem:
                if str(tem).split(" ")[0] in self.timeHash:
                    self.timeHash.remove(str(tem).split(" ")[0])
                self.timeHash.append(str(tem).split(" ")[0])
                print("aaa:", self.timeHash)
                A = time.time()
    self.product(sum)
    cmd1 = "cd /home/algo/matching/Images/real; rm -rf *"
    re1 = os.popen(cmd1)
    cmd2 = "cd /home/algo/matching/Images/vir; rm -rf *"
    re2 = os.popen(cmd2)
    cmd3 = "cd /home/algo/matching/Images/sem; rm -rf *"
    re3 = os.popen(cmd3)

def swlTask(msg):  # 算法组从MQ中获得待处理的图片
    image1 = msg["truePic"]  # 真实
    image2 = msg["modelPic"]  # 虚拟
    image3 = msg["transPic"]  # 语义
    print(image1, image2, image3)
    savedir = "/home/algo/matching/Images"
    try:
        real = os.path.join(savedir, "real", image1.split("/")[-1])
        vir = os.path.join(savedir, "vir", image1.split("/")[-1])
        sem = os.path.join(savedir, "sem", image1.split("/")[-1])
    except:
        pass
    else:
        shutil.copyfile(image1, real)
        shutil.copyfile(image2, vir)
        shutil.copyfile(image3, sem)
        print("what", image1.split("/")[-1])
        return image1.split("/")[-1]

def product(sum):  # 算法组推送图片合成信息
    print("1")
    producer = Producer('test')
    producer.set_namesrv_addr('')  # rocketmq队列接口地址(服务器ip:port)

    producer.start()
    msg_body = {
        "result": sum
    }
    ss = json.dumps(msg_body).encode('utf-8')
    msg = Message("xxx")  # topic名称
    msg.set_tags('xxx')
    msg.set_body(ss)
    ret = producer.send_sync(msg)
    producer.shutdown()

when id(self.timeHash),there are two address,why?