eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.17k stars 723 forks source link

mqtt not open thread 340 #662

Closed jiajintao2021 closed 9 months ago

jiajintao2021 commented 2 years ago

It's been too long since the previous question Read some problems, also read some solutions。 No solution was found, it seems that the library does not intend to solve this problem。 A maximum of 340 links can be opened under a single process。 example: loop_start() If you maintain the thread yourself, you can open more links example: loop_forever() In this way, you can open about 1000 links, and then through the process, you can add more。1000 * CpuCount

jiajintao2021 commented 2 years ago

https://github.com/concurrencylabs/mqtt-locust/issues/3 https://github.com/eclipse/paho.mqtt.python/issues/238

jiajintao2021 commented 2 years ago

183

238

j04n-f commented 1 year ago

Maybe it's too late but replacing the select(..) method used on the _loop(..) method worked for us. You can override the method to use the Python selectors package instead of select. Selectors choose the best fit for your OS.

import selectors
from paho.mqtt.client import Client

class MQTTClient(Client): 
[...]
    def _loop(self, timeout: float = 1.0) -> int:
        if timeout < 0.0:
            raise ValueError("Invalid timeout.")

        sel = selectors.DefaultSelector()

        eventmask = selectors.EVENT_READ

        with suppress(IndexError):
            packet = self._out_packet.popleft()
            self._out_packet.appendleft(packet)
            eventmask = selectors.EVENT_WRITE | eventmask

        if self._sockpairR is None:
            sel.register(self._sock, eventmask)
        else:
            sel.register(self._sock, eventmask)
            sel.register(self._sockpairR, selectors.EVENT_READ)

        pending_bytes = 0
        if hasattr(self._sock, "pending"):
            pending_bytes = self._sock.pending()

        if pending_bytes > 0:
            timeout = 0.0

        try:
            events = sel.select(timeout)

        except TypeError:
            return int(MQTT_ERR_CONN_LOST)
        except ValueError:
            return int(MQTT_ERR_CONN_LOST)
        except Exception:
            return int(MQTT_ERR_UNKNOWN)

        socklist: list[list] = [[], []]

        for key, _event in events:
            if key.events & selectors.EVENT_READ:
                socklist[0].append(key.fileobj)

            if key.events & selectors.EVENT_WRITE:
                socklist[1].append(key.fileobj)

        if self._sock in socklist[0] or pending_bytes > 0:
            rc = self.loop_read()
            if rc or self._sock is None:
                return int(rc)

        if self._sockpairR and self._sockpairR in socklist[0]:
            with suppress(BlockingIOError):
                self._sockpairR.recv(10000)

        if self._sock in socklist[1]:
            rc = self.loop_write()
            if rc or self._sock is None:
                return int(rc)

        sel.close()

        return int(self.loop_misc())
MattBrittan commented 9 months ago

I'm going to close this as a duplicate (#183 & #238 and a few others). This is part of a general project to clean-up issues (which should make it simpler to identify priorities going forward).