jasonacox / tinytuya

Python API for Tuya WiFi smart devices using a direct local area network (LAN) connection or the cloud (TuyaCloud API).
MIT License
1k stars 177 forks source link

Possible to start tinytuya as a service #101

Open Bonze255 opened 2 years ago

Bonze255 commented 2 years ago

it shows me, that i must poll every device to get the state of it? its possible to open one socket and check via callback, which device is send his state?

jasonacox commented 2 years ago

Good question. Unfortunately, it is not as easy as just listening. The Tuya devices do broadcast their signature via UDP packets, but the payload only contains their identifier, not their state. Tuya devices are designed to send their state updates to the cloud. They also listen on the local network for queries and commands which is how TinyTuya works.

To get close to what you are wanting, you can have TinyTuya make a connection to the device and listen for updates:

import tinytuya

d = tinytuya.OutletDevice('DEVICEID', 'DEVICEIP', 'DEVICEKEY')
d.set_version(3.3)
d.set_socketPersistent(True)

print(" > Send Request for Status < ")
payload = d.generate_payload(tinytuya.DP_QUERY)
d.send(payload)

print(" > Begin Monitor Loop <")
while(True):
    # See if any update is available
    data = d.receive()
    print('Received Payload: %r' % data)

    # Send keyalive heartbeat
    print(" > Send Heartbeat Ping < ")
    payload = d.generate_payload(tinytuya.HEART_BEAT)
    d.send(payload)
Bonze255 commented 2 years ago

yes ive read this, .. for 1 device its just easy, but when i have 10 devices its not so easy, also when they are on/off and their is a timeout.. so i think to check over UDP if there are online and then read their Status is even better? send the devices when they are going online a message?

my problem is, i would wrote a plugin for www.smarthomeng.de when a defined deicve is not online the open port is blocket, so i can not read/write on it at this time, when i have more than 1 device defined..?

jasonacox commented 2 years ago

For the timeout, you can watch for that and reconnect in the code. But I understand the concern. I have 30+ devices. A "service" could still be created where it maintains a "memory" of device states and uses TinyTuya to poll the devices on some frequency (15s, 5m, etc), or push changes if requested.

If you come up with a solution, please let us know! Perhaps someone else in the community can post some ideas too.

Alternatively, you could look at using the Tinytuya cloud functions to monitor and control the devices. Still, I don't think that is what you are looking for.

Bonze255 commented 2 years ago

mh my idea fpra quick fix, is to ping all devices, each x s , save them to a dict, so can i check bevor i read something to a non existing device, if this device is powered on ?!

but this is not the ultimative solution .

i think it would be even better, to split of the UDP part to a "Server" function, decode the data and give the data to the specified device..

jasonacox commented 2 years ago

I'm not really following. Devices only send their ID, IP and version via UDP, not state. But yes, you could have a python based server constantly listen for that and build a dictionary of that. Can you hack together an example?

deepcoder commented 2 years ago

Hey @jasonacox thanks very much for your work on this project, it is really been helpful to me to learn a little about how Tuya devices operate. I'm still trying to get my feeble brain around how and where a Tuya device sends updates about changes to its state. Your project appears to be a polling model, others one the interweb seem to claim they have some type of device push model for local updates. It seems to me that a Tuya device has to send push update to the Tuya cloud for their products to work, you make a statement above 'Tuya devices are designed to send their state updates to the cloud.'. So my question is in line with others, is there a way to get local network notifications from Tuya devices? Or is polling the only local solution and are the folks that seem to claim local push, blowing smoke? Thanks again!

jasonacox commented 2 years ago

Hi @deepcoder - thanks for the note! I can be wrong and love learning something new. Please point me to any source that you find that has any information about a different way to get updates from these devices. I've poked around quite a bit to other projects like pytuya, localtuya, tuyapi, tuya-mqtt, and they all do the same thing tinytuya is doing (initiating the TCP connection to the Tuya device).

A few things:

Please let me know if you find any other details. Like I say, I would love to learn something new about these devices.

Bonze255 commented 2 years ago

