eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.18k stars 724 forks source link

there are some matter work with celery #654

Closed fuuhoo closed 2 months ago

fuuhoo commented 2 years ago

myMQtt.py

class myMqtt:
    def __init__(self):
        self.client=mqtt.Client()
        self.MQTT_TOPIC=settings.MQTT_TOPIC
        self.MQTT_USER=settings.MQTT_USER
        self.MQTT_PWD=settings.MQTT_PWD
        self.MQTT_SERVER=settings.MQTT_SERVER
        self.MQTT_PORT=int(settings.MQTT_PORT)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client._transport="websockets"
        self.client.username_pw_set(self.MQTT_USER, self.MQTT_PWD)
        print("MQTT:",self.MQTT_SERVER,self.MQTT_PORT,self.MQTT_USER,self.MQTT_PWD,self.MQTT_TOPIC)
        cr=self.client.connect(self.MQTT_SERVER, self.MQTT_PORT, 6000) # 600为keepalive的时间间隔

    def on_connect(self,client, userdata, flags, rc):
        # 0: Connection successful
        # 1: Connection refused - incorrect protocol version
        # 2: Connection refused - invalid client identifier
        # 3: Connection refused - server unavailable
        # 4: Connection refused - bad username or password
        # 5: Connection refused - not authorised
        # 6-255: Currently unused.
        print("MQtt onnected with result code: " + str(rc))
    def on_message(self,client, userdata, msg):
        print(msg.topic + " " + str(msg.payload))
    def on_publish(self,client, userdata, mid):
        print("on_publish:::",client, userdata, mid)
    def publish(self,payload):
        try:
            r=self.client.publish(self.MQTT_TOPIC, payload=payload, qos=0)
            print("mqtt publish结果")
        except Exception as e :
            print("MQTT publish ERROR",str(e))
app = Celery()
@app.task
def onlinePersonBroadcast():
    retrunList=[]
    keys=redisHandle.scan_iter("idcard:*")
    for key in keys:
        print(key)
        value=redisHandle.get(key)
        jsonValue=json.loads(value)
        retrunList.append(jsonValue)
    mymqtt.publish(json.dumps(retrunList,cls=DateEncoder))

When I call the client in celery's tasks file and publish the message, there will be no error or wraining output, but I can't actually get data. Onlineperson broadcasts is a scheduled task in django.anyone help?

当我在celery的tasks文件中调用客户端并且发布消息的时候,会没有报错,但是实际上收不到数据。onlinePersonBroadcas是一个定时任务

MattBrittan commented 8 months ago

Could you please attempt this with logging enabled? It's possible the task is being shut down before the message is actually sent (calling publish adds the message to a queue; consider using wait_for_publish). Sorry - I'm not familiar with celery so cannot make any further suggestions (if you can provide more info someone may be able to help further - assuming this is still an issue).

MattBrittan commented 2 months ago

Closing due to inactivity. If this is still an issue with the current release then please feel free to reopen (but more info will be needed to assist us to identify the issue).