eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.17k stars 723 forks source link

Use Python selectors instead of select #697

Open j04n-f opened 1 year ago

j04n-f commented 1 year ago

Using Python select.select(..) method limits the open connections to 340:

Python select docs encourages users to use selectors instead of select.

Would it be possible to change the select usage to use selectors? The following implementation worked for us:

import selectors

class Client:
    [...]
    def _loop(self, timeout: float = 1.0) -> int:
        if timeout < 0.0:
            raise ValueError("Invalid timeout.")

        sel = selectors.DefaultSelector()

        eventmask = selectors.EVENT_READ

        with suppress(IndexError):
            packet = self._out_packet.popleft()
            self._out_packet.appendleft(packet)
            eventmask = selectors.EVENT_WRITE | eventmask

        if self._sockpairR is None:
            sel.register(self._sock, eventmask)
        else:
            sel.register(self._sock, eventmask)
            sel.register(self._sockpairR, selectors.EVENT_READ)

        pending_bytes = 0
        if hasattr(self._sock, "pending"):
            pending_bytes = self._sock.pending()

        if pending_bytes > 0:
            timeout = 0.0

        try:
            events = sel.select(timeout)
        except TypeError:
            return int(MQTT_ERR_CONN_LOST)
        except ValueError:
            return int(MQTT_ERR_CONN_LOST)
        except Exception:
            return int(MQTT_ERR_UNKNOWN)

        socklist: list[list] = [[], []]

        for key, _event in events:
            if key.events & selectors.EVENT_READ:
                socklist[0].append(key.fileobj)

            if key.events & selectors.EVENT_WRITE:
                socklist[1].append(key.fileobj)

        if self._sock in socklist[0] or pending_bytes > 0:
            rc = self.loop_read()
            if rc or self._sock is None:
                return int(rc)

        if self._sockpairR and self._sockpairR in socklist[0]:
            socklist[1].insert(0, self._sock)

            with suppress(BlockingIOError):
                self._sockpairR.recv(10000)

        if self._sock in socklist[1]:
            rc = self.loop_write()
            if rc or self._sock is None:
                return int(rc)

        sel.close()

        return int(self.loop_misc())
lincoln310 commented 1 year ago

Should the "sel.close()" be invoked before return? And i meet a error with self._sock is None at

        if self._sockpairR is None:
            sel.register(self._sock, eventmask)
        else:
            sel.register(self._sock, eventmask)
            sel.register(self._sockpairR, selectors.EVENT_READ)
j04n-f commented 1 year ago

I tested both use cases, closing and not closing the selectors. Not closing the selectors increases CPU usage: https://docs.python.org/3/library/selectors.html#selectors.BaseSelector.close

Which error do you have with the None value?

j04n-f commented 1 year ago

Yesterday we had the same problem. The error raises when the MQTT client attempts to reconnect. The solution is quite easy. The old select method registers and selects the sockets using only one method. The new one uses two methods. Then, both methods (register and select) should be wrapped by the try-catch as follows:

import selectors

class Client:
    [...]
    def _loop(self, timeout: float = 1.0) -> int:
        if timeout < 0.0:
            raise ValueError("Invalid timeout.")

        sel = selectors.DefaultSelector()

        eventmask = selectors.EVENT_READ

        with suppress(IndexError):
            packet = self._out_packet.popleft()
            self._out_packet.appendleft(packet)
            eventmask = selectors.EVENT_WRITE | eventmask

        # used to check if there are any bytes left in the (SSL) socket
        pending_bytes = 0
        if hasattr(self._sock, "pending"):
            pending_bytes = self._sock.pending()

        # if bytes are pending do not wait in select
        if pending_bytes > 0:
            timeout = 0.0

        try:
            if self._sockpairR is None:
                sel.register(self._sock, eventmask)
            else:
                sel.register(self._sock, eventmask)
                sel.register(self._sockpairR, selectors.EVENT_READ)

            events = sel.select(timeout)

        except TypeError:
            # Socket isn't correct type, in likelihood connection is lost
            return int(MQTT_ERR_CONN_LOST)
        except ValueError:
            # Can occur if we just reconnected but rlist/wlist contain a -1 for
            # some reason.
            return int(MQTT_ERR_CONN_LOST)
        except Exception:
            # Note that KeyboardInterrupt, etc. can still terminate since they
            # are not derived from Exception
            return int(MQTT_ERR_UNKNOWN)

        socklist: list[list] = [[], []]

        for key, _event in events:
            if key.events & selectors.EVENT_READ:
                socklist[0].append(key.fileobj)

            if key.events & selectors.EVENT_WRITE:
                socklist[1].append(key.fileobj)

        if self._sock in socklist[0] or pending_bytes > 0:
            rc = self.loop_read()
            if rc or self._sock is None:
                return int(rc)

        if self._sockpairR and self._sockpairR in socklist[0]:
            # Stimulate output write even though we didn't ask for it, because
            # at that point the publish or other command wasn't present.
            socklist[1].insert(0, self._sock)
            # Clear sockpairR - only ever a single byte written.
            with suppress(BlockingIOError):
                # Read many bytes at once - this allows up to 10000 calls to
                # publish() inbetween calls to loop().
                self._sockpairR.recv(10000)

        if self._sock in socklist[1]:
            rc = self.loop_write()
            if rc or self._sock is None:
                return int(rc)

        sel.close()

        return int(self.loop_misc())
JXingK commented 1 year ago

Thank you very much, it solves my problem.

j04n-f commented 1 year ago

Any update? May I open a PR?