jhelovuo / RustDDS

Rust implementation of Data Distribution Service
Apache License 2.0
316 stars 65 forks source link

Reliable reader delayed reception of FastDDS shapes #300

Closed samcarey closed 10 months ago

samcarey commented 11 months ago

I've noticed that when I try to receive shapes from FastDDS using a reliable reader, samples can get very delayed. It seems to work fine with a RustDDS writer of the "same" settings. First, let me show you code that works as expected. Then I'll tell you what breaks it.

use anyhow::Result;
use futures::{select, StreamExt};
use rustdds::{
    policy::{Deadline, Durability, History, Lifespan, Liveliness, Ownership, Reliability},
    DomainParticipant, Keyed, QosPolicyBuilder, TopicKind,
};
use smol::Timer;
use std::time::Duration;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
struct ShapeType {
    color: String,
    x: i32,
    y: i32,
    shape_size: i32,
}

impl Keyed for ShapeType {
    type K = String;
    fn key(&self) -> Self::K {
        self.color.clone()
    }
}

#[test]
fn delayed_samples() -> Result<()> {
    smol::block_on(async {
        let participant = DomainParticipant::new(0)?;
        let writer_qos = QosPolicyBuilder::new()
            .history(History::KeepLast { depth: 1 })
            .reliability(Reliability::Reliable {
                max_blocking_time: Duration::ZERO.into(),
            })
            .durability(Durability::Volatile)
            .liveliness(Liveliness::Automatic {
                lease_duration: rustdds::Duration::DURATION_INFINITE,
            })
            .ownership(Ownership::Shared)
            .deadline(Deadline(rustdds::Duration::DURATION_INFINITE))
            .lifespan(Lifespan {
                duration: rustdds::Duration::DURATION_INFINITE,
            })
            .build();
        let writer = participant
            .create_publisher(&writer_qos)?
            .create_datawriter_cdr::<ShapeType>(
                &participant.create_topic(
                    "Square".to_string(),
                    "ShapeType".to_string(),
                    &writer_qos,
                    TopicKind::WithKey,
                )?,
                None,
            )?;

        let qos1 = QosPolicyBuilder::new()
            .reliability(rustdds::policy::Reliability::Reliable {
                max_blocking_time: rustdds::Duration::DURATION_ZERO,
            })
            .build();
        let reader1 = participant
            .create_subscriber(&qos1)?
            .create_datareader_cdr::<idl_types::ShapeType>(
                &participant.create_topic(
                    "Square".to_string(),
                    "ShapeType".to_string(),
                    &qos1,
                    TopicKind::WithKey,
                )?,
                None,
            )?;
        let mut samples1 = reader1.async_sample_stream();
        let mut reader_events1 = samples1.async_event_stream();

        let qos2 = QosPolicyBuilder::new().build();
        let reader2 = participant
            .create_subscriber(&qos2)?
            .create_datareader_cdr::<idl_types::ShapeType>(
                &participant.create_topic(
                    "Square".to_string(),
                    "ShapeType".to_string(),
                    &qos2,
                    TopicKind::WithKey,
                )?,
                None,
            )?;
        let mut samples2 = reader2.async_sample_stream();
        let mut reader_events2 = samples2.async_event_stream();
        let mut count = 0;

        let mut write_timer =
            futures::StreamExt::fuse(Timer::interval(Duration::from_secs(7).into()));
        loop {
            select! {
                _ = write_timer.select_next_some() => {
                    writer.write(ShapeType {color: "RED".to_string(), shape_size: count, x: count, y: count}, None)?;
                    count += 1;
                }
                result = samples1.select_next_some() => {
                    println!("Reliable:   {:?}", result?.into_value())
                }
                result = samples2.select_next_some() => {
                    println!("BestEffort: {:?}", result?.into_value())
                }
                result = reader_events1.select_next_some() => {
                    println!("Reader 1 Event: {:?}", result?);
                }
                result = reader_events2.select_next_some() => {
                    println!("Reader 2 Event: {:?}", result?);
                }
            }
        }
    })
}

This produces the following output as expected:

