stlehmann / Flask-MQTT

Flask Extension for the MQTT protocol
MIT License
207 stars 71 forks source link

Flask MQTT on_connect is never called when used with SocketIO #82

Open ramanraja opened 4 years ago

ramanraja commented 4 years ago

I am trying to implement an MQTT to Web Socket bridge on the lines of your helpful example https://flask-mqtt.readthedocs.io/en/latest/usage.html#interact-with-socketio

In the above example, subscribing to the MQTT topic is triggered by the socket client. But I want my MQTT channel to keep communicating even if there is no socket client.

So I tried to subscribe in the @mqtt.on_connect() event. But that callback is never invoked. But once subscription is initiated using a socket, MQTT messages start flowing in all right.

The plain Flask-MQTT example works fine. The callback fails when Socket support is added to it. So I am curious to know how async_mode and socketio.run() interact with the MQTT life cycle events. Especially since multiple workers are not supported.

I have posted a minimum reproducible sample code here: https://stackoverflow.com/questions/64592277/flask-mqtt-on-connect-is-never-called-when-used-with-socketio

dsmaugy commented 4 years ago

I'm having this same problem. I even modified the source through this function here:

def _handle_connect(self, client, userdata, flags, rc):
    # type: (Client, Any, Dict, int) -> None
    if rc == MQTT_ERR_SUCCESS:
        self.connected = True
        for key, item in self.topics.items():
            self.client.subscribe(topic=item.topic, qos=item.qos)
    if self._connect_handler is not None:
        logger.debug("Internally handling MQTT connection...\nPassing off to user handler: " + str(self._connect_handler))
        self._connect_handler(client, userdata, flags, rc)

This function is FLask-MQTT's internal on_connect() callback to PahoMQTT. I added the logger.debug line right before the client callback is supposed to be called (the one defined by @mqtt.on_connect()). The log output shows it correctly gets to this location in the callback but fails to execute the client callback directly.

Here is what my callback looks like:

@mqtt.on_connect()
def mqtt_connect_cb(client, userdata, flags, rc):
    app.logger.debug("MQTT Connected!")
    ...

And here is my output:

7:01:05 PM web.1 | [2020-11-16 19:01:05,712] DEBUG in init: Internally handling MQTT connection... 7:01:05 PM web.1 | Passing off to user handler: <function mqtt_connect_cb at 0x7f4b7690ebf8>

My callback is registered correctly and I would expect to see "MQTT Connected!" in the logs. But when it's time to call it, nothing happens.

EDIT: I should also note that this problem happens when I'm creating my application through the Factory method. When I was tinkering with Flask-MQTT + Socketio in a single test file, both libraries were able to work properly.

ramanraja commented 4 years ago

Can you please elaborate on "Flask-MQTT + Socketio in a single test file.. work properly" ? Please see my sample code here: https://stackoverflow.com/questions/64592277/flask-mqtt-on-connect-is-never-called-when-used-with-socketio This is a single file implementation, but it does not work for me. Can you please post the relevant pieces of your initialization code where on_connect() is invoked correctly ?

dsmaugy commented 4 years ago

Can you please elaborate on "Flask-MQTT + Socketio in a single test file.. work properly" ? Please see my sample code here: https://stackoverflow.com/questions/64592277/flask-mqtt-on-connect-is-never-called-when-used-with-socketio This is a single file implementation, but it does not work for me. Can you please post the relevant pieces of your initialization code where on_connect() is invoked correctly ?

My single file test looks similar to yours except I initiated the SocketIO object without the async_mode='gevent', cors_allowed_origins="*"

I'm running the server through eventlet

plambrechtsen commented 3 years ago

I'm also having the same issue where on_connect is never called, so therefor I can't automatically subscribe my default topics.

The main issue seems to be until you establish the initial connection to Flask from the browser then socketio threads don't start. I also tried with before_first_request but that also required an initial request to start the main threads.

