if isinstance(self.router.connection, XenBusConnection):
raise PyXSError("using ``Monitor`` over XenBus is not supported",
UserWarning)
This no longer seems to be a valid check with Xen 4.14.x + Linux 5.4 (Ubuntu kernel). I don't know whether the solution to the original problem was in Xen or Linux or with which versions that it started working. This code works in dom0 and domU with that check commented out:
#!/usr/bin/env python3
import sys
import threading
import time
import pyxs
class Watcher(threading.Thread):
def __init__(self):
super().__init__()
self._monitor = None
self._watch = b"/"
self._token = b"token"
self.start()
def run(self):
with pyxs.client.Client(xen_bus_path="/dev/xen/xenbus") as xsc:
self._monitor = xsc.monitor()
self._monitor.watch(self._watch, self._token)
while self._monitor:
print(next(self._monitor.wait(unwatched=True)))
def stop(self):
mtmp = self._monitor
self._monitor = None
# poke an event in queue for anything currently .wait()ing
mtmp.events.put(pyxs.client.Event(self._watch, self._token)),
def main():
watcher = Watcher()
time.sleep(4)
watcher.stop()
watcher.join()
if __name__ == "__main__":
sys.exit(
main()
)
It is possible to bypass the check by instantiating the Monitor() class directly with an existing Client().
The client.py currently contains a check for setting up a watch on xenstore:
https://github.com/selectel/pyxs/blob/master/pyxs/client.py#L619-L621
This no longer seems to be a valid check with Xen 4.14.x + Linux 5.4 (Ubuntu kernel). I don't know whether the solution to the original problem was in Xen or Linux or with which versions that it started working. This code works in dom0 and domU with that check commented out:
It is possible to bypass the check by instantiating the
Monitor()
class directly with an existingClient()
.