You CAN get Tuya devices to send status update to an endpoint. APIs like localtuya and tuya-mqtt appear to be doing this and tinytuya can do the same. To do it, you do what I have above using the d.receive() function in a loop to check for any new updates the device may send to you. You would want to occasionally d.send() a heartbeat payload to the device to make sure it still knows you are wanting updates. If it errors out during hearbeat, you would want to re-initialize the connection. You could create a "server" that basically opens up a handle to receive updates from all your devices (localtuya in home assistant does this).

that means, a server function : -to poll all devices for an update with d.send(HEARTBEAT) for each device?

or how must bee the strcuture for more than 1 device?

Bonze255 commented 2 years ago
        while True:
            for device in self.devicelist:
                print(" > Send Request for Status < ")
                payload = self.devicelist[device].generate_payload(tinytuya.DP_QUERY)
                self.devicelist[device].send(payload)
                # See if any data is available
                data = self.devicelist[device].receive()
                if data != None:
                    print('Received Payload: %r' % data)

                # Send keyalive heartbeat
                print(" > Send Heartbeat Ping < ")
                payload = self.devicelist[device].generate_payload(tinytuya.HEART_BEAT)
                self.devicelist[device].send(payload)
                time.sleep(2)

? when i restart the device , i got a this state

Received Payload: {'Error': 'Device22 Detected: Retry Command', 'Err': '907', 'Payload': None}

deepcoder commented 2 years ago

Hi @jasonacox, thank you for the info and education! It was the 'local tuya' github home assistant project that I was specifically referring to. In their docs, they say 'This custom integration updates device status via push updates instead of polling, so status updates are fast (even when manually operated)'. In your reply, one of the ideas you discuss it to have 'd.receive() function in a loop'. IMHO, that not a push model, that's pull/poll...

I created a simple python script using your library to do as you recommended with d.receive() in a loop. I am wondering if you have build your apps round your library using a threading library for python? If so which one and could you share an example. I am trying to use 'asyncio', and while it is kind of working, my weak python and specifically 'asyncio' skills are limiting my success so far.

when my asyncio function calls 'd.receive()', with your default parameters set, my python app seems to get locked for 5 seconds, not just the asyncio function that calls 'd.receive()'. I seem to be able to 'tune' the parameters to shorten this period, but the fundamental problem is that while 'd.receive()' is executing the whole python app is no longer asynchronous, but frozen.

Any ideas you might have for a code model would facilitate asynchronous MQTT and your library would greatly be appreciated!

Bonze255 commented 2 years ago

You can use the threading module , this works fine for me ..

deepcoder commented 2 years ago

Thank you @Bonze255 for directing me to python thread library, my code performs better with this multitasking library vs. asyncio. Much to learn for me in Python. I'm still learning how these Tuya based devices operate. Clearly, as @jasonacox say, the best way is to flash firmware like Tasmota or espHome that is 'local focused'. However, for more and more of these 'interesting' home automation devices, this is not an option as the vendors have either leapfrogged the firmware replacement hacks for the esp based devices or replaced the esp devices with other hardware.

As a suboptimal path for these Tuya devices, I am hoping there is a way to a least create some type of network call back intercept of their 'hey I have just state changed something' broadcasts to the cloud and/or local network and then execute a query of the device attribute states....

jasonacox commented 2 years ago

If you are willing, please share any of the code you develop to do the multi-threaded handling. I would love to add that to our example collection. Better yet, submit a pull request so you get contributor credit.

@deepcoder Thanks for the note about local tuya for home-assistant (hass)! The note you mention references the fact that they rebuilt the pytuya code to "push" the Tuya device DPS updates to hass instead of having hass poll pytuya for the device state.

I had a conversation with rospogrigio and postlund about joining efforts when clash04 shut down development on pytuya. Both tinytuya and localtuya are based on pytuya and clash04's great work. We talked about joining efforts but rospogrigio and team were focused on creating a library for home-assistant instead of a Tuya API module. The work that postlund did in the resulting python code using asyncio is simply incredible, and honestly it is beyond my ability. It is designed for a server operation similar to what you are looking for but tightly coupled with hass. From what I can tell from their repo, they connect to all devices and have asyncio handlers to react to packets received, similar to what you doing with tinytuya's send() and receive(). They also have a heartbeat loop running. Here is the section in the code that seem to be most relevant (scroll up to see the rest of the functions that are similar to tinytuya):

