fermi-ad / acsys-python

Python module to access the Fermilab Control System
MIT License
8 stars 4 forks source link

V0.x pull request #51

Closed rneswold closed 2 years ago

rneswold commented 2 years ago

Proposed fix for #50.

rneswold commented 2 years ago

Adds an optional parameter to acsys.Connection.request_stream(), called done_fut. This parameter is an unresolved Future. The async generator will monitor this future as well as the reply stream. If the future is canceled, the generator will exit.

DPM._shutdown() now cancels the future that it passes to request_stream().

beauremus commented 2 years ago

I'm sharing my testing script in case anyone else wants to try it. I'm using Python 3.10, but can try other versions if needed.

#!/usr/bin/env python3

"""An example of using asyncio with ACSys to provide data to PyQt."""

import threading
import asyncio
import time

import acsys.dpm

async def read_many(con, thread_context):
    """Read many values from the DPM."""
    async with acsys.dpm.DPMContext(con) as dpm:
        # Add our async context to the thread context.
        # This allows us to close the DPM context when
        # the thread exits without a flag.
        thread_context['daq_task'] = dpm

        await dpm.add_entries(list(enumerate(thread_context['paramlist'])))

        await dpm.start()

        async for evt_res in dpm:
            if evt_res.isReading:
                # We must have a lock before we can write data, otherwise the
                # data could be read at the same time.
                with thread_context['lock']:
                    thread_context['data'][evt_res.tag] = evt_res.data
            elif evt_res.isStatus:
                print(f'Status: {evt_res}')
            else:
                print(f'Unknown response: {evt_res}')

        print('Ending read_many loop')

class Phasescan:
    """A class to handle the phase scan."""

    def __init__(self):
        self.thread_dict = {}

    def _acnet_daq(self, thread_name):
        """Run the ACNet DAQ."""
        # This should be asyncio.run() but it's not supported in Python 3.6.
        event_loop = asyncio.new_event_loop()

        try:
            asyncio.set_event_loop(event_loop)

            # This blocks until the client returns.
            acsys.run_client(read_many, thread_context=self.thread_dict[thread_name])
        finally:
            # Clean up the event loop after the client returns.
            event_loop.close()

    def get_thread_data(self, thread_name):
        """Get the data from the thread."""
        with self.thread_dict[thread_name]['lock']:
            # We must copy the data, otherwise we are reading a reference.
            # Reading the reference could mean that we get the wrong data.
            return self.thread_dict[thread_name]['data'].copy()

    def start_thread(self, thread_name, param_list):
        """Start the thread."""
        print('Starting thread', thread_name)
        daq_thread = threading.Thread(
            target=self._acnet_daq,
            args=(thread_name,)
        )

        self.thread_dict[thread_name] = {
            'thread': daq_thread,
            'lock': threading.Lock(),
            # Initialise a None list the length of the param list.
            # The index of the list is the tag of the parameter.
            'data': [None]*len(param_list),
            'paramlist': param_list
        }

        daq_thread.start()

    async def stop_thread(self, thread_name):
        """Stop the thread."""
        print('Stopping thread', thread_name)

        # Close the DPM context.
        await self.thread_dict[thread_name]['daq_task']._shutdown()
        # Clean up the thread.
        self.thread_dict[thread_name]['thread'].join()

async def main():
    """Run the main loop."""
    pscan = Phasescan()

    pscan.start_thread('pscan', ['M:OUTTMP'])

    for _ in range(3):
        print(pscan.get_thread_data('pscan'))
        time.sleep(1)

    await pscan.stop_thread('pscan')

asyncio.run(main())
beauremus commented 2 years ago

Here's my current stack trace.

➜ python pyqt_async.py
Starting thread pscan
[None]
Exception in thread Thread-1 (_acnet_daq):
Traceback (most recent call last):
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 53, in _acnet_daq
    acsys.run_client(read_many, thread_context=self.thread_dict[thread_name])
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 819, in run_client
    return loop.run_until_complete(client_fut)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 795, in __client_main
    result = (await main(con, **kwargs))
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 14, in read_many
    async with acsys.dpm.DPMContext(con) as dpm:
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 813, in __aenter__
    await self.dpm._restore_state()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 361, in _restore_state
    await self._connect(lock)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/dpm/__init__.py", line 390, in _connect
    _, msg = await gen.asend(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 704, in request_stream
    pending.add(asyncio.create_task(done_fut))
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 337, in create_task
    task = loop.create_task(coro)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 433, in create_task
    task = tasks.Task(coro, loop=self, name=name)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 100, in __init__
    raise TypeError(f"a coroutine was expected, got {coro!r}")
TypeError: a coroutine was expected, got <Future pending>
[None]
[None]
Stopping thread pscan
Traceback (most recent call last):
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 105, in <module>
    asyncio.run(main())
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 38, in run
    return loop.run_until_complete(task)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/site-packages/nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 103, in main
    await pscan.stop_thread('pscan')
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python-examples/pyqt_async.py", line 89, in stop_thread
    await self.thread_dict[thread_name]['daq_task']._shutdown()
KeyError: 'daq_task'
Exception ignored in: <function __AcnetdProtocol.__del__ at 0x1063648b0>
Traceback (most recent call last):
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 164, in __del__
    self.end()
  File "/Users/beau/Projects/fermi/controls/applications/acsys-python/acsys/__init__.py", line 168, in end
    self.transport.close()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/selector_events.py", line 700, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 745, in call_soon
    self._check_closed()
  File "/Users/beau/.pyenv/versions/3.10.2/lib/python3.10/asyncio/base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
beauremus commented 2 years ago

This wasn't complete, so I'm reverting it and restoring the branch.