ukBaz / python-bluezero

A simple Python interface to Bluez
MIT License
387 stars 112 forks source link

added ability to pass options into peripheral read callbacks #392

Closed joshbaran closed 1 year ago

joshbaran commented 1 year ago

It is useful to have access to the options in a read callback, for example to get the device address.

For backward compatibility, this change first checks the signature of the callback, and only passes the options if the function has a parameter. This will allow existing code without options to continue working.

ukBaz commented 1 year ago

Thanks for the PR @joshbaran

I don't have a lot of time to look at this right now so it would be helpful if you could share a minimal reproducible example I could use to help me with the testing.

joshbaran commented 1 year ago

test2.py

It starts a Peripheral with two read characteristics, one with options passed, one without. The one with options extracts the device address and keeps a counter per-device for the number of times it has read the char, and then returns the current number. The one without options just returns a random number.

Thanks for the library too, it has been a great help with learning BLE.

ukBaz commented 1 year ago

I have spent some time today looking at this and it looks good to me. My only question was around documentation or how we help people understand how to use this.

I think you have put together a really great example so I wondered if that would be enough if we included that in the examples directory?

Thoughts?

I've made a few minor tweaks to your example. Would you be OK to add the following?

import random
import struct

from bluezero import adapter
from bluezero import dbus_tools
from bluezero import device
from bluezero import peripheral

SERVICE_UUID = "00001111-1234-1234-1234-123456789abc"
OPT_CHR_UUID = "00002222-1234-1234-1234-123456789abc"
NON_OPT_UUID = "00003333-1234-1234-1234-123456789abc"

def on_connect(ble_device: device.Device):
    print("Connected to " + str(ble_device.address))

def on_disconnect(adapter_address, device_address):
    print("Disconnected from " + device_address)

def get_address(options):
    dev_addr = dbus_tools.get_adapter_address_from_dbus_path(options.get('device'))
    return dbus_tools.dbus_to_python(dev_addr)

call_count: dict[str, int] = {}

def cb_with_options(options):
    """
    Callback with passing options - uses device address from options to keep a
    running counter of the number of times it was called, and return the count
    as LE 2-byte int
    """
    print(f"\nCalling with options: {options}")
    addr = get_address(options)
    count = call_count.setdefault(addr, 0)
    count += 1
    call_count[addr] = count
    print(f"\tReturning: {count}")
    print(f"\tAll counts are: {call_count}")
    return struct.pack("<h", count)

def cb_no_options():
    """
    Callback without options - just return a random number as an LE 2-byte int
    """
    print("\nCalling without options")
    num = random.randint(0, 1000)
    print(f"\tReturning: {num}")
    return struct.pack("<h", num)

def main(adapter_address):
    ble = peripheral.Peripheral(adapter_address, local_name="Options Callback Test")
    ble.add_service(srv_id=1, uuid=SERVICE_UUID, primary=True)

    ble.add_characteristic(
        srv_id=1,
        chr_id=1,
        uuid=OPT_CHR_UUID,
        value=[],
        notifying=False,
        flags=["read"],
        read_callback=cb_with_options,
    )

    ble.add_characteristic(
        srv_id=1,
        chr_id=2,
        uuid=NON_OPT_UUID,
        value=[],
        notifying=False,
        flags=["read"],
        read_callback=cb_no_options,
    )
    ble.on_connect = on_connect
    ble.on_disconnect = on_disconnect
    ble.publish()

if __name__ == "__main__":
    dongle = list(adapter.Adapter.available())[0].address
    main(dongle)
joshbaran commented 1 year ago

Sounds good to me... added.

ukBaz commented 1 year ago

Thanks. I'll get to this merge in a couple of days. I'll then bump the version and do a release