omelia-iliffe / fdcanusb-rs

Apache License 2.0
0 stars 0 forks source link

Issue with reading for longer periods. #1

Open perara opened 1 month ago

perara commented 1 month ago

First of all, I'd like to congratulate on a exceptional effort on porting this to rust!

I've done some testing with your library and it seems to work for a while, before I'm receive TimeOut errors. I've done quite alot of debugging, but It seems as if the issues stems for issues with reading OK, combined with the next rcv.

In particular, the error points out that it was expecting "rcv" but it read "OK\r\nrcv ....".

I've ported my exact code to C++ and it works good there.

omelia-iliffe commented 1 month ago

Hello! Thanks for opening an issue and sorry its causing problems.

I have been using this library a lot of the last few months and haven't had any TimeOut errors or any Lost Sync errors. There is a hardcoded 100ms timeout on the SerialPort, could that be timing out and then getting out of sync? It would be unexpected as it would mean more than 100ms between the can send command being written and the OK\r\n reply being received. Could you share some logs of the errors?

omelia-iliffe commented 1 month ago

I've published v0.5 which added proper error types and fixes an issue with padding length of can frames. I don't think these changes will resolve your issue but it may make it easier to debug. I'll publish a new version of the moteus crate shortly, I assume you are using that?

omelia-iliffe commented 3 weeks ago

Another question: What platform are you working on? I had mainly done testing with LInux/Ubuntu but now I'm testing with windows and having some issues.

perara commented 3 weeks ago

Thanks. I will give this update a go. I ran my test on a rpi 4.

Will also post better logs if the problem presiss :)

omelia-iliffe commented 3 weeks ago

I think I have found the problem. The read_ok method will read all data waiting and will read the recv line if its available.

Ok("OK\r\nrcv 100 210001230D181A242F010000000000000000000000002C06380000C07F0000C07F0000000000000000000000")

This causes no response as its already been read and thrown away. I think switching to a global buffer and scanning for the \n at the end of each line is whats required. I will have a look at the today or next week.

perara commented 3 weeks ago

This sounds like the problem I was experiencing. Looking forward to testing a fix!

omelia-iliffe commented 3 weeks ago

Hello, I've released 0.6.0 of fdcanusb and 0.2.1 of the moteus crate. Give them a try and let me know how it goes.

perara commented 1 week ago

Dear Omelia, Sorry for the delay, been busy with a few other components but I finally got to test our rust version of communicating with the servo.

So far it works perfect, I've yet to see any instabilities. Thanks a lot for your efforts, really great implementation!

I shamelessly post my code here, with the reservation that I have no idea what I'm doing :) This class uses the library, along with moteus-rs to connect the motor to a pub-sub redis instance, for which you can issue rotations on demand.



pub struct ServoController {
    controller: Arc<Mutex<Controller<SerialPort>>>,
    servo_increment: f32,
    idle_timeout: Duration,
    query: Query,
    last_signal_time: Arc<Mutex<Instant>>,
    should_fire: Arc<Mutex<bool>>,
    position: Arc<Mutex<f32>>,
}

impl ServoController {
    pub async fn new(servo_increment: f32, idle_timeout: u64) -> Result<Self, ServoError> {
        info!("Initializing ServoController with increment: {}, timeout: {}s", servo_increment, idle_timeout);

        let qr = Query::new_with_extra([
            ControlPosition::read().into(),
            ControlVelocity::read().into(),
            ControlTorque::read().into(),
            ControlPositionError::read().into(),
            ControlVelocityError::read().into(),
            ControlTorqueError::read().into(),
            Fault::read().into(),
        ]);

        let transport = FdCanUSB::open("/dev/fdcanusb", serial2::KeepSettings)?;
        debug!("FdCanUSB opened successfully");

        let controller = Arc::new(Mutex::new(Controller::new(transport, false)));
        debug!("Controller initialized");

        let servo_controller = Self {
            controller,
            servo_increment,
            idle_timeout: Duration::from_secs(idle_timeout),
            query: qr,
            last_signal_time: Arc::new(Mutex::new(Instant::now())),
            should_fire: Arc::new(Mutex::new(false)),
            position: Arc::new(Mutex::new(0f32)),
        };

        info!("ServoController initialized successfully");
        Ok(servo_controller)
    }

    pub async fn start(self: &Arc<Self>) {
        let control_loop = self.clone();
        let idle_check = self.clone();

        tokio::spawn(async move {
            control_loop.control_loop().await;
        });

        tokio::spawn(async move {
            idle_check.idle_check().await;
        });
    }

    async fn control_loop(&self) {
        if let Err(e) = self.query_and_set_initial_position().await {
            error!("Failed to query initial servo state: {:?}", e);
        }

        loop {
            if *self.should_fire.lock().await {
                match self.fire_internal().await {
                    Ok(_) => {
                        info!("Fired successfully");
                        *self.should_fire.lock().await = false;
                        *self.last_signal_time.lock().await = Instant::now();
                    },
                    Err(e) => error!("Failed to fire: {:?}", e),
                }
            }

            tokio::time::sleep(Duration::from_millis(20)).await;
        }
    }

