Open MiguelGarciaUVa opened 4 months ago
Hello @MiguelGarciaUVa ,
There is no issue using RefreshFeedback in High level, as long as the BaseCyclic client is connected via a UDP transport to port 10001.
We are not aware of any known issue which would cause this. Can you launch your code with the Monitoring page of the WebApp opened in the Detailed tab and verify if the data is still updating there? The WebApp also uses RefreshFeedback() to fetch this data, so this would confirm whether your issue lies in the robot or in your code.
If the WebApp also freezes, I suggest reinstalling the firmware via the Upgrade page (find the latest firmware on our Website). If not, we'll need to get more detail about your implementation to help you diagnose the issue.
Cheers
Hi @martinleroux, Thanks for your reply. I have tried it and the WebApp does not freeze, so it is probably a bug in my code. I think it freezes 10 secs because I am using:
base_feedback = self.SendCallWithRetry(self.base_cyclic.RefreshFeedback, 3)
@staticmethod
def SendCallWithRetry(call, retry, *args):
i = 0
arg_out = []
while i < retry:
try:
arg_out = call(*args)
break
except:
i = i + 1
continue
if i == retry:
print("Failed to communicate")
return arg_out
So maybe it takes 10 seconds to try to communicate, then try again. When it retries, it can usually do it. But now, I don't know why the freeze is systematic. Every 90secs it can't communicate so it freezes 10 secs to do so. My loop is as follows:
def RunCyclic(self, t_sample, print_stats):
self.cyclic_running = True
print("Run Cyclic")
sys.stdout.flush()
cyclic_count = 0 # Counts refresh
stats_count = 0 # Counts stats prints
failed_cyclic_count = 0 # Count communication timeouts
t_now = time.time()
t_cyclic = t_now # cyclic time
t_stats = t_now # print time
t_init = t_now # init time
print("Running torque control example for {} seconds".format(self.cyclic_t_end))
while not self.kill_the_thread:
t_now = time.time()
self.base_feedback = self.SendCallWithRetry(self.base_cyclic.RefreshFeedback, 3)
# Cyclic Refresh
if (t_now - t_cyclic) >= t_sample:
t_cyclic = t_now
# Position command to first actuator is set to measured one to avoid following error to trigger
# Bonus: When doing this instead of disabling the following error, if communication is lost and first
# actuator continue to move under torque command, resulting position error with command will
# trigger a following error and switch back the actuator in position command to hold its position
actuador1 = self.base_feedback.actuators[0].position
actuador2 = self.base_feedback.actuators[1].position
actuador3 = self.base_feedback.actuators[2].position
actuador4 = self.base_feedback.actuators[3].position
actuador5 = self.base_feedback.actuators[4].position
actuador6 = self.base_feedback.actuators[5].position
actuador7 = self.base_feedback.actuators[6].position
cyclic_count = cyclic_count + 1
To init the cyclic:
def InitCyclic(self, sampling_time_cyclic, t_end, print_stats):
if self.cyclic_running:
return True
print("Init Cyclic")
sys.stdout.flush()
base_feedback = self.SendCallWithRetry(self.base_cyclic.RefreshFeedback, 3)
if base_feedback:
self.base_feedback = base_feedback
# Init command frame
for x in range(self.actuator_count):
self.base_command.actuators[x].flags = 1 # servoing
self.base_command.actuators[x].position = self.base_feedback.actuators[x].position
print("Actuador ", x, ": ", self.base_command.actuators[x].position)
# Init cyclic thread
self.cyclic_t_end = t_end
self.cyclic_thread = threading.Thread(target=self.RunCyclic, args=(sampling_time_cyclic, print_stats))
self.cyclic_thread.daemon = True
self.cyclic_thread.start()
return True
else:
print("InitCyclic: failed to communicate")
return False
Sampling time is 0.002 and execution time is infinite (t_end = 0) The basis of this programme is the torque control example programme.
Are you obtaining the same behaviour by running the defaut torque control example?
No, because they don't use RefreshFeedback() in the loop. I tried to use RefreshFeedback() in that example and i obtain the same behaviour, yes.
We will try to replicate the issue and get back to you.
I modified the Torque example above to replace all torque commands with just RefreshFeedback. I ran the example for 30 minutes and could not replicate your issue. I did notice however that the delay would vary a lot depending on what else I would be doing with my computer at the same time. I am currently running the example using Python on a Windows 10 PC with a bunch of browser tabs open, but the worse delay I have seen is ~35ms.
Here is the full code if you want to try it:
#! /usr/bin/env python3
###
# KINOVA (R) KORTEX (TM)
###
import sys
import os
from kortex_api.autogen.client_stubs.ActuatorConfigClientRpc import ActuatorConfigClient
from kortex_api.autogen.client_stubs.ActuatorCyclicClientRpc import ActuatorCyclicClient
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.client_stubs.BaseCyclicClientRpc import BaseCyclicClient
from kortex_api.autogen.client_stubs.DeviceConfigClientRpc import DeviceConfigClient
from kortex_api.autogen.client_stubs.DeviceManagerClientRpc import DeviceManagerClient
from kortex_api.autogen.messages import Session_pb2, ActuatorConfig_pb2, Base_pb2, BaseCyclic_pb2, Common_pb2
from kortex_api.RouterClient import RouterClientSendOptions
import time
import sys
import threading
class TorqueExample:
def __init__(self, router, router_real_time):
# Maximum allowed waiting time during actions (in seconds)
self.ACTION_TIMEOUT_DURATION = 20
self.torque_amplification = 2.0 # Torque measure on last actuator is sent as a command to first actuator
# Create required services
device_manager = DeviceManagerClient(router)
self.actuator_config = ActuatorConfigClient(router)
self.base = BaseClient(router)
self.base_cyclic = BaseCyclicClient(router_real_time)
self.base_command = BaseCyclic_pb2.Command()
self.base_feedback = BaseCyclic_pb2.Feedback()
self.base_custom_data = BaseCyclic_pb2.CustomData()
# Detect all devices
device_handles = device_manager.ReadAllDevices()
self.actuator_count = self.base.GetActuatorCount().count
# Only actuators are relevant for this example
for handle in device_handles.device_handle:
if handle.device_type == Common_pb2.BIG_ACTUATOR or handle.device_type == Common_pb2.SMALL_ACTUATOR:
self.base_command.actuators.add()
self.base_feedback.actuators.add()
# Change send option to reduce max timeout at 3ms
self.sendOption = RouterClientSendOptions()
self.sendOption.andForget = False
self.sendOption.delay_ms = 0
self.sendOption.timeout_ms = 3
self.cyclic_t_end = 300 #Total duration of the thread in seconds. 0 means infinite.
self.cyclic_thread = {}
self.kill_the_thread = False
self.already_stopped = False
self.cyclic_running = False
# Create closure to set an event after an END or an ABORT
def check_for_end_or_abort(self, e):
"""Return a closure checking for END or ABORT notifications
Arguments:
e -- event to signal when the action is completed
(will be set when an END or ABORT occurs)
"""
def check(notification, e = e):
print("EVENT : " + \
Base_pb2.ActionEvent.Name(notification.action_event))
if notification.action_event == Base_pb2.ACTION_END \
or notification.action_event == Base_pb2.ACTION_ABORT:
e.set()
return check
def MoveToHomePosition(self):
# Make sure the arm is in Single Level Servoing mode
base_servo_mode = Base_pb2.ServoingModeInformation()
base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
self.base.SetServoingMode(base_servo_mode)
# Move arm to ready position
print("Moving the arm to a safe position")
action_type = Base_pb2.RequestedActionType()
action_type.action_type = Base_pb2.REACH_JOINT_ANGLES
action_list = self.base.ReadAllActions(action_type)
action_handle = None
for action in action_list.action_list:
if action.name == "Home":
action_handle = action.handle
if action_handle == None:
print("Can't reach safe position. Exiting")
return False
e = threading.Event()
notification_handle = self.base.OnNotificationActionTopic(
self.check_for_end_or_abort(e),
Base_pb2.NotificationOptions()
)
self.base.ExecuteActionFromReference(action_handle)
print("Waiting for movement to finish ...")
finished = e.wait(self.ACTION_TIMEOUT_DURATION)
self.base.Unsubscribe(notification_handle)
if finished:
print("Cartesian movement completed")
else:
print("Timeout on action notification wait")
return finished
return True
def InitCyclic(self, sampling_time_cyclic, t_end, print_stats):
if self.cyclic_running:
return True
# Move to Home position first
if not self.MoveToHomePosition():
return False
print("Init Cyclic")
sys.stdout.flush()
base_feedback = self.SendCallWithRetry(self.base_cyclic.RefreshFeedback, 3)
if base_feedback:
self.base_feedback = base_feedback
# Init command frame
for x in range(self.actuator_count):
self.base_command.actuators[x].flags = 1 # servoing
self.base_command.actuators[x].position = self.base_feedback.actuators[x].position
# First actuator is going to be controlled in torque
# To ensure continuity, torque command is set to measure
self.base_command.actuators[0].torque_joint = self.base_feedback.actuators[0].torque
# Set arm in LOW_LEVEL_SERVOING
base_servo_mode = Base_pb2.ServoingModeInformation()
base_servo_mode.servoing_mode = Base_pb2.LOW_LEVEL_SERVOING
self.base.SetServoingMode(base_servo_mode)
# Send first frame
#self.base_feedback = self.base_cyclic.Refresh(self.base_command, 0, self.sendOption)
self.base_feedback = self.base_cyclic.RefreshFeedback()
# Set first actuator in torque mode now that the command is equal to measure
#control_mode_message = ActuatorConfig_pb2.ControlModeInformation()
#control_mode_message.control_mode = ActuatorConfig_pb2.ControlMode.Value('TORQUE')
#device_id = 1 # first actuator as id = 1
#self.SendCallWithRetry(self.actuator_config.SetControlMode, 3, control_mode_message, device_id)
# Init cyclic thread
self.cyclic_t_end = t_end
self.cyclic_thread = threading.Thread(target=self.RunCyclic, args=(sampling_time_cyclic, print_stats))
self.cyclic_thread.daemon = True
self.cyclic_thread.start()
return True
else:
print("InitCyclic: failed to communicate")
return False
def RunCyclic(self, t_sample, print_stats):
self.cyclic_running = True
print("Run Cyclic")
sys.stdout.flush()
cyclic_count = 0 # Counts refresh
stats_count = 0 # Counts stats prints
failed_cyclic_count = 0 # Count communication timeouts
# Initial delta between first and last actuator
init_delta_position = self.base_feedback.actuators[0].position - self.base_feedback.actuators[self.actuator_count - 1].position
# Initial first and last actuator torques; avoids unexpected movement due to torque offsets
init_last_torque = self.base_feedback.actuators[self.actuator_count - 1].torque
init_first_torque = -self.base_feedback.actuators[0].torque # Torque measure is reversed compared to actuator direction
t_now = time.time()
t_cyclic = t_now # cyclic time
t_stats = t_now # print time
t_init = t_now # init time
loop_counter = 0
print("Running torque control example for {} seconds".format(self.cyclic_t_end))
while not self.kill_the_thread:
t_now = time.time()
dt = t_now - t_cyclic
# Cyclic Refresh
if (dt) >= t_sample:
if (dt) >= 0.05:
print(loop_counter, " ", dt)
t_cyclic = t_now
# Position command to first actuator is set to measured one to avoid following error to trigger
# Bonus: When doing this instead of disabling the following error, if communication is lost and first
# actuator continue to move under torque command, resulting position error with command will
# trigger a following error and switch back the actuator in position command to hold its position
self.base_command.actuators[0].position = self.base_feedback.actuators[0].position
# First actuator torque command is set to last actuator torque measure times an amplification
self.base_command.actuators[0].torque_joint = init_first_torque + \
self.torque_amplification * (self.base_feedback.actuators[self.actuator_count - 1].torque - init_last_torque)
# First actuator position is sent as a command to last actuator
self.base_command.actuators[self.actuator_count - 1].position = self.base_feedback.actuators[0].position - init_delta_position
# Incrementing identifier ensure actuators can reject out of time frames
self.base_command.frame_id += 1
if self.base_command.frame_id > 65535:
self.base_command.frame_id = 0
for i in range(self.actuator_count):
self.base_command.actuators[i].command_id = self.base_command.frame_id
# Frame is sent
try:
self.base_feedback = self.base_cyclic.RefreshFeedback()
loop_counter = loop_counter + 1
except:
failed_cyclic_count = failed_cyclic_count + 1
cyclic_count = cyclic_count + 1
# Stats Print
if print_stats and ((t_now - t_stats) > 1):
t_stats = t_now
stats_count = stats_count + 1
cyclic_count = 0
failed_cyclic_count = 0
sys.stdout.flush()
if self.cyclic_t_end != 0 and (t_now - t_init > self.cyclic_t_end):
print("Cyclic Finished")
sys.stdout.flush()
break
self.cyclic_running = False
return True
def StopCyclic(self):
print ("Stopping the cyclic and putting the arm back in position mode...")
if self.already_stopped:
return
# Kill the thread first
if self.cyclic_running:
self.kill_the_thread = True
self.cyclic_thread.join()
# Set first actuator back in position mode
control_mode_message = ActuatorConfig_pb2.ControlModeInformation()
control_mode_message.control_mode = ActuatorConfig_pb2.ControlMode.Value('POSITION')
device_id = 1 # first actuator has id = 1
self.SendCallWithRetry(self.actuator_config.SetControlMode, 3, control_mode_message, device_id)
base_servo_mode = Base_pb2.ServoingModeInformation()
base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
self.base.SetServoingMode(base_servo_mode)
self.cyclic_t_end = 0.1
self.already_stopped = True
print('Clean Exit')
@staticmethod
def SendCallWithRetry(call, retry, *args):
i = 0
arg_out = []
while i < retry:
try:
arg_out = call(*args)
break
except:
i = i + 1
continue
if i == retry:
print("Failed to communicate")
return arg_out
def main():
# Import the utilities helper module
import argparse
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import utilities
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--cyclic_time", type=float, help="delay, in seconds, between cylic control call", default=0.001)
parser.add_argument("--duration", type=int, help="example duration, in seconds (0 means infinite)", default=660)
parser.add_argument("--print_stats", default=True, help="print stats in command line or not (0 to disable)", type=lambda x: (str(x).lower() not in ['false', '0', 'no']))
args = utilities.parseConnectionArguments(parser)
# Create connection to the device and get the router
with utilities.DeviceConnection.createTcpConnection(args) as router:
with utilities.DeviceConnection.createUdpConnection(args) as router_real_time:
example = TorqueExample(router, router_real_time)
success = example.InitCyclic(args.cyclic_time, args.duration, args.print_stats)
if success:
while example.cyclic_running:
try:
time.sleep(0.5)
except KeyboardInterrupt:
break
example.StopCyclic()
return 0 if success else 1
if __name__ == "__main__":
exit(main())
Thank you very much for your response and your time.
I ran it and the issue persists. This is the output console:
I am running it on Spyder on a Windows 11 PC. Did you change any parameters in utilities? Maybe there is a parameter causing the program to stop 10 seconds. Today, for some reason, instead of freezing every 90 seconds, it runs for 30 seconds, stops, runs again for 60 seconds, stops, runs for 30 seconds, stops, and so on. I noticed that it happens when it can't call the function RefreshFeedback() in SendCallWithRetry(). It's like it tries for 10 seconds to call it, until it realises it can't, and then it tries again, and succeeds. However, it has to wait 10 seconds before deciding that it couldn't and stops trying. It also happens without using SendCallWithRetry(), so maybe there is a parameter that sets this 10-second connection attempt. My API version is 2.6.0. My program is this one, in case you want to try (I know there might be a lot of things I don't need, but it doesn't matter):
#! /usr/bin/env python3
##################################################################
# Programa encargado de leer las posiciones de las articulaciones
# y mandárselas a Unity
##################################################################
import sys
import os
import collections
try:
from collections import abc
collections.MutableMapping = abc.MutableMapping
collections.MutableSequence = abc.MutableSequence
except:
pass
from kortex_api.autogen.client_stubs.ActuatorConfigClientRpc import ActuatorConfigClient
from kortex_api.autogen.client_stubs.ActuatorCyclicClientRpc import ActuatorCyclicClient
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.client_stubs.BaseCyclicClientRpc import BaseCyclicClient
from kortex_api.autogen.client_stubs.DeviceConfigClientRpc import DeviceConfigClient
from kortex_api.autogen.client_stubs.DeviceManagerClientRpc import DeviceManagerClient
from kortex_api.autogen.messages import Session_pb2, ActuatorConfig_pb2, Base_pb2, BaseCyclic_pb2, Common_pb2
from kortex_api.RouterClient import RouterClientSendOptions
import time
import sys
import threading
import socket
import struct
class ReadJoints:
def __init__(self, router, router_real_time):
# Create required services
device_manager = DeviceManagerClient(router)
self.actuator_config = ActuatorConfigClient(router)
self.base = BaseClient(router)
self.base_cyclic = BaseCyclicClient(router_real_time)
self.base_command = BaseCyclic_pb2.Command()
self.base_feedback = BaseCyclic_pb2.Feedback()
self.base_custom_data = BaseCyclic_pb2.CustomData()
# Detect all devices
device_handles = device_manager.ReadAllDevices()
self.actuator_count = self.base.GetActuatorCount().count
# Only actuators are relevant
for handle in device_handles.device_handle:
if handle.device_type == Common_pb2.BIG_ACTUATOR or handle.device_type == Common_pb2.SMALL_ACTUATOR:
self.base_command.actuators.add()
self.base_feedback.actuators.add()
# Change send option to reduce max timeout at 3ms
self.sendOption = RouterClientSendOptions()
self.sendOption.andForget = False
self.sendOption.delay_ms = 0
self.sendOption.timeout_ms = 3
self.cyclic_t_end = 0 #Total duration of the thread in seconds. 0 means infinite.
self.cyclic_thread = {}
self.kill_the_thread = False
self.already_stopped = False
self.cyclic_running = False
self.angulo_anterior = [0, 0, 0, 0, 0, 0, 0]
def InitCyclic(self, sampling_time_cyclic, t_end, print_stats):
if self.cyclic_running:
return True
print("Init Cyclic")
sys.stdout.flush()
base_feedback = self.SendCallWithRetry(self.base_cyclic.RefreshFeedback, 3)
if base_feedback:
self.base_feedback = base_feedback
# Init command frame
for x in range(self.actuator_count):
self.base_command.actuators[x].flags = 1 # servoing
self.base_command.actuators[x].position = self.base_feedback.actuators[x].position
print("Actuador ", x, ": ", self.base_command.actuators[x].position)
# Init cyclic thread
self.cyclic_t_end = t_end
self.cyclic_thread = threading.Thread(target=self.RunCyclic, args=(sampling_time_cyclic, print_stats))
self.cyclic_thread.daemon = True
self.cyclic_thread.start()
return True
else:
print("InitCyclic: failed to communicate")
return False
def RunCyclic(self, t_sample, print_stats):
self.cyclic_running = True
print("Run Cyclic")
sys.stdout.flush()
cyclic_count = 0 # Counts refresh
failed_cyclic_count = 0 # Count communication timeouts
t_now = time.time()
t_cyclic = t_now # cyclic time
t_freeze = t_now
print("Running ReadJointAnglesCyclic for {} seconds".format(self.cyclic_t_end))
while not self.kill_the_thread:
t_now = time.time()
if(t_now - t_cyclic) >= 10:
print("ME HE CONGELADO 10 SEGUNDOS")
print("Tiempo entre congelaciones: ", t_now - t_freeze)
print("Tiempo congelado: ", t_now - t_cyclic)
print("Contador de ciclo:", cyclic_count)
t_freeze = time.time()
self.base_feedback = self.SendCallWithRetry(self.base_cyclic.RefreshFeedback, 3)
# Cyclic Refresh
if (t_now - t_cyclic) >= t_sample:
t_cyclic = t_now
actuador1 = self.base_feedback.actuators[0].position
actuador2 = self.base_feedback.actuators[1].position
actuador3 = self.base_feedback.actuators[2].position
actuador4 = self.base_feedback.actuators[3].position
actuador5 = self.base_feedback.actuators[4].position
actuador6 = self.base_feedback.actuators[5].position
actuador7 = self.base_feedback.actuators[6].position
#if actuador4>210:
# actuador4=actuador4-360
actuadores = [actuador1, actuador2, actuador3, actuador4, actuador5, actuador6, actuador7]
#self.convertir_angulos(actuadores)
#print(actuadores)
#time.sleep(0.1)
#self.send_data(actuadores)
cyclic_count = cyclic_count + 1
self.cyclic_running = False
return True
def StopCyclic(self):
print ("Stopping the cyclic and putting the arm back in position mode...")
if self.already_stopped:
return
# Kill the thread first
if self.cyclic_running:
self.kill_the_thread = True
self.cyclic_thread.join()
base_servo_mode = Base_pb2.ServoingModeInformation()
base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
self.base.SetServoingMode(base_servo_mode)
self.cyclic_t_end = 0.1
self.already_stopped = True
print('Clean Exit')
##################################
# Detecto los saltos de 360º a 0º debido a que el Unity necesita que, si se pasa de 360º a 10º, reciba 370º en vez de los 10º.
# Si se pasa de 10º a 360º, el Unity necesita que le lleguen 0º, para hacer bien la trayectoria del robot.
##################################
def convertir_angulos(self, angulos):
for indice, angulo in enumerate(angulos):
if self.angulo_anterior != [0, 0, 0, 0, 0, 0, 0]:
# Verificar la transición de 350º a 10º y viceversa
if angulo - self.angulo_anterior[indice] > 300:
angulo -= 360
elif angulo - self.angulo_anterior[indice] < -300:
angulo += 360
angulos[indice] = angulo
# Actualizar el ángulo anterior
self.angulo_anterior = angulos
return angulos
#Envia el valor de las articulaciones a Unity
def send_data(self, posArticulaciones):
# Crear una conexión TCP/IP
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('localhost', 5555)) # IP y puerto de tu elección
# Empaquetar la fila como una cadena de bytes
fila_bytes = struct.pack('7f', *posArticulaciones)
# Enviar la fila al programa C#
s.sendall(fila_bytes)
@staticmethod
def SendCallWithRetry(call, retry, *args):
i = 0
arg_out = []
while i < retry:
try:
arg_out = call(*args)
break
except:
i = i + 1
print("Reconectando:", i)
continue
if i == retry:
print("Failed to communicate")
return arg_out
def main():
# Import the utilities helper module
import argparse
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import utilities
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--cyclic_time", type=float, help="delay, in seconds, between cylic control call", default=0.002)
parser.add_argument("--duration", type=int, help="example duration, in seconds (0 means infinite)", default=0)
parser.add_argument("--print_stats", default=True, help="print stats in command line or not (0 to disable)", type=lambda x: (str(x).lower() not in ['false', '0', 'no']))
args = utilities.parseConnectionArguments(parser)
# Create connection to the device and get the router
with utilities.DeviceConnection.createTcpConnection(args) as router:
with utilities.DeviceConnection.createUdpConnection(args) as router_real_time:
example = ReadJoints(router, router_real_time)
success = example.InitCyclic(args.cyclic_time, args.duration, args.print_stats)
if success:
while example.cyclic_running:
try:
time.sleep(0.5)
except KeyboardInterrupt:
break
example.StopCyclic()
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())
And this is the output:
In utilities I have this:
class DeviceConnection:
TCP_PORT = 10000
UDP_PORT = 10001
@staticmethod
def createTcpConnection(args):
"""
returns RouterClient required to create services and send requests to device or sub-devices,
"""
return DeviceConnection(args.ip, port=DeviceConnection.TCP_PORT, credentials=(args.username, args.password))
@staticmethod
def createUdpConnection(args):
"""
returns RouterClient that allows to create services and send requests to a device or its sub-devices @ 1khz.
"""
return DeviceConnection(args.ip, port=DeviceConnection.UDP_PORT, credentials=(args.username, args.password))
def __init__(self, ipAddress, port=TCP_PORT, credentials = ("","")):
self.ipAddress = ipAddress
self.port = port
self.credentials = credentials
self.sessionManager = None
# Setup API
self.transport = TCPTransport() if port == DeviceConnection.TCP_PORT else UDPTransport()
self.router = RouterClient(self.transport, RouterClient.basicErrorCallback)
# Called when entering 'with' statement
def __enter__(self):
self.transport.connect(self.ipAddress, self.port)
if (self.credentials[0] != ""):
session_info = Session_pb2.CreateSessionInfo()
session_info.username = self.credentials[0]
session_info.password = self.credentials[1]
session_info.session_inactivity_timeout = 10000 # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
self.sessionManager = SessionManager(self.router)
print("Logging as", self.credentials[0], "on device", self.ipAddress)
self.sessionManager.CreateSession(session_info)
return self.router
# Called when exiting 'with' statement
def __exit__(self, exc_type, exc_value, traceback):
if self.sessionManager != None:
router_options = RouterClientSendOptions()
router_options.timeout_ms = 1000
self.sessionManager.CloseSession(router_options)
self.transport.disconnect()
I have not changed anything in the Utilities file. Have you tried reinstalling the firmware on your robot and performing a factory reset? Have you tried running the same code from another computer? I am tempted to blame either Windows 11 or your firewall configuration. I have seen somewhat similar issues in the past where the cause was that the network card of the PC could not handle all of the communication it had to (code + browser + other stuff running on the PC at the same time), which would cause some calls to just drop.
You can try to change your session_inactivity_timeout in Utilities to less than 10 seconds. I expect it will only reduce your systematic time to recall to whatever value you select without solving the root of the issue, but it may help you make progress.
Hello @MiguelGarciaUVa , Any update on this topic? Can I close this issue?
Hello, I have a code where I am getting the position of the joints in a loop. I am receiving data from the robot through the Refresh Feedback function, but suddenly it stops receiving that data for exactly 10 seconds and then continues as usual. This happens randomly (I think). We would like to know the cause of this or if you are aware that this is a bug and how to fix it. The same applies to the torques. The servoing mode is set to single level servoing, instead of low level, because I want to move the robot in high level mode at the same time. I don't know if the problem can come from that. If someone can help me, I would appreciate it. Thanks in advance.