My use case is I am trying to run a flask + mqtt + socketio site, where flask and mqtt should start as background processes and subscribe to the topics I want to and process those events even if a browser has never connected.

Would love to see this bug fixed.

Osmiogrzesznik commented 2 years ago

when i simply added call to mqtt.subscribe() just before socketio.run() it works.

ramanraja commented 2 years ago

@Osmiogrzesznik, mqtt.subscribe() works for me, and I am able to receive messages. But the issue is when to subscribe. I want to do it whenever a connection is established, so that even if reconnects, my subscription is renewed automatically. But the life cycle event mqtt.on_connect() is never invoked.

urbanskalar commented 1 year ago

I have the same problem.

Sohaib90 commented 1 year ago

Let me look into this. Will get back to you soon :)

Sohaib90 commented 1 year ago

I have the same problem.

@urbanskalar can you elaborate your problem?

Sohaib90 commented 1 year ago

I created a gist here: https://gist.github.com/Sohaib90/07a616c0dfe8e8d4eab24d2d60ece2b1, which follows the same pattern as https://stackoverflow.com/questions/64592277/flask-mqtt-on-connect-is-never-called-when-used-with-socketio except that I do not use async_mode='gevent', cors_allowed_origins="*"

In my gist, on line 58, I print "Connected to MQTT broker" and proceed to subscribe to a test topic and also print that to get this result: github_issues

You can see here that the call is invoked and I do get to subscribe to the test_topic and also see that it is populated in the topics attribute of the mqtt object

Unless I am doing something wrong here, shouldnt this be the expected outcome and that the callback is being invoked?

ramanraja commented 1 year ago

@Sohaib90, I see that you are subscribing to MQTT on socketio.on('subscribe') also. This is probably a client-initiated call. Can you please disable this and try again?

@socketio.on('subscribe')
def handle_subscribe(json_str):
    data = json.loads(json_str)
    mqtt.subscribe(data['topic'], data['qos'])

The only other difference I can think of is the way gevent and eventlet handle background tasks. You are monkey-patching eventlet, so this may have eliminated some underlying problem in async mode compatibility. I am only guessing here. Somebody familiar with Flask-MQTT source code can throw light on this.

urbanskalar commented 1 year ago

I have the same problem.

@urbanskalar can you elaborate your problem?

Hey sorry for late reply. I kind of moved on from this problem now, so I might give you some inaccurate info. Basically, when I first tried to implement this, I wasn't aware of the example here. I was only following the documentation. At that time I was still using app.run() instead of socketio.run(). Also my code didn't include lines eventlet.monkey_patch(), socketio = SocketIO (app) and bootstrap = Bootstrap(app). I still don't understand why it didn't worked without that. At some point I started thinking maybe I am doing something wrong.

meatherly commented 1 year ago

Call me crazy but if you init socketio before mqtt it works :shrug:

Works:

app = Flask(__name__)
socketio = SocketIO(app, logger=True, cors_allowed_origins="*")
mqtt = flask_mqtt.Mqtt(app)

Does not work:

app = Flask(__name__)
mqtt = flask_mqtt.Mqtt(app)
socketio = SocketIO(app, logger=True, cors_allowed_origins="*")

Not sure why this is working for me?

roboteer commented 1 year ago

Also ran into this issue. I had not SocketIO, but Influx between call to flask_mqtt.Mqtt(app) and on_connect callback.

Looks like any delay can mess things up:

import flask
import flask_mqtt
import time

app = flask.Flask(__name__)
app.config['MQTT_BROKER_URL'] = 'localhost'

mqtt = flask_mqtt.Mqtt(app)

time.sleep(1)  # bad, bad, bad!

@mqtt.on_connect()
def on_connect(client, userdata, flags, rc):
    print('connected?') # nope, does not print

if __name__ == '__main__':
    app.run()

My limited understanding tells me that Mqtt.__init__ manages to complete a connection before on_connect callback gets registered. Perhaps there is an argument somewhere for delayed connect - execute connect only after flask is up and running. Perhaps not. All the best.