luxonis / depthai-core

DepthAI C++ Library
MIT License
235 stars 127 forks source link

Script MQTT subscription - extracting payload for further processing #892

Closed mallyagirish closed 1 year ago

mallyagirish commented 1 year ago

Creating an issue based on the response in the discussion forum.

I have the OAK-D Pro PoE and need to use MQTT-based communication. I am on Windows 11 and using the Python API of depthai (v2.22.0.0). I can successfully publish from camera based on this example.

However, I'm facing a strange issue when trying communication the other way i.e., getting messages to the camera via MQTT. I can successfully subscribe to a topic in a script and see messages correctly coming in inside the on_message callback. However, I can't get the payload value to the surrounding context, which I have tried using a global variable. A minimum reproducible example code below. The global keyword doesn't seem to do anything in on_message as the value of msg inside the while loop of the script is simply the initialised blank value.

import depthai as dai
import time

pipeline = dai.Pipeline()

script = pipeline.createScript()
script.setProcessor(dai.ProcessorType.LEON_CSS)

# Change the IP to your MQTT broker!
MQTT_BROKER = "192.168.0.38"
MQTT_BROKER_PORT = 1883
MQTT_TOPIC = "oak_d/detection/m"
script_text = f"""
    import time

    msg = ''

    def on_message(client, userdata, message, tmp=None):
        global msg
        msg = message.payload.decode('utf8')
        node.warn(' In on_message, message: ' + msg)

    mqttc = Client()
    node.warn('Connecting to MQTT broker...')
    mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
    node.warn('Successfully connected to MQTT broker!')
    mqttc.subscribe("{MQTT_TOPIC}", 2);
    mqttc.on_message = on_message
    mqttc.loop_start()

    while True:
        time.sleep(1)
        node.warn(' In main loop, message: ' + msg)
"""

with open("paho-mqtt.py", "r") as f:
    paho_script = f.read()
    script.setScript(f"{paho_script}\n{script_text}")

with dai.Device(pipeline) as device:
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            break

Output of running this code:

image

moratom commented 1 year ago

Hi @mallyagirish, thanks for reporting the issue and the MRE.

I can reproduce it and will be investigating a bit why the global variables don't work as expected in this case, because they do work with simple examples, which is strange.

For your specific usecase though, there is a simple workaround:

import depthai as dai
import time

pipeline = dai.Pipeline()

script = pipeline.createScript()
script.setProcessor(dai.ProcessorType.LEON_CSS)

# Change the IP to your MQTT broker!
MQTT_BROKER = "192.168.108.112"
MQTT_BROKER_PORT = 12345
MQTT_TOPIC = "oak_d/detection/m"
script_text = f"""
    import time
    msg = ''
    class Test:
        def __init__(self):
            self.msg = ''
        def on_message(self, client, userdata, message, tmp=None):
            self.msg = message.payload.decode('utf8')
            node.warn("In on_message, message: " + str(self.msg))

    test = Test()
    # Create a lambda that will call the test.on_message method
    on_message = lambda client, userdata, message, test=test: test.on_message(client, userdata, message)

    mqttc = Client()
    node.warn('Connecting to MQTT broker...')
    mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
    node.warn('Successfully connected to MQTT broker!')
    mqttc.subscribe("{MQTT_TOPIC}", 2);
    mqttc.on_message = on_message
    mqttc.loop_start()
    while True:
        time.sleep(1)
        msg = test.msg
        node.warn(' In main loop, message: ' + msg)
"""

with open("paho-mqtt.py", "r") as f:
    paho_script = f.read()
    script.setScript(f"{paho_script}\n{script_text}")

with dai.Device(pipeline) as device:
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            break 

I tested it and it works as expected, but don't hesitate to ask if you have any issues with it.

moratom commented 1 year ago

Ha, I found the issue!

The global variables were not really global, because of indentation... the whole script became a part of the last class in paho-mqtt.py.

So the assembled script looked like this:

