Open honglei opened 1 year ago
by https://github.com/altdesktop/python-dbus-next/issues/61, and changed bus_type to BusType.SESSION, but canot recv any message
bus_type
BusType.SESSION
import asyncio import signal import os from dbus_next.aio import MessageBus from dbus_next.message import Message from dbus_next.constants import BusType, MessageType from dbus_next.signature import Variant def dbus_path_to_name(path): name = os.path.basename(path) name = name.replace('_40', '@') name = name.replace('_2e', '.') name = name.replace('_5f', '_') name = name.replace('_2d', '-') name = name.replace('_5c', '\\') return name def message_handler(msg: Message ): name: str = dbus_path_to_name(msg.path) properties: dict[str, Variant] = msg.body[1] if 'ActiveState' not in properties: return False print(f"'unit: {name}, " f"ActiveState: {properties['ActiveState'].value}, " f"SubState:{properties['SubState'].value} ") return True async def main(): stop_event = asyncio.Event() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, stop_event.set) loop.add_signal_handler(signal.SIGTERM, stop_event.set) bus = await MessageBus(bus_type=BusType.SESSION).connect() reply = await bus.call(Message( destination='org.freedesktop.DBus', path='/org/freedesktop/DBus', interface='org.freedesktop.DBus', member='AddMatch', signature='s', body=["path_namespace='/org/freedesktop/systemd1/unit',type='signal',interface='org.freedesktop.DBus.Properties'"], serial=bus.next_serial() )) assert reply.message_type == MessageType.METHOD_RETURN bus.add_message_handler(message_handler) await stop_event.wait() asyncio.run(main())
by https://github.com/altdesktop/python-dbus-next/issues/61, and changed
bus_type
toBusType.SESSION
, but canot recv any message