reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.19k forks source link

Strictly define operator Fusion contract for Reactor subscribers (internal and user-made) #2636

Open OlegDokuka opened 3 years ago

OlegDokuka commented 3 years ago

At the moment we have unconsolidated behaviors in the operators' fusion, which exposed how downstream (a.k.a Queue Drainer) notifies upstream (a.k.a Queue Source) about completion on doing any operators with a queue. This consolidation is even more important in the light when all the elements from the queue MUST be discarded.

This issue stands for providing a clear contract between upstream and downstream in case of fusion between them:

If Fusion established then:

  1. The Queue source must never call methods poll or clear on its queue until it is notified that downstream is not working with that queue.
  2. The Queue source upon reception of the clear call, must discard all the elements from the queue if the queue is known as finite
  3. The Queue drainer must notify the queue source via the queueSubscription.clear method that it is done with all operation on that queue and will never call the poll method anymore
  4. The Queue drainer must call the queueSubscription.clear only once and ensure it is never called concurrently.
  5. The Queue drainer must call the queueSubscription.clear after reception of any of the terminal signals which are (ON_ERROR, ON_COMPLETE, and CANCEL)

if Fusion not established then:

  1. The Queue source must manage the queue on its own and is responsible for terminating all interaction with that queue upon reception of any of the terminal signals which are (ON_ERROR, ON_COMPLETE, and CANCEL) and discarding all the elements from it if the queue is known as finite

By consolidating the behaviors, we will be able to resolve a list of issues related to cases when downstream try to blindly discard elements from the upstream's queue without full context:

  1. FluxIterable fused with any downstream led to infinity looping if the source iterable is infinite (https://github.com/rsocket/rsocket-java/issues/992)
  2. FluxStream fused with any downstream let to infinity looping if the source is an infinite stream.
  3. FluxGenerate fused with any downstream let to infinity looping if the generate is infinite.
simonbasle commented 3 years ago

This needs an experiment. Can we relax the safeguards in operators, and instead safe-wrap downstream Subscribers if they're not from the vanilla operators CoreSubscriber?

If yes, can we extend that to non-vanilla CoreSubscribers?

chemicL commented 10 months ago

Related effort to strictly define fusion for processors: #2626