Open vulpes2 opened 1 month ago
I have already planned most of those because I have already done a lot with MagicPods for Windows.
My idea is to make the MagicPodsCore a background service with a bidirectional WebSocket and provide universal control of Bluetooth headphones. The WebSocket will send and receive data using JSON format, as I already used in MagicPodsCore C++. However, that was the first iteration, so I need to come up with a better API.
It will allow for easy writing of the UI part. Currently, I have ready-to-work Sony, FakeAirPods, and FastPair SDKs, and I am reverse engineering Huawei headphones.
@andreylitvintsev and I have figured out how to get all necessary fields of Bluetooth headphones, such as Product ID and Vendor ID, manage the connection, and grab the battery status from the system to avoid using the Hands-Free profile.
Now we are planning how to implement stream reading and writing for both Bluetooth and WebSocket. This is the most difficult part; writing an SDK is quite a routine operation because it mainly involves passing commands.
GFPS is a really useful backend, there are quite a lot of stuff that supports it. It's unfortunate that it doesn't allow more device controls like ANC, but at least we can get battery info. I have gotten a basic asyncio demo working with an L2CAP socket, but haven't done much on the packet SerDes side yet.
Did you mean GFPS - Google Fast Pair Service? It's Fast Pair as I said above, the main problem I was not able to get the key to sign commands, so I can only receive battery, anc and etc, notifications.
You can take a look at main.py; I created a simple service to read from and write to a Bluetooth socket without blocking. I checked the asyncio package yesterday, but it does not support Bluetooth sockets, as I understand.
Yes, that's what I meant. The problem with GFPS is that it's meant to be locked down, so there might not be a lot of stuff we can do with it beyond the basics.
As for asyncio on Bluetooth sockets, here's a basic example. You need to use AF_BLUETOOTH
provided by python stdlib when establishing the connection, not from the unmaintained pybluez
package.
import asyncio
import socket
cmd_connect = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
cmd_set_notification_filter = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xff\xff"
address = "aa:bb:cc:dd:ee:ff"
psm = 0x1001
async def receive_handler(reader):
while True:
response = await reader.read(4096)
if response:
print(f"Incoming AACP Message: {response.hex()}")
async def send_handler(writer, command_queue):
# Call await command_queue.put(command) in a separate handler to queue commands
while True:
command = await command_queue.get()
print(f"Sending AACP Message in queue: {command.hex()}")
writer.write(command)
await writer.drain()
print("Sent")
async def main():
sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_L2CAP)
sock.setblocking(False)
await asyncio.get_event_loop().sock_connect(sock, (address, psm))
reader, writer = await asyncio.open_connection(sock=sock)
print("Sending Connection Packet")
writer.write(cmd_connect)
await writer.drain()
response = await reader.read(1024)
print(f"Response: {response.hex()}")
print("Setting Notification Filter")
writer.write(cmd_set_notification_filter)
await writer.drain()
response = await reader.read(1024)
print(f"Response: {response.hex()}")
command_queue = asyncio.Queue()
receiver_task = asyncio.create_task(receive_handler(reader))
sender_task = asyncio.create_task(send_handler(writer, command_queue))
try:
await asyncio.gather(receiver_task, sender_task)
except ConnectionResetError:
print("Connection lost")
finally:
writer.close()
await writer.wait_closed()
asyncio.run(main())
Awesome, then we can reduce dependencies and remove PyBluez.
Unfortunately you can't scan for devices or get device info with stdlib (socket only). Maybe it's a good idea to just talk to BlueZ over dbus directly, since all we need is to list devices, retrieve vid/pid and check for available services.
made a little something :) -- https://github.com/kavishdevar/aln (i've got no future plans for this, at least now.. it's just for myself, and probably will be only for linux)
my main focus - create a unix socket. keep a single ear detection program running in the background, and have command line tools for other things like set anc/configurations. and/or a single standalone gui app that does ear detection and configuration (using the unix socket)
@vulpes2
I have tested your asynchronous code, but I could not figure out the reason why command = await self.command_queue.get() is stuck until the read loop receives a message.
The second issue is that socket.write will skip writing if I do not introduce a small pause between calls.
await writer.drain()
to make sure it blocks until the buffer has been flushed.
I've had some ideas on building a similar project. Now this project is licensed as GPLv3, it might be a good idea for us to combine our efforts to make things easier. Here's a summary of what I had in mind, please let me know what you think of it.
AF_BLUETOOTH
works out of the box for BR/EDR, Bleak can be used for LE GATT)