altdesktop / python-dbus-next

🚌 The next great DBus library for Python with asyncio support
https://python-dbus-next.readthedocs.io/en/latest/
MIT License
187 stars 59 forks source link

Monitoring all systemd units status change #61

Closed rjarry closed 3 years ago

rjarry commented 3 years ago

Hi,

When running the following commands, I can see PropertiesChanged signals that are printed when systemd units are restarted:

~# dbus-monitor --system --monitor &
[1] 29559
~# systemctl restart sshd.service
...
signal time=1600355643.644640 sender=:1.0 -> destination=(null destination) serial=49752 path=/org/freedesktop/systemd1/unit/ssh_2eservice; interface=org.freedesktop.DBus.Properties; member=PropertiesChanged
   string "org.freedesktop.systemd1.Unit"
   array [
      dict entry(
         string "ActiveState"
         variant             string "active"
      )
...

I have written a simple program that attempts to do the same thing:

import asyncio
import signal

from dbus_next.constants import BusType
from dbus_next.aio import MessageBus

loop = asyncio.get_event_loop()
stop_event = asyncio.Event()

def properties_changed_cb(iface, changed_props, inval_props):
    print('properties_changed_cb(%r, %r, %r)' % (iface, changed_props, inval_props))

async def main():
    bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
    api = await bus.introspect('org.freedesktop.systemd1', '/org/freedesktop/systemd1/unit')
    proxy = bus.get_proxy_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1/unit', api)
    iface = proxy.get_interface('org.freedesktop.DBus.Properties')
    iface.on_properties_changed(properties_changed_cb)
    await stop_event.wait()

loop.add_signal_handler(signal.SIGINT, stop_event.set)
loop.add_signal_handler(signal.SIGTERM, stop_event.set)
loop.run_until_complete(main())

However, the callback is never called when I restart services. Is there something I missed?

I have very little knowledge of DBus. Is it even possible to monitor all units with a single signal? Searching on the net did not help much, neither did reading the official freedesktop documentation.

Any help will be appreciated, thanks!

acrisci commented 3 years ago

The issue is that the path of the properties interface that emitted the signal is /org/freedesktop/systemd1/unit/ssh_2eservice and you have the interface at /org/freedesktop/systemd1/unit. You need to have the interface of the former to receive the signals in that way.

rjarry commented 3 years ago

Right, that's what I figured but I means I have to create a new proxy object for every systemd unit and register a signal callback for PropertiesChanged for all of them.

Isn't there a way to do a "wildcard" subscription to signals? Like dbus-monitor does.

acrisci commented 3 years ago

I think what you're doing is beyond the scope of the high level client. Look at the low level interface which can be used to install a single callback for multiple paths.

You'll need to add a match rule with the path_namespace arg for the root path.

The docs and test cases provide plenty of examples on how to do that.

If you need more support, you can ask questions here or go to my discord channel.

rjarry commented 3 years ago

Hi @acrisci,

thanks for your advice, here is a working script that monitors all systemd units PropertiesChanged signals:

import asyncio
import os
import signal

from dbus_next.message import Message
from dbus_next.constants import BusType
from dbus_next.aio import MessageBus

def dbus_path_to_name(path):
   name = os.path.basename(path)
   name = name.replace('_40', '@')
   name = name.replace('_2e', '.')
   name = name.replace('_5f', '_')
   name = name.replace('_2d', '-')
   name = name.replace('_5c', '\\')
   return name

def message_handler(msg):
    name = dbus_path_to_name(msg.path)
    properties = msg.body[1]
    if 'ActiveState' not in properties:
        return False
    print('unit: %s, ActiveState: %s' % (name, properties['ActiveState'].value))
    return True

loop = asyncio.get_event_loop()
stop_event = asyncio.Event()

async def main():
    bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
    rules = [
        {
            'type': 'signal',
            'interface': 'org.freedesktop.DBus.Properties',
            'path_namespace': '/org/freedesktop/systemd1/unit',
            'member': 'PropertiesChanged',
        },
    ]
    match = [','.join('%s=%r' % i for i in r.items()) for r in rules]
    msg = Message(
        destination='org.freedesktop.DBus',
        path='/org/freedesktop/DBus',
        interface='org.freedesktop.DBus.Monitoring',
        member='BecomeMonitor',
        signature='asu',
        body=[match, 0],
        serial=bus.next_serial(),
    )
    await bus.call(msg)
    bus.add_message_handler(message_handler)
    await stop_event.wait()

loop.add_signal_handler(signal.SIGINT, stop_event.set)
loop.add_signal_handler(signal.SIGTERM, stop_event.set)
loop.run_until_complete(main())

This is not very pretty API but I can live with it. It would be awesome if this could be integrated in the "high-level API" somehow.

Cheers :+1:

acrisci commented 3 years ago

I would advise against using BecomeMonitor if you can accomplish basically the same thing with AddMatch.

If you do use the monitor api, report bugs with that because I don't have any testing for it right now.

rjarry commented 3 years ago

Could you provide a concrete code example for the use of AddMatch instead of BecomeMonitor to achieve the same thing?

Thanks.

udf commented 2 years ago

Unlike someone else, I decided to spend some time figuring this out

@rjarry here's how your example would look using AddMatch:

import asyncio
import signal
import os

from dbus_next.aio import MessageBus
from dbus_next.message import Message
from dbus_next.constants import BusType, MessageType

def dbus_path_to_name(path):
   name = os.path.basename(path)
   name = name.replace('_40', '@')
   name = name.replace('_2e', '.')
   name = name.replace('_5f', '_')
   name = name.replace('_2d', '-')
   name = name.replace('_5c', '\\')
   return name

def message_handler(msg):
    name = dbus_path_to_name(msg.path)
    properties = msg.body[1]
    if 'ActiveState' not in properties:
        return False
    print('unit: %s, ActiveState: %s' % (name, properties['ActiveState'].value))
    return True

async def main():
  stop_event = asyncio.Event()
  loop = asyncio.get_event_loop()
  loop.add_signal_handler(signal.SIGINT, stop_event.set)
  loop.add_signal_handler(signal.SIGTERM, stop_event.set)

  bus = await MessageBus(bus_type=BusType.SYSTEM).connect()

  reply = await bus.call(Message(
    destination='org.freedesktop.DBus',
    path='/org/freedesktop/DBus',
    interface='org.freedesktop.DBus',
    member='AddMatch',
    signature='s',
    body=["path_namespace='/org/freedesktop/systemd1/unit',type='signal',interface='org.freedesktop.DBus.Properties'"],
    serial=bus.next_serial()
  ))
  assert reply.message_type == MessageType.METHOD_RETURN
  bus.add_message_handler(message_handler)
  await stop_event.wait()

asyncio.run(main())

However, instead of watching the unit namespace, systemd's dbus interface can be used directly

  reply = await bus.call(Message(
    # ...
    body=["interface='org.freedesktop.systemd1.Manager'"],
    # ...
  ))

The message handler could look like:

def message_handler(msg):
  body = msg.body
  if msg.member == 'JobNew':
    print(f'New Job: {body[2]}')
  if msg.member == 'JobRemoved':
    print(f'Job Finished: {body[2]}, result: {body[3]}')

See the systemd dbus docs for the possible job results and more: https://www.freedesktop.org/software/systemd/man/org.freedesktop.systemd1.html#Signals

Another option for monitoring systemd units would be to parse the journal using journalctl -f -o json or the python api (an advantage of that would be that user units can be monitored), but that's out of the scope of this issue.