astrorafael / twisted-mqtt

MQTT Client protocol for Twisted.
MIT License
30 stars 11 forks source link

Unable to publish to topic #17

Closed guy-affable closed 2 years ago

guy-affable commented 2 years ago

I'm unable to publish to a particular topic. I have 2 topics, publish on 1 is successful whereas on the other it's not. Publish: d1 = self.protocol.publish(topic=final_topic, qos=publish_qos, message=simplejson.dumps(report_message)) No error is being raised, but when I check the object, I get a weird dict response

logger.info(d1.__dict__)
{'_chainedTo': None, 'msgId': 3, '_runningCallbacks': False, '_canceller': None, 'callbacks': [], 'result': None, 'called': True}

Not sure if it's a network issue or lib issue or how to debug further.

astrorafael commented 2 years ago

Thanks for reporting. I'll have a look as soon as I can.

astrorafael commented 2 years ago

What is probably happening to you is that you are issuing 2 self.protocol.connect(() with the same connection Id. The Mosquitto Broker will close one of the connections.

astrorafael commented 2 years ago

I have written a sample publisher script with three publisher services, each one performing a different connection. It seems to work fine:


import sys
import random

from twisted.internet import reactor, task
from twisted.application import service
from twisted.internet.defer       import inlineCallbacks, DeferredList
from twisted.application.internet import ClientService, backoffPolicy
from twisted.internet.endpoints   import clientFromString

from twisted.logger   import (
    Logger, LogLevel, globalLogBeginner, textFileLogObserver, 
    FilteringLogObserver, LogLevelFilterPredicate)

from mqtt.client.factory import MQTTFactory

# Global object to control globally namespace logging
logLevelFilterPredicate = LogLevelFilterPredicate(defaultLogLevel=LogLevel.info)

def startLogging(console=True, filepath=None):
    global logLevelFilterPredicate
    observers = []
    if console:
        observers.append( FilteringLogObserver(observer=textFileLogObserver(sys.stdout),  
            predicates=[logLevelFilterPredicate,] ))

    if filepath is not None and filepath != "":
        observers.append( FilteringLogObserver(observer=textFileLogObserver(open(filepath,'a')), 
            predicates=[logLevelFilterPredicate,] ))
    globalLogBeginner.beginLoggingTo(observers)

def setLogLevel(namespace=None, levelStr='info'):
    global logLevelFilterPredicate
    level = LogLevel.levelWithName(levelStr)
    logLevelFilterPredicate.setLogLevelForNamespace(namespace=namespace, level=level)

startLogging(console  = True, filepath = None)
log = Logger(namespace="XPUB")
setLogLevel('mqtt','debug')

BROKER = "tcp:test.mosquitto.org:1883"

class MQTTService(ClientService):

    def __init__(self, endpoint, factory, topic, message, qos, period):
        super().__init__(endpoint, factory, retryPolicy=backoffPolicy())
        self.message = message
        self.topic   = topic
        self.period  = period
        self.qos     = qos

    def startService(self):
        log.info("starting MQTT Client Publisher Service for topic {topic}, period {period} (QoS = {qos})", 
            topic=self.topic, period=self.period, qos=self.qos)
        # invoke whenConnected() inherited method
        self.whenConnected().addCallback(self.connectToBroker)
        super().startService()

    @inlineCallbacks
    def connectToBroker(self, protocol):
        '''
        Connect to MQTT broker
        '''
        log.info("Connecting to {broker}", broker=BROKER)
        self.protocol                 = protocol
        self.protocol.onDisconnection = self.onDisconnection
        # We are issuing 3 publish in a row
        # if order matters, then set window size to 1
        # Publish requests beyond window size are enqueued
        self.protocol.setWindowSize(3) 
        self.task = task.LoopingCall(self.publish)
        self.task.start(self.period, False)
        conn_id = random.randint(0,65535)
        try:
            yield self.protocol.connect("TwistedMQTT-pub " + str(conn_id), keepalive=60)
        except Exception as e:
            log.error("Connecting to {broker} raised {excp!s}", 
               broker=BROKER, excp=e)
        else:
            log.info("Connected to {broker}", broker=BROKER)

    def onDisconnection(self, reason):
        log.debug(" >< Connection was lost ! ><, reason={r}", r=reason)
        self.whenConnected().addCallback(self.connectToBroker)

    @inlineCallbacks
    def publish(self):
        try:
            log.info("Publishing {msg} to topic {topic} with QoS {qos}",
                msg=self.message, topic=self.topic, qos=self.qos)
            yield self.protocol.publish(
                topic   = self.topic, 
                qos     = self.qos, 
                message = self.message,
            )
        except Exception as e:
            log.failure(e)

# --------------------
# Application assembly
# --------------------

application = service.Application("myapp")
factory    = MQTTFactory(profile=MQTTFactory.PUBLISHER)
endpoint = clientFromString(reactor, BROKER)

publisher1 = MQTTService(endpoint, factory, "foo/bar1", "hello 1", 1, 3)
publisher1.setName("PUBLISHER 1")
publisher1.setServiceParent(application)

publisher2 = MQTTService(endpoint, factory, "foo/bar2", "hello 2", 2, 5)
publisher2.setName("PUBLISHER 2")
publisher2.setServiceParent(application)

publisher3 = MQTTService(endpoint, factory, "foo/bar3", "hello 3", 0, 7)
publisher3.setName("PUBLISHER 3")
publisher3.setServiceParent(application)

# Start the ball rolling
service.IService(application).startService()
reactor.run()

Listen with mosquitto_sub -h test.mosquitto.org -p 1883 -t foo/+

astrorafael commented 2 years ago

Since I have not received any feedback from the original poster, I'm closing this issue.