def _acquire_data(self, name: str, key: str, mandatory: bool = True) -> _Any | None:
try:
return getattr(self.data(), name)
except AttributeError:
if mandatory:
self.suspend(SuspensionEvent(self, key, f"No data for `{name}`"))
def _do_plugin_callback(self, method: _Literal["pre_push", "post_push", "pre_update", "post_update"]) -> None:
for key, plugin in self._plugins.items():
if plugin.enabled():
for tag in plugin.required_devices():
if not has_device(tag) or not SFT.device_ok(tag):
self.suspend(SuspensionEvent(self, key, f"Device {tag} not ok"))
getattr(plugin, method)(self, {d: self._acquire_data(d, key) for d in plugin.required_data()})
self.suspend()
does not quit the process.