zigpy / zigpy-znp

TI CC2531, CC13x2, CC26x2 radio support for Zigpy and ZHA
GNU General Public License v3.0
144 stars 40 forks source link

Request failed after 5 attempts: <Status.NWK_NO_ROUTE: 205> #180

Closed naive-HA closed 1 year ago

naive-HA commented 1 year ago

I am playing with a SONOFF ZIGBEE 3.0 USB Dongle Plus and a temperature sensor by SONOFF I am using the below code to query the temperature several times in a row: 19 times every 20 seconds. All works fine for just 3 times and then I get an error message: Request failed after 5 attempts: <Status.NWK_NO_ROUTE: 205> Can someone share some advice? What am I doing wrong? Do I query too often? Should I do something in between? If I press the pairing button while querying, I get an event notification saying the device left: I understand the device can still communicate with the coordinator, it just stops responding to repeated queries... Any advice is much appreciated

` import asyncio import zhaquirks #how to use quirks? import zigpy.profiles from zigpy_znp.zigbee.application import ControllerApplication

class MainListener: def init(self, application): self.application = application self.ieee = list()

def device_joined(self, device):
    print(f"Device joined: {device}")
    self.ieee.append(device.ieee)

def device_left(self, device):
    print(f"Device left: {device}")
    self.ieee.remove(device.ieee)

def device_removed(self, device):
    print(f"Device removed: {device}")
    self.ieee.remove(device.ieee)

def device_initialized(self, device, *, new=True):
    for endpoint_id, endpoint in device.endpoints.items():
        if endpoint_id == 0:
            continue  #skip zdo
        if getattr(endpoint, 'device_type', None) == zigpy.profiles.zha.DeviceType.IAS_CONTROL:
            continue #skip coordinator
        if getattr(endpoint, 'profile_id', None) == 49246:
            continue #skip coordinator

        # You need to attach a listener to every cluster to receive events
        for cluster in endpoint.in_clusters.values():
            # The context listener passes its own object as the first argument
            # to the callback
            cluster.add_context_listener(self)

        for cluster in endpoint.out_clusters.values():
            # The context listener passes its own object as the first argument
            # to the callback
            cluster.add_context_listener(self)

        print(30*"X")
        print(getattr(endpoint, 'device_type', None))
        print("output_clusters:", [cluster.cluster_id for cluster in endpoint.out_clusters.values()])
        for cluster_id in [cluster.cluster_id for cluster in endpoint.out_clusters.values()]:
            for attribute in list(device.endpoints[endpoint_id].out_clusters[cluster_id].attributes_by_name):
                print("cluster: ", cluster_id, ": ", device.endpoints[endpoint_id].out_clusters[cluster_id].ep_attribute, \
                    " :: attribute: ", attribute)
            for command_id in list(device.endpoints[endpoint_id].out_clusters[cluster_id].server_commands):
                print("cluster: ", cluster_id, ": ", device.endpoints[endpoint_id].out_clusters[cluster_id].ep_attribute, \
                    " :: command: ", device.endpoints[endpoint_id].out_clusters[cluster_id].server_commands[command_id].name)
        print(30*"X")
        print("input_clusters:", [cluster.cluster_id for cluster in endpoint.in_clusters.values()])
        for cluster_id in [cluster.cluster_id for cluster in endpoint.in_clusters.values()]:
            for attribute in list(device.endpoints[endpoint_id].in_clusters[cluster_id].attributes_by_name):
                print("cluster: ", cluster_id, ": ", device.endpoints[endpoint_id].in_clusters[cluster_id].ep_attribute, \
                    " :: attribute: ", attribute)
            for command_id in list(device.endpoints[endpoint_id].in_clusters[cluster_id].server_commands):
                print("cluster: ", cluster_id, ": ", device.endpoints[endpoint_id].in_clusters[cluster_id].ep_attribute, \
                    " :: command: ", device.endpoints[endpoint_id].in_clusters[cluster_id].server_commands[command_id].name)
        print(30*"X")

def attribute_updated(self, cluster, attribute_id, value):
    print(f"Received an attribute update "
          f"{cluster._endpoint._device.endpoints[cluster._endpoint._endpoint_id].in_clusters[cluster.cluster_id].find_attribute(attribute_id).name}={value}"
          f" on cluster {cluster.ep_attribute} from device {cluster._endpoint._device._ieee}")

def cluster_command(self, hdrtsn, hdrcommand_id, args):
    print(f"command ID: {hdrcommand_id}")

async def main():

ls -l /dev/serial/by-id

app = ControllerApplication(\
    ControllerApplication.SCHEMA({"database_path": "/home/user/Documents/zigpy/zigbee.db",
                                  "device": {"path": "/dev/serial/by-id/usb-ITead_Sonoff_Zigbee_3.0_USB_Dongle_Plus_b47ef8a90460ec119f99345f25bfaa52-if00-port0"}}))

listener = MainListener(app)
app.add_listener(listener)
await app.startup(auto_form=True)
# Have every device in the database fire the same event so you can attach listeners
for device in app.devices.values():
    listener.device_initialized(device, new=False)

permit_duration = 20
await app.permit(permit_duration)
print("join now")
await asyncio.sleep(permit_duration)
print("joining ends")

for device in app.devices.values():
    if str(device.ieee) == "00:12:4b:00:25:15:98:f3":
        for i in range (0, 20):
            try:
                await device.endpoints[1].in_clusters[1026].read_attributes(["measured_value"])
                await device.endpoints[1].in_clusters[1029].read_attributes(["measured_value"])
                time.sleep(20)
            except Exception as e:
                print(e)
                time.sleep(10)

await asyncio.get_running_loop().create_future()

if name == "main": import time asyncio.run(main()) `

naive-HA commented 1 year ago

*I have no idea why the above code snippet fails to display properly: I am using the right indentation...

puddly commented 1 year ago

I am using the below code to query the temperature several times in a row:

Any battery-operated Zigbee device will be sleeping 99% of the time and will probably not be awake to receive any command you send it. You will need to set up attribute reporting and ask the device to send you attribute updates when it joins.

naive-HA commented 1 year ago

Makes sense. I saw Home Assistant has a refresh button reading the values off the device, although I have not waited long enough to see whether the device goes to sleep and that refresh fails also under Home Assistant I guess I need to understand better the philosophy of zigbee