fermi-ad / acsys-python

Python module to access the Fermilab Control System
MIT License
8 stars 4 forks source link

V0.x pull request (attempt 2) #53

Closed rneswold closed 1 year ago

rneswold commented 2 years ago

Here's another attempt at #51, which was meant to fix #50.

beauremus commented 2 years ago

Reposting the current state of things.

I'm sharing my testing script in case anyone else wants to try it. I'm using Python 3.10, but can try other versions if needed.

#!/usr/bin/env python3

"""An example of using asyncio with ACSys to provide data to PyQt."""

import threading
import asyncio
import time

import acsys.dpm

async def read_many(con, thread_context):
    """Read many values from the DPM."""
    async with acsys.dpm.DPMContext(con) as dpm:
        # Add our async context to the thread context.
        # This allows us to close the DPM context when
        # the thread exits without a flag.
        thread_context['daq_task'] = dpm

        await dpm.add_entries(list(enumerate(thread_context['paramlist'])))

        await dpm.start()

        async for evt_res in dpm:
            if evt_res.isReading:
                # We must have a lock before we can write data, otherwise the
                # data could be read at the same time.
                with thread_context['lock']:
                    thread_context['data'][evt_res.tag] = evt_res.data
            elif evt_res.isStatus:
                print(f'Status: {evt_res}')
            else:
                print(f'Unknown response: {evt_res}')

        print('Ending read_many loop')

class Phasescan:
    """A class to handle the phase scan."""

    def __init__(self):
        self.thread_dict = {}

    def _acnet_daq(self, thread_name):
        """Run the ACNet DAQ."""
        # This should be asyncio.run() but it's not supported in Python 3.6.
        event_loop = asyncio.new_event_loop()

        try:
            asyncio.set_event_loop(event_loop)

            # This blocks until the client returns.
            acsys.run_client(read_many, thread_context=self.thread_dict[thread_name])
        finally:
            # Clean up the event loop after the client returns.
            event_loop.close()

    def get_thread_data(self, thread_name):
        """Get the data from the thread."""
        with self.thread_dict[thread_name]['lock']:
            # We must copy the data, otherwise we are reading a reference.
            # Reading the reference could mean that we get the wrong data.
            return self.thread_dict[thread_name]['data'].copy()

    def start_thread(self, thread_name, param_list):
        """Start the thread."""
        print('Starting thread', thread_name)
        daq_thread = threading.Thread(
            target=self._acnet_daq,
            args=(thread_name,)
        )

        self.thread_dict[thread_name] = {
            'thread': daq_thread,
            'lock': threading.Lock(),
            # Initialise a None list the length of the param list.
            # The index of the list is the tag of the parameter.
            'data': [None]*len(param_list),
            'paramlist': param_list
        }

        daq_thread.start()

    async def stop_thread(self, thread_name):
        """Stop the thread."""
        print('Stopping thread', thread_name)

        # Close the DPM context.
        await self.thread_dict[thread_name]['daq_task']._shutdown()
        # Clean up the thread.
        self.thread_dict[thread_name]['thread'].join()

async def main():
    """Run the main loop."""
    pscan = Phasescan()

    pscan.start_thread('pscan', ['M:OUTTMP'])

    for _ in range(3):
        print(pscan.get_thread_data('pscan'))
        time.sleep(1)

    await pscan.stop_thread('pscan')

asyncio.run(main())

Here's my current stack trace.

➜ python pyqt_async.py
Starting thread pscan
[None]
Exception in thread Thread-1 (_acnet_daq):
Traceback (most recent call last):
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 53, in _acnet_daq
    acsys.run_client(read_many, thread_context=self.thread_dict[thread_name])
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 819, in run_client
    return loop.run_until_complete(client_fut)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 795, in __client_main
    result = (await main(con, **kwargs))
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 14, in read_many
    async with acsys.dpm.DPMContext(con) as dpm:
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 813, in __aenter__
    await self.dpm._restore_state()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 361, in _restore_state
    await self._connect(lock)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 390, in _connect
    _, msg = await gen.asend(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 704, in request_stream
    pending.add(asyncio.create_task(done_fut))
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 337, in create_task
    task = loop.create_task(coro)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 433, in create_task
    task = tasks.Task(coro, loop=self, name=name)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 100, in __init__
    raise TypeError(f"a coroutine was expected, got {coro!r}")
TypeError: a coroutine was expected, got <Future pending>
[None]
[None]
Stopping thread pscan
Traceback (most recent call last):
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 105, in <module>
    asyncio.run(main())
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 38, in run
    return loop.run_until_complete(task)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 103, in main
    await pscan.stop_thread('pscan')
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 89, in stop_thread
    await self.thread_dict[thread_name]['daq_task']._shutdown()
KeyError: 'daq_task'
Exception ignored in: <function __AcnetdProtocol.__del__ at 0x1063648b0>
Traceback (most recent call last):
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 164, in __del__
    self.end()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 168, in end
    self.transport.close()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/selector_events.py", line 700, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 745, in call_soon
    self._check_closed()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
beauremus commented 2 years ago

I'm confused... It appears the done_fut is returning immediately. The stack trace shows (what I'm guessing is the done_fut) that a reply from the queue doesn't have the .result() method.

