eclipse-zenoh / zenoh

zenoh unifies data in motion, data in-use, data at rest and computations. It carefully blends traditional pub/sub with geo-distributed storages, queries and computations, while retaining a level of time and space efficiency that is well beyond any of the mainstream stacks.
https://zenoh.io
Other
1.5k stars 160 forks source link

Failover brokering issue #498

Open YuanYuYuan opened 1 year ago

YuanYuYuan commented 1 year ago

Describe the bug

Even when the failover_brokering option is utilized, transmission failure can still occur due to the disconnection between peers that are connecting to a router.

To reproduce

Topology

flowchart TD
    A(zenohd \n router mode \n tcp/127.0.0.1:7450)
    B(z_pub_modified \n peer mode \n tcp/127.0.0.1:7452)
    C(z_sub \n peer mode \n tcp/127.0.0.1:7451)
    B -- connect --> A
    C -- connect --> A

[async_std::main]

async fn main() -> Result<()> { let connect_endpoint = vec!["tcp/127.0.0.1:7450".parse()?]; let listen_endpoint = vec!["tcp/127.0.0.1:7452".parse()?]; let peer_config = { let mut config = Config::default(); config.set_mode(Some(WhatAmI::Peer)).unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config.connect.set_endpoints(connect_endpoint).unwrap(); config.listen.set_endpoints(listen_endpoint).unwrap(); config }; let runtime = Runtime::new(peer_config).await.unwrap(); let _manager = runtime.manager(); let session = zenoh::init(runtime.clone()).res_async().await?; let _receiver = zenoh::scout(WhatAmI::Peer, Config::default()) .res() .await .unwrap();

async_std::task::sleep(Duration::from_secs(1)).await;
dbg!(session
    .info()
    .routers_zid()
    .res_async()
    .await
    .collect::<Vec<ZenohId>>());
dbg!(session
    .info()
    .peers_zid()
    .res_async()
    .await
    .collect::<Vec<ZenohId>>());

for cnt in 0..30 {
    dbg!(runtime.manager().get_transports());
    session.put("demo/example/put", "test").res_async().await?;
    for trans in runtime.manager().get_transports() {
        if 5 <= cnt && cnt <= 10 && trans.get_whatami()? == WhatAmI::Peer {
            trans.close().await?;
        }
    }
    async_std::task::sleep(Duration::from_secs(1)).await;
}
Ok(())

}



### System info

- OS: Arch Linux x86_64
- Kernel: 6.3.1-arch1-1
- CPU: Intel i5-10300H (8) @ 4.500GHz
OlivierHecart commented 1 year ago

Failover brokering has been designed to forward data between peers that cannot directly connect with each other. But it is not able at this time to forward data between peers that can connect to each other but loose connectivity.