jhelovuo / RustDDS

Rust implementation of Data Distribution Service
Apache License 2.0
316 stars 65 forks source link

Why does my subscriber miss the first 2 published samples? #301

Closed CraigBuilds closed 10 months ago

CraigBuilds commented 10 months ago

Here is my program:

use rustdds::{
    no_key::DataReader, no_key::DataWriter, policy::Durability, policy::History,
    policy::Reliability, DomainParticipant, Duration, QosPolicies, QosPolicyBuilder, TopicKind,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
struct ExampleMessage {
    count: u32,
}

static DOMAIN_ID: u16 = 0;

///Default QoS settings
fn qos() -> QosPolicies {
    QosPolicyBuilder::new()
        .durability(Durability::TransientLocal)
        .history(History::KeepLast { depth: 10 })
        .reliability(Reliability::Reliable {
            max_blocking_time: Duration::DURATION_INFINITE,
        })
        .build()
}

///Create a publisher return its writer. In this app each publisher has
///a single writer, and topics do not have keys.
fn create_publisher<TopicType: Serialize>(
    ctx: &DomainParticipant,
    topic_name: &str,
) -> DataWriter<TopicType> {
    let topic_type = std::any::type_name::<TopicType>();
    let topic = ctx
        .create_topic(
            topic_name.to_string(),
            topic_type.to_string(),
            &qos(),
            TopicKind::NoKey,
        )
        .unwrap();
    let publisher = ctx.create_publisher(&qos()).unwrap();
    publisher
        .create_datawriter_no_key_cdr::<TopicType>(&topic, Some(qos()))
        .unwrap()
}

///Publish a sample using the given publisher (writer)
fn publish<TopicType: Serialize + std::fmt::Debug>(
    publisher: &DataWriter<TopicType>,
    sample: TopicType,
) {
    let result = publisher.write(sample, None);
    result
        .map_err(|e| println!("Failed to publish: {:?}", e))
        .ok();
}

///Create a subscriber and return its reader. In this app each subscriber has
///a single reader, and topics do not have keys.
///Subscription callbacks will not be processed until the returned reader is
///processed using "process_samples".
fn create_subscriber<TopicType>(ctx: &DomainParticipant, topic_name: &str) -> DataReader<TopicType>
where
    TopicType: for<'de> Deserialize<'de> + std::fmt::Debug + 'static,
{
    let topic_type = std::any::type_name::<TopicType>();
    let topic = ctx
        .create_topic(
            topic_name.to_string(),
            topic_type.to_string(),
            &qos(),
            TopicKind::NoKey,
        )
        .unwrap();
    let subscriber = ctx.create_subscriber(&qos()).unwrap();
    subscriber
        .create_datareader_no_key_cdr::<TopicType>(&topic, Some(qos()))
        .unwrap()
}

///Call the given callback for each sample received by the given subscriber (reader)
fn process_samples<TopicType: for<'de> Deserialize<'de> + std::fmt::Debug + 'static>(
    subscriber: &mut DataReader<TopicType>,
    callback: impl Fn(&TopicType),
) {
    while let Some(sample) = subscriber.take_next_sample().ok().flatten() {
        let sample_value = sample.into_value();
        callback(&sample_value);
    }
}

///Create a domain participant, a publisher, and a subscriber.
///Publish a sample every second (in a separate thread), and
///set the subscription callback to print the received samples (in the main thread).
fn main() {
    println!("Creating DomainParticipant");
    let ctx = DomainParticipant::new(DOMAIN_ID).unwrap();

    println!("Creating subscriber");
    let mut subscriber = create_subscriber::<ExampleMessage>(&ctx, "hello/world");

    println!("Creating publisher");
    let publisher = create_publisher::<ExampleMessage>(&ctx, "hello/world");

    println!("Starting Publisher thread");
    std::thread::spawn(move || {
        let mut sample = ExampleMessage::default();
        loop {
            println!("Publishing: {}", sample.count);
            publish(&publisher, sample.clone());
            sample.count += 1;
            std::thread::sleep(std::time::Duration::from_millis(1000));
        }
    });

    println!("Starting Subscriber thread");
    loop {
        process_samples(&mut subscriber, |sample| {
            println!("Received: {}", sample.count)
        });
    }
}

This outputs:

Creating DomainParticipant
Creating subscriber
Creating publisher
Starting Publisher thread
Starting Subscriber thread
Publishing: 0
Publishing: 1
Publishing: 2
Received: 2
Publishing: 3
Received: 3
Publishing: 4
Received: 4
Publishing: 5
Received: 5
Publishing: 6
Received: 6
Publishing: 7
Received: 7
Publishing: 8
Received: 8
Publishing: 9
Received: 9