https://github.com/rospogrigio/localtuya/blob/2688a170c8b22dc448e845911dd63f3bba378580/custom_components/localtuya/pytuya/__init__.py#L657

Here is my thought: I would love to add even more support within tinytuya to make this type of server function easier and your interest helps me think how we can do this. I have an idea of building a GUI or shell based interactive interface built on tinytuya that would definitely use a more multithreaded async approach. At the same time, there are hundreds of projects that are using tinytuya as a pytuya replacement so I want to keep the same API. I also want to keep the API and code generally approachable to any python coder.

Anything the two of you can develop and contribute would be very much appreciated! I would love to add several multithreaded example scripts.

jasonacox commented 2 years ago
        while True:
            for device in self.devicelist:
                print(" > Send Request for Status < ")
                payload = self.devicelist[device].generate_payload(tinytuya.DP_QUERY)
                self.devicelist[device].send(payload)
                # See if any data is available
                data = self.devicelist[device].receive()
                if data != None:
                    print('Received Payload: %r' % data)

                # Send keyalive heartbeat
                print(" > Send Heartbeat Ping < ")
                payload = self.devicelist[device].generate_payload(tinytuya.HEART_BEAT)
                self.devicelist[device].send(payload)
                time.sleep(2)

? when i restart the device , i got a this state

Received Payload: {'Error': 'Device22 Detected: Retry Command', 'Err': '907', 'Payload': None}

@Bonze255 You will probably want to add a condition handler to close and reconnect when it detects an error like that. Maybe something like:

if 'Err' in data:
    d.close()
    d = tinytuya.OutletDevice('DEVICEID', 'DEVICEIP', 'DEVICEKEY')
    d.set_version(3.3)
    d.set_socketPersistent(True)
Bonze255 commented 2 years ago

ok ive played a bit around..

jasonacox commented 2 years ago
  • in the main thread i change the colour of 1 device, every 2s this worfks fine, but, iam think about asyncio , maybe it would better for this ? when i have 30 devices, i think its not so good, to use 30 threads?

Nice job! Yes, I think asyncio would be a good option. It is designed for asynchronous IO handling which is what you are wanting. The number of threads is only limited by the amount of RAM in your computer. Each thread is likely 1M or less so a 8G machine could easily handle 8000 threads. 30 should not be a problem.

Please do share your code if you get something you like. :)

Bonze255 commented 2 years ago

i get now the state of the device ..

{'dps': {'20': True, '21': 'colour', '24': '00f003e803e8', '26': 0}}

but i cant use the function get_colour() or something.. while colour_rgb() it use def colour_rgb(self): """Return colour as RGB value""" hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR[self.bulb_type]] return BulbDevice._hexvalue_to_rgb(hexvalue, self.bulb_type) the best option should, that recieve() == self.status , or do you have an idea? I would not impllement all device status functions myself :D

jasonacox commented 2 years ago

I don't think I fully understand what you are trying to accomplish. If you want to parse the results from the state, for smart bulb devices, you can parse the 24 DPS key. It is a hex value representing the colour so you can use this:

# Data received
data = {'dps': {'20': True, '21': 'colour', '24': '00f003e803e8', '26': 0}}
# Parse
(r, g, b) = tinytuya.BulbDevice._hexvalue_to_rgb(data['dps']['24'])
print("R=%d G=%d B=%d" % (r,g,b))
R=0 G=240 B=3
Bonze255 commented 2 years ago

the first code is now ready,

but there must be an configuration failure.. when i power on the device, i cant set /read the colour value, .- it shows an "error device22" detected...

i must set the value with the tuya app, then i can read/write the colour value with the code..

i create the device with this code

self.devicelist[aDevice['name']] = tinytuya.BulbDevice(aDevice['id'], aDevice['ip'], aDevice['local_key'])
self.devicelist[aDevice['name']].set_version(3.3)
self.devicelist[aDevice['name']].set_dpsUsed({"20": None,"21": None, "22": None, "23": None, "24": None, "25": None, "26": None })
self.devicelist[aDevice['name']].set_socketPersistent(True)
jasonacox commented 2 years ago