.
.
.
class WebsocketWrapper(object):
    OPCODE_CONTINUATION = 0x0
    OPCODE_TEXT = 0x1
    OPCODE_BINARY = 0x2
    OPCODE_CONNCLOSE = 0x8
    OPCODE_PING = 0x9
    OPCODE_PONG = 0xA

    def __init__(self, socket, host, port, is_ssl, path, extra_headers):

        self.connected = False

        self._ssl = is_ssl
        self._host = host
        self._port = port
        self._socket = socket
        self._path = path

        self._sendbuffer = bytearray()
        self._readbuffer = bytearray()

        self._requested_size = 0
        self._payload_head = 0
        self._readbuffer_head = 0

        self._do_handshake(extra_headers)

    def __del__(self):

        self._sendbuffer = None
        self._readbuffer = None
        .
        .
        .
        mqttc = Client()
        mqttc = Client()
        node.warn('Connecting to MQTT broker...')
        mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
        node.warn('Successfully connected to MQTT broker!')
        mqttc.subscribe("{MQTT_TOPIC}", 2);
        mqttc.on_message = on_message
        mqttc.loop_start()
       .
       .
       .
       .

Removing the indentation in your original script works as expeced:

script_text = f"""
import time

msg = 'original'

def on_message(client, userdata, message, tmp=None):
    global msg
    node.warn('Msg before is ' + msg)
    msg = message.payload.decode('utf8')
    node.warn(' In on_message, message: ' + msg)

def test_global():
    global msg
    msg = 'test_global'

mqttc = Client()
node.warn('Connecting to MQTT broker...')
mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
node.warn('Successfully connected to MQTT broker!')
mqttc.subscribe("{MQTT_TOPIC}", 2);
mqttc.on_message = on_message
mqttc.loop_start()

while True:
    time.sleep(1)
    node.warn(' In main loop, message: ' + msg)
"""
mallyagirish commented 1 year ago

Ha, I found the issue!

The global variables were not really global, because of indentation... the whole script became a part of the last class in paho-mqtt.py.

So the assembled script looked like this:

.
.
.
class WebsocketWrapper(object):
    OPCODE_CONTINUATION = 0x0
    OPCODE_TEXT = 0x1
    OPCODE_BINARY = 0x2
    OPCODE_CONNCLOSE = 0x8
    OPCODE_PING = 0x9
    OPCODE_PONG = 0xA

    def __init__(self, socket, host, port, is_ssl, path, extra_headers):

        self.connected = False

        self._ssl = is_ssl
        self._host = host
        self._port = port
        self._socket = socket
        self._path = path

        self._sendbuffer = bytearray()
        self._readbuffer = bytearray()

        self._requested_size = 0
        self._payload_head = 0
        self._readbuffer_head = 0

        self._do_handshake(extra_headers)

    def __del__(self):

        self._sendbuffer = None
        self._readbuffer = None
        .
        .
        .
        mqttc = Client()
        mqttc = Client()
        node.warn('Connecting to MQTT broker...')
        mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
        node.warn('Successfully connected to MQTT broker!')
        mqttc.subscribe("{MQTT_TOPIC}", 2);
        mqttc.on_message = on_message
        mqttc.loop_start()
       .
       .
       .
       .

Removing the indentation in your original script works as expeced:

script_text = f"""
import time

msg = 'original'

def on_message(client, userdata, message, tmp=None):
    global msg
    node.warn('Msg before is ' + msg)
    msg = message.payload.decode('utf8')
    node.warn(' In on_message, message: ' + msg)

def test_global():
    global msg
    msg = 'test_global'

mqttc = Client()
node.warn('Connecting to MQTT broker...')
mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
node.warn('Successfully connected to MQTT broker!')
mqttc.subscribe("{MQTT_TOPIC}", 2);
mqttc.on_message = on_message
mqttc.loop_start()

while True:
    time.sleep(1)
    node.warn(' In main loop, message: ' + msg)
"""

Oh gosh! Such a silly but subtle one. When first learning Python many years ago, I knew that its indentation-based scoping/grouping would get me someday. It finally happened!

Thanks so much for your help!