Open kaushik8785 opened 2 years ago
unzipped example
import asyncio
import time
from logging import exception
from os import read
from types import CellType
from typing import final
from bleak import BleakScanner, BleakError, BleakClient, cli, exc
import sys
BLE_NAME = "ABCD 1001"
BLE_ADDRESS='D8:A9:8B:A9:86:00'
from time import sleep
client=None
ADDRESS = None
def notification_handle(sender,data):
print(f"{sender}: {data}")
print("this is an notifiacation handler....")
return
def detection_callback(device, advertisement_data):
# print(device.name)
global ADDRESS
if BLE_NAME == advertisement_data.local_name:
print(advertisement_data.local_name)
ADDRESS = device.address
print(device.address, "\nRSSI:", device.rssi, advertisement_data)
print("found the ABCD 1001 device ")
print("---------------------------------------------------")
async def services_of_device(client):
print("client object in services:{0}".format(client))
print("print client services...")
svcs = await client.get_services()
print("Services:")
for service in svcs:
print(service)
await char_of_device(service,client)
async def char_of_device(service,client):
print("client object in char:{0}".format(client))
for char in service.characteristics:
if "read" in char.properties:
print("Characteristic wirth Read properties:")
try:
value=bytes(await client.read_gatt_char(char.uuid))
print(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
)
except Exception as e:
print(
f"Exception:\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}"
)
elif "write" in char.properties:
print("Characteristic wirth Write properties:")
try:
print(
f"\t[Characteristic]: {char}"
)
except Exception as e:
print(
f"Exception:\t[Characteristic] {char} : {e}"
)
async def pair_device(client):
print("client object in pair:{0}".format(client))
for i in range (1,10):
try:
pair_status=False
print("trying to pair with device {0} time : Pair status:{1}".format(i,pair_status))
pair_status=await client.pair(protection_level=2)
if pair_status==True:
print("pair_status:",pair_status)
break
except Exception as e:
print("Exception in pair:",e)
async def write_char(client,id,input_data):
print(client)
# change data in byte
# byte_value=input_data.to_bytes(16,'little')
byte_value=bytearray([input_data])
for i in range(1,20):
isConnected = client.is_connected
print("is connected:",isConnected)
if isConnected:
try:
print("try to write on char : {0} data:{1}".format(id,input_data))
write_v=await client.write_gatt_char(char_specifier=id,
data=byte_value,
response=True)
await asyncio.sleep(1.0)
break
except Exception as e:
print("Error in write:",e)
else:
try:
await client.connect()
print(client.is_connected)
except Exception as e:
print("Error in connection while writing:",e)
async def read_char(client,id):
print("try to read char:{0} ".format(id))
read_v=await client.read_gatt_char(id)
async def disconnect_device(client):
disconnect_status=await client.disconnect()
print("disconnect:",disconnect_status)
async def run(ble_address: str):
""""""
ble_address=BLE_ADDRESS
print("Ble address:",ble_address)
device = await BleakScanner.find_device_by_address(ble_address, timeout=30.0)
if not device:
raise BleakError(f"A device with address {ble_address} could not be found.")
client = BleakClient(device)
print(dir(client))
for i in range(1,20):
try:
connection_status=False
print("trying to connect...{0} time".format(i))
connection_status= await client.connect()
print(client.is_connected)
if connection_status==True:
print(dir(client))
break
except Exception as e:
print("Excp:",e)
if connection_status==False:
await client.disconnect()
print("not able to connect with device with test script again...")
return
print("client object in pair:{0}".format(client))
for i in range (1,20):
try:
pair_status=False
print("trying to pair with device {0} time : Pair status:{1}".format(i,pair_status))
pair_status=await client.pair(protection_level=2)
print("pair_status:",pair_status)
if pair_status==True:
print(dir(client))
break
except Exception as e:
print("Exception in pair:",e)
await services_of_device(client)
try:
# to write on char.
await write_char(client=client,id='8b777661-9d6c-4945-b17f-b1fd22c27b8b',input_data=0x04)
except Exception as e:
print("error:",e)
#to start notify service
await client.start_notify(char_specifier='8b777661-9d6c-4945-b17f-b1fd22c27b8b',
callback=notification_handle)
await asyncio.sleep(5)
#to stop notify service
await client.stop_notify(char_specifier='8b777661-9d6c-4945-b17f-b1fd22c27b8b')
# to read char value
await read_char(client=client,id='8b777661-9d6c-4945-b17f-b1fd22c27b8b')
# to disconnect the Ble device
disconnect_device(client)
# await client.disconnect()
# print("disconnect")
# to scan the nearby bluetooth devices
async def scanner():
scanner = BleakScanner()
scanner.register_detection_callback(detection_callback)
await scanner.start()
await asyncio.sleep(15.0)
await scanner.stop()
print("==================Bluetooth test script ======================")
loop = asyncio.get_event_loop()
loop.run_until_complete(scanner())
print(ADDRESS)
# scan loop complete here
loop.run_until_complete(run(ADDRESS))
print("================end of script==================")
loop = asyncio.get_event_loop() loop.run_until_complete(scanner()) print(ADDRESS) # scan loop complete here loop.run_until_complete(run(ADDRESS))
There should only be one event loop that runs for the entire duration of the program:
async def main():
await scanner()
print(ADDRESS)
await run(ADDRESS)
asyncio.run(main())
As per your suggestion i have updated the code and tested it, but still facing the same issue. In addition to that I am facing one more issue which is : When BLE device is already paired with PC and when it is trying to connect with the showed BLE device it displays the below exception: "[WinError -2147418113] Catastrophic failure"
This error comes from the WIndows OS. I have found that logging Bluetooth packets with Wireshark (see troubleshooting page of Bleak docs) is the best way to debug these sorts of problems.
Does #1062 fix the WinError -2147483629
error?
bluetoothctl -v
) in case of Linux:What I Did
I am trying to read and write properties using bleak library but not able to do. The ble peripheral required pairing which required passkey.
Attached my code which is used for Bluetooth communication. EXAMPLE.zip
output of attached code:
We are getting every time "[WinError -2147483629] The object has been closed" this type of exception.
Please let us know if any details required.