bluez / bluer

BlueR — Official BlueZ Bindings for Rust
https://crates.io/crates/bluer
Other
321 stars 45 forks source link

Events of MonitorHandle must be consumed #145

Closed PhilippMeyerWeidmueller closed 5 months ago

PhilippMeyerWeidmueller commented 5 months ago

Hi,

I am currently encountering a strange memory leak that seems to be related to blueR and want to share some of my first findings. Unfortunately I am currently unable to track down the issue further - maybe someone can help out here.

First I thought it was something in my larger codebase, but then I was able to reproduce the problem with a minimal test program.

  1. Create a new blueR session and setup adapter
  2. Start an advertisement monitor (Basically a passive scan)
  3. Add some filter to the advertisement monitor. The filter itself doesnt matter, it just needs to match the test device
  4. Subscribe to an event stream of any test device that matches the advertisement monitor filters

Code:

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let session = bluer::Session::new().await.unwrap();
    let adapter = session.default_adapter().await.unwrap();
    adapter.set_powered(true).await.unwrap();

    //start passive scan
    let monitor_manager = adapter.monitor().await.unwrap();

    let test_monitor = Monitor {
        monitor_type: Type::OrPatterns,
        patterns: Some(vec![Pattern {
            start_position: 0,
            data_type: data_type::COMPLETE_LOCAL_NAME,
            content: b"<your device name>".to_vec(),
        }]),
        ..Default::default()
    };

    let _monitor_handle = monitor_manager.register(test_monitor).await.unwrap();

    //Listen to device events of our test device
    let test_device = adapter
        .device(Address::from_str("<your device MAC>").unwrap())
        .unwrap();
    let mut prop_changed_stream = test_device.events().await.unwrap();

    while let Some(event) = prop_changed_stream.next().await {
        let bluer::DeviceEvent::PropertyChanged(prop) = event;
        println!("Property changed: {prop:?}");
    }
}

For all tests I used blueR 0.17.1, Rust 1.79 and a debian based x86 machine.

When executing the program, everything seems to be working as expected. In the console, I get RSSI changed and manufacturer data changed events as new advertisements are picked up by the passive scanner:

Property changed: ManufacturerData({65535: [1, 1, 1, 28]})
Property changed: Rssi(80)
Property changed: Rssi(-50)
Property changed: ManufacturerData({65535: [1, 1, 1, 29]})
Property changed: Rssi(80)
Property changed: Rssi(-50)
Property changed: Rssi(80)
Property changed: Rssi(-50)

Now, looking at the memory consumption, just after starting the program it looks like this (RSS = 5520 ):

wm01822@DE13926:~/Repos/usp-tests/ble_scan_tester$ date && ps -aux | grep ble_scan_tester
Thu 20 Jun 2024 02:00:51 PM CEST
wm01822   240769  0.0  0.0   2480   508 pts/10   Ss+  12:50   0:00 /bin/sh -c cd '/home/wm01822/Repos/usp-tests/ble_scan_tester' && /bin/sh
wm01822   310275  0.0  0.0  79112  5520 pts/11   S+   13:58   0:00 target/debug/ble_scan_tester
wm01822   313050  0.0  0.0   6372   640 pts/12   S+   14:00   0:00 grep ble_scan_tester

After a few minutes, we can see that RSS almost doubled to 10344 kb!

wm01822@DE13926:~/Repos/usp-tests/ble_scan_tester$ date && ps -aux | grep ble_scan_tester
Thu 20 Jun 2024 02:32:41 PM CEST
wm01822   240769  0.0  0.0   2480   508 pts/10   Ss+  12:50   0:00 /bin/sh -c cd '/home/wm01822/Repos/usp-tests/ble_scan_tester' && /bin/sh
wm01822   310275  0.0  0.0  82012 10344 pts/11   S+   13:58   0:00 target/debug/ble_scan_tester
wm01822   344694  0.0  0.0   6372   708 pts/12   S+   14:32   0:00 grep ble_scan_tester

I performed a memory leak profiling of the program using heaptrack and heaptrack_gui. I will attach the file to this post so you can check it our yourself: heaptrack_blueR_leak.gz

First, lets look at total heap consumption over time:

heaptrack_consumption

