eclipse-zenoh / zenoh

zenoh unifies data in motion, data in-use, data at rest and computations. It carefully blends traditional pub/sub with geo-distributed storages, queries and computations, while retaining a level of time and space efficiency that is well beyond any of the mainstream stacks.
https://zenoh.io
Other
1.41k stars 146 forks source link

NBFT reliability #669

Open milyin opened 7 months ago

milyin commented 7 months ago

Describe the feature

NBFT means Non-Blocking Fault-Tolerance

Based on PublicationCache and QueryingSubscriber. Newer implementation is in branch https://github.com/eclipse-zenoh/zenoh/pull/401 (@OlivierHecart, is it correct one?)

Goals:

Subscriber when started pulls history of packets from publisher

Subscriber when detects missing packet makes query to publisher to restore them (by sequence number)

Subscriber keeps order of packets when giving them to user (holds onguing packets until query is done)

Subscriber detects new publishers and pulls history from them (through liveliness)

Subscriber makes sure that packets are not repeated (existing impl. have this problem: same packet from query and sub)

Pulling history, missing packets detection, late-comers publisher detection are orthogonal and can be enabled/disabled when constructing

Additional requirements:

milyin commented 7 months ago

New requirement added: allow to use unbounded channel in PublicationCache

lomac — Yesterday at 8:57 AM Hello, I am getting some deadlock on publication. Is this an expected behavior? Here is my setup: 1°) one zenoh session in one process (so, local only) 2°) declaring some publishers and subscribers 3°) async I have some tokio tasks and they are doing some subscriber.recv_async() and some session.put(xxx).await. If I burst the system too much, I am deadlock: tokio console show some tasks always busy (should not be the case since everything is async). I don't personnaly use any Mutex or locking mechanism. If I stop the debugger and go on the threads to find the root cause, I see this: -flume channel is probably full and blocked -I can never recover because all my thread are doing this so executor has no more thread anymore to empty the flume channels

Basically, I am using a session.put(xxx) in an async way but it is blocking nevertheless (because of local subscriber and zenoh try to put the sample in its local flume channel). It seems declaring my subscribers with an unbounded flume channel solve the problem (I cannot reproduce the deadlock, but I am not 100% sure this is the cause). Does my analysis seems right? What would be the right way to solve the deadlock? Use flume unbounded channels? Other?

Pierre 'Fox' Avital — Yesterday at 9:30 AM Hi there,

Your analysis is correct. Specifically, the core of the issue is that Zenoh doesn't have a way to run asynchronous callbacks, so pushing onto the channel can only be done synchronously.

With a bounded channel, if the channel is full, the synchronous implementation has no choice but to block the thread (either parking it, or looping on some condition), since it can't return before the operation succeeds the receiver dies out, needing another task to be performed before it can return in both cases (reading from the channel or destroying its receiver end). With an unbounded one, the deadlock is prevented because the sender task can resolve the issue all on its own: just grow the channel, push the message, and return.

While we are working at supporting asynchronous callbacks, I think using unbounded channels is the correct approach here regardless: since we run callbacks on the read task to minimize latency, the trade-off is that a callback (even asynchronous) not returning will block the reading task, preventing other messages on the same link from being read until the callback returns. Channels let you do the inverse trade-off of allowing parallel processing of messages at the cost of a bit of latency, so we do expect them to be often used by default.

Conclusion: yes, unbounded channels will solve your issue, and if you expect bursty traffic, are probably the most well-suited channels :)

My personal opinion on bounded channels is that they tend to be overused. Mostly because rather than to apply backpressure (the reason why they're Zenoh's default, and the only valid use for them IMO), they're often used as a premature optimization or chasing the delusion that some people have that this will let them control how much memory their programs use more finely (usually while pushing into an ever growing hashmap somewhere else to use as cache :p) TL;DR: your analysis is right and unbounded channels are the solution to everything :)

lomac — Today at 10:09 AM hi @Pierre 'Fox' Avital new day, new problem 😉 For my subscribers declaration, I now use flume::unbounded, no problem here. But, I am also using PublicationCache: zenoh_ext::ArcSessionExt::declare_publication_cache(&session, &key_expr) .history(1) .res() .await .unwrap();

It seems I have no control on the internal flume channel and the publication cache use the bounded version from the DefaultHandler. Is there a possibility for the Zenog Team to add a way to use unbounded channels also there?

OlivierHecart commented 7 months ago

Regarding last comment, I think it would be even better to implement the publication cache with a callback subscriber rather than a flume subscriber. It would imply some mutex but could be better overall.