bohdan-s / SungrowModbusWebClient

Access Modbus RTU via API call to Sungrow WiNet-S
GNU General Public License v3.0
21 stars 8 forks source link

Write Holding Registers #1

Closed jsindorff closed 2 years ago

jsindorff commented 2 years ago

This is a great webclient. Many thanks.

Am I right in that it doesn't support writing holding registers to the inverter at the moment? Presumably to write we can't use the GetParams API endpoint and there's another one to use.

Any info on the WiNet web API you could share? Happy to contribute to your project if you like

bohdan-s commented 2 years ago

@jsindorff i couldn’t find any setting I would want to write. But I’d have something in mind I can implement it. as for the API that’s a good idea. I’ll publish what I found for others to use.

jsindorff commented 2 years ago

@bohdan-s that would be great if you could share? Or how you did the API discovery. I have an SH10RT and I need to write to it - like others I found that WiNet firmware broke ModBus.

jsindorff commented 2 years ago

FYI - I checked the javascript and figured it out. Here's the updated code, still getting a ModBusIO error "Unable to decode" for the returned response, but it still seems to work ok

` def _send(self, request):

    if not self.ws_token:
        self.connect()

    self.header = request

    """ param_type should be:
        0 = 4x read-only 
        1 = 3x holding 
    """
    if str(request[7]) == '4':
        param_type=0
    elif str(request[7]) == '3':
        param_type=1

    address = 256 * request[8] + request[9]
    count = 256 * request[10] + request[11]
    dev_id = str(request[6])
    param_size = 1  # 1=unsigned 16, 0=signed16, 2=unsigned32, 3=signed32
    self.payload_modbus = ""
    # Comment
    try:
        if str(request[7]) == '6':
            url = f'http://{str(self.dev_host)}/device/setParam?token={self.ws_token}&lang=en_us&time123456={str(int(time.time()))}'
            data = {"dev_id": int(dev_id),
                    "dev_type": self.dev_type,
                    "dev_code": self.dev_code,
                    "param_size": param_size,
                    "param_addr": address,
                    "param_value": count}
            logging.debug("Calling URL:%s " % url)
            logging.debug("POST Data: %s" % str(data) )

            r = requests.post(url, data)
        else:
            #Lookup for getParam
            dev_code = next(s for s in self.model_codes if self.model_codes[s] == self.dev_code)
            logging.debug("param_type: " + str(param_type) + ", start_address: " + str(address) + ", count: " + str(
                count) + ", dev_id: " + str(dev_id))
            logging.debug(
                f'Calling: http://{str(self.dev_host)}/device/getParam?dev_id={dev_id}&dev_type={str(self.dev_type)}&dev_code={str(dev_code)}&type=3&param_addr={address}&param_num={count}&param_type={str(param_type)}&token={self.ws_token}&lang=en_us&time123456={str(int(time.time()))}')

            r = requests.get(
                f'http://{str(self.dev_host)}/device/getParam?dev_id={dev_id}&dev_type={str(self.dev_type)}&dev_code={str(dev_code)}&type=3&param_addr={address}&param_num={count}&param_type={str(param_type)}&token={self.ws_token}&lang=en_us&time123456={str(int(time.time()))}',
                timeout=3)
    except Exception as err:
        raise ConnectionException(f"HTTP Request failed: {str(err)}")
    logging.debug("HTTP Status code " + str(r.status_code))
    if str(r.status_code) == '200':
        self.payload_dict = json.loads(str(r.text))
        logging.debug("Payload Status code " + str(self.payload_dict.get('result_code', "N/A")))
        logging.debug("Payload Dict: " + str(self.payload_dict))
        if self.payload_dict.get('result_code',0) == 1:
            if str(request[7]) == '6':
                data_len = 0
                self.payload_modbus = ['00', format(request[1], '02x'), '00', '00', '00',
                                       format((data_len + 3), '02x'), format(request[6], '02x'),
                                       format(request[7], '02x'), format(data_len, '02x')]
            else:
                modbus_data = self.payload_dict['result_data']['param_value'].split(' ')
                modbus_data.pop() # remove null on the end
                data_len = int(len(modbus_data)) # length of data
                logging.debug("Data length: " + str(data_len))
                # Build the Header, The header is consumed by pyModbus and not actually sent to the device
                # [aaaa][bbbb][cccc][dd][ee][ff] a = Transaction ID, b = Protocol ID, c = Message Length (Data + Header of 3), d =  Device ID, e = Function Code, f = Data Length
                self.payload_modbus = ['00', format(request[1], '02x'), '00', '00', '00', format((data_len+3), '02x'), format(request[6], '02x'), format(request[7], '02x'), format(data_len, '02x')]
                # Attach the data we recieved from HTTP request to the header we created to make a modbus RTU message
                self.payload_modbus.extend(modbus_data)
            return self.payload_modbus`