tcalmant / ipopo

iPOPO: a Service-Oriented Component Model for Python
https://ipopo.readthedocs.io/
Apache License 2.0
69 stars 28 forks source link

Always active Publisher bundle question #106

Closed acutaia closed 5 years ago

acutaia commented 5 years ago

Hi, i'm developing an IoT application.I'm using the EventAdmin service. Inside the Publisher i have a sensor that is sending information, so the Publisher is always active. The Handler has to take the data from the publisher and decide what to do, for example request a Remote instance using the RSA . My problem is that the publisher is always active , so when the handler tries to import the Remote service,it doesn't do anything. I know that bundles have to end fast their job, is there some way to bypass this problem? Thank you for your attention. Angelo

tcalmant commented 5 years ago

By "the publisher is always active", do you mean that it doesn't return in the start call to the bundle activator (or the @Validate method of the iPOPO component) ? If it is the case, you just have to make the active code run in a new thread (using threading.Thread).

acutaia commented 5 years ago

Yes, this is the code :

from gateway import Usb300DB
from EnoceanDevices import EnoceanDevices
from enocean.protocol.packet import Packet
import traceback
import sys

# iPOPO
from pelix.ipopo.decorators import *
import pelix.ipopo.constants as constants

# EventAdmin constants
import pelix.services
@ComponentFactory('publisher-factory')
# Require the EventAdmin service
@Requires('_event', pelix.services.SERVICE_EVENT_ADMIN)
# Inject our component name in a field
#@Property('_enocean', constants.IPOPO_INSTANCE_NAME)
# Auto-instantiation
@Instantiate('publisher')
class Publisher(object):
  """
  A sample publisher
  """
  def __init__(self):
      """
      Set up members, to be OK with PEP-8
      """
      # EventAdmin (injected)
      self._event = None

      # Component name (injected property)
      #self._name = None

  @Validate
  def validate(self, context):

      """
      Component validated
      """
      try:
          import queue
      except ImportError:
          import Queue as queue

      devices = EnoceanDevices()
      gateway = Usb300DB()
      gateway.start_gateway()
      while gateway.is_alive():
          try:
              receivedPacket = gateway.packet()
              if devices.packet(receivedPacket):
                  self._event.post(devices.url,devices.properties)

          except queue.Empty:
              continue
          except KeyboardInterrupt:
              break
          except Exception:
              traceback.print_exc(file=sys.stdout)
              break

      if gateway.is_alive():
          gateway.stop()

what do you mean bye using threading.Thread? can you give me an example? Thank's for your answer

tcalmant commented 5 years ago

In your code, the @Validate method won't return while the gateway is alive, which will block the whole framework.

Here is a version that should work:

from gateway import Usb300DB
from EnoceanDevices import EnoceanDevices
from enocean.protocol.packet import Packet
import traceback
import sys

# Import threading from the standard library
import threading

# Import queue once and for all
try:
    import queue
except ImportError:
    import Queue as queue

# iPOPO
from pelix.ipopo.decorators import *
import pelix.ipopo.constants as constants

# EventAdmin constants
import pelix.services

@ComponentFactory("publisher-factory")
# Require the EventAdmin service
@Requires("_event", pelix.services.SERVICE_EVENT_ADMIN)
# Inject our component name in a field
# @Property('_enocean', constants.IPOPO_INSTANCE_NAME)
# Auto-instantiation
@Instantiate("publisher")
class Publisher(object):
    """
    A sample publisher
    """

    def __init__(self):
        """
        Set up members, to be OK with PEP-8
        """
        # EventAdmin (injected)
        self._event = None

        # Keep track of the thread
        self._thread = None

        # We need an Event object to be notified of the order to stop
        self._stop_event = threading.Event()

        # Component name (injected property)
        # self._name = None

    def gateway_loop(self):
        """
        This method is executed in a separate thread
        """
        devices = EnoceanDevices()
        gateway = Usb300DB()
        gateway.start_gateway()

        # Here we both check if the gateway is still alive and if the stop
        # event is still off 
        while gateway.is_alive() and not self._stop_event.is_set():
            try:
                receivedPacket = gateway.packet()
                if devices.packet(receivedPacket):
                    self._event.post(devices.url, devices.properties)
            except queue.Empty:
                continue
            except KeyboardInterrupt:
                break
            except Exception:
                traceback.print_exc(file=sys.stdout)
                break

        if gateway.is_alive():
            gateway.stop()

    @Validate
    def validate(self, context):
        """
        Component validated
        """
        # Ensure we don't have the stop event already set
        self._stop_event.clear()

        # Start the gateway handling in a separate thread
        self._thread = threading.Thread(target=self.gateway_loop)
        self._thread.start()

    @Invalidate
    def invalidate(self, context):
        """
        Component invalidated
        """
        # Signal the thread to stop
        self._stop_event.set()

        # Wait for the thread to fully stop
        self._thread.join()

        # Clean up
        self._thread = None

With this snippet:

acutaia commented 5 years ago

Thank you, it works perfectly