jhelovuo / RustDDS

Rust implementation of Data Distribution Service
Apache License 2.0
316 stars 65 forks source link

Durable writer only keeps/sends samples written after some time #295

Closed samcarey closed 11 months ago

samcarey commented 11 months ago

I would expect a durable writer with history set to "KeepAll" to preserve all samples written to it and send them to late joiners, but it seems that one has to wait some time after initialization before samples won't be silently dropped.

Minimal Reproducible example:

use anyhow::Result;
use futures::StreamExt;
use rustdds::{
    policy::{Durability, History, Reliability},
    DomainParticipant, QosPolicyBuilder, TopicKind,
};
use serde::{Deserialize, Serialize};
use smol::Timer;
use std::time::Duration;

#[derive(Debug, Serialize, Deserialize)]
struct TestStruct;

#[test]
fn simple_test() -> Result<()> {
    smol::block_on(async {
        let participant = DomainParticipant::new(0)?;
        let qos = QosPolicyBuilder::new()
            .reliability(Reliability::Reliable {
                max_blocking_time: rustdds::Duration::DURATION_ZERO,
            })
            .durability(Durability::TransientLocal)
            .history(History::KeepAll)
            .build();
        let topic = participant.create_topic(
            "test".to_string(),
            "TestStruct".to_string(),
            &qos,
            TopicKind::NoKey,
        )?;
        let subscriber = participant.create_subscriber(&qos)?;
        let publisher = participant.create_publisher(&qos)?;
        let reader = subscriber.create_datareader_no_key_cdr::<TestStruct>(&topic, None)?;
        let mut datareader_stream = reader.async_sample_stream();

        for millis in (0..2500).step_by(200).collect::<Vec<_>>().iter().rev() {
            println!("Creating writer and waiting {millis} milliseconds before writing");
            let writer = publisher.create_datawriter_no_key_cdr(&topic, None)?;
            Timer::after(Duration::from_millis(*millis)).await;
            writer.async_write(TestStruct, None).await?;
            datareader_stream.select_next_some().await?;
        }
        Ok(())
    })
}

This produces the following output:

Creating writer and waiting 2400 milliseconds before writing
Creating writer and waiting 2200 milliseconds before writing
Creating writer and waiting 2000 milliseconds before writing
Creating writer and waiting 1800 milliseconds before writing
Creating writer and waiting 1600 milliseconds before writing
Creating writer and waiting 1400 milliseconds before writing

...and then it stops forever.

So you can see the reader never sees samples if they are written only 1400 milliseconds after the writer was created. By the way, I'm on Windows 11, using rustdds="0.8.5".

jhelovuo commented 11 months ago

Thanks for the Minimal Reproducible example code. That was a great help.

It seems that this is a genuine bug, which was an accidental consequence of some unrelated new feature development we did some months ago. We'll see what we can do about it.

jhelovuo commented 11 months ago

The latest master commit should fix this issue. Please test and share your findings here.

samcarey commented 11 months ago

The code completes successfully now! Thanks!!