flyte / mqtt-io

Expose GPIO modules (Raspberry Pi, Beaglebone, PCF8754, PiFace2 etc.) and digital sensors (LM75 etc.) to an MQTT server for remote control and monitoring.
MIT License
461 stars 157 forks source link

Always remove finished transient_tasks. #301

Closed gmsoft-tuxicoman closed 1 year ago

gmsoft-tuxicoman commented 1 year ago

The original code only remove tasks from self.transient_tasks when an exception occurs.

This lead to a high CPU scenario when the for loop checks all the tasks for their done status as the task list keeps growing forever.

This should fixes #236.

BenjiU commented 1 year ago

It also works on my side.

I've a question to your change: you remove all tasks that are 'done', the "exeption print" will not work any longer, right?

    async def _remove_finished_transient_tasks(self) -> None:
        while True:
            await asyncio.sleep(1)
            finished_tasks = [x for x in self.transient_tasks if x.done()]
            if not finished_tasks:
                continue
==> Your fix will remove all tasks that are "done" even the one with exceptions.
            for task in finished_tasks:
                self.transient_tasks.remove(task)
<==
            results = await asyncio.gather(*finished_tasks, return_exceptions=True)
            for i, exception in [
                x for x in enumerate(results) if isinstance(x[1], Exception)
                    raise exception
                except Exception:  # pylint: disable=broad-except
                    _LOG.exception("Exception in task: %r:", task)
==> you removed
                finally:
                    self.transient_tasks.remove(task)
<== 

So we should first print the exceptions and then remove all tasks?

    async def _remove_finished_transient_tasks(self) -> None:
        while True:
            await asyncio.sleep(1)
            finished_tasks = [x for x in self.transient_tasks if x.done()]
            if not finished_tasks:
                continue
            results = await asyncio.gather(*finished_tasks, return_exceptions=True)
            for i, exception in [
                x for x in enumerate(results) if isinstance(x[1], Exception)
                    raise exception
                except Exception:  # pylint: disable=broad-except
                    _LOG.exception("Exception in task: %r:", task)
==> Your fix will remove all tasks after the exceptions
            for task in finished_tasks:
                self.transient_tasks.remove(task)
<==

Am I right?

gmsoft-tuxicoman commented 1 year ago

self.transient_tasks.remove(task) only remove the reference to the tasks from the transient_tasks list. Eventually the finished tasks are still referenced in the finished_tasks list so the exception part will still work since it doesn't reference self.transient_tasks.