sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
392 stars 71 forks source link

Operation timed out #306

Closed lin-goo closed 3 days ago

lin-goo commented 4 weeks ago

Hi, I tested the simplest example in the documentation, connecting to my locally started emqx, and then sending and receiving data, but I get errors on both ends! And with the same configuration, I'm using the paho package and it's connecting and sending/receiving data just fine.

publisher.py

import asyncio

import aiomqtt

config = {
    "hostname": "127.0.0.1",
    "port": 1883,
    "username": "DMSERVER",
    "password": "DMSERVER",
    "qos": 2,
    "topic": "diamond/info",
}

async def publisher(body: dict):
    topic = config.pop('topic', '')
    qos = config.pop('qos')
    async with aiomqtt.Client(
        username=config['username'],
        password=config['password'],
        hostname=config['hostname'],
        port=config['port'],
    ) as client:
        await client.publish(topic=topic, qos=qos, payload=json.dumps(body))

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    for i in range(10):
        loop.run_until_complete(publisher({'id': i}))

subscriber.py

import json
import asyncio
import aiomqtt

config = {
    "hostname": "localhost",
    "port": 1883,
    "username": "DMSERVER",
    "password": "DMSERVER",
    "qos": 2,
    "topic": "diamond/info",
}

async def subscriber():
    topic = config.pop('topic', '')
    qos = config.pop('qos')
    while True:
        async with aiomqtt.Client(
            username=config['username'],
            password=config['password'],
            hostname=config['hostname'],
            port=config['port'],
        ) as client:
            await client.subscribe(topic=topic, qos=qos)
            async for message in client.messages:
                print(type(message.payload), message.payload)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(subscriber())

error message

Exception in callback AbstractEventLoop.add_reader(732, <function Cli...001F534C15580>)
handle: <Handle AbstractEventLoop.add_reader(732, <function Cli...001F534C15580>)>
Traceback (most recent call last):
  File "C:\Python\Lib\asyncio\events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Python\Lib\asyncio\events.py", line 534, in add_reader
    raise NotImplementedError
NotImplementedError
Exception in callback AbstractEventLoop.add_writer(<socket.socke..., 1883, 0, 0)>, <function Cli...001F534C16D40>)
handle: <Handle AbstractEventLoop.add_writer(<socket.socke..., 1883, 0, 0)>, <function Cli...001F534C16D40>)>
Traceback (most recent call last):
  File "C:\Python\Lib\asyncio\events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Python\Lib\asyncio\events.py", line 540, in add_writer
    raise NotImplementedError
NotImplementedError
Traceback (most recent call last):
  File "C:\Projects\kidinsight-demos\emqtt\aio\sub.py", line 51, in <module>
    loop.run_until_complete(subscriber())
  File "C:\Python\Lib\asyncio\base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Projects\kidinsight-demos\emqtt\aio\sub.py", line 38, in subscriber
    async with aiomqtt.Client(
  File "C:\Virtualenvs\kidinsight-demos\Lib\site-packages\aiomqtt\client.py", line 714, in __aenter__
    await self._wait_for(self._connected, timeout=None)
  File "C:\Virtualenvs\kidinsight-demos\Lib\site-packages\aiomqtt\client.py", line 493, in _wait_for
    raise MqttError(msg) from None
aiomqtt.exceptions.MqttError: Operation timed out

Refs: https://github.com/sbtinstruments/aiomqtt/issues/34

@frederikaalund @empicano

empicano commented 4 weeks ago

Hi there, please double check that you are using an asyncio event loop that supports the add_reader method.

empicano commented 3 days ago

I'll tentatively close this as there have been no further responses. If it is still a problem, please feel free to reopen! 🙂