eclipse-iceoryx / iceoryx2

Eclipse iceoryx2™ - true zero-copy inter-process-communication in pure Rust
https://iceoryx.io
Apache License 2.0
507 stars 23 forks source link

Publisher does not always follow `UnableToDeliverStrategy::Block` strategy #305

Closed appcypher closed 1 month ago

appcypher commented 1 month ago

Required information

Operating system:

Rust version:

rustc 1.79.0 (129f3b996 2024-06-10)

Cargo version:

cargo 1.79.0 (ffa9cf99a 2024-06-03)

iceoryx2 version:

commit 728df35

Observed result or behaviour:

Even with UnableToDeliverStrategy::Block set, safe_overflow disabled and a limited buffer size, a publisher does not always block. For example, it does not block:

The publisher blocks when there is a subscriber present when buffer gets filled. It also remains blocked when you remove the last subscriber before it consumes its buffer.

BTW, I made sure subscriber_max_buffer_size on publisher side as well.

Code

Expand to See Code ```rust use std::time::Duration; use iceoryx2::prelude::*; fn main() -> anyhow::Result<()> { // Read the first argument let arg = match std::env::args().nth(1) { Some(arg) => arg, None => { eprintln!("Usage: test_pub_queue "); std::process::exit(1); } }; if arg == "sub" { subscriber()?; } else if arg == "pub" { publisher()?; } Ok(()) } const CYCLE_TIME: Duration = Duration::from_secs(3); const BUFFER_SIZE: usize = 5; fn subscriber() -> anyhow::Result<()> { println!("Subscriber"); let node = NodeBuilder::new().create::()?; let service = node .service_builder(&"test/path".try_into()?) .publish_subscribe::() .max_publishers(1) .max_subscribers(1) .subscriber_max_buffer_size(BUFFER_SIZE) .enable_safe_overflow(false) .open_or_create()?; let subscriber = service .subscriber_builder() .create()?; while let NodeEvent::Tick = node.wait(CYCLE_TIME) { while let Some(sample) = subscriber.receive()? { println!("Received: {:?}", sample.payload()); } } Ok(()) } fn publisher() -> anyhow::Result<()> { println!("Publisher"); let node = NodeBuilder::new().create::()?; let service = node .service_builder(&"test/path".try_into()?) .publish_subscribe::() .max_publishers(1) .max_subscribers(1) .subscriber_max_buffer_size(BUFFER_SIZE) .enable_safe_overflow(false) .open_or_create()?; let publisher = service .publisher_builder() .unable_to_deliver_strategy(UnableToDeliverStrategy::Block) .create()?; let mut count = 0; while let NodeEvent::Tick = node.wait(CYCLE_TIME / 10) { let sample = publisher.loan_uninit()?; let sample = sample.write_payload(count); sample.send()?; println!("Sent: {}", count); count += 1; } Ok(()) } ``` ```toml [dependencies] iceoryx2 = { git = "https://github.com/eclipse-iceoryx/iceoryx2", rev = "728df35", features = [ "logger_tracing", ] } ```

Expected result or behaviour:

I expected the publisher to always block when buffer is filled - based on doc found in code: https://github.com/eclipse-iceoryx/iceoryx2/blob/728df35da2b99bb47cd5d0221d8f7a7385edf075/iceoryx2/src/service/port_factory/publisher.rs#L73-L81

Conditions where it occurred / Performed steps:

elfenpiff commented 1 month ago

You are entirely right with this observation, and I made it behave intentionally like this.

The problem this feature shall solve is that a subscriber is missing messages since the publisher is too fast and the buffer is full.

But when there is no subscriber, there is no buffer. Maybe the terminology is a bit unclear here, but the buffer mentioned in the documentation was the subscriber's buffer. Every subscriber has its own buffer.

It also remains blocked when you remove the last subscriber before it consumes its buffer.

This is a bug and must be solved.

If you start the publisher without a subscriber, the publisher never blocks

Intended behavior, without a subscriber there is no need to block since there is no buffer.

If you kill the last subscriber before the publisher is done filling the buffer, the publisher never blocks

Intended behavior.

elBoberido commented 1 month ago

Maybe we could use a similar terminology like with iceoryx1. It's called ConsumerTooSlowPolicy on the publisher side and QueueFullPolicy on the subscriber side

appcypher commented 1 month ago

But when there is no subscriber, there is no buffer. Maybe the terminology is a bit unclear here, but the buffer mentioned in the documentation was the subscriber's buffer. Every subscriber has its own buffer.

Alright that makes sense. Just after typing the issue, I realised that could be the case. Is there [going to be] a separate strategy for detecting when no subscriber is present? To prevent sending messages to the void?

elfenpiff commented 1 month ago

I have not yet thought of this use case, but we have already an API available to handle this issue:

let service = node
    .service_builder(&"My/Funk/ServiceName".try_into()?)
    .publish_subscribe::<TransmissionData>()
    .open_or_create()?;
if service.dynamic_config().number_of_subscribers() > 0 {
  // send data
}

But I admit it is a bit hidden and not as straight forward as the iceoryx1 API with Publisher::has_subscribers() -> bool. The reason is that in iceoryx1 the service was somehow hidden inside the Publisher but with iceoryx2 it is an explicit object. And this object can tell you a lot about its state via the static_config() and dynamic_config() methods.

Another approach we want to implement is, that a service emits events whenever its state (see: dynamic_config()) changes. So the sending process could wait until the event was signaled that there are subscribers and then starts publishing. Today you could also realize this but you have to do this explicitly by creating an event service and send a notification when the process has created a subscriber. Events can have multiple event_ids so you could define your event_id for new subscriber, subscriber disconnected, new publisher, and so on.

appcypher commented 1 month ago

I have not yet thought of this use case, but we have already an API available to handle this issue:

This should work in my case.

Another approach we want to implement is, that a service emits events whenever its state (see: dynamic_config()) changes. So the sending process could wait until the event was signaled that there are subscribers and then starts publishing.

Cool. Will make the API nicer for use cases like this.

I'm closing this issue since I can make the publisher block if there is no one to deliver to using service.dynamic_config().number_of_subscribers()