itas109 / CSerialPort

CSerialPort - lightweight cross-platform serial port library for C++/C/C#/Java/Python/Node.js/Electron
https://blog.csdn.net/itas109/article/details/84282860
Other
710 stars 337 forks source link

发送Hex读取命令,接收不到数据 #72

Closed zhangningboo closed 1 year ago

zhangningboo commented 1 year ago

使用pyserial方式发送查询命令并接收到数据:

    def test_query_tof(self):
        query_byte = [
            # '57 10 FF FF 00 FF FF 63',
            '57 10 FF FF 01 FF FF 64',
            # '57 10 FF FF 02 FF FF 65',
            '57 10 FF FF 03 FF FF 66',
            # '57 10 FF FF 04 FF FF 67',
            # '57 10 FF FF 05 FF FF 68',
        ]
        import serial
        # ser = serial.Serial(port="/dev/ttyTHS0", baudrate=115200, timeout=0.5)  # 40 PIN8-TX; PIN10-RX
        # ser = serial.Serial(port="/dev/ttyTHS1", baudrate=921600, timeout=0.5)
        # ser = serial.Serial(port="/dev/ttyTHS4", baudrate=921600, timeout=0.5)
        ser = serial.Serial(port="/dev/ttyUSB0", baudrate=921600, timeout=0.5)
        cnt = 0
        try:
            while ser.isOpen():
                for qb in query_byte:
                    # print('send cmd')
                    qb = bytes.fromhex(qb)
                    ser.write(qb)
                    time.sleep(0.066)  # 必要的延迟
                    byte_cnt = ser.inWaiting()
                    if byte_cnt and byte_cnt % 16 == 0:
                        data = ser.read(byte_cnt)  # 读取数据
                        data = data[-16:]
                        data = ["%02x" % ord(B) for B in data.decode('ISO-8859-1')]
                        # print(data)
                        print(cnt, len(data), parse_tofsense_data(data))
                    cnt += 1
        except Exception as e:
            print(e)
        finally:
            ser.close()
            print('closed serial')

使用cserialport发送,但接收不到数据,请问是哪里出问题了呢?

# ***************************************************************************
# @file main.py
# @author itas109 (itas109@qq.com) \n\n
# Blog : https://blog.csdn.net/itas109 \n
# Github : https://github.com/itas109 \n
# Gitee : https://gitee.com/itas109 \n
# QQ Group : 129518033
# @brief Python CSerialPort Example  Python的CSerialPort示例程序
############################################################################
import cserialport

def main():
    sp = cserialport.CSerialPort()
    print("Version: %s\n" % (sp.getVersion()))

    listener = MyListener(sp).__disown__()

    spInfoVec = cserialport.CSerialPortInfo.availablePortInfos()
    print("Available Friendly Ports:")
    for index, spInfo in enumerate(spInfoVec, start=1):
        print("%d - %s %s" % (index, spInfo.portName, spInfo.description))

    if len(spInfoVec) == 0:
        print("No Valid Port")
    else:
        while True:
            print("Please Input The Index Of Port(1 - %d)" % (len(spInfoVec)))
            myInput = int(input())
            if myInput >= 1 and myInput <= len(spInfoVec):
                break
        portName = spInfoVec[myInput - 1].portName
        print("Port Name: %s" % (portName))

        sp.init(portName,  # windows:COM1 Linux:/dev/ttyS0
                921600,  # baudrate
                cserialport.ParityNone,  # parity
                cserialport.DataBits8,  # data bit
                cserialport.StopOne,  # stop bit
                cserialport.FlowNone,  # flow
                4096  # read buffer size
                )
        sp.setReadIntervalTimeout(0);  # read interval timeout
        sp.open()
        print("Open %s %s" % (portName, "Success" if sp.isOpen() else "Failed"))

        # connect for read
        sp.connectReadEvent(listener)

        # write hex data
        qb = bytes.fromhex('57 10 FF FF 03 FF FF 66')
        hex = cserialport.malloc_void(len(qb))
        print(str(qb))
        cserialport.memmove(hex, str(qb))
        sp.writeData(hex, len(qb))
        cserialport.free_void(hex)

        # # write str data
        # data = cserialport.malloc_void(7)
        # cserialport.memmove(data, "itas109")
        # sp.writeData(data, 7)
        # cserialport.free_void(data)

        while True:
            pass

def byte2hexStr(data):
    ch = ['0x%02X' % ord(i) for i in data]
    return " ".join(ch)

class MyListener(cserialport.CSerialPortListener):
    def __init__(self, sp):
        MyListener.countRead = 0
        MyListener.sp = sp
        cserialport.CSerialPortListener.__init__(self)

    def onReadEvent(self, portName, readBufferLen):
        data = cserialport.malloc_void(readBufferLen)
        recLen = MyListener.sp.readData(data, readBufferLen)
        str = cserialport.cdata(data, readBufferLen)
        MyListener.countRead += 1
        print("%s - Count: %d, Length: %d, Str: %s, Hex: %s" % (
            portName, MyListener.countRead, recLen, str, byte2hexStr(str)))
        MyListener.sp.writeData(data, readBufferLen)
        cserialport.free_void(data)

if __name__ == '__main__':
    main()

改用cserialport的原因,是因为pyserial读完数据后,必须执行代码

time.sleep(0.066)  # 必要的延迟

导致数据获取时存在延迟,需要换一种不需要等待的串口库(sensor的厂家说在单片机上是不需要等待,即发送查询命令后可以直接获取数据)

itas109 commented 1 year ago

先用虚拟串口测试一下吧,bindings/python的demo测试可以正常收发 可能有些下位机需要加\n等结束符才能应答