Quite strange. First, nothing happens. Then after about 18min, it suddenly starts increasing quickly. This rise continues forever and will eat up many megabytes after a while. This is also confirmed by manually running ps multiple times and checking RSS. The test device is always advertising at the same rate.

The call stack of heaptrack is very hard to follow due to tokios complex nesting of calls. But it seems to be related to dbus_crossroads and "handle_message" somehow:

heaptrack_leak_call

handle_message_inner() seems to call some function pointers, so its hard for me to follow the track. Not sure if this is a dbus issue or related to blueR callbacks that dbus triggers.

Does anyone have a hint of how to look further, or even a fix? For small devices even a rather slow leak like this will quickly exhaust all available resources.

Kind regards, Philipp

PhilippMeyerWeidmueller commented 5 months ago

Ok so, after a few more hours of investigation, I think I have understood what is going on.

First, I reprofiled the program with valgrind. Results are similar to before, but give a bit more insight:

==15652== 1,406,328 bytes in 761 blocks are still reachable in loss record 262 of 262
==15652==    at 0x483877F: malloc (vg_replace_malloc.c:307)
==15652==    by 0x2585FA: alloc::alloc::alloc (alloc.rs:100)
==15652==    by 0x25875C: alloc::alloc::Global::alloc_impl (alloc.rs:183)
==15652==    by 0x25852B: allocate (alloc.rs:243)
==15652==    by 0x25852B: alloc::alloc::exchange_malloc (alloc.rs:332)
==15652==    by 0x2728EB: new<dbus_crossroads::crossroads::{impl#8}::run_async_method::{async_block_env#0}<dbus_crossroads::ifacedesc::{impl#11}::method_with_cr_async::{closure#0}::{closure_env#0}<alloc::sync::Arc<bluer::monitor::RegisteredMonitor, alloc::alloc::Global>, (dbus::strings::Path), (), &str, bluer::method_call::{async_block_env#0}<bluer::monitor::RegisteredMonitor, (), bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure#0}::{async_block_env#0}, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure_env#0}>, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure_env#2}>, dbus_crossroads::ifacedesc::{impl#11}::method_with_cr_async::{closure#0}::{closure#0}::{async_block_env#0}<alloc::sync::Arc<bluer::monitor::RegisteredMonitor, alloc::alloc::Global>, (dbus::strings::Path), (), &str, bluer::method_call::{async_block_env#0}<bluer::monitor::RegisteredMonitor, (), bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure#0}::{async_block_env#0}, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure_env#0}>, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure_env#2}>>> (boxed.rs:219)
==15652==    by 0x2728EB: pin<dbus_crossroads::crossroads::{impl#8}::run_async_method::{async_block_env#0}<dbus_crossroads::ifacedesc::{impl#11}::method_with_cr_async::{closure#0}::{closure_env#0}<alloc::sync::Arc<bluer::monitor::RegisteredMonitor, alloc::alloc::Global>, (dbus::strings::Path), (), &str, bluer::method_call::{async_block_env#0}<bluer::monitor::RegisteredMonitor, (), bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure#0}::{async_block_env#0}, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure_env#0}>, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure_env#2}>, dbus_crossroads::ifacedesc::{impl#11}::method_with_cr_async::{closure#0}::{closure#0}::{async_block_env#0}<alloc::sync::Arc<bluer::monitor::RegisteredMonitor, alloc::alloc::Global>, (dbus::strings::Path), (), &str, bluer::method_call::{async_block_env#0}<bluer::monitor::RegisteredMonitor, (), bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure#0}::{async_block_env#0}, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure#2}::{closure_env#0}>, bluer::monitor::{impl#3}::register_interface::{closure#0}::{closure_env#2}>>> (boxed.rs:286)
==15652==    by 0x2728EB: dbus_crossroads::crossroads::Crossroads::run_async_method (crossroads.rs:255)
==15652==    by 0x2CDE20: dbus_crossroads::ifacedesc::IfaceBuilder<T>::method_with_cr_async::{{closure}} (ifacedesc.rs:575)
==15652==    by 0x2DAA65: dbus_crossroads::ifacedesc::IfaceBuilder<T>::method_with_cr_custom::{{closure}} (ifacedesc.rs:557)
==15652==    by 0x3B4D8D: <alloc::boxed::Box<F,A> as core::ops::function::FnMut<Args>>::call_mut (boxed.rs:2027)
==15652==    by 0x39C8DD: dbus_crossroads::crossroads::Crossroads::handle_message_inner (crossroads.rs:270)
==15652==    by 0x162C01: dbus_crossroads::crossroads::Crossroads::handle_message (crossroads.rs:280)
==15652==    by 0x1B9E2A: bluer::session::Session::new::{{closure}}::{{closure}} (session.rs:267)
==15652==    by 0x181AB8: tokio::runtime::task::core::Core<T,S>::poll::{{closure}} (core.rs:328)
==15652== 
==15652== LEAK SUMMARY:
==15652==    definitely lost: 0 bytes in 0 blocks
==15652==    indirectly lost: 0 bytes in 0 blocks
==15652==      possibly lost: 30,138 bytes in 149 blocks
==15652==    still reachable: 2,146,543 bytes in 8,839 blocks
==15652==         suppressed: 0 bytes in 0 blocks
==15652== 
==15652== ERROR SUMMARY: 100 errors from 100 contexts (suppressed: 0 from 0)

With this output, I realized that the leak comes from a callback that is registered by blueR with dbus:

ib.method_with_cr_async(
                "DeviceFound",
                ("device",),
                (),
                |ctx, cr, (addr,): (dbus::Path<'static>,)| {
                    method_call(ctx, cr, |reg: Arc<RegisteredMonitor>| async move {
                        let (adapter, device) = Self::parse_device_path(&addr)?;
                        if let Some(event_tx) = reg.event_tx.lock().await.as_ref() {
                            let _ = event_tx.send(MonitorEvent::DeviceFound(DeviceId { adapter, device })).await;
                        }
                        Ok(())
                    })
                },
            );

The issue here comes from this line:

let _ = event_tx.send(MonitorEvent::DeviceFound(DeviceId { adapter, device })).await;

This will attempt to send a new event to the event_tx. This is a bounded mspc channel with a capacity of 1024

let (event_tx, event_rx) = mpsc::channel(1024);

Now here is the issue: If the user application registeres the monitor, but does not consume the events (as in my example), the channel starts to fill up until it reaches its maximum capacity. After that, send().await will block forever, causing the memory leak in the dbus callback as it never finishes executing.

I confirmed this by simply adding a loop that consumes the events from the registered monitor in my program, like so:

    //consume events by monitor handle, so that mspc channels do not fill up
    tokio::spawn(async move {
        while let Some(scan_event) = scan_events.next().await {
            log::info!("Consuming scan event: {scan_event:?}");
        }
    });

However, IMO this is not a good solution and feels rather hacky. Also, other apps may run into the same problem.

The issue here is that I can not simply remove the monitor_manager.register(usense_monitor).await.unwrap(); from the application, because this will cause an immediate unregister of the monitor, causing passive scanning to stop and device.events() will no longer send new events.

One solution I could think of would be to make the event listener an "opt in" feature, so that the user explicitly needs to call this after registering the monitor. Another fix could be to rework the API slightly, so that passive scanning can be used without having a registered monitor.

surban commented 5 months ago

Another solution would be to just drop events when the channel is full. However, under certain conditions (busy system or process blocked by IO) this might lead to unexpected loss of events. We could avoid this by using a big enough buffer, but then this may also waste 1 MB if unused.

Also, making the monitor events to be opt-in would require a major release due to the change being backwards incompatible.

I tend to just change the documentation and state that monitor events must be consumed while a MonitorHandle is held. After all, how does the program know that the device of interest is present, if not listening to monitor events? In your example you just call adapter.device(Address::from_str("<your device MAC>").unwrap() but this won't work in real world, since the device might not yet be present at program start.

So in the end I believe that changing the behavior would not warrant the effort and if you really want to ignore the monitor events, consuming them like you showed in your code snippet is the right thing to do.

surban commented 5 months ago

Feel free to reopen, if there is need for further discussion.

PhilippMeyerWeidmueller commented 5 months ago

Hi @surban, thanks for the feedback!

I agree that this is not really a bug, but rather a misleading API for this one, special case. I think the documentation fix is fine.