➜ python pyqt_async.py
Starting thread pscan
[None]
Exception in thread Thread-1 (_acnet_daq):
Traceback (most recent call last):
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 53, in _acnet_daq
    acsys.run_client(read_many, thread_context=self.thread_dict[thread_name])
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 819, in run_client
    return loop.run_until_complete(client_fut)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 795, in __client_main
    result = (await main(con, **kwargs))
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 14, in read_many
    async with acsys.dpm.DPMContext(con) as dpm:
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 812, in __aenter__
    await self.dpm._restore_state()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 360, in _restore_state
    await self._connect(lock)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 389, in _connect
    _, msg = await gen.asend(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 745, in request_stream
    (snd, sts, msg, done) = next_reply.result()
AttributeError: 'coroutine' object has no attribute 'result'
[None]
[None]
Stopping thread pscan
Traceback (most recent call last):
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 105, in <module>
    asyncio.run(main())
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 38, in run
    return loop.run_until_complete(task)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 103, in main
    await pscan.stop_thread('pscan')
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 89, in stop_thread
    await self.thread_dict[thread_name]['daq_task']._shutdown()
KeyError: 'daq_task'
Exception ignored in: <function __AcnetdProtocol.__del__ at 0x105080790>
Traceback (most recent call last):
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 164, in __del__
    self.end()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 168, in end
    self.transport.close()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/selector_events.py", line 700, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 745, in call_soon
    self._check_closed()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
beauremus commented 2 years ago

Note that the

[None]
[None]

In the middle is reading data from the thread. None is the initial value of the memory.

rneswold commented 2 years ago

I'm confused... It appears the done_fut is returning immediately. The stack trace shows (what I'm guessing is the done_fut) that a reply from the queue doesn't have the .result() method.

done_fut is the future passed in the function call which the caller will use to end the request early (by canceling the future.)

The .result() is being called on next_reply, which is the future that reads from the reply queue. I thought we needed to call .result() to get the future's result, but rpy_q.get() might be a coroutine. Sheesh, can it get any more complicated, Python?

rneswold commented 1 year ago

I rebased the pull request and added my proposed fix. That's why the old commits are no longer valid.

beauremus commented 1 year ago

When the connection is closed, we try to reconnect. This normally makes sense, but I don't think so in this case.

$ python pyqt_async.py
Starting thread pscan
[None]
[None]
[34.8012849788675]
Stopping thread pscan
DPM(id: 4943) connection closed ... retrying
rneswold commented 1 year ago

Right. We should only retry when it closes due to an error.

rneswold commented 1 year ago

Ugh. Python's specifications for (async) generators is so messy. We're doing a

    except GeneratorExit:
        pass

so it falls through to the code that tries to retry the connection. If we get this exception, the user is done. Maybe that case should just be removed so it gets propagated up?

beauremus commented 1 year ago

I'm getting a stack trace when the GeneratorExit is raised.

Exception in thread Thread-1 (_acnet_daq):
Traceback (most recent call last):
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 53, in _acnet_daq
    acsys.run_client(read_many, thread_context=self.thread_dict[thread_name])
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 819, in run_client
    return loop.run_until_complete(client_fut)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 795, in __client_main
    result = (await main(con, **kwargs))
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 24, in read_many
    async for evt_res in dpm:
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 315, in __anext__
    _, msg = await self.gen.__anext__()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 743, in request_stream
    raise GeneratorExit
GeneratorExit
Exception ignored in: <function __AcnetdProtocol.__del__ at 0x1016a8940>
Traceback (most recent call last):
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 164, in __del__
    self.end()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 168, in end
    self.transport.close()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/selector_events.py", line 700, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 745, in call_soon
    self._check_closed()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

This isn't great for users since this is an intentional exit.