fermi-ad / acsys-python

Python module to access the Fermilab Control System
MIT License
8 stars 4 forks source link

Trying to implement restore settings on Exception #34

Closed kjhazelwood closed 3 years ago

kjhazelwood commented 3 years ago

I'm writing a script that collects readings, makes settings and upon exiting the script or exception it attempts to restore the intitial setting. I feel like the code below should work, but when the outermost finally block executes and tries to restore, it says the the current DPMContext no longer has an entry for the specified tags [17 -37]. I'm guessing this is because the DPMContext is now dead because an exception was raised in the async dpm reply loop. I've been able to create the desired functionallity by having the finally block create a new DPMContext and then perform the settings but it is unreliable as a good portion of the time the attempt to create another DPMContext so soon after the first one died I get either a [1 -34] or [1-6] timeout or busy error.

Any thoughts how to implement restore on exception reliably?

async def run(connection):
    async with acsys.dpm.DPMContext(connection, dpm_node=None) as dpm:
        try:
            await dpm.enable_settings(role="testing")

            for i, device in enumerate(self.devices):
                await dpm.add_entry(i, "%s.SETTING@%s" % (device["name"], self.iteration_event))

            await dpm.start()

            iteration = 0

            async for reply in dpm:
                if isinstance(reply, ItemData):
                    logger.debug("%s %d %s: %.4f" % (reply.stamp, reply.tag, reply.meta["name"], reply.data))

                    #Calculate new bounded random setting
                    current_device_setting = reply.data
                    new_device_setting = random.random()*(self.devices[reply.tag]["max"]-self.devices[reply.tag]["min"]) + self.devices[reply.tag]["min"]

                    #Fill array of device initial settings for restore, once filled it stays filled
                    if np.isnan(self.devices_settings[reply.tag]["initial"]):
                        self.devices_settings[reply.tag]["initial"] = current_device_setting

                    #Fill array of device current settings for logging current readings, cleared after each bulk setting
                    if np.isnan(self.devices_settings[reply.tag]["current"]):
                        self.devices_settings[reply.tag]["current"] = current_device_setting

                    #Fill array of device new settings for bulk setting of all devices at once, cleared after each bulk setting
                    if np.isnan(self.devices_settings[reply.tag]["new"]):
                        self.devices_settings[reply.tag]["new"] = new_device_setting

                    #Send new setting
                    if not np.isnan(np.asarray([device_settings["new"] for device_settings in self.devices_settings])).any():
                        if self.iteration_count != None and iteration >= self.iteration_count:
                            raise Exception("Desired iteration count met")

                        settings = []

                        for i, device_settings in enumerate(self.devices_settings):
                            settings.append((i, device_settings["new"]))

                        await dpm.apply_settings(settings)

                        logger.info(self.devices_settings)

                        for i, device_settings in enumerate(self.devices_settings):
                            device_settings["current"] = np.nan
                            device_settings["new"] = np.nan

                        iteration += 1

                elif isinstance(reply, ItemStatus):
                    if reply.status.isFatal:
                        logger.crtical(reply)
                        raise Exception("Fatal status returned from ACNET")
                    elif reply.status.isWarning:
                        logger.warning(reply)
        except Exception as e:
            if not isinstance(e, KeyboardInterrupt):
                logger.critical(traceback.print_exc())
        finally:
            try:
                if True: #self.restore_devices_on_exit:
                    logger.info("Restoring initial device settings...")

                    settings = []

                    for i, device_settings in enumerate(self.devices_settings):
                        settings.append((i, device_settings["initial"]))

                    await dpm.apply_settings(settings)

                    logger.info("Initial device settings restored.")
            except Exception as e:
                logger.critical("Failed to restore all settings!")
                logger.critical(traceback.print_exc())
            finally:
                logger.info("Device de-tuner stopped.")

acsys.run_client(run)

EDIT: Add "Python" to code block for syntax highlighting - @beauremus

kjhazelwood commented 3 years ago

