SMFSW / Queue

Queue handling library (designed on Arduino)
BSD 3-Clause "New" or "Revised" License
97 stars 20 forks source link

parallel access of getRemainingCount() / getCount() / nbRecs() #20

Open programmeddeath1 opened 1 year ago

programmeddeath1 commented 1 year ago

When trying to access the current remaining count or get count for a queue, the getCount() always gives value as 0 and getRemainingCount() always stays at 50. The queue publish and pop are working because they execute corrdinates on an arm running the motors, but the number of coordinates in the queue currently is not accessible. Since i'm not able to publish the number of coordinates currently in the queue, it is making it difficult to know when to send next list of points.

SMFSW commented 1 year ago

Hello, thanks for your interest and message.

If I get everything well, you're using some RTOS, but I don't really figure out what is your implementation. It seems you push and pull items in one task, and try to get Queue items count in another?

At first, I'd say your implementation always empties the Queue before your other task asks for count, or has a chance to, but I would need more information. Maybe your task pushes one item to queue and pops it right after, before your other task asks for count.

I need to know about tasks and priorities (also other non related tasks which may have a higher priority than those using the queue), I need to know if you need to use an enable/disable interrupts mechanisms when pushing/pulling, if you use mutex for concurrent access, and how they are used.

One other thing you may try is using temporary variable to store counts in your push/pull task (without concurrent access) and print it to serial (if such peripheral is in use), or try putting some breakpoints to see how much the queue is filled. If you fill your queue with a list of items, right after they are pushed, count methods shall state right amounts. The getCount like methods are always accessible, unless a mutex or something prevents execution of those methods.

Please let me know, SMFSW

programmeddeath1 commented 1 year ago

Yes the code is as below

void trajectoryPointCallback(const trajectory_msgs::JointTrajectoryPoint& point_msg) {
  trajPoint input_point;
  if (!points.isFull()) {
    for (int num = 0; num < MOTOR_COUNT; num++) {
      input_point.pos[num] = point_msg.positions[num];
      input_point.velo[num] = point_msg.velocities[num];
    }
    points.push(&input_point);
  }

}

