JoelBender / bacpypes

BACpypes provides a BACnet application layer and network layer written in Python for daemons, scripting, and graphical interfaces.
MIT License
299 stars 128 forks source link

Asynchronous iocb #333

Closed BenoistM closed 4 years ago

BenoistM commented 4 years ago

Hi,

I try to read values in bacnet devices and to do this, I created a bacnet service which manage all the queries. I also set a timeout on iocb and here is the problem: When I set the timeout very low (like 0.001s), the iocb has no response and an error is raised. However, sometimes, the response comes for the next iteration and my values are wrong.

Example: QUERY 1: read_data queue task: 192.168.x.x analogInput 9 presentValue RESULT 1: Error: impossible to read value (timeout) QUERY 2: read_data queue task: 192.168.x.x analogInput 10 presentValue answer to client: 28.0 (this is the result of the query 1).

I think that I understand why (the device response is asynchronous) but do you know a solution to this issue? I checked the iocb IDs, i also put an iocb.abort() (which is include in set_timeout) but nothing change...

Here is my code: ` class BACPropertyMaster(): def init(self, ini_obj):

    # make a device object
    this_device = LocalDeviceObject(ini=ini_obj)

    # make a simple application
    self.app = BIPSimpleApplication(this_device, ini_obj.address)

    # launch the core lib
    self.core = Thread(target = run, daemon=True)
    self.core.start()

    # Wait the lib launching
    time.sleep(1)

def do_read(self, ip, obj_type, obj_inst, prop_id):
    if obj_type.isdigit():
        obj_type = int(obj_type)
    elif not get_object_class(obj_type):
        raise ValueError("unknown object type")

    obj_inst = int(obj_inst)

    datatype = get_datatype(obj_type, prop_id)
    if not datatype:
        raise ValueError("invalid property for object type")

    # build a request
    request = ReadPropertyRequest(
        objectIdentifier=(obj_type, obj_inst),
        propertyIdentifier=prop_id,
        )
    request.pduDestination = Address(ip)

    # make an IOCB
    iocb = IOCB(request)
    # Define a timeout (delay in sec)
    iocb.set_timeout(0.1, err=TimeoutError)

    # give it to the application
    self.app.request_io(iocb)

    # wait for it to complete
    iocb.wait()

    # do something for error/reject/abort
    if iocb.ioError:
        raise (iocb.ioError)

    # do something for success
    elif iocb.ioResponse:
        apdu = iocb.ioResponse

        # should be an ack
        if not isinstance(apdu, ReadPropertyACK):
            return

        # find the datatype
        datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
        if not datatype:
            raise TypeError("unknown datatype")

        # special case for array parts, others are managed by cast_out
        if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
            if apdu.propertyArrayIndex == 0:
                value = apdu.propertyValue.cast_out(Unsigned)
            else:
                value = apdu.propertyValue.cast_out(datatype.subtype)
        else:
            value = apdu.propertyValue.cast_out(datatype)
        return value

    # do something with nothing?
    else:
        return None

`

Best regards,

Benoist

JoelBender commented 4 years ago

When the request goes out it is given a unique apduInvokeID and the response contains the same value. This is lined up with the iocb in the ApplicationIOController. Please run the application again with --debug bacpypes.app.ApplicationIOController and look for the _app_complete() function call. My guess is that when the reponse comes back in one thread that it doesn't realize that it has been marked as timeout in the other thread, so it doesn't get removed from the queue_by_address.

If that's the case, this is going to be a bit of a mess because there are no locks/semaphores around the IO queues.

BenoistM commented 4 years ago

Hi Joel, Thank you for your quick answer. Sorry for my incompetence but how can I get the apduInvokeID? I never used the debugging tool. What do I need to add in my code to get it? I also don't understand why the response comes back in a different thread? I only use single thread didn't I?

JoelBender commented 4 years ago

This line self.core = Thread(target = run, daemon=True) starts the core.run() in a separate daemon thread. Changing your application to run without threads means attaching call backs to the IOCB's rather than calling wait() which suspends the current thread until the event gets set.

BenoistM commented 4 years ago

My bad, I have 2 thread: the main and the Bacnet Application. I need to keep these two differents thread in my program to manage the bacnet readings/writings. I tried today to change wait() by add_callback() function but I think add_callback create another thread isnt it? I was not really obvious to get the response in the main thread. However, I think I found a solution: The wait() function seems to be the good one (the thread wait until the iocb complete). I only added this after the app.request: iocb_id_send=iocb.ioID % 256 and in the iocb.ioResponse, I checked the apduInvokeID:

if iocb.ioError:
            raise (iocb.ioError)
        # do something for success
        elif iocb.ioResponse:
            apdu = iocb.ioResponse
            iocb_id_response=apdu.apduInvokeID
            if (iocb_id_send!=iocb_id_response):
                time.sleep(0.5)
                raise Exception ('Wrong response ID')

I wait 0.5s in order to avoid the shift of all the next requests/answers. I don't think it's the best way to solve my issue but it seems to works. Is it possible however to check the apduInvokeID instead of ioID? Maybe the correspondance between the two ids (%256) is just a coincidence...

Regards,

ChristianTremblay commented 4 years ago

Working with thread, I always deffer all traffic to bacpypes thread.

This prevent all collisions and weird issues.

Le ven. 10 avr. 2020 à 12:02, BenoistM notifications@github.com a écrit :

My bad, I have 2 thread: the main and the Bacnet Application. I need to keep these two differents thread in my program to manage the bacnet readings/writings. I tried today to change wait() by add_callback() function but I think add_callback create another thread isnt it? I was not really obvious to get the response in the main thread. However, I think I found a solution: The wait() function seems to be the good one (the thread wait until the iocb complete). I only added this after the app.request: iocb_id_send=iocb.ioID % 256 and in the iocb.ioResponse, I checked the apduInvokeID:

` if iocb.ioError: raise (iocb.ioError)

do something for success

elif iocb.ioResponse: apdu = iocb.ioResponse iocb_id_response=apdu.apduInvokeID

    if (iocb_id_send!=iocb_id_response):
        time.sleep(0.5)
        raise Exception ('Wrong response ID')

` I wait 0.5s in order to avoid the shift of all the next requests/answers. I don't think it's the best way to solve my issue but it seems to works. Is it possible however to check the apduInvokeID instead of ioID? Maybe the correspondance between the two ids (%256) is just a coincidence...

Regards,

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/JoelBender/bacpypes/issues/333#issuecomment-612096015, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABQUXB26O45MARKKP3DU3ATRL47HVANCNFSM4MECPAPQ .

BenoistM commented 4 years ago

Working with thread, I always deffer all traffic to bacpypes thread. This prevent all collisions and weird issues. Le ven. 10 avr. 2020 à 12:02, BenoistM notifications@github.com a écrit :

How do you do this? When I run the app core inside the main thread, my program get stuck at the run() function.

JoelBender commented 4 years ago

Keep most of the application the way it is, just change this:

    # give it to the application
    self.app.request_io(iocb)

to this:

    # give it to the application
    deferred(self.app.request, iocb)

So what's happening? The "deferred" function call adds a entry to the list of functions that are called in the run() loop, so all of the operations that it does, like putting it into queues and such, are all done in the BACpypes thread. You are turning the entire iocb over and saying "I'm not touching this until you think you're done with it, and I'll wait for the event."

You'll need to add from bacpypes.core import deferred where you imported run().

BenoistM commented 4 years ago

Yes, I think that is what I need. Do I have to keep the run() in a new thread in the init? (self.core = Thread(target = run, daemon=True)) I tried without and I have an error: No task manager If I keep the run() in the thread, i can't have an answer from the iocb (the iocb.ioError timeout is raised each time). Maybe the response is not iocb.ioResponse anymore?

JoelBender commented 4 years ago

I usually have the BACpypes thread as the main thread and the others as workers. I put together a gist of a bunch of different ways of handling main/other threads. The really short timeouts may be interfering with the expectation that the ApplicationIOController "owns" the iocb for its lifetime and short circuiting the APDU timeouts is causing headaches.

ChristianTremblay commented 4 years ago

Consider this fixed. If help still needed, please reopen

MitchCayCRC commented 3 years ago

Keep most of the application the way it is, just change this:

    # give it to the application
    self.app.request_io(iocb)

to this:

    # give it to the application
    deferred(self.app.request, iocb)

So what's happening? The "deferred" function call adds a entry to the list of functions that are called in the run() loop, so all of the operations that it does, like putting it into queues and such, are all done in the BACpypes thread. You are turning the entire iocb over and saying "I'm not touching this until you think you're done with it, and I'll wait for the event."

You'll need to add from bacpypes.core import deferred where you imported run().

I believe I am seeing a similar issue, I am wondering if deferred(self.app.request, iocb) was intended to be deferred(self.app.request_io, iocb)?