Reader 1 Event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Reader 2 Event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
BestEffort: Value(ShapeType { color: "RED", x: 0, y: 0, shapesize: 0 })
Reliable:   Value(ShapeType { color: "RED", x: 0, y: 0, shapesize: 0 })
BestEffort: Value(ShapeType { color: "RED", x: 1, y: 1, shapesize: 1 })
Reliable:   Value(ShapeType { color: "RED", x: 1, y: 1, shapesize: 1 })
...

Now, I do the following and things get weird:

  1. Download the FastDDS Shapes Demo app.
    • This requires filling out a form, but is pretty easy.
  2. Run the contained bin/ShapesDemo.exe.
  3. (Optional) Click Options > Preferences > increase the update interval to something like 7500 milliseconds so we have plenty of time between messages.
  4. In the GUI, click "Publish" and accept the default settings to create a red square publisher with (I think) the same settings as the writer above.
  5. Comment out the let writer... and this line above: writer.write(...)?;
  6. Run the test again.

I get something like the following

Reader 1 Event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Reader 2 Event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
BestEffort: Value(ShapeType { color: "RED", x: 118, y: 239, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 112, y: 243, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 106, y: 247, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 99, y: 249, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 97, y: 242, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 95, y: 235, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 93, y: 228, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 91, y: 221, shapesize: 30 })
Reliable:   Value(ShapeType { color: "RED", x: 118, y: 239, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 89, y: 214, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 87, y: 207, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 85, y: 200, shapesize: 30 })
Reliable:   Value(ShapeType { color: "RED", x: 112, y: 243, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 83, y: 193, shapesize: 30 })
Reliable:   Value(ShapeType { color: "RED", x: 99, y: 249, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 81, y: 186, shapesize: 30 })
Reliable:   Value(ShapeType { color: "RED", x: 93, y: 228, shapesize: 30 })
BestEffort: Value(ShapeType { color: "RED", x: 79, y: 179, shapesize: 30 })

So you can see the reliable reader eventually gets the messages, but they can be variably delayed. The results can be very different between runs. Sometimes the first sample from the reliable reader shows up right away, sometimes it takes over a minute. Please let me know if you have any ideas or are able to reproduce this.

jhelovuo commented 11 months ago

This sounds strange indeed. To summarize, what you are seeing here is functionally correct behaviour, but with very large delay in the case of Reliability = Reliable and FastDDS writing to RustDDS?

It would also seem that you are getting the same data samples over both Reliable and BestEffort, but again with significant delay.

Some instructions to debug this:

Expected behaviour on the wire:

BestEffort minimally requires just DATA submessages. Each DATA contains a SequenceNumber to distinguish duplicates (e.g. retransmits) and avoid confusing message order. The BestEffort Reader will deliver messages to the application as soon as it receives them. If it receives something out-of-order, it must discard the data. E.g. when receiving SequenceNumbers 1,3,2,4, it must deliver exactly 1,3,4 to the application and discard 2, as it was received out of order.

Reliable communication also uses HEARTBEAT, GAP, and ACKNACK messages. The Writer will announce available SequenceNumbers with a HEARTBEAT, possibly also sending them with DATA. In case the Reader thinks it is missing some SequenceNumbers, it will request those with ACKNACK. The Writer must respond to ACKNACK with either DATA (send the missing data) or GAP (to indicate that the requested data is no longer available). The Reliable Reader must not give data to the application until it is in sync with the Writer, i.e. has received all the available messages up to some point either as a DATA or as a GAP. If the Reader knows about some messages (via a HEARTBEAT) that it is missing, it must stop giving data to the application and wait until Writer has either sent DATA, GAP, or indicated with a HEARTBEAT that the missing SequenceNumbers are no longer available.

If this reliable transmission protocol is not able to get to sync quickly enough, that may cause the messages to be delayed initially. However, that does not explain why the Reliable messages do not catch up.

SelimV commented 11 months ago

I get the same problem even without Fast DDS when I change the write interval from 7 seconds to 1. In the Wireshark capture it appears that the DATA and ACKNACK submessages get sent normally. I will look into it tomorrow.

SelimV commented 11 months ago