Here you can see that it misses the first two samples, even though I am using transient local durability (so it should receive samples published in the past), history (so it should store samples), and reliability (so it should use tcp like reliability and not miss samples).

What am I doing wrong?

SelimV commented 10 months ago

I am consistently getting a correct output with your program:

Creating DomainParticipant
Creating subscriber
Creating publisher
Starting Publisher thread
Starting Subscriber thread
Publishing: 0
Publishing: 1
Publishing: 2
Received: 0
Received: 1
Received: 2
Publishing: 3
Received: 3
Publishing: 4
Received: 4

Have you tried to analyse the RTPS traffic with Wireshark? There should be an ACKNACK submessage requesting for the missing samples:

Frame 1974: 110 bytes on wire (880 bits), 110 bytes captured (880 bits) on interface lo, id 1
Ethernet II, Src: 00:00:00_00:00:00 (00:00:00:00:00:00), Dst: 00:00:00_00:00:00 (00:00:00:00:00:00)
Internet Protocol Version 4, Src: 172.23.17.124, Dst: 172.23.17.124
User Datagram Protocol, Src Port: 34433, Dst Port: 7411
Real-Time Publish-Subscribe Wire Protocol
    Magic: RTPS
    Protocol version: 2.4
    vendorId: 01.18 (Unknown)
    guidPrefix: 01120f1a28a4ce91d5d70bfc
    Default port mapping: domainId=0, participantIdx=0, nature=UNICAST_USERTRAFFIC
    submessageId: INFO_DST (0x0e)
    submessageId: ACKNACK (0x06)
        Flags: 0x03, Final flag, Endianness bit
        octetsToNextHeader: 28
        readerEntityId: 0x00000004 (Application-defined reader (no key): 0x000000)
        writerEntityId: 0x00000103 (Application-defined writer (no key): 0x000001)
        readerSNState
            bitmapBase: 1
            numBits: 2
            [Acknack Analysis: Lost samples 1, 2 in range [1,2]]
            bitmap: 11
        Count: 0
tot0k commented 10 months ago

Hi,

I've got the same problem, and was able to reproduce it using the minimal reproducible example given by @CraigPersonal100 .

Creating DomainParticipant
Creating subscriber
Creating publisher
Starting Publisher thread
Starting Subscriber thread
Publishing: 0
Publishing: 1
Publishing: 2
Received: 2
Publishing: 3
Received: 3
Publishing: 4
Received: 4
Publishing: 5
Received: 5
Publishing: 6
Received: 6

In the packet capture, we cannot see the ACKNACK message.

Tested with following Cargo.toml

[package]
name = "rdds-301"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rustdds = "0.8.5"
serde = "1.0.190"

Note that the problem seems solved in latest RustDDS version "0.8.5" ( d38a1bce77f009d6c61ad63923e1b58f0d9bb002 ):

Creating DomainParticipant
Creating subscriber
Creating publisher
Starting Publisher thread
Starting Subscriber thread
Publishing: 0
Publishing: 1
Publishing: 2
Publishing: 3
Received: 0
Received: 1
Received: 2
Received: 3
Publishing: 4
Received: 4
Publishing: 5
Received: 5

With following packet capture. rdds-301-master.zip

We don't see the ACKNACK message either, but we receive the samples !

SelimV commented 10 months ago

So to clarify, the release 0.8.5 does not work, but up-to-date master does? In that case we suspect that the commits https://github.com/jhelovuo/RustDDS/commit/9f6f2883fc835fdf63180f37125ef35420e9a275 and https://github.com/jhelovuo/RustDDS/commit/737bb695e4a981f966854f617e29f6b9026a9779 might fix it, since they rework how missing data is sent.

As the ACKNACK messages are in this case unicast from the participant to itself, to see them you probably have to enable loopback capture, and if you are using WSL, install and run Wireshark in WSL: Screenshot 2023-10-26 150704

tot0k commented 10 months ago

Yes, I ran @CraigPersonal100's example on latest commit on master, and it works.

Thanks for the loopback tips, here are the reproduced packet capture on 0.8.4 and master, with loopback activated, if one want to investigate further.

rdds-301_lo.zip

I finally noticed that the problem described in this issue is not exactly the same I've got on my project. I might open an issue later (more related to https://github.com/jhelovuo/RustDDS/issues/255 ) if I come up with a reproducible example before finding out that my shitty code is the cause.

EDIT: Created issue #302

jhelovuo commented 10 months ago

Fixes in the latest master discussed above are now released as 0.8.6.