bluesky / ophyd-async

Hardware abstraction for bluesky written using asyncio
https://blueskyproject.io/ophyd-async
BSD 3-Clause "New" or "Revised" License
7 stars 21 forks source link

Fix handling of multiple event loops #135

Open keithralphs opened 4 months ago

keithralphs commented 4 months ago

When using the python asyncio teminal (python -m asyncio), with the Run Engine, it's possible for timeout errors to occur when running abluesky plan because the ents for the plan get sent to the event loop for the asyncio environment rather than the Run Engine Causing a Tiemout exception and lockup(TomC tells me).

To reproduce this situation using my Ophyd-async Synchrotron device implementation (See i22-source branch, devices folder) :

python3 -m asyncio

>>> import asyncio
>>> import synch
>>> from synch import OASynchrotron
>>> from bluesky import RunEngine
>>> from bluesky.plans import count
>>>
>>> thing = OASynchrotron()
>>> await thing.connect()
>>> await thing.connect()
{'synchrotron-ring_current': {'value': 0.024646897287500004, 'timestamp': 1709729428.649475, 'alarm_severity': 0}, 'synchrotron-machine_status-beam_energy': {'value': 3.0008823737792967, 'timestamp': 1709729422.042399, 'alarm_severity': 0}}
>>> re = RunEngine()
>>> re(count([thing]))
Run aborted
Traceback (most recent call last):
  File "/dls_sw/apps/python/miniforge/4.10.0-0/envs/python3.11/lib/python3.11/asyncio/tasks.py", line 490, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/ophyd_async/core/signal.py", line 166, in read
    return {self.name: await self._backend_or_cache(cached).get_reading()}
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/ophyd_async/core/signal.py", line 100, in get_reading
    await self._valid.wait()
  File "/dls_sw/apps/python/miniforge/4.10.0-0/envs/python3.11/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/run_engine.py", line 1532, in _run
    msg = self._plan_stack[-1].send(resp)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plans.py", line 82, in count
    return (yield from inner_count())
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/utils/__init__.py", line 1203, in dec_inner
    return (yield from plan)
            ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 985, in stage_wrapper
    return (yield from finalize_wrapper(inner(), unstage_devices()))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 535, in finalize_wrapper
    ret = yield from plan
          ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 983, in inner
    return (yield from plan)
            ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/utils/__init__.py", line 1203, in dec_inner
    return (yield from plan)
            ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 351, in run_wrapper
    yield from contingency_wrapper(plan,
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 606, in contingency_wrapper
    ret = yield from plan
          ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plans.py", line 79, in inner_count
    return (yield from bps.repeat(partial(per_shot, detectors),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 1345, in repeat
    return (yield from repeated_plan())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 1328, in repeated_plan
    yield from ensure_generator(plan())
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 1185, in one_shot
    yield from take_reading(list(detectors))
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 1069, in trigger_and_read
    return (yield from rewindable_wrapper(inner_trigger_and_read(),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 729, in rewindable_wrapper
    return (yield from plan)
            ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 1061, in inner_trigger_and_read
    ret = yield from contingency_wrapper(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 616, in contingency_wrapper
    ret = yield from except_plan(e)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 1059, in exception_path
    raise exp
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/preprocessors.py", line 606, in contingency_wrapper
    ret = yield from plan
          ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 1049, in read_plan
    reading = (yield from read(obj))
               ^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/plan_stubs.py", line 120, in read
    return (yield Msg('read', obj))
            ^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/run_engine.py", line 1592, in _run
    new_response = await coro(msg)
                   ^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/run_engine.py", line 1873, in _read
    ret = await maybe_await(obj.read(*msg.args, **msg.kwargs))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/utils/__init__.py", line 1845, in maybe_await
    return await ret
           ^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/ophyd_async/core/standard_readable.py", line 69, in read
    return await merge_gathered_dicts(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/ophyd_async/core/utils.py", line 98, in merge_gathered_dicts
    for result in await asyncio.gather(*coros):
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/ophyd_async/core/signal.py", line 30, in wrapper
    return await asyncio.wait_for(func(self, *args, **kwargs), self._timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/dls_sw/apps/python/miniforge/4.10.0-0/envs/python3.11/lib/python3.11/asyncio/tasks.py", line 492, in wait_for
    raise exceptions.TimeoutError() from exc
TimeoutError
Traceback (most recent call last):
  File "/dls_sw/apps/python/miniforge/4.10.0-0/envs/python3.11/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/dls_sw/apps/python/miniforge/4.10.0-0/envs/python3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/dls_sw/apps/python/miniforge/4.10.0-0/envs/python3.11/lib/python3.11/asyncio/__main__.py", line 34, in callback
    coro = func()
           ^^^^^^
  File "<console>", line 1, in <module>
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/run_engine.py", line 903, in __call__
    plan_return = self._resume_task(init_func=_build_task)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/fri44821/lib/python3.11/site-packages/bluesky/run_engine.py", line 1042, in _resume_task
    raise exc
TimeoutError

This has apparently bee addressed in iPython environments using


In [1]: from bluesky.run_engine import call_in_bluesky_event_loop
In [2]:  %autoawait call_in_bluesky_event_loop
In [3]: from ophyd_async.core import DeviceCollector
In [4]: with DeviceCollector():
    ...:        oas = OASynchrotron()
    ...:
gilesknap commented 2 months ago

Sorry, I did not get to this one in the hackathon.

I'm interested in looking into it but have lot's to do on J20 too. @coretl let me know if you want me to spend some time on this this week.

coretl commented 2 months ago

I'd prefer you look at J20, I'll see if I can get round to it.

When I work out what's wrong I also need to pull in the docs from https://github.com/bluesky/ophyd/pull/1136 and amend

coretl commented 1 month ago

An attempt at fixing it here: https://github.com/dls-controls/aioca/tree/contexts

Using the first seen context in all event loops, but keeping the channel caches separate