skysilver-lab / php-miio

Реализация взаимодействия с устройствами из экосистемы xiaomi по протоколу miIO
MIT License
119 stars 21 forks source link

Получение токена через облако #10

Open alexpts opened 3 years ago

alexpts commented 3 years ago

Предлагаю портировать к себе такой способ получения токена через api и учетную запись на xiaomi:

import base64
import hashlib
import hmac
import json
import os
import requests

from Crypto.Hash import MD5, SHA256

class XiaomiCloudConnector:

    def __init__(self, username, password):
        self._username = username
        self._password = password
        self._session = requests.session()
        self._ssecurity = None
        self._userId = None
        self._location = None
        self._serviceToken = None

    def login_step_2(self):
        url = "https://account.xiaomi.com/pass/serviceLoginAuth2"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        fields = {
            "sid": "xiaomiio",
            "hash": (MD5.new(str.encode(self._password)).hexdigest() + "").upper(),
            "user": self._username,
            "_json": "true"
        }
        response = self._session.post(url, headers=headers, params=fields)
        valid = response.status_code == 200 and "ssecurity" in self.to_json(response.text)
        if valid:
            json_resp = self.to_json(response.text)
            self._ssecurity = json_resp["ssecurity"]
            self._userId = json_resp["userId"]
            self._location = json_resp["location"]

        return valid

    def login_step_3(self):
        headers = {
            #"User-Agent": self._agent,
            "Content-Type": "application/x-www-form-urlencoded"
        }
        response = self._session.get(self._location, headers=headers)
        if response.status_code == 200:
            self._serviceToken = response.cookies.get("serviceToken")
        return response.status_code == 200

    def login(self):
        if self.login_step_2():
            if self.login_step_3():
                return True
            else:
                print("Unable to get service token.")
        else:
            print("Invalid login or password.")

        return False

    def get_devices(self, country):
        url = self.get_api_url(country) + "/home/device_list"
        params = {
            "data": '{"getVirtualModel":false,"getHuamiDevices":0}'
        }
        return self.execute_api_call(url, params)

    def execute_api_call(self, url, params):
        headers = {
            "Accept-Encoding": "gzip",
            "Content-Type": "application/x-www-form-urlencoded",
            "x-xiaomi-protocal-flag-cli": "PROTOCAL-HTTP2"
        }
        cookies = {
            "userId": str(self._userId),
            "serviceToken": str(self._serviceToken),
        }

        nonce = self.generate_nonce()
        signed_nonce = self.signed_nonce(nonce)
        signature = self.generate_signature(url.replace("/app", ""), signed_nonce, nonce, params)
        fields = {
            "signature": signature,
            "_nonce": nonce,
            "data": params["data"]
        }
        response = self._session.post(url, headers=headers, cookies=cookies, params=fields)
        if response.status_code == 200:
            return response.json()
        return None

    def get_api_url(self, country):
        return "https://" + ("" if country == "cn" else (country + ".")) + "api.io.mi.com/app"

    def signed_nonce(self, nonce):
        hash_object = SHA256.new()
        hash_object.update( base64.b64decode(self._ssecurity) + base64.b64decode(nonce) )
        return base64.b64encode(hash_object.digest()).decode('utf-8')

    @staticmethod
    def generate_nonce():
        nonce_bytes = os.urandom(12)
        return base64.b64encode(nonce_bytes).decode()

    @staticmethod
    def generate_signature(url, signed_nonce, nonce, params):
        signature_params = [url.split("com")[1], signed_nonce, nonce]
        for k, v in params.items():
            signature_params.append(f"{k}={v}")
        signature_string = "&".join(signature_params)
        signature = hmac.new(base64.b64decode(signed_nonce), msg=signature_string.encode(), digestmod=hashlib.sha256)
        return base64.b64encode(signature.digest()).decode()

    @staticmethod
    def to_json(response_text):
        return json.loads(response_text.replace("&&&START&&&", ""))

print("Username (email or user ID):")
username = input()

print("Password:")
password = input()

print("Country (one of: ru, us, tw, sg, cn, de) Leave empty to check all available:")
country = input()

while country not in ["", "ru", "us", "tw", "sg", "cn", "de"]:
    print("Invalid country provided. Valid values: ru, us, tw, sg, cn, de")
    print("Country:")
    country = input()

print()
countries = ["cn", "de", "us", "ru", "tw", "sg"]
if not country == "":
    countries = [country]

connector = XiaomiCloudConnector(username, password)
print("Logging in...")
logged = connector.login()
if logged:
    print("Logged in.")
    print()
    for current_country in countries:
        devices = connector.get_devices(current_country)
        if devices is not None:
            if len(devices["result"]["list"]) == 0:
                print(f"No devices found for country \"{current_country}\".")
                continue
            print(f"Devices found for country \"{current_country}\":")
            for device in devices["result"]["list"]:
                print("   ---------")
                if "name" in device:
                    print("   NAME:  " + device["name"])
                if "did" in device:
                    print("   ID:    " + device["did"])
                if "localip" in device:
                    print("   IP:    " + device["localip"])
                if "token" in device:
                    print("   TOKEN: " + device["token"])
                if "model" in device:
                    print("   MODEL: " + device["model"])
            print("   ---------")
            print()
        else:
            print("Unable to get devices.")
else:
    print("Unable to log in.")

Полная более избыточная версия исходная - https://github.com/PiotrMachowski/Xiaomi-cloud-tokens-extractor/blob/master/token_extractor.py