hardbyte / python-can

The can package provides controller area network support for Python developers
https://python-can.readthedocs.io
GNU Lesser General Public License v3.0
1.31k stars 604 forks source link

Why is the same code not running now? #1862

Open hesenrun opened 2 months ago

hesenrun commented 2 months ago

Describe the bug

I used to be able to successfully obtain data using the following code, but now the data cannot be displayed The hardware I am using is Canable.

Additional context

OS and version:Windows10/Ubuntu 24.04 Python version:Python3.10/Python3.12 python-can version:4.2.0/4.4.2 python-can interface/s (if applicable):slcan

Traceback and logs The environment that was once able to run successfully: ```python cantools==39.3.0 mqtt==0.0.1 paho-mqtt==1.6.1 pyserial==3.5 python-can==4.2.0 ``` this is my code example,It was once able to run successfully. ```python # This is a sample Python script. import asyncio import time from typing import List import can from can.notifier import MessageRecipient # import cantools import serial.tools.list_ports # import paho.mqtt.client as mqtt # import json # client = mqtt.Client() # db = cantools.database.load_file('.\wuling.dbc') def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") async def main() -> None: port = None ports_list = list(serial.tools.list_ports.comports()) if len(ports_list) <= 0: print("无串口设备,即将退出...") time.sleep(2) exit(0) else: print("可用的串口设备如下:") for comport in ports_list: print(comport.name + ",描述:" + str(comport.description) + ",设备ID:" + str(comport.vid) + ":" + str( comport.pid)) for comport in ports_list: if comport.pid == 4478 and comport.vid == 5840: print("选择第一个CAN设备:" + comport.name + " " + str(comport.description)) port = comport break if port is None: print("未发现CAN设备,即将退出...") time.sleep(2) exit(0) # client.on_connect = on_connect # client.username_pw_set(username="user", password="user") # client.connect("127.0.0.1", 1883, 60) """The main function that runs in the loop.""" with can.interface.Bus(bustype='slcan', channel=str(port.name), bitrate=500000, ) as bus: reader = can.AsyncBufferedReader() listeners: List[MessageRecipient] = [ reader, ] loop = asyncio.get_running_loop() notifier = can.Notifier(bus, listeners, loop=loop) while True: msg = await reader.get_message() try: print(msg) except Exception as ex: print(ex) pass pass if __name__ == "__main__": asyncio.run(main()) ```