Try to specify device22 as the type:

self.devicelist[aDevice['name']] = tinytuya.BulbDevice(aDevice['id'], aDevice['ip'], aDevice['local_key'], 'device22')
self.devicelist[aDevice['name']].set_version(3.3)
self.devicelist[aDevice['name']].set_dpsUsed({"20": None,"21": None, "22": None, "23": None, "24": None, "25": None, "26": None })
self.devicelist[aDevice['name']].set_socketPersistent(True)
Bonze255 commented 2 years ago

when i set "device22" as val, i couldnt set anything to the device...

here is my sample code..

#!/usr/env python
import json 
import  os 
import tinytuya
import time
from threading import Thread

class Tuya():

    def __init__(self):
        localkey ="cjghjjhj"
        self.devicelist = {} #holds all devices-objects
        _data = [] #liste der devices und deren keywords
        self.watchedItems = {}
        self.watchedItems['INNOVATE - RGBW Light'] = [['itempath','colour'], ['itempath','mode']]
        self.create_devices()
        self.checker()
        #heartbeater = Thread(name = 'heartbeat_threat', target = self.heartbeat, args = ())
        #heartbeater.start()

        #time.sleep(2)
        print(self.devicelist['INNOVATE - RGBW Light'].set_mode('colour'))
        while True: 

            self.devicelist['INNOVATE - RGBW Light'].set_colour(255,0,0)
            time.sleep(2)
            self.devicelist['INNOVATE - RGBW Light'].set_colour(0,255,0)
            time.sleep(2)
            self.devicelist['INNOVATE - RGBW Light'].set_colour(0,0,255)
            time.sleep(5)   
        #print("devicelist", self.devicelist)
      # ----------------------------------------------------------------------------------------------
    # Daten Lesen, über scheduler
    # ----------------------------------------------------------------------------------------------
    def checker(self):
        for device in self.devicelist:
            """
            Create thread instances for alle devices to control them 
            """
            read_data= Thread(name = "Read_"+device, target = self.read_data_thread, args = (device, ))
            read_data.start()

            """
            Create thread instances for alle devices ping them 
            """

            heartbeat= Thread(name = "Heartbeat_"+device, target = self.heartbeat, args = (device, ))
            heartbeat.start()    
    def update_item(self, device=None, data =None):
        """
        callback, when state of device is updated
        self._watchedItems[name] = [item, function]
        """
        try:
            if device != None:
                for items in self.watchedItems[device]:
                    #item, readfunction = self.watchedItems[device][items]        
                    item, readfunction = items   
                    # item = devicevals[0]
                    # function = devicevals[1]
                    value = None
                    self.devicelist[device].status = data
                    if readfunction == 'power':
                        if  self.devicelist[device].state()['is_on'] == True:
                            value = 'true'
                        else:
                            value = 'false'
                    elif readfunction == 'colourtemp':
                        hex = data[self.devicelist[device].DPS][self.devicelist[device].DPS_INDEX_COLOURTEMP[self.devicelist[device].bulb_type]] 
                        value = hex
                    elif readfunction == 'colour':   
                        hex = data[self.devicelist[device].DPS][self.devicelist[device].DPS_INDEX_COLOUR[self.devicelist[device].bulb_type]]      
                        value = self.devicelist[device]._hexvalue_to_rgb(hex, self.devicelist[device].bulb_type)

                    elif readfunction == 'brightness':
                        hex = data[self.devicelist[device].DPS][self.devicelist[device].DPS_INDEX_BRIGHTNESS[self.devicelist[device].bulb_type]] 
                        value = hex
                    elif readfunction == 'mode':
                        hex = data[self.devicelist[device].DPS][self.devicelist[device].DPS_INDEX_MODE[self.devicelist[device].bulb_type]] 
                        value = hex

                    print("read Value from Device",device, readfunction, value)
        except Exception as e:
            print("ERROR: UPDATE ITEM, ", e,device, value)
            value = None

    def read_data_thread(self, device):
        """
        starts for every Device a thread to poll the data .. 
        """

        while True:
            try: 
                print(" > Send Request for Status < ")
                payload = self.devicelist[device].generate_payload(tinytuya.DP_QUERY)
                self.devicelist[device].send(payload)

            except Exception as e:
                print(" > ERROR in THREAD SEND REQUEST < ",device,  e)
            finally:
                try:     
                    # See if any data is available
                    data = self.devicelist[device].receive()
                    if not data == None and not 'Err' in data:
                        #call function to update the specific items!
                        print(device, ' Received Payload: %r' % data)
                        self.update_item(device, data)
                    elif not data == None and 'Err' in data:
                        if data['Err'] == 900:#Unreachable'
                            self.devicelist.pop(device)
                            time.sleep(5)
                            deviceVals = self.getDevice(device)
                            deviceid = deviceVals['id']
                            deviceip = deviceVals['ip']
                            devicekey = deviceVals['local_key']

                            self.devicelist[device] = tinytuya.BulbDevice(deviceid, deviceip, devicekey)
                            self.devicelist[device].set_dpsUsed({"20": None,"21": None, "22": None, "23": None, "24": None, "25": None, "26": None })
                            self.devicelist[device].set_version(3.3)
                            self.devicelist[device].set_socketPersistent(True)
                            self.devicelist[device].set_mode('colour')

                    else:
                        print("Answer ", data)
                except Exception as e:
                    print(" > ERROR in THREAD Read from < ",device,  e)   
            time.sleep(2)

    def heartbeat(self, device):
        while True: 
            try:       
                # Send keyalive heartbeat
                print(device," > Send Heartbeat Ping < ")
                payload = self.devicelist[device].generate_payload(tinytuya.HEART_BEAT)
                self.devicelist[device].send(payload)

            except Exception as e:
                print(" > ERROR in THREAD Send Heartbeat from < ", device ,  e)    
            time.sleep(3)

    def create_devices(self):
        """
        Create Tuya Devices from snapshot.json and save them to de devicelist
        """

        try: 
            with open(os.path.join(os.path.dirname(__file__), "tuya-raw.json")) as json_file:
                self.devicefile = json.load(json_file)
        except (FileExistsError, FileNotFoundError) as e:
            print("Snapshot File not Found!")

        for aDevice in self.devicefile['result']:
            #if "Light" in aDevice['name']:
            self.devicelist[aDevice['name']] = tinytuya.BulbDevice(aDevice['id'], aDevice['ip'], aDevice['local_key'])
            self.devicelist[aDevice['name']].set_version(3.3)
            self.devicelist[aDevice['name']].set_dpsUsed({"20": None,"21": None, "22": None, "23": None, "24": None, "25": None, "26": None })
            self.devicelist[aDevice['name']].set_socketPersistent(True)
            time.sleep(1)
            data = self.devicelist[aDevice['name']].state()
            print(aDevice['name'] , data)

        print(self.devicelist)

        pass
    def getDevice(self, device=None):
        """
        get vals from device from devicefile
        """
        for aDevice in self.devicefile['result']:
            if aDevice['name'] == device:
                return aDevice
            break

i read the devices from the tuya-raw file start for every device a heartbeat thread and a read thread, because update_item() is a method used by smarthomeNG ...

Bonze255 commented 2 years ago

i think it works good now, but my device would not send the state of dpt 24, after start up. I must set the colour via the tuya app, once. then it works without problems. It is adevice problem? I dont know..

jasonacox commented 2 years ago

I haven't seen that with any of the RGB smartbulbs that I have. Instead of using the Tuya app, can you send the command to the bulb to switch to colour mode to see if that helps?

d.set_mode('colour')

Thanks for the updates. This does seem more like a problematic bulb than a code issue with TinyTuya but I do like to explore these just in case.

Bonze255 commented 2 years ago

ihave played a lot around, .. my probllem is that the Bulbs are sometimes not on energy.. so i think the tinytuya .recieve() method blocks with an timeout my code ..

send for every device a message to get the status payload = self.devicelist[device].generate_payload(tinytuya.DP_QUERY) self.devicelist[device].send(payload)

then i start a new thread for every device to get the status data = self.devicelist[device].receive() but this block the code, because the recieve method wait for the timeout.. when one device is not powered on..

Do you have an idea to evade this?

EDIT: to make it a little bit more faster i had set the .socketRetryLimit = 1