    async fn idle_check(&self) {
        loop {
            tokio::time::sleep(self.idle_timeout).await;
            let current_time = Instant::now();
            let last_time = *self.last_signal_time.lock().await;
            if current_time.duration_since(last_time) > self.idle_timeout {
                warn!("Servo is idle");
                if let Err(e) = self.stop_internal().await {
                    error!("Failed to stop servo during idle check: {:?}", e);
                }
            }
        }
    }

    async fn query_and_set_initial_position(&self) -> Result<(), ServoError> {
        let (current_position, _) = self.query_state().await?;
        debug!("Initial position: {}", current_position);
        *self.position.lock().await = current_position;
        Ok(())
    }

    async fn query_state(&self) -> Result<(f32, Option<Fault>), ServoError> {
        let state = self.controller
            .lock()
            .await
            .send_with_query(1, self.query.clone(), QueryType::Default)?;

        let position = state.get::<Position>()
            .ok_or(ServoError::PositionNotFound)?
            .value()
            .ok_or(ServoError::PositionValueNotAvailable)?;

        let fault = state.get::<Fault>();

        Ok((position, fault))
    }

    async fn fire_internal(&self) -> Result<(), ServoError> {
        let mut pos = self.position.lock().await;
        let old_position = *pos;
        *pos -= self.servo_increment;

        debug!("Firing servo. Current position: {}, New position: {}", old_position, *pos);

        let command = moteus::frame::Position {
            position: Some(CommandPosition::write(*pos)),
            watchdog_timeout: Some(CommandTimeout::write(f32::NAN)),
            ..Default::default()
        };

        let state = self.controller.lock().await.send_with_query(
            1,
            command,
            QueryType::Default
        )?;
        debug!("Fire command result: {:?}", state);

        Ok(())
    }

    async fn stop_internal(&self) -> Result<(), ServoError> {
        self.controller.lock().await.send_no_response(1, moteus::frame::Stop)?;
        Ok(())
    }

    pub async fn trigger_fire(&self) -> Result<(), ServoError> {
        let mut should_fire = self.should_fire.lock().await;
        *should_fire = true;
        info!("Fire triggered");
        Ok(())
    }

    pub async fn stop(&self) -> Result<(), ServoError> {
        info!("Stopping servo");
        self.stop_internal().await?;
        debug!("Stop command sent successfully");
        *self.last_signal_time.lock().await = Instant::now();
        Ok(())
    }

    pub async fn check_state(&self) -> Result<(f32, Option<Fault>), ServoError> {
        let (position, fault) = self.query_state().await?;

        if let Some(fault_value) = &fault {
            if *fault_value != Fault::read() {
                warn!("Fault detected: {:?}", fault_value);
                self.stop().await?;
            }
        }
        Ok((position, fault))
    }

    pub async fn reset_position(&self) -> Result<(), ServoError> {
        info!("Resetting servo position to 0");

        let command = moteus::frame::Position {
            position: Some(CommandPosition::write(0.0)),
            velocity: Some(CommandVelocity::write(1.0)),
            ..Default::default()
        };

        let state = self.controller.lock().await.send_with_query(1, command, QueryType::Default)?;

        debug!("Reset command result: {:?}", state);
        if let Some(position) = state.get::<Position>() {
            info!("New Position after reset: {:?}", position);
        }

        *self.last_signal_time.lock().await = Instant::now();
        Ok(())
    }

    pub async fn get_position(&self) -> Result<f32, ServoError> {
        let position = *self.position.lock().await;
        Ok(position)
    }

    pub async fn update_position(&self, new_position: f32) {
        let mut position = self.position.lock().await;
        *position = new_position;
    }
}

pub async fn redis_subscriber(servo_controller: Arc<ServoController>, redis_url: String, channel: String) -> Result<(), RedisSubscriberError> {
    let client = redis::Client::open(redis_url)?;
    let mut pubsub = client.get_async_pubsub().await?;
    pubsub.subscribe(&channel).await?;
    let mut stream = pubsub.on_message();
    info!("Redis subscriber started, listening on channel: {}", channel);

    while let Some(msg) = stream.next().await {
        let payload: String = msg.get_payload()?;
        let redis_message: RedisMessage = serde_json::from_str(&payload)?;

        match serde_json::from_str::<Command>(&redis_message.data) {
            Ok(Command::Fire) => {
                info!("Received FIRE command from Redis");
                servo_controller.trigger_fire().await?;
            },
            Err(_) => {
                warn!("Unknown command received: {}", redis_message.data);
                return Err(RedisSubscriberError::UnknownCommand(redis_message.data));
            }
        }
    }
    Ok(())
}```