Open snowuyl opened 2 years ago
contents of main.zip:
import asyncio
import logging
import platform
import sys
import threading
import time
from bleak import BleakClient
from bleak import BleakClient
from bleak import BleakScanner
from bleak import _logger as logger
from bleak.uuids import uuid16_dict
from state import State
from strings import Strings
BATTERY_LEVEL_UUID = "00002a19-0000-1000-8000-00805f9b34fb" # battery level characteristic
UART_TX_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" # Nordic NUS characteristic for TX
UART_RX_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" # Nordic NUS characteristic for RX
data_flag = False #global flag to check for new data
quit_flag = False
connect_flag = False
command = ""
state = State.STATE_UNKNOWN
def display_manual():
print("""HRV manual
M. Measure
S. Stop measure
R. Reading result
Q. Quit
""")
def measure(bt_mac_address):
global command
command = "START"
#thread = threading.Thread(target=thread_function, args=(bt_mac_address, ))
#thread.start()
def stop_measure(bt_mac_address):
global command
command = "STOP"
#time.sleep(0.01)
#thread = threading.Thread(target=thread_function, args=(bt_mac_address, ))
#thread.start()
def read_result(bt_mac_address):
global command
command = "READ"
#thread = threading.Thread(target=thread_function, args=(bt_mac_address, ))
#thread.start()
def quit_program():
global quit_flag
quit_flag = True
sys.exit(0)
def notification_handler(sender, data):
"""Simple notification handler which prints the data received."""
#print("{0}: {1}".format(sender, data))
global command
if (command == "READ"):
file = open('ppg_gsensor.dat', 'ab')
file.write(data)
file.close()
global data_flag
data_flag = True
async def run_battery(address, loop):
async with BleakClient(address, loop=loop) as client:
try:
# waiting for BLE client to be connected
#x = await client.is_connected()
x = await client.is_connected()
# print("Connected: {0}".format(x))
# waiting for data to be sent from client
await client.start_notify(BATTERY_LEVEL_UUID, notification_handler)
while not quit_flag :
# giving some time to do other tasks
await asyncio.sleep(0.01)
# checking if we received data
global data_flag
if data_flag:
data_flag = False
# echoing our received data back to the BLE device
data = await client.read_gatt_char(BATTERY_LEVEL_UUID)
except Exception as e:
print(e)
finally:
await client.disconnect()
async def run_uart(address, loop):
global command
global connect_flag
global state
async with BleakClient(address, loop=loop) as client:
try:
# waiting for BLE client to be connected
x = await client.is_connected()
connect_flag = True
# waiting for data to be sent from client
await client.start_notify(UART_RX_UUID, notification_handler)
global quit_flag
while not quit_flag:
# giving some time to do other tasks
await asyncio.sleep(0.01)
if command == "START" and state != State.STATE_MEASURE:
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x53")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x54")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x41")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x52")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x54")
state = State.STATE_MEASURE
elif command == "STOP" and state != State.STATE_STOP:
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x53")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x54")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x4F")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x50")
state = State.STATE_STOP
elif command == "READ" and state != State.STATE_READ:
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x52")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x45")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x41")
await asyncio.sleep(0.01)
await client.write_gatt_char(UART_TX_UUID, b"\x44")
state = State.STATE_READ
# checking if we received data
global data_flag
if data_flag :
data_flag = False
# echoing our received data back to the BLE device
data = await client.read_gatt_char(UART_RX_UUID)
#await client.write_gatt_char(UART_TX_UUID,data)
except Exception as e:
print(e)
finally:
await client.disconnect()
connect_flag = False
async def scan_device_by_name(device_name):
device = await BleakScanner.find_device_by_filter(
lambda d, ad: d.name and d.name.lower() == bt_device_name.lower()
)
strObj = Strings()
device_mac_address = strObj.get_bt_mac_address(str(device))
if device_mac_address is None:
return ""
return device_mac_address
def thread_function(bt_mac_address):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run_uart(bt_mac_address, loop))
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
bt_device_name = "ANCS1"
#bt_device_name = "Ameg09071100"
bt_mac_address = ""
while bt_mac_address == "":
bt_mac_address = asyncio.run(scan_device_by_name(bt_device_name))
time.sleep(0.01)
print(bt_device_name)
print(bt_mac_address)
thread = threading.Thread(target=thread_function, args=(bt_mac_address, ))
thread.start()
while connect_flag == False:
time.sleep(0.01)
while True:
display_manual()
choice = input("Enter an option: ")
choice = choice.upper()
if choice == "M":
measure(bt_mac_address)
elif choice == "S":
stop_measure(bt_mac_address)
elif choice == "R":
read_result(bt_mac_address)
elif choice == "Q":
quit_program()
else:
print("{0} is not a valid choice".format(choice))
#loop = asyncio.new_event_loop()
#asyncio.set_event_loop(loop)
#loop.run_until_complete(run_uart(bt_mac_address, loop))
#thread = threading.Thread(target=thread_function, args=(bt_mac_address,))
#thread.start()
thread.join()
Enter an option: Failed to read characteristic 18: Error Domain=CBATTErrorDomain Code=2 "Reading is not permitted." UserInfo={NSLocalizedDescription=Reading is not permitted.} q
This is an error that comes from the OS. Presumably it is telling the truth and you are trying to read from a characteristic that does not have the read
property flag set. So the solution is probably to just not read this characteristic.
Description
main.zip
% python3 main.py ANCS1 2956B0F4-976D-4E07-CE53-BCA1F475F44A main.py:102: FutureWarning: is_connected has been changed to a property. Calling it as an async method will be removed in a future version x = await client.is_connected() HRV manual M. Measure S. Stop measure R. Reading result Q. Quit
Enter an option: M HRV manual M. Measure S. Stop measure R. Reading result Q. Quit
Enter an option: S HRV manual M. Measure S. Stop measure R. Reading result Q. Quit
Enter an option: R HRV manual M. Measure S. Stop measure R. Reading result Q. Quit
Enter an option: Failed to read characteristic 18: Error Domain=CBATTErrorDomain Code=2 "Reading is not permitted." UserInfo={NSLocalizedDescription=Reading is not permitted.} q