Xinne / osc4py3

31 stars 6 forks source link

Please fix `as_comthreads` #13

Open vivi90 opened 2 years ago

vivi90 commented 2 years ago

as_comthreads stopped working since v1.0.4.

Downgrade to v1.0.3 works:

But then i have a lot of other issues like incomplete system shutdown (makes my unittests unusable):

So please fix it in the next PyPi release 🙂

vivi90 commented 2 years ago

For everyone, who need's to work with the v1.0.3 because of as_comthreads.. ..here is an very hacky workaround:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: AGPL-3.0-or-later

"""Shutdown workaround for ``osc4py3``."""

from types import ModuleType
import osc4py3.as_comthreads as sys
import osc4py3.oscdispatching as disp

def wait_for_shutdown(backend: ModuleType):
    """Wait until ``osc4py3`` is shut down completely.

    Args:
        backend (ModuleType):
            Backend module.

    """
    def is_terminated(backend: ModuleType) -> bool:
        """Return whether ``osc4py3`` is shutting down.

        Args:
            backend (ModuleType):
                Backend module.

        Returns:
            bool:
                ``True`` if is shutting down, otherwise ``False``.

        """
        terminated = False
        try:
            backend.globdisp()
        except RuntimeError:
            terminated = True
        finally:
            return terminated  # pylint: disable=lost-exception
    while not is_terminated(backend):
        pass
    while len(backend.all_dispatchers) > 0:
        for dispacher in tuple(backend.all_dispatchers.values()):
            dispacher.terminate()

def test() -> None:
    """Test."""
    sys.osc_startup()
    sys.osc_terminate()

test()
wait_for_shutdown(disp)  # Waits here for complete shutdown, before continue
test()