void executeArm() {
  prevTime = startTime;

  int curr_buffer;  
  // Execution
  if (!points.isEmpty()) {
    trajPoint exec_point;
    points.pop(&exec_point);
    curr_buffer_empty = false;

    for (int num = 0; num < MOTOR_COUNT; num++) {
      if (exec_point.pos[num] < 0)
        continue;
      set_point(num, exec_point.pos[num], exec_point.velo[num], BYPASS);
    }

  }
  else
      curr_buffer_empty = true;
    return;
}
void publishBuffer() {
  //int curr_buffer = points.getCount();
  bool curr_buffer_empty = points.isEmpty();

  if (curr_buffer_empty != prev_buffer_empty) {
    lastDebounceTime = millis();
  }

  if ((millis() - lastDebounceTime) > debounceDelay) {
    if (curr_buffer_empty != buffer_empty) {
      buffer_empty = curr_buffer_empty;
      if (buffer_empty == true) {
        buffer_.data = 1;
        buffer_pub.publish(&buffer_);        
        //counter++;
        //Serial.println(counter);
      }
      else{
        buffer_.data = 0;
        buffer_pub.publish(&buffer_);         
      }
    }
  }

void loop()
{
  //  startTime = millis();
  //  if ((startTime - prevTime) > dt)
  publishBuffer();
  executeArm();
  nh.spinOnce();
  // delay(1);
}

These are the three main functions. The ros package sends the points on the node which is then pushed to the points queue one by one. This queue is popped and executed one at a time by the executeArm function. I am reading the queue getCount() on a publishBuffer function which is sending the feedback to the higher level whether to send next points or not. You are right the queue is being popped an executed faster than the getCount() is being called. Its always empty by the time the data needs to be published. So at the Higher level the queue always shows to be empty and thus i get coordinates being sent one after the other.

The queue is cleared immediately but the execution on the stepper motor takes some time. There are no points in the queue,but the motors are still moving, since its in rtos, they move independently but the queue is cleared. So i dnt really have any feedback to know if the motors are currently free or moving. If i try to publish data while the motors are moving, the execution by the esp32 seems to get hampered. it starts jerking.

SMFSW commented 1 year ago

Alright then, you're not under RTOS, using sequential access.

It seems a portion of code is missing at the beginning of your post. Still, with this, you're just putting the last point_msg to input_msg in queue. I'm not sure it's the behavior you want. If you want to fill the entire queue, you should try something like that:

for (int num = 0 ; num < MOTOR_COUNT ; num++) {
    if (points.isFull()) {
        break; }

    trajPoint input_point;
    input_point.pos = point_msg.positions[num];
    input_point.velo = point_msg.velocities[num];
    points.push(&input_point);
}

This will break for loop if the queue is full, otherwise fill the buffers in the queue. Still, you may need to keep track of you num count when the loop breaks, so you can start from last record next time you need to feed the queue, like this for example:

int save_num = 0;
for (int num = save_num ; num < MOTOR_COUNT ; num++) {
    if (points.isFull()) {
        save_num = num;
        break; }

    trajPoint input_point;
    input_point.pos = point_msg.positions[num];
    input_point.velo = point_msg.velocities[num];
    points.push(&input_point);
}

I see you're using a lot of [num] (array access) where you declared just a single structure item. Declaring just an item is fine when you want to push or pull from the queue, but you shall remove those [num] when structures are declared single. You're writing outside variables bounds with these. Don't you get any warning from the compiler doing so? If not, go to the preferences and enable warnings.

Just to be clear, when you're using push and pull methods, this does not copy the whole queue, only one item by call.

I'm still not sure what your project should do, but I guess you fill coordinates in queue, want to execute actuator movement step by step (I mean start next step when previous one arrived at target position). If I'm right, I don't see in your code anything waiting for the actuator to finish it's previous move before launching the next one.

for (int num = 0; num < MOTOR_COUNT; num++) {
  if (exec_point.pos[num] < 0)
    continue;
  set_point(num, exec_point.pos[num], exec_point.velo[num], BYPASS);
}

There, you're passing all the points at once. Unless set_point acts as a queue itself, last call to set_point will be the target, no sequence in between.

Some kind of proper sequence would be:

void loop() {
    fill_buffers(); // This function would be something like the first 2 excerpts of code in the post

    if (actuator_in_position() && (!q.isEmpty())) {
        trajPoint exec_point;
        q.pull(&exec_point);
        set_point(&exec_point...);
    }
}
programmeddeath1 commented 1 year ago

Thank you so much for taking time with your inputs here! Really appreciate it! I'm sorry I shared only the parts of the code that seemed relevant to the question so as not to spam the post. If you don't mind ill share the entire code here will help you understand better.

#include <ros.h>
#include <trajectory_msgs/JointTrajectoryPoint.h>
#include <std_msgs/UInt8.h>
#include <std_msgs/String.h>
#include <ESP_FlexyStepper.h>
#include <cppQueue.h>
#define BYPASS 0

// Stack Implementation
#define IMPLEMENTATION  FIFO
#define dt 2
#define BUFFER_SIZE 50

typedef struct trajPoint {
  float pos[3];
  float velo[3];
} single_point;

cppQueue points(sizeof(single_point), BUFFER_SIZE, IMPLEMENTATION);
int prev_buffer = 0;
//## Is empty debounce  variables
bool buffer_empty = false;
bool prev_buffer_empty = false;
unsigned long lastDebounceTime = 0;
unsigned long debounceDelay = 100;
// Send updates at interval for motion running
unsigned long lastSendTime = 0;
unsigned long sendDelay = 1000;

// Watch timer
unsigned long startTime, prevTime;

// Pre Defined Values
#define MOTOR_COUNT 3
#define STEP_PER_REV 1600
#define MM_PER_REV 70

// ROS Serial
ros::NodeHandle nh;
std_msgs::String str_msg;
std_msgs::UInt8 buffer_;
ros::Publisher status_pub("/delta_arm/status", &str_msg);
ros::Publisher buffer_pub("/delta_arm/armBuffer", &buffer_);

// Assign Pins
int MOTOR_PUL_PINS[] = {27, 32, 26};
int MOTOR_DIR_PINS[] = {14, 33, 25};
int MOTOR_LIMIT_PINS[] = {19, 18, 5};

const float STEPS_PER_MM = STEP_PER_REV / MM_PER_REV;
const float SPEED_IN_MM_S = 300.0;
const float ACCEL_IN_MM_S = 100000.0;
const float HOMING_VEL_IN_MM_S = 200.0;

// create the stepper motor object
ESP_FlexyStepper MOTOR[MOTOR_COUNT];

void trajectoryPointCallback(const trajectory_msgs::JointTrajectoryPoint& point_msg) {
  trajPoint input_point;
  if (!points.isFull()) {
    for (int num = 0; num < MOTOR_COUNT; num++) {
      input_point.pos[num] = point_msg.positions[num];
      input_point.velo[num] = point_msg.velocities[num];
    }
    points.push(&input_point);
  }

}

// Subscriber after the callback function
ros::Subscriber<trajectory_msgs::JointTrajectoryPoint> traj_sub("/delta_arm/armCmd", trajectoryPointCallback);

void set_point(int num, float pos, float velo, float accel) {
  // setVeloAndAccel(num, abs(velo), abs(accel));
  MOTOR[num].setTargetPositionInMillimeters(pos);
  MOTOR[num].setSpeedInMillimetersPerSecond(velo);
}

// Function to Set Velo and Accel
void setVeloAndAccel(int num, float velo_in_mm_s, float accel_in_mm_s) {
  MOTOR[num].setSpeedInMillimetersPerSecond(velo_in_mm_s);
  MOTOR[num].setAccelerationInMillimetersPerSecondPerSecond(accel_in_mm_s);
  MOTOR[num].setDecelerationInMillimetersPerSecondPerSecond(accel_in_mm_s);
}

void motorHoming() {
  delay(1000);

  // Homing Functions

  for (int num = 0; num < MOTOR_COUNT ; num++) {
    // Set max bounds and speed to retract
    MOTOR[num].setTargetPositionRelativeInMillimeters(-500.0);
    setVeloAndAccel(num, HOMING_VEL_IN_MM_S, ACCEL_IN_MM_S);
  }

  // Loop to execute each step and check for completion
  do {
    if (digitalRead(MOTOR_LIMIT_PINS[0]) == LOW) {
      MOTOR[0].setCurrentPositionAsHomeAndStop();
      if (!MOTOR[0].isStartedAsService())
        MOTOR[0].startAsService();
    }
    if (digitalRead(MOTOR_LIMIT_PINS[1]) == LOW) {
      MOTOR[1].setCurrentPositionAsHomeAndStop();
      if (!MOTOR[1].isStartedAsService())
        MOTOR[1].startAsService();
    }
    if (digitalRead(MOTOR_LIMIT_PINS[2]) == LOW) {
      MOTOR[2].setCurrentPositionAsHomeAndStop();
      if (!MOTOR[2].isStartedAsService())
        MOTOR[2].startAsService();
    }

    // Step Executor
    MOTOR[0].processMovement();
    MOTOR[1].processMovement();
    MOTOR[2].processMovement();
    nh.spinOnce();
  } while (digitalRead(MOTOR_LIMIT_PINS[0]) == HIGH || digitalRead(MOTOR_LIMIT_PINS[1])  == HIGH || digitalRead(MOTOR_LIMIT_PINS[2]) == HIGH );

  // Confirm all the points are homed and started as a service to run in background
  MOTOR[0].setCurrentPositionAsHomeAndStop();
  MOTOR[1].setCurrentPositionAsHomeAndStop();
  MOTOR[2].setCurrentPositionAsHomeAndStop();

  if (!MOTOR[0].isStartedAsService())
    MOTOR[0].startAsService();
  if (!MOTOR[1].isStartedAsService())
    MOTOR[1].startAsService();
  if (!MOTOR[2].isStartedAsService())
    MOTOR[2].startAsService();

  delay(100);
}

void setup()
{
  nh.initNode();
  while (!nh.connected())
  {
    nh.spinOnce();
    delay(100);
  }
  nh.advertise(status_pub);
  nh.advertise(buffer_pub);
  nh.subscribe(traj_sub);

  // connect and configure the stepper motor to its IO pins
  for (int num = 0; num < MOTOR_COUNT ; num++) {

    // Input for Limit Pins
    pinMode(MOTOR_LIMIT_PINS[num], INPUT_PULLUP);

    // Motor Setup
    MOTOR[num].connectToPins(MOTOR_PUL_PINS[num], MOTOR_DIR_PINS[num]);
    MOTOR[num].setStepsPerMillimeter(STEPS_PER_MM);
  }

  // Home and be ready for input

  delay(3500);
  nh.loginfo("Homing.......");
  str_msg.data = "Homing...";
  status_pub.publish( &str_msg );

  motorHoming();

  delay(1000);
  nh.loginfo("Homing Done !!");
  str_msg.data = "Homing Done!";
  status_pub.publish( &str_msg );

  for (int num = 0; num < MOTOR_COUNT; num++) {
    set_point(num, 147.042, SPEED_IN_MM_S, ACCEL_IN_MM_S);
  }

  delay(2000);
  nh.loginfo("Ready for Trjectory now!!");
  str_msg.data = "Ready for Trjectory now!!";
  status_pub.publish( &str_msg );

  lastSendTime = millis()
}

void executeArm() {
  prevTime = startTime;

  int curr_buffer;  
  // Execution
  if (!points.isEmpty()) {
    trajPoint exec_point;
    points.pop(&exec_point);
    if ((millis() - lastSendTime) > sendDelay){
      buffer_.data = 0;
      buffer_pub.publish(&buffer_); 
      lastSendTime = millis();
    }
    for (int num = 0; num < MOTOR_COUNT; num++) {
      if (exec_point.pos[num] < 0)
        continue;
      set_point(num, exec_point.pos[num], exec_point.velo[num], BYPASS);
    }

  }
  else
    return;
}

void publishBuffer() {
  //int curr_buffer = points.getCount();
  bool curr_buffer_empty = points.isEmpty();

  if (curr_buffer_empty != prev_buffer_empty) {
    lastDebounceTime = millis();
  }

  if ((millis() - lastDebounceTime) > debounceDelay) {
    if (curr_buffer_empty != buffer_empty) {
      buffer_empty = curr_buffer_empty;
      if (buffer_empty == true) {
        buffer_.data = 1;
        buffer_pub.publish(&buffer_);        
        //counter++;
        //Serial.println(counter);
      }
      else{
        buffer_.data = 0;
        buffer_pub.publish(&buffer_);         
      }
    }
  }
//  else {
//        buffer_.data = 0;
//        buffer_pub.publish(&buffer_); 
//  }
  prev_buffer_empty = curr_buffer_empty;

}

void loop()
{
  //  startTime = millis();
  //  if ((startTime - prevTime) > dt)
  publishBuffer();
  executeArm();
  nh.spinOnce();
  // delay(1);
}

So this currently runs by declaring a struct array for the StepperMotor library where the num points to each motor to be run. In my case 3 motors are being run for the delta robot. So the ros callback continuously sends points which is pushed to the queue, then in the executeArm fn it pops from the queue and executes (Sending to set_point). The MOTOR[0].isStartedAsService() is basically the RTOS implementation(ESP-FlexyStepper library), where the service handles the actuator movement once the target point is set for the particular motor. Once the queue point is popped and send to set_point, the service handles the movement. Which is where i'm having trouble trying to get feedback of the current movement of the arm. The last point in queue is popped and the queue becomes empty slightly before the actuator movement completes. My challenge is when the motors are currently running, i cannot send any feedback, because the serial feedback communication seems to interrupt the motor execution making it jerky. So I only have the state change from start of execution and end, to figure out how to send back current actuator status. Since im sending upto 200+ points one at a time sequentially, the queue fills and empties continuously in milliseconds.