Closed aakashbadiani closed 1 year ago
I may be misunderstanding, but the code provided is not complete. For example, set_pin_mode and set_default_pin_value are not shown. Also, calling the run method twice is not correct. Doing so will have multiple event loops executing. According to the Python asyncio documentation,
asyncio.run(coro, *, debug=None)
Execute the [coroutine](https://docs.python.org/3/glossary.html#term-coroutine) coro and return the result.
This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.
This function cannot be called when another asyncio event loop is running in the same thread.
Here is a simple example of running within a class:
import asyncio
import sys
import time
from telemetrix_aio import telemetrix_aio
"""
Establishing a digital and analog input pin and monitoring their values.
"""
class PinsWithinClass:
def __init__(self, board, d_pin, a_pin):
"""
:param board: a telemetrix aio instance
:param d_pin: digital input pin number
:param a_pin: analog input pin number
"""
self.board = board
self.d_pin = d_pin
self.a_pin = a_pin
async def run_it(self):
"""
Initialize pin modes with the callbacks
"""
await self.board.set_pin_mode_digital_input(self.d_pin, callback=self.callback)
await self.board.set_pin_mode_analog_input(self.a_pin, callback=self.callback)
async def callback(self, data):
"""
:param data: data[0] = report type 2 = digital input
report type 3 = analog input
data[1] = pin number
data[2] = reported value
"""
if data[0] == 2:
print(f'digital input pin {data[1]} reports a value of {data[2]}')
elif data[0] == 3:
print(f'analog input pin {data[1]} reports a value of {data[2]}')
# get the event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# instantiate telemetrix_aio
the_board = telemetrix_aio.TelemetrixAIO()
async def monitor(my_board, digital_pin, analog_pin):
"""
Set the pin modes for the pins
:param my_board: telemetrix aio instance
:param digital_pin: Arduino digital input pin number
:param analog_pin: Arduino analog input pin number
"""
pwc = PinsWithinClass(my_board, digital_pin, analog_pin)
await pwc.run_it()
# wait forever
while True:
try:
await asyncio.sleep(.01)
except KeyboardInterrupt:
await my_board.shutdown()
sys.exit(0)
try:
# start the main function
loop.run_until_complete(monitor(the_board, 12, 2))
except KeyboardInterrupt:
loop.run_until_complete(the_board.shutdown())
sys.exit(0)
I am closing this issue. If you have any other questions or comments, please place them here, and I will see them.
Thanks MrYsLab.
I have multiple Arduino boards connected to single RasPi where I run this python script. As result, I currently have board connection object as a class variable. Is there a better way to do this with Telemetrix AIO? I store all serial connection objects as a class variable and access them depending on the pin reference.
I am looking to solve this multiple board problem within telemetrix environment and I am not as fluent in asyncio as I thought i was.
currently the await self.board_object.set_pin_mode_digital_input(self.pin_n, self. callback) leads to a "Task was destroyed but it is pending! task: <Task pending name='Task-2' coro=<TelemetrixAIO._arduino_report_dispatcher() running at C:\Users\Aakas\AppData\Local\Programs\Python\Python39\lib\site-packages\telemetrix_aio\telemetrix_aio.py:2168> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000019A6E6CD340>()]>>"
Yes, asyncio can be tricky. Please provide a complete simplified example demonstrating the failure. I can then take a look at what might be going on.
import time
import asyncio
from telemetrix_aio import telemetrix_aio
#Standalone_Test: False when production.
#Standalone_Test: True for Module testing
Standalone_Test = True
if not Standalone_Test:
import sys
sys.path.append('C:\\Users\\Aakas\\OneDrive\\Code\\BatataPoha_Automation\\')
if True:
from Common_Paths import *
from Common_Libraries import *
if Standalone_Test:
THROB_DURATION = 0.2 # seconds
IO_list = 'PATH/TO/YOUR/CSV/FILE'
import csv
from loguru import logger
class IO_Class():
IO_list_as_a_dict = {}
instances = []
board_objects = []
def __init__(self, signal):
self.name = IO_Class.IO_list_as_a_dict[signal]['Function']
self.tag = IO_Class.IO_list_as_a_dict[signal]['Tag']
"""
Count is the number of pins being used for a signal
"""
self.count = eval(IO_Class.IO_list_as_a_dict[signal]['Count'])
"""
pin number is the pin number of the signal. if count is more than 1, pin number will be a list of pin numbers
"""
self.pin_n = eval(IO_Class.IO_list_as_a_dict[signal]['Pin_number'])
if self.count > 1:
if len(self.pin_n) != self.count:
logger.error(f"Pin count and pin number count do not match for {self.name} {self.tag}")
"""
Only allowing TB6600 mode in stepper setup.
if in the future We drive steppers directly with a 4 wire input, None will be changed to 4
https://github.com/MrYsLab/telemetrix-aio/blob/master/examples/stepper_absolute.py
"""
self.stpr_mode = 2 if IO_Class.IO_list_as_a_dict[signal]['Stepper_Mode'] == "TB6600" else None
self.type = IO_Class.IO_list_as_a_dict[signal]['Signal_Type'].upper().strip()[0] # D for Digital, A for Analog
self.mode = IO_Class.IO_list_as_a_dict[signal]['Input_Output'].upper().strip()[0] # I for Input, O for Output, P for pullup
self.normal_state = 1 if IO_Class.IO_list_as_a_dict[signal]['Normal_State'] == 'On' else 0 # 1 for On, 0 for Off
self.anti_normal_state = 0 if IO_Class.IO_list_as_a_dict[signal]['Normal_State'] == 'On' else 1 # 1 for On, 0 for Off
self.equipment = IO_Class.IO_list_as_a_dict[signal]['Equipment']
self.board_rate = IO_Class.IO_list_as_a_dict[signal]['Baud_Rate']
self.board_port = IO_Class.IO_list_as_a_dict[signal]['Com_Port']
self.board_mode = IO_Class.IO_list_as_a_dict[signal]['Board_Mode']
self.board_tag = IO_Class.IO_list_as_a_dict[signal]['Board_Tag'].replace(" ", "_").upper()
self.board_make = IO_Class.IO_list_as_a_dict[signal]['Board_make']
self.call_method = IO_Class.IO_list_as_a_dict[signal]['Call Method']
self.read_timeout = 1 if int(IO_Class.IO_list_as_a_dict[signal]['Read_Timeout']) == 0 else int(IO_Class.IO_list_as_a_dict[signal]['Read_Timeout'])
self.cnction_type = IO_Class.IO_list_as_a_dict[signal]['Connection_Type']
self.virtual = IO_Class.IO_list_as_a_dict[signal]['Virtual Signal']
self.pin_reference = (':'.join(map(str, [self.board_port, self.type, self.mode, self.pin_n]))).upper()
self.board_object = self.set_connection()
self.last_pin_value = 0
self.throb_duration = .5 # seconds
self.pin_mode_set = False
self.ignore_class_instance = False
""" sets up pin modes """
asyncio.run(self.set_pin_mode())
asyncio.run(self.set_default_pin_value())
self.append_instances()
print(f'created({signal})')
#Instance Methods
def append_instances(self):
if not self.ignore_class_instance:
self.__class__.instances.append(self)
logger.success(f"APPENDED: {self.pin_reference}")
# Board methods
def set_connection(self):
def create_board_connection_object():
try:
self.board_object = telemetrix_aio.TelemetrixAIO(com_port=self.board_port, arduino_instance_id=self.board_tag, arduino_wait=4, sleep_tune=0.001, shutdown_on_exception=False)
# connection = telemetrix.Telemetrix(com_port=self.board_port, arduino_instance_id=self.board_tag, arduino_wait=4, sleep_tune=0.000001, shutdown_on_exception=True)
# connection = serial.Serial(self.board_port, self.board_rate, timeout=self.read_timeout)
# connection.timeout = int(self.read_timeout)
logger.debug(f"created new connection: {self.board_port}")
# do not turn this on unless you know why it needs to be turned on
# self.board_object.rtscts = False
IO_Class.board_objects.append(self.board_object)
except Exception as e:
# keep this exception broad. Want to catch all errors
self.ignore_class_instance = True
logger.warning(f"{self.board_port} error: {e}")
if len(IO_Class.board_objects) == 0:
create_board_connection_object()
try:
while IO_Class.board_objects[0].com_port != self.board_port:
time.sleep(1)
return IO_Class.board_objects[0]
except IndexError:
logger.error(f"IndexError: {self.board_port} not found in IO_Class.board_objects")
return False
else:
for connection in IO_Class.board_objects:
if connection.com_port == self.board_port:
return connection
logger.warning(f"DID NOT FIND: {self.board_port}")
logger.info(f"CREATING NEW CONNECTWrite_ION: {self.board_port}")
create_board_connection_object()
def close_board(self):
"""
To ensure we are properly closing our connection to the
Arduino device.
"""
self.board_object.shutdown()
logger.success(f'CLOSED:{self.board_port}')
# Pin methods
async def set_pin_mode(self):
if self.pin_mode_set:
logger.debug(f'pin_mode already set: {self.pin_reference} {self.mode}')
return True
if self.mode not in ['I', 'O']:
logger.warning(f"set_pin_mode: {self.pin_reference} {self.mode} is not a valid MODE")
return False
if self.type not in ['A', 'D', 'P', 'S', 'T']:
logger.warning(f"set_pin_mode: {self.pin_reference} {self.type} is not a valid TYPE")
return False
case_ = str(self.type + self.mode).upper()
# if case_ == 'AI':
# await self.board_object.set_pin_mode_analog_input(self.pin_n, 5, self.digital_input_callback)
# elif case_ == 'DI' or case_ == 'TI': # throb input TI
# await self.board_object.set_pin_mode_digital_input(self.pin_n, self.digital_input_callback)
if case_ == 'AO':
await self.board_object.set_pin_mode_analog_output(self.pin_n)
elif case_ == 'DO' or case_ == 'TO': # throb Output TO
await self.board_object.set_pin_mode_digital_output(self.pin_n)
elif case_ == 'SI':
await self.board_object.set_pin_mode_stepper(interface=self.stpr_mode, pin1=self.pin_n[0], pin2=self.pin_n[1])
elif case_ == 'SO':
await self.board_object.set_pin_mode_stepper(interface=self.stpr_mode, pin1=self.pin_n[0], pin2=self.pin_n[1])
self.pin_mode_set = True
async def set_default_pin_value(self):
if not self.pin_mode_set:
logger.warning(f"set_default_pin_value: PIN MODE is {self.pin_mode_set}. Not set for {self.pin_reference}")
return False
if self.mode in ['I', 'P']: # P is pullup
return False
await self.write(self.normal_state)
async def write(self, value, throb_duration=0.2):
if self.mode != 'O':
logger.warning(f"write_pin: {self.mode} is not a valid MODE for WRITING to {self.pin_reference}")
return False
if self.pin_mode_set is False:
logger.warning(f"write_pin: PIN MODE is {self.pin_mode_set}. Not set for WRITING to {self.pin_reference}")
return False
if self.type == 'D':
if value not in [0, 1]:
logger.warning(f"write_pin: {value} is not a valid digital VALUE for {self.pin_reference}")
return False
if self.type == 'A':
if value not in range(0, 255):
logger.warning(f"write_pin: {value} is not a valid analog VALUE for {self.pin_reference}")
return False
if self.type == 'T':
if value not in [0, 1]:
logger.warning(f"write_pin: {value} is not a valid THROB VALUE for {self.pin_reference}")
return False
if self.type == 'S':
if value not in [0, 1]: # TODO need to figue this our for stepper
logger.warning(f"write_pin: {value} is not a valid STEPPER VALUE for {self.pin_reference}")
return False
# self.check_connected()
if self.type == 'A':
await self.board_object.analog_write(self.pin_n, value)
logger.success(f"Analog write: {self.pin_reference} {value}")
return True
if self.type == 'D':
await self.board_object.digital_write(self.pin_n, value)
logger.success(f"Digital write: {self.pin_reference} {value}")
return True
if self.type == 'S':
pass
if self.type == 'T':
self.throb_duration = throb_duration
await self.board_object.digital_write(self.pin_n, self.anti_normal_state)
await asyncio.sleep(self.throb_duration)
await self.board_object.digital_write(self.pin_n, self.normal_state)
logger.success(f"Pulse Pin: {self.pin_reference}")
return True
def async_read(self):
try:
# get the event loop
loop = asyncio.get_event_loop()
print("existing loop got")
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# run the code until complete
loop.run_until_complete(self.read())
# await self.read()
async def read(self):
print(self.pin_n, self.pin_mode_set, self.board_object)
if self.mode != 'I':
logger.warning(f"read_pin: {self.mode} is not a valid MODE for READING from {self.pin_reference}")
return False
if self.pin_mode_set is False:
logger.warning(f"read_pin: PIN MODE is {self.pin_mode_set}. Not set for READING from {self.pin_reference}")
return False
if self.type == 'D':
await self.board_object.set_pin_mode_digital_input(self.pin_n, self.callback)
if self.type == 'A':
await self.board_object.set_pin_mode_analog_input(self.pin_n, 5, self.callback)
counter = 0
print("here")
while counter < 1000:
try:
await asyncio.sleep(.001)
counter += 1
except:
pass
async def callback(self, data):
"""
:param data: data[0] = report type 2 = digital input
report type 3 = analog input
data[1] = pin number
data[2] = reported value
"""
print('reached the call back')
if data[0] == 2:
print(f'digital input pin {data[1]} reports a value of {data[2]}')
elif data[0] == 3:
print(f'analog input pin {data[1]} reports a value of {data[2]}')
await asyncio.sleep(.001)
# Interaction methods
@ staticmethod
def write_to(pin_ref, value):
obj_ = IO_Class.get_pin_object(pin_ref)
if not obj_:
return
asyncio.run(obj_.write(value))
@ staticmethod
def throb(pin_ref, throb_duration=THROB_DURATION):
obj_ = IO_Class.get_pin_object(pin_ref)
if not obj_:
return
asyncio.run(obj_.write(0, throb_duration)) # value does not matter for throb
# Get methods
@staticmethod
def get_pin_object(matching_pin_ref):
for _IO in IO_Class.instances:
if str(_IO.pin_reference).upper() == str(matching_pin_ref).upper():
logger.trace(f'get_pin_object: IO_Class Object match for {_IO.pin_reference}')
return _IO
logger.error(f'get_pin_object: |{matching_pin_ref}| does not exist')
return False
@staticmethod
def digital_in(pin_ref):
obj_ = IO_Class.get_pin_object(pin_ref)
if not obj_:
return False
obj_.async_read()
# Set methods
@staticmethod
def create_IO_Objects():
IO_Dict = {}
with open(IO_list, 'r') as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
IO_Dict[row['Function']] = row
IO_Class.IO_list_as_a_dict = IO_Dict
for signal, info in IO_Dict.items():
IO_Class(signal)
def setup():
IO_Class.create_IO_Objects()
time.sleep(2)
def main():
obj_ = IO_Class.get_pin_object('D:O:13')
if not obj_:
print("test 1 successful")
obj_ = IO_Class.get_pin_object('COM6:T:O:13')
if obj_:
print("test 2 successful")
for i in range(5):
IO_Class.throb('COM6:T:O:13', throb_duration=1)
print("test 3 successful")
IO_Class.digital_in('COM6:D:I:5')
print("test 4 completed")
time.sleep(5)
if __name__ == "__main__":
setup()
main()
create_IO_Objects() uses CSV info below. This allows engineers to convert Arduinos into Psudo PLC powered by python.:
Function,Signal_Type,Input_Output,Pin_number,Equipment,Tag,Count,Com_Port,Baud_Rate,Board_Mode,Board_Tag,Call Method,Read_Timeout,Connection_Type,Board_make,Virtual Signal,Normal_State,Stepper_Mode
X 0 Limit,Digital,Input,27,H_Frame,Limit-X0,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
X Max Limit,Digital,Input,26,H_Frame,Limit-XMax,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
Y 0 Limit,Digital,Input,22,H_Frame,Limit-Y0,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
Y Max Limit,Digital,Input,23,H_Frame,Limit-YMax,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
Z Top 0 Limit,Digital,Input,25,H_Frame,Limit-ZTop0,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
Z Top Max Limit,Digital,Input,24,H_Frame,Limit-ZTopMax,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
Z Bottom 0 Limit,Digital,Input,28,H_Frame,Limit-ZBot0,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
Z Bottom Max Limit,Digital,Input,29,H_Frame,Limit-ZBotMax,1,COM6,115200,IO,MEGA 1,read,0,Hardwired,MEGA,No,On,NA
Increase Temp,Throb,Output,51,Heater,HTR-001,1,COM6,115200,IO,MEGA 1,write,0,Hardwired,MEGA,No,On,NA
Decrease Temp,Throb,Output,49,Heater,HTR-001,1,COM6,115200,IO,MEGA 1,write,0,Hardwired,MEGA,No,On,NA
Running,Throb,Output,53,Heater,HTR-001,1,COM6,115200,IO,MEGA 1,write,0,Hardwired,MEGA,No,On,NA
Mag Lock,Digital,Output,41,Handler,LA-001,1,COM6,115200,IO,MEGA 1,write,0,Hardwired,MEGA,No,Off,NA
stepper,Stepper,Output,"[4,6]",test_stepper,Step_001,2,COM6,115200,IO,MEGA 1,write,0,Multi_Connection,MEGA,No,Off,TB6600
Test_input,Digital,Input,5,Test,Test-001,1,COM6,115200,IO,MEGA 1,write,0,Hardwired,MEGA,No,Off,NA
Test_Output,Throb,Output,13,Test,Test-001,1,COM6,115200,IO,MEGA 1,write,0,Hardwired,MEGA,No,Off,NA
I see the "Task was destroyed but it is pending!" message when the program exits. I modified main in order to see when the error is generated:
def main():
obj_ = IO_Class.get_pin_object('D:O:13')
if not obj_:
print("test 1 successful")
obj_ = IO_Class.get_pin_object('/dev/ttyACM0:T:O:13')
if obj_:
print("test 2 successful")
for i in range(5):
IO_Class.throb('/dev/ttyACM0:T:O:13', throb_duration=1)
print("test 3 successful")
IO_Class.digital_in('/dev/ttyACM0:D:I:5')
print("test 4 completed")
IO_Class.board_objects[0].the_task.cancel()
time.sleep(5)
print('done sleeping')
The output looks like this:
...
test 3 successful
5 True <telemetrix_aio.telemetrix_aio.TelemetrixAIO object at 0x7f48f577de10>
here
test 4 completed
done sleeping
Task was destroyed but it is pending!
task: <Task cancelling name='Task-2' coro=<TelemetrixAIO._arduino_report_dispatcher() running at /home/afy/PycharmProjects/akash/venv/lib/python3.11/site-packages/telemetrix_aio/telemetrix_aio.py:2169> wait_for=<Future cancelled>>
The reason this is happening is that there is a coroutine in telemetrix that is still running since shutdown was never called for the telemetrix instance. Other than being ugly, this error can be safely ignored.
Were you able to get an output from pin changes to digital pin 5? the callback/ digital read not outputting is the main reason why this issue was created in github.
If you look at the example I provided above, you will see that the monitor method has an infinite loop. This is to keep the asyncio event loop open and running.
While running main.py, if I set a breakpoint in the line of code at the next to last line before the telemetrix-aio constructor returns and examine the state of the telemetrix event loop, its state is running and open:
<_UnixSelectorEventLoop running=True closed=False debug=False>
If I now set a breakpoint in main.py at the following line after instantiating telemetrix-aio (the logger.debug statement) and then inspect the state of the event loop of the telemetrix-aio object, the event loop is no longer running.
try:
self.board_object = telemetrix_aio.TelemetrixAIO(com_port=self.board_port,
arduino_instance_id=self.board_tag,
arduino_wait=4,
sleep_tune=0.001,
shutdown_on_exception=False,
)
# connection = telemetrix.Telemetrix(com_port=self.board_port, arduino_instance_id=self.board_tag, arduino_wait=4, sleep_tune=0.000001, shutdown_on_exception=True)
# connection = serial.Serial(self.board_port, self.board_rate, timeout=self.read_timeout)
# connection.timeout = int(self.read_timeout)
logger.debug(f"created new connection: {self.board_port}")
Inspection of event loop:
<_UnixSelectorEventLoop running=False closed=False debug=False>
The design of telemetrix-aio assumes that its context is always running using a short asyncio sleep that allows other tasks to run when idle.
The input data is not recognized because the telemetrix-aio event loop is not running when the input event occurs.
Hi I am trying to achieve the "Read For 10 seconds for pin change" within a input output class like so:
and calling it via a class static method like so:
and externally using to get info:
IO.digital_in('COM6:D:I:5')
I am getting no callbacks while the perpetual while loop is running.
Could you please on how I can achieve my desired status.
I have tried your example code stand alone (it works). Loaded that example into my class. it gives me _arduino_report_dispatcher() task pending error while all tasks destroyed error (efectivly the infinite loop is never entered.