eclipse-iceoryx / iceoryx

Eclipse iceoryx™ - true zero-copy inter-process-communication
https://iceoryx.io
Apache License 2.0
1.6k stars 373 forks source link

Possibility to block the publisher when subscriber queue is full #615

Open budrus opened 3 years ago

budrus commented 3 years ago

Brief feature description

Today our default is to use an "overflowing queue" for the subscribers. If the subscriber does not consume fast enough we start loosing samples. An option would be nice to block the publisher in this case for ensuring that no samples are lost.

Detailed information

The overflowing queue starts to drop the oldest sample in case of an overflow, so technically it behaves like a ring buffer. In many use cases this is fine as we want to have a "provide the last X samples" contract. E.g. if a subscriber is only interested in latest greatest data, they can set the queue size to 1 and we don't waste memory chunks with samples that are not interesting for the subscriber. We often also do not want to have an interference from a subscriber back to a publisher. So if the subscriber is not fast enough to consume all samples solutions could be

  1. increase the frequency of the subscribing application if it operates in polling mode
  2. increase the queue size for the subscriber
  3. decrease the runtime for the subscribing application But there also might be use cases where it is fine to slow down the publisher to ensure that no data is lost in our system. The solution would be to block the publish() call when we detect a queue overflow until the subscriber popped samples and there is again a free slot in the queue. Sure, this has an influence on the publishing applications ans also other subscribers that are connected to this publisher. This is comparable to the DDS history QoS KeepAll. The normal behavior with our overflowing queue is comparable to the DDS history QoS KeepLastX

    ToDo

    When implemented implement the following integration tests:

    • [ ] Modified icedelivery where subscriber acquires no samples until publisher blocks. When publisher blocks press CTRL-c (for both sides)
    • [ ] Unsubscriber subscriber when publisher is in blocking push.
    • [ ] Destroy subscriber object when publisher is in blocking push.
    • [ ] Subscribe new subscriber when publisher is in blocking push.
    • [ ] Unsubscribe different subscriber when publisher is in blocking push with another subscriber.
    • [ ] Optimization in ChunkDistributor https://github.com/eclipse-iceoryx/iceoryx/pull/663#discussion_r606655415
    • [ ] Fix TriggerQueue https://github.com/eclipse-iceoryx/iceoryx/pull/663#discussion_r606653889
    • [x] Ctrl+C on an application with an publisher blocked by a slow subscriber doesn't shut down when a signal handler is installed; this is due to the while (!remainingQueues.empty()) in ChunkDistributor::deliverToAllStoredQueues which is not stopped when SIG_TERM has a custom signal handler
    • a Runtime::unblockShutdown could be implemented
    • this could call stop offer on all the publisher
    • it must be carefully checked what the stop offer call does since only a limited number of functions are allowed to be called in the signal handler (https://man7.org/linux/man-pages/man7/signal-safety.7.html)
    • [x] An application with a blocked publisher slows down the RouDi shutdown due to the 45s processKillDelay in RouDi::shutdown method
    • after m_prcMgr->requestShutdownOfAllProcesses(); RouDi has to make all the publisher stop offering so that the discovery loop can remove the subscriber queues from the ChunkDistributor of the publisher
budrus commented 3 years ago

Proposed solution

  1. Introduce a publisher option that allows subscribers to block this publisher if they have a queue overflow. Publisher can only be blocked if this is enabled
  2. Introduce a subscriber option that indicates that this subscriber does not want to use any samples. This will result in a rejecting and not an overflowing queue for this subscriber
  3. If a subscriber requests the publisher to block if the queue is full but the publisher does not allow this, we print a warning on connect or call the error handler (tbd)
  4. If the publisher supports blocking, the subscriber requests this and we have an overflow, the publisher has to wait and try again. The simplest solution would be a busy loop that tries to push until it succeeds, the advanced version would be to use our inter-process condition variable to wake up the publisher from a non-busy wait when there is again space in the queue. A good compromise for now could be something in between, like tiny sleeps and retries
  5. We extend iceperf to also measure the throughput that could be reached with the blocking behavior as this should guarantee that no samples are lost.
elBoberido commented 3 years ago

@budrus iceperf already does it more or less this way. One appliacation only sends data after it received data from the other, so we already have a blocking wait for the publisher.

budrus commented 3 years ago

@elBoberido That's somehow right. But only because we have the ping-pong back channel. For a throughput measurement I think other middleware doing a sent as fast as possible and check if everything was received and how much in a fixed time. So I would prefer to also have a setup close to what people do when testing iceoryx. This is send like crazy as fast as possible and see if there is any data lost and what throughput it has.

elBoberido commented 3 years ago

@budrus okay, with this approach there will almost certainly be a data loss when the publisher is not blocked since the queue sizes are way to small to hold enough samples even if the OS stops the subscriber only for a few milliseconds.

elfenpiff commented 3 years ago

I would suggest the following approach.

  1. adjust the trigger queue
    • add a template parameter to select the queue type (SoFi, FiFo etc.)
    • add a template policy to select the waiting strategy (e.g. semaphore, busy wait, condition variable)
    • implement the easiest policy for the trigger queue (more sophisticated waiting policies can come later)
    • integrate the trigger queue into the variant queue
    • adjust the trigger queue interface (add blockingPush, timedPush, tryPush as pendant to blockingPop, timedPop, tryPop
  2. use the trigger queue flavor in subscriber when option is set to I_WANT_IT_ALL
  3. use blocking push in publisher when option is not DONT_STOP_ME_NOW or is set to THE_SHOW_MUST_GO_ON
  4. Think about the fact that Freddy Mercury seems to know more about lock free programming then we do?!
ankitkk commented 3 years ago

This is great. Anything which doesn’t force or imply a threading model on clients is awesome.

budrus commented 3 years ago

Hackathon:

  1. extension of iceperf -> @elBoberido
  2. extension of publisher and subscriber options C++, extension of runtime and RouDi processMessage() -> @FerdinandSpitzschnueffler
  3. extension of publisher and subscriber options C, needs 2. -> @FerdinandSpitzschnueffler
  4. Extension of trigger queue, blocking behavior -> @elfenpiff
  5. Extension of constructSubscriber(), needs 4., 2.
  6. Discovery. Store reliable/best effort in BasePort and extension of PortManager discovery matching -> @mossmaurice
  7. Naming things -> @budrus
orecham commented 3 years ago

@budrus with an impossible task

budrus commented 3 years ago

@budrus with an impossible task

@ithier at least you realized that I got the hardest job

elBoberido commented 3 years ago

@budrus regarding the issue with stopping an application with a blocked publisher. I think there are two options

What do you think? Do you have another idea?

mossmaurice commented 3 years ago

@budrus regarding the issue with stopping an application with a blocked publisher. I think there are two options

* option 1

  * use a flag in the runtime set by the signal handler
  * use that flag in the keep alive thread to send a IPC message to RouDi
  * RouDi disconnects the publisher ports

I think I prefer option 1 as we should avoid to duplicate the publisher list. However, I have mixed feelings about this topic. It feels very hacky in a way. Would it be possible to only do this fix it on the release_1.0 branch and solve it on master altogether in #611 with our new concept for object creation in shared memory?

elBoberido commented 3 years ago

@budrus regarding the issue with stopping an application with a blocked publisher. I think there are two options

* option 1

  * use a flag in the runtime set by the signal handler
  * use that flag in the keep alive thread to send a IPC message to RouDi
  * RouDi disconnects the publisher ports

I think I prefer option 1 as we should avoid to duplicate the publisher list. However, I have mixed feelings about this topic. It feels very hacky in a way. Would it be possible to only do this fix it on the release_1.0 branch and solve it on master altogether in #611 with our new concept for object creation in shared memory?

Yes, it's kind of hacky but I wouldn't do it in only in the release_1.0 branch in order to keep the branches in sync for as long as possible and also to not keep this regression in master.

This is the make it work -> make it beautiful -> make it fast cycle ;)

budrus commented 3 years ago

@elBoberido @mossmaurice. I would also vote for option 1. I fear that ugly things could happen if we do another bookkeeping. having a runtime.shutdown() that sends a command over UDS and ends up in doing the things on RouDi side you made to solve the challenge their is maybe the best for now. So we have a bit of reuse. Setting something in an individual publisher to release it feels even more ugly and more ideas I do not have

elBoberido commented 3 years ago

@elBoberido @mossmaurice @MatthiasKillat @elfenpiff I think this runtime.shutdown() could also be used once we extend the WaitSet/Listener to be used without an explicit shutdown trigger. The runtime could then automatically register a signal handler for this and if the user wants a custom signal handler she must also call runtime.shutdown(). I think this could also be used to simplify our examples even more, like

int main() {
    auto& runtime = Runtime::init(...);
    Publisher<uint8_t> pub{...};
    while (!runtime.shutdownRequested()) {
        pub.publishCopyOf(42U);
        runtime.sleep(...); // this would be interruptible by runtime.shutdown()
    }
}

or with the listener

    auto& runtime = Runtime::init(...);
    Listener listener;
    Subscriber<uint8_t> sub{...};
    listener.attachEvent(sub, ...);
    runtime.wait(); // blocking wait which will be unblocked by runtime.shutdown()

What do you think? Shall I create an issue for this?

budrus commented 3 years ago

@elBoberido it's getting closer to while(ros::ok()) ;-) I like the idea. As you wrote, it should be optional if our runtime does the signal handling. E.g. if iceoryx is used in ROS we would not have the signal handler on our side but do a runtime.shutdown() in a place like rmw_shutdown() Go for it!

budrus commented 3 years ago

@elBoberido So would you propose to leave it for now for Almond and we create a new issue and close this one?

elBoberido commented 3 years ago

@budrus I would create the runtime.shutdown() method with option 1 for unblocking the application at shutdown and an issue for the other stuff like runtime.registerShutdownSignalHandler(), runtime.ok(), runtime.sleep() and runtime.wait().

I'd like to have the unblocking of the blocked publisher in 1.0 and all the other stuff is nice to have and can be implemented later on. This could also be a good first issue for new contributors.