It seems that the issue here is that the Square topic is created multiple times. As far as I can tell participant.create_topic just creates the Topic object and does not cause problems by itself, but create_datareader_cdr tries to add the topic to the DDS cache and ends up updating the QOS of the existing Square topic, since the cache is indexed by the topic name. This can be observed by switching the order of the create_datareader_cdr calls: if reader 2 is created first, QOS 1 overrides QOS 2 and remains in the DDS cache during the data exchange, which seems to fix the problem, whereas if reader 1 is created first, QOS 1 is overridden by the weaker QOS 2, which causes problems for the reliable reader.

In conclusion, this seems to be a combination of user error and unclear handling of duplicate topics. However, I haven't figured out why reader 1 was providing samples at a delay, nor why the problem did not appear with a longer write interval, but I'll let @jhelovuo decide how we will prioritize this.

@samcarey please let us know whether changes according to the following code fix your initial problem.

use std::time::Duration;

use anyhow::Result;
use futures::{select, StreamExt};
use rustdds::{
  policy::{Deadline, Durability, History, Lifespan, Liveliness, Ownership, Reliability},
  DomainParticipant, Keyed, QosPolicyBuilder, TopicKind,
};
use smol::Timer;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
struct ShapeType {
  color: String,
  x: i32,
  y: i32,
  shape_size: i32,
}

impl Keyed for ShapeType {
  type K = String;
  fn key(&self) -> Self::K {
    self.color.clone()
  }
}

fn main() -> Result<()> {
  smol::block_on(async {
    let participant = DomainParticipant::new(0)?;
    let square_qos = QosPolicyBuilder::new()
      .history(History::KeepLast { depth: 1 })
      .reliability(Reliability::Reliable {
        max_blocking_time: Duration::ZERO.into(),
      })
      .durability(Durability::Volatile)
      .liveliness(Liveliness::Automatic {
        lease_duration: rustdds::Duration::DURATION_INFINITE,
      })
      .ownership(Ownership::Shared)
      .deadline(Deadline(rustdds::Duration::DURATION_INFINITE))
      .lifespan(Lifespan {
        duration: rustdds::Duration::DURATION_INFINITE,
      })
      .build();

    let square_topic = participant.create_topic(
      "Square".to_string(),
      "ShapeType".to_string(),
      &square_qos,
      TopicKind::WithKey,
    )?;

   /*  let writer = participant
      .create_publisher(&square_qos)?
      .create_datawriter_cdr::<ShapeType>(&square_topic, None)?; */

    let reliable_qos = QosPolicyBuilder::new()
      .reliability(rustdds::policy::Reliability::Reliable {
        max_blocking_time: rustdds::Duration::DURATION_ZERO,
      })
      .build();

    let best_effort_qos = QosPolicyBuilder::new().build();

    let reliable_reader = participant
      .create_subscriber(&reliable_qos)?
      .create_datareader_cdr::<ShapeType>(&square_topic, None)?;

    let best_effort_reader = participant
      .create_subscriber(&best_effort_qos)?
      .create_datareader_cdr::<ShapeType>(&square_topic, None)?;

    let mut reliable_samples = reliable_reader.async_sample_stream();
    let mut best_effort_samples = best_effort_reader.async_sample_stream();

    let mut reliable_reader_events = reliable_samples.async_event_stream();
    let mut best_effort_reader_events = best_effort_samples.async_event_stream();

    // let mut count = 0;

    let mut write_timer = futures::StreamExt::fuse(Timer::interval(Duration::from_secs(1).into()));
    loop {
      select! {
        _ = write_timer.select_next_some() => {
           /*  writer.write(ShapeType {color: "RED".to_string(), shape_size: count, x: count, y: count}, None)?;
            count += 1; */
        }
        result = reliable_samples.select_next_some() => {
            println!("Reliable: {:?}", result?.value());
        }
        result = best_effort_samples.select_next_some() => {
            println!("BestEffort: {:?}", result?.value());
        }
        result = reliable_reader_events.select_next_some() => {
            println!("Reliable Reader Event: {:?}", result?);
        }
        result = best_effort_reader_events.select_next_some() => {
            println!("Best Effort Reader Event: {:?}", result?);
        }
      }
    }
  })
}
jhelovuo commented 10 months ago

This seems to be a combination of Topic inconsistency, i.e. creating same Topic multiple times with different QoS settings, and possibly a duplicate of #302. Closing as investigation continues in issue #302.