wialon / gmqtt

Python MQTT v5.0 async client
MIT License
400 stars 52 forks source link

on_message callback is triggered when sending messages. #106

Open algsupport opened 4 years ago

algsupport commented 4 years ago

Hello, Thank you for this great project. It's very nice. I noticed an issue. Please let me know if this is intentional or if there is bug somewhere.

Consider this code:

import asyncio
import os
import signal
import time
import ssl

from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('TEST/#', qos=0)

def on_message(client, topic, payload, qos, properties):
    print('[RECV MSG {}] TOPIC: {} PAYLOAD: {} QOS: {} PROPERTIES: {}'.format(client._client_id, topic, payload, qos, properties))

def on_disconnect(client, packet, exc=None):
    print('Disconnected')

def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')

async def main(broker_host):
    client = MQTTClient("code_client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    ssl_context.verify_mode = ssl.CERT_NONE

    client.set_auth_credentials("usename", "password")
    await client.connect(broker_host,8883,ssl_context)

    client.publish('TEST/TIME', "message from python code", qos=1)

    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    host = 'mymqttbroker.com'
    loop.run_until_complete(main(host))

the above code provides the following output:

Connected
SUBSCRIBED
[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from python code' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from other client' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}

As you can see the on_message callback was triggered both when sending and receiving a message. Thus making it impossible to distinguish who sent the message (server or client?).

Thank you for your time.

dexif commented 4 years ago

As I can see, you subscribed to the "test/#" wildcard topic. Then you send a message from this client and receive it from the broker by this client. After that, you send another message from another mqtt client, and also receive a message using this client. It works correctly.

If you do not want to receive messages sent by this client, you can use the “no_local = true” flag when subscribing to a topic. After that, you will begin to receive only those messages that are sent only by other clients. But it works only with mqtt5 also it depends on broker.

algsupport commented 4 years ago

Thank you this helped a lot.

algsupport commented 4 years ago

One note on this matter. I noticed that if I create subscriptions with the subscription class, if I don't set the no_local = true at the time of creation and set it when I actually subscribe, the no_local = true is ignored.

Lenka42 commented 4 years ago

@teucrus could you please provide example of code, how are you doing the subscription?

algsupport commented 4 years ago
import ssl
import asyncio

from gmqtt import Client as MQTTClient
from gmqtt import Subscription

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    # This is OK
    client.subscribe('test/a', qos=0, no_local=True)
    # This is also OK
    b=[Subscription("test/b",qos=0, no_local=True)]
    client.subscribe(b)
    #This will not disable local messages
    c=[Subscription("test/c",qos=0)]
    client.subscribe(c, no_local=True)

def on_message(client, topic, payload, qos, properties):
    print('[RECV MSG {}] TOPIC: {} PAYLOAD: {} QOS: {} PROPERTIES: {}'.format(client._client_id, topic, payload, qos, properties))

def on_disconnect(client, packet, exc=None):
    print('Disconnected')

def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')

async def main(broker_host):
    client = MQTTClient("code_client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    ssl_context.verify_mode = ssl.CERT_NONE

    client.set_auth_credentials("username", "password")
    await client.connect(broker_host,8883,ssl_context)

    client.publish('test/a', "testA", qos=0)
    client.publish('test/b', "testB", qos=0)
    client.publish('test/c', "testC", qos=0)

    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    host = 'mqtt.myhost.com'
    loop.run_until_complete(main(host))

There you go.

Lenka42 commented 4 years ago

@teucrus yeeah, my idea was if you use Subscription object, you should pass all the arguments into this object, and all the arguments from subscribe method is ignored. I guess I should do this 'ignoring' more explicit, or update subscriptions with this arguments, I'll think about it, thanks for noticing :+1: