Closed giadefa closed 1 year ago
We have a Python (not MicroPython) library called pybricksdev that can be used for this.
It can be used as a command line tool to download and run programs (as an alternative to our online ID), or as a Python library to communicate with hubs (your use case).
We used to have a few Python examples of this, but they are a bit outdated. This issue is a good action to point for us to update them again, so thanks for opening it. We might be able to do this some time this week.
In the mean time, perhaps you can already look at installing it and try the command line use case to make sure everything is working.
I think that I had already tried with pybrickdev but got problems sending multiple floats or something like that. I don't actually remember but I dropped it. If you can send an example, it would be greatly appreciated.
On Tue, Sep 7, 2021 at 10:29 AM laurensvalk @.***> wrote:
We have a Python (not MicroPython) library called pybricksdev https://github.com/pybricks/pybricksdev that can be used for this.
It can be used as a command line tool to download and run programs (as an alternative to our online ID), or as a Python library to communicate with hubs (your use case).
We used to have a few Python examples of this, but they are a bit outdated. This issue is a good action to point for us to update them again, so thanks for opening it. We might be able to do this some time this week.
In the mean time, perhaps you can already look at installing it and try the command line use case to make sure everything is working.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/pybricks/support/issues/470#issuecomment-914101390, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB3KUOSSJ7AH5PPFR2CMH6LUAXEOPANCNFSM5DR2PDLQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.
This is an example that just shows communication from PC to the Hub (Tested on a Lego City Train with a very recent beta firmware). It uses the character "$" as a message termination character to send a string to the hub.
PC program:
import asyncio
from pybricksdev.connections import PybricksHub
from pybricksdev.ble import find_device
async def main():
hub = PybricksHub()
device = await find_device()
await hub.connect(device)
await hub.run("client.py", wait=False)
await asyncio.sleep(1)
for _ in range(4):
await hub.write(b"start$")
await asyncio.sleep(1)
await hub.write(b"stop$")
await asyncio.sleep(1)
await hub.write(b"exit$")
await hub.user_program_stopped.wait()
await asyncio.sleep(0.3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Program running on the hub (save as client.py
in the same direcory as the PC program):
from uselect import poll
from usys import stdin
from pybricks.hubs import CityHub
from pybricks.pupdevices import DCMotor
from pybricks.parameters import Port
from pybricks.tools import wait
input_buffer = ""
loop_poll = poll()
loop_poll.register(stdin)
hub = CityHub()
motor = DCMotor(Port.A)
running = True
def start():
motor.dc(50)
def stop():
motor.dc(0)
def exit():
global running
wait(1000)
running = False
def input_handler(msg):
print("got message:", msg)
if msg == "start":
start()
if msg == "stop":
stop()
if msg == "exit":
exit()
def update_input(char):
global input_buffer
if char == "$":
input_handler(input_buffer)
input_buffer = ""
else:
input_buffer += char
def main_loop():
while running:
if loop_poll.poll(100): #times out after 100ms
char = stdin.read(1)
if char is not None:
update_input(char)
# update other stuff here
main_loop()
Thanks @Novakasa!
And to send data from the hub to the computer, just print() anything in client.py. In the PC program, you can then access it as hub.output, which is a list containing all printed messages.
I extended the previous example to include the Hub-to-PC communication. This now uses an additional asyncio task to monitor the output from the hub, which is created using asyncio.create_task()
. In a full application one would then use a asyncio.queue to synchronize data between the concurrently running tasks.
PC-side:
import asyncio
from pybricksdev.connections import PybricksHub
from pybricksdev.ble import find_device
async def main():
hub = PybricksHub()
device = await find_device()
await hub.connect(device)
async def hub_run():
await hub.run("client2.py", wait=False)
while not hub.program_running:
await asyncio.sleep(0.05)
while hub.program_running:
while hub.output: #message printed in hub code
line = hub.output.pop(0).decode()
print("got message from hub:", line)
await asyncio.sleep(0.05)
run_task = asyncio.create_task(hub_run())
while not hub.program_running:
await asyncio.sleep(0.05)
await asyncio.sleep(1)
for _ in range(4):
await hub.write(b"hello$")
await asyncio.sleep(1)
await hub.write(b"exit$")
await hub.user_program_stopped.wait()
await asyncio.sleep(0.3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Hub-side:
from uselect import poll
from usys import stdin
from pybricks.hubs import CityHub
from pybricks.pupdevices import DCMotor
from pybricks.parameters import Port
from pybricks.tools import wait
input_buffer = ""
loop_poll = poll()
loop_poll.register(stdin)
hub = CityHub()
motor = DCMotor(Port.A)
running = True
def hello():
print("hello")
def start():
motor.dc(50)
def stop():
motor.dc(0)
def exit():
global running
wait(1000)
running = False
def input_handler(msg):
# print("got message from PC:", msg)
if msg == "start":
start()
if msg == "stop":
stop()
if msg == "exit":
exit()
if msg == "hello":
hello()
def update_input(char):
global input_buffer
if char == "$":
input_handler(input_buffer)
input_buffer = ""
else:
input_buffer += char
def main_loop():
while running:
if loop_poll.poll(100): #times out after 100ms
char = stdin.read(1)
if char is not None:
update_input(char)
# update other stuff here
main_loop()
If you want to send more complex data between hubs, I am currently heavily using eval
and repr
. For example, a dictionary can be converted to a string using str = repr(dict)
, and the String can be converted back to the object using dict = eval(str)
. This also works for nested dictionaries and lists etc. I want to note though that it is discouraged to use eval() on unsafe sources, since it allows arbitrary code execution.
For my (specific) purposes, I created a class BLEHub
which abstracts away a lot of this stuff, which can be seen in my repo: https://github.com/Novakasa/brickrail/blob/88ace294e8443b071945abffdb733ac26e7098a4/ble_hub.py
An example for an according hub script can be seen here:
https://github.com/Novakasa/brickrail/blob/88ace294e8443b071945abffdb733ac26e7098a4/auto_train_updated.py
My project is very messy right now and very in development, so keep that in mind.
We've also recently enabled the ustruct module, which lets you pack and unpack things like floating point values into bytes.
If the amount of data you're sending is a concern, this is probably a bit more compact than sending a string representation of a floating point number.
Last time I have installed pybrickdev using poetry, but I would like to use pip over a conda environment if possible. Yet I get this error. Is pip an option?
(lego) gianni:~$ pip install pybrickdev --pre ERROR: Could not find a version that satisfies the requirement pybrickdev ERROR: No matching distribution found for pybrickdev
(lego) gianni:~$ pip install pybrickdev --pre
It is missing the "s" in pybricksdev
Oops. I copy and pasted from the readme in the pybricksdev repo. Fix it there as well.
G
On Fri, Sep 17, 2021, 16:08 David Lechner @.***> wrote:
(lego) gianni:~$ pip install pybrickdev --pre
It is missing the "s" in pybricksdev
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/pybricks/support/issues/470#issuecomment-921826406, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB3KUOWZNVPUVDLG7SH4UALUCNDU3ANCNFSM5DR2PDLQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.
I get a
RuntimeError: Unsupported Pybricks protocol version: 1.0.0
error while connecting via pybricksdev to a Technic Hub with the latest firmware installed.
Looking inside the pybricksdev code, it seems to require 1.1.0 protocol version while the hub is version 1.0.0.
Any idea?
Try installing the firmware on the Technic Hub with the latest Pybricks Code beta.
I have tested the code above and it works fine, but I am having an hard time with the PC side due probably to my inexperience with asyncio.
I would like to send-and-receive data from the hub outside of an event loop as i need to do other calculations between each communication. Any suggestion?
Edit: I forgot about having to run the sync code from a different thread, this means that the queue must be threadsafe. See https://github.com/pybricks/support/issues/470#issuecomment-945567590 below
You probably want to look into an asyncio Queue
for this: https://docs.python.org/3.9/library/asyncio-queue.html
For example, you might have a send_queue
and a receive_queue
.
In my above send and receive example, you could use await receive_queue.put(line)
after the line print("got message from hub:", line)
. Outside the event loop you could then poll the queue in a non-blocking way via receive_queue.get_nowait()
(and catch the exception if there's nothing to get).
For sending, you might want to start a sending task that manages the send_queue
in a similar way. In a loop it waits for data via somethin like this:
async send_loop():
while True:
data = await send_queue.get()
await hub.write(data)
send_task = asyncio.create_task(send_loop())
Outside your event loop, you can queue something for sending using send_queue.put_nowait()
I haven't tested this, so some adaptation is probably necessary, but hopefully this conveys the idea.
You could also manage the send queue in the same task as the receive queue (i.e. in the hub_run
coroutine). For that you need to wait each iteration for whichever of the send_queue
or the hub.output
is handled first.
You could use something like this in the hub_run coroutine:
while hub.program_running:
done, pending = await asyncio.wait([send_handler(), output_handler()], return_when=FIRST_COMPLETED)
for task in pending:
task.cancel()
The two coroutines should look something like this:
async def send_handler():
data = await send_queue.get()
hub.write(data)
async def output_handler():
while not hub.output:
await asyncio.sleep(0.05)
line = hub.output.pop(0).decode()
await receive_queue.put(line)
Outside the event loop, you can then use the put_nowait
and get_nowait
functions for the send/receive queues as I described in the previous post.
Here's an example where something like this is used in a different context: https://websockets.readthedocs.io/en/stable/intro/index.html#both-sides
Thanks, but I am still quite confused how I get out of the blocking asynco.run(something)
I need to do something like this:
# I am PC
class Robot:
def __init__(self):
#start hub program and return
def step(self,list_of_angular_positions):
#send to hub list_of_angular_positions to move to
return latest_angular_positions_motors_achieved
def reset(self):
#get latest positions
return latest_angular_positions_motors
robot = Robot()
positions = robot.reset()
while True:
a = give_me_new_positions(positions) # e.g. [0.2,0.4,0.4,0.2]
positions = robot.step(a)
It would be great to have some help, it's a quite standard protocol to interact with a robot.
I forgot about that. For that you need to run your sync code in a different thread, or execute your sync code from within the event loop. If your sync code blocks in a non async way, that will also block your other async tasks from running concurrently, so then it is encouraged to run the event loop and your sync code in different threads. You can also use asyncio's executors to start threads/multiprocesses to run your blocking code. If your sync code is mainly io bound anwyway, you might want to try to move that to async code, as waiting for io will not block your other asyncio tasks from running.
For communication between sync and async, you need to make the queue access threadsafe, which is missing in my above answers. One way to remedy that, is to use loop.call_soon_threadsafe
to queue/get data from queues in your sync code. Here is a stackoverflow answer that ecapsulates this in a new threadsafe queue class: https://stackoverflow.com/questions/59650243/communication-between-async-tasks-and-synchronous-threads-in-python
You could also use the UniversalQueue
from curio
, which should have methods for threads as well as asyncio
or curio
event loops.
Interesting issue that i came across. I hope i dont crash it with my question Help is appreciated :)
Context I've been working on a Spike Prime project where i try to move an omnidirectional vehicle(mecanum wheel based) with a ps4 controller. I came across this github since i want my computer to handle pure python code and send an Integer to my hub via ble. My hub takes over the final micropython code
More Context (help for people with the same problems) Spike Prime uses micropython with additional from lego added libraries. Thats why u cant include new outside libraries such as pygame for joystickmotion. Pygame wouldve been usefull to track the ps4's buttons and joysticks. You cant connect the ps4 to the hub directly anyway since the hub requires ble and the ps4 uses Bluetooth Classic. An xbox one controller seems to have BLE but i dont own one :( . The Joystick is key in this project
Question I downloaded the pybricksdev firmware and started a simple copy-pasta programm based on @Novakasa 's first submission with a few slight changes(the programm if connected properly to my pc should only light up colors to confirm the connection). I also selected the "include current programm" so that my hub downloads and saves the file which by default is, as it seems, named main.py . I ran it once and started coding basically the other half on my pc, with vsc, very similiar to @Novakasa . Running both at the same time caused a "RuntimeError: OSError: 2" on my pc: I assume it couldnt find the main.py or it may not even connected to the hub. Therefore im in need of help to clear things in my mind.
PC
import asyncio
from pybricksdev.connections import PybricksHub
from pybricksdev.ble import find_device
async def main():
hub = PybricksHub()
device = await find_device()
await hub.connect(device)
await hub.run("main.py", wait= False)
await asyncio.sleep(1)
for _ in range(4):
await hub.write(b"red$")
await asyncio.sleep(1)
await hub.write(b"green$")
await asyncio.sleep(1)
await hub.write(b"red$")
await asyncio.sleep(1)
await hub.write(b"exit$")
await hub.user_program_stopped.wait()
await asyncio.sleep(0.3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Hub
from uselect import poll
from usys import stdin
from pybricks.hubs import PrimeHub
from pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.tools import wait
input_buffer = ""
loop_poll = poll()
loop_poll.register(stdin)
hub = PrimeHub()
running = True
def exit():
global running
wait(1000)
running = False
def do_something(msg):
if msg == "red":
hub.light.on(Color.RED)
if msg == "green":
hub.light.on(Color.GREEN)
if msg == "exit":
exit()
def update_input(char):
global input_buffer
if char == "$":
do_something(input_buffer)
input_buffer = ""
else:
input_buffer = input_buffer + char
def main_loop():
while running:
if loop_poll.poll(100):
char = stdin.read(1)
if char is not None:
update_input(char)
main_loop()
Hi @all,
@AGarves During my investigations into this topic, I found that there is an OSError 2 (file not found) if a relative path is specified. As soon as the entire path to main.py (including the "r" sign) is specified, the upload of main.py to the hub and thus the execution of the program on the hub works
Example:
await hub.run(r"C:\Mindstorms\pybricksdev_scripts\main.py", wait=False)>
HTH....
Best, Daniel
Interesting issue that i came across. I hope i dont crash it with my question Help is appreciated :)
Context I've been working on a Spike Prime project where i try to move an omnidirectional vehicle(mecanum wheel based) with a ps4 controller. I came across this github since i want my computer to handle pure python code and send an Integer to my hub via ble. My hub takes over the final micropython code
More Context (help for people with the same problems) Spike Prime uses micropython with additional from lego added libraries. Thats why u cant include new outside libraries such as pygame for joystickmotion. Pygame wouldve been usefull to track the ps4's buttons and joysticks. You cant connect the ps4 to the hub directly anyway since the hub requires ble and the ps4 uses Bluetooth Classic. An xbox one controller seems to have BLE but i dont own one :( . The Joystick is key in this project
Question I downloaded the pybricksdev firmware and started a simple copy-pasta programm based on @Novakasa 's first submission with a few slight changes(the programm if connected properly to my pc should only light up colors to confirm the connection). I also selected the "include current programm" so that my hub downloads and saves the file which by default is, as it seems, named main.py . I ran it once and started coding basically the other half on my pc, with vsc, very similiar to @Novakasa . Running both at the same time caused a "RuntimeError: OSError: 2" on my pc: I assume it couldnt find the main.py or it may not even connected to the hub. Therefore im in need of help to clear things in my mind.
- First off, Is it nessecary to upload the hub file via vsc und specific extension because uploading via beta.pybricks.com is not enough.
- Second off all, do i need to establish the connection between hub and pc myself before running the programms. If so how would i handle that
PC
import asyncio from pybricksdev.connections import PybricksHub from pybricksdev.ble import find_device async def main(): hub = PybricksHub() device = await find_device() await hub.connect(device) await hub.run("main.py", wait= False) await asyncio.sleep(1) for _ in range(4): await hub.write(b"red$") await asyncio.sleep(1) await hub.write(b"green$") await asyncio.sleep(1) await hub.write(b"red$") await asyncio.sleep(1) await hub.write(b"exit$") await hub.user_program_stopped.wait() await asyncio.sleep(0.3) loop = asyncio.get_event_loop() loop.run_until_complete(main())
Hub
from uselect import poll from usys import stdin from pybricks.hubs import PrimeHub from pybricks.pupdevices import Motor from pybricks.parameters import Port, Color from pybricks.tools import wait input_buffer = "" loop_poll = poll() loop_poll.register(stdin) hub = PrimeHub() running = True def exit(): global running wait(1000) running = False def do_something(msg): if msg == "red": hub.light.on(Color.RED) if msg == "green": hub.light.on(Color.GREEN) if msg == "exit": exit() def update_input(char): global input_buffer if char == "$": do_something(input_buffer) input_buffer = "" else: input_buffer = input_buffer + char def main_loop(): while running: if loop_poll.poll(100): char = stdin.read(1) if char is not None: update_input(char) main_loop()
However, when calling via the PC, I always get the OSError: 2 (with "await hub.run("main.py", wait=False)")
Can you share the full stack trace?
There is now a tutorial here: https://pybricks.com/projects/tutorials/wireless/hub-to-device/pc-communication/
So we can close this issue.
Question I would like to remotely control a HUB via a python code which runs on a PC. I need to communicate back and forth the motor angular positions: the PC will send angular positions to control them and read their real positions to decide new actions. The python code on the PC will use a standard conda environment and very large machine learning modules, so it cannot be micropython.
Context We are playing with reinforcement learning on a lego hub.