Thanks for the '''python tip @beauremus

rneswold commented 3 years ago

Seems like you should be able to do something like this:

async def run(connection):
    async with acsys.dpm.DPMContext(connection) as dpm:
        await dpm.enable_settings(role="testing")

        # Add DRF entries.
        await dpm.add_entry(...)

        # start acquisition
        await dpm.start()

        try:
            # Handle incoming data
            async for reply in dpm.replies():
                # Process each packet (saving old state to restore later.)
                update(orig_settings, packet)

                # test for condition that exits the loop
                if condition():
                    break;
        finally:
            await dpm.apply_settings(orig_settings)

acsys.run_client(run)

Of course the real world is more complicated; what if the for-loop exited due to a network error? Then the finally clause would fail to restore the settings.

I've opened #35 to fix the library if it can't be used this way. Use that issue to discuss desired features.

kjhazelwood commented 3 years ago

Yes, the code example above works if one breaks out of the loop. However, on Exception or ctrl-c (KeyboadInterupt exception) the DPMContext is destroyed somehow. asyncio throws a CancelledError when an Exception occurs within the async for loop which could be what destroys the context?

rneswold commented 3 years ago

Yes, the code example above works if one breaks out of the loop. However, on Exception or ctrl-c (KeyboadInterupt exception) the DPMContext is destroyed somehow. asyncio throws a CancelledError when an Exception occurs within the async for loop which could be what destroys the context?

Yup, it's more complicated in the asyncio world. Your code between await statements runs to completion. Once you block on an await, control is moved back to the event loop where it tries to run a task that can proceed. This means your code is mostly blocked waiting for the next event to occur so keyboard exceptions are handled by the event loop and would rarely occur in your code.

To make sure resources are cleaned up, when acsys.run_client() exits due to an exception in the event loop, it calls .cancel() on your async function so that the CanceledError gets propagated to all the active async tasks. If you get a CanceledError, it means the system is going down.

It shouldn't have killed your DPMContext, though, unless you let the exception rise past the with-statement.

rneswold commented 3 years ago

This is where exceptions are being caught and where the .cancel() is being issued.

https://github.com/fermi-controls/acsys-python/blob/73ec2a25f96172407919870bf9e38aed059686b4/acsys/__init__.py#L816-L825

kjhazelwood commented 3 years ago

I dont understand how the cancel() is being called if exceptions are being caught inside the run_client() target function, but obviously its getting there. The code correctly never exits the run_client target function but somehow the DPMContext has been cleaned up. Is there a function/property for looking at the current entries in the DPMContext so I can verify all entries are now missing?

rneswold commented 3 years ago

I don't understand how the cancel() is being called if exceptions are being caught inside the run_client() target function, but obviously its getting there.

Keyboard exceptions (i.e. Ctrl-C) are being caught by the event loop, which is being run in .run_until_complete(). You're never continuously "running" in your async function. The event loop briefly runs your function. When your function reaches an await statement, the event loop puts your function in a queue and monitors what'll wake it up (e.g. a timer expiring or data on a socket.) While your function is on the "blocked" queue, the event loop finds another async function that can make progress. So your function is only briefly running between instances of await statements. Most of the time the CPU is running in the event loop scheduler deciding which function can make further progress.

So Ctrl-C doesn't generate an exception in your function. It kills the .run_to_completion() function and all the async tasks are idle, sitting in a scheduler's queue.

.run_client() then calls .cancel() on the top-level future, which will send the Cancel exception down to the lowest futures in the tree. The Cancel exception then percolates up and the code can clean the resources properly.

So your code won't see a KeyboardInterrupt (or whatever) but it'll see a CanceledError.

The code correctly never exits the run_client target function but somehow the DPMContext has been cleaned up. Is there a function/property for looking at the current entries in the DPMContext so I can verify all entries are now missing?

DPMContext can only be cleaned up if control leaves the async-with statement. DPMContext doesn't have anything interesting. The dpm object it provides, however, has lots of interesting stuff. Since Python doesn't support data-hiding, you're free to look at all the internal properties (dpm._dev_list, for instance)

beauremus commented 3 years ago

dir(dpm) will show you all the available properties.

kjhazelwood commented 3 years ago

Thanks, its clear to me now why the CanceledError is propogating. I think this is inherent to the async loop that Exceptions will propogate to the try/catch inside run_client(). Perhaps this is more on an issue for #35, I think the dpm being killed upon caguht exception in the async for loop is not the desired behavior.