Open coretl opened 3 months ago
Relevant to #437
Maybe make TangoDevice
and PviDevice
baseclasses that take a Connector
object
After more thinking and expanding on the Connector
idea, we now have:
from __future__ import annotations
from abc import abstractmethod
from collections.abc import Iterator, Mapping, Sequence
from enum import Enum
from typing import Any, Callable, Generic, Protocol, TypeVar
from bluesky.protocols import (
HasName,
Locatable,
Location,
Movable,
Reading,
Subscribable,
)
from event_model import DataKey
from ophyd_async.core._protocol import AsyncReadable, AsyncStageable
from ophyd_async.core._status import AsyncStatus
from ophyd_async.core._utils import DEFAULT_TIMEOUT
T = TypeVar("T")
Callback = Callable[[T], None]
class DeviceConnector(Protocol):
@abstractmethod
async def connect(
self,
mock: bool = False,
timeout: float = DEFAULT_TIMEOUT,
force_reconnect: bool = False,
) -> Any: ...
class DeviceChildConnector(DeviceConnector):
def __init__(self, children: Mapping[str, Any]): ...
class FastCsDeviceConnector(DeviceChildConnector):
def __init__(self, id: str, children: Mapping[str, Any]): ...
# fills in children with PviSignalConnector or TangoSignalConnector
class TangoDeviceConnector(DeviceChildConnector):
def __init__(self, trl: str, children: Mapping[str, Any]): ...
class DeviceBase(HasName):
_connector: DeviceConnector
def __init__(self, connector: DeviceConnector, name: str = ""): ...
def log(self): ...
def set_name(self, name: str): ...
async def connect(
self,
mock: bool = False,
timeout: float = DEFAULT_TIMEOUT,
force_reconnect: bool = False,
): ...
class Device(DeviceBase[DeviceChildConnector]):
def __init__(self, name: str = ""):
super().__init__(DeviceChildConnector(self.__dict__), name)
class FastCsDevice(DeviceBase):
def __init__(self, id: str, name: str = ""):
if id.startswith("pva://"):
connector = PviSignalConnector
fill_in_signals(self.__annotations__, self.__dict__, connector)
super().__init__(FastCsDeviceConnector(id, self.__dict__), name)
class PandA(FastCsDevice):
thing: SignalRW[int]
class DeviceVector(DeviceBase[DeviceChildConnector], Mapping[str, DeviceBase]):
def __init__(self, children: dict[str, Any], name: str = ""):
self._children = children
super().__init__(DeviceChildConnector(self), name)
def __getitem__(self, item: str) -> DeviceBase: ...
def __iter__(self) -> Iterator[str]: ...
def __len__(self) -> int: ...
SignalTypeUnion = int | str | Sequence[Enum]
SignalType = TypeVar("SignalType", bound=SignalTypeUnion)
class SignalBackend(Generic[SignalType]):
@abstractmethod
async def get_datakey(self, source: str) -> DataKey: ...
@abstractmethod
async def get_reading(self) -> Reading[SignalType]: ...
@abstractmethod
async def get_value(self) -> SignalType: ...
@abstractmethod
async def get_setpoint(self) -> SignalType: ...
@abstractmethod
def set_callback(self, callback: Callback[Reading[SignalType]] | None) -> None: ...
@abstractmethod
async def put(self, value: SignalType | None, wait=True, timeout=None): ...
class SignalConnector(Protocol[SignalType]):
@abstractmethod
async def connect(
self,
mock: bool = False,
timeout: float = DEFAULT_TIMEOUT,
force_reconnect: bool = False,
) -> SignalBackend[SignalType]: ...
@abstractmethod
def source(self, name: str) -> str: ...
class Signal(DeviceBase, Generic[SignalType]]):
backend: SignalBackend[SignalType] = DisconnectedBackend()
def __init__(
self,
connector: SignalConnector[SignalType],
timeout: float | None = DEFAULT_TIMEOUT,
name: str = "",
):
super().__init__(connector)
@property
def source(self) -> str | None:
return self._connector.source(self.name)
class SignalR(Signal[SignalType], AsyncReadable, AsyncStageable, Subscribable):
async def read(
self, cached: bool | None = None
) -> dict[str, Reading[SignalType]]: ...
async def describe(self) -> dict[str, DataKey]: ...
async def get_value(self, cached: bool | None = None) -> SignalType: ...
def subscribe(self, function: Callback[dict[str, Reading[SignalType]]]) -> None: ...
def clear_sub(self, function: Callback[dict[str, Reading[SignalType]]]) -> None: ...
def stage(self) -> AsyncStatus: ...
def unstage(self) -> AsyncStatus: ...
class SignalW(Signal[SignalType], Movable):
def set(self, value: SignalType, wait=True) -> AsyncStatus: ...
class SignalRW(SignalR[SignalType], SignalW[SignalType], Locatable):
async def locate(self) -> Location: ...
class CaConverter(Generic[SignalType]):
def __init__(self, initial_value): ...
def datakey(self, value) -> DataKey: ...
def to_signal(self, value) -> SignalType: ...
def from_signal(self, value) -> Any: ...
class CaSignalBackend(SignalBackend[SignalType]):
def __init__(
self, read_pv: str, write_pv: str, converter: CaConverter[SignalType]
): ...
class EpicsSignalConnector(SignalConnector[SignalType]):
def __init__(self, read_pv: str, write_pv: str): ...
# connect decides on ca/pva, gets initial value, makes converter and backend
class PviSignalConnector(SignalConnector[SignalType]):
def __init__(self, pvi_structure, name: str): ...
# connect grabs pv out of pvi structure, then does the same as EpicsSignalConnector
We would like to add the following features to the pvi connection strategy:
To do this I suggest that the "introspectable device" template should now look like this:
This would require the following changes:
SignalBackend.set_backend
rather than passing it duringconnect
SignalBackend.set_backend
duringfill_pvi_signals
DeviceVector
to take a datatype and the number of mock devices to makeDeviceVector.connect(mock=True)
that makes the correct number of mock devicesDevice.pre_connect
method that can be overridden to callfill_pvi_signals
orfill_tango_signals
when in mock=False mode