risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.95k stars 574 forks source link

Discusssion: backpressure barrier injection #10755

Open hzxa21 opened 1 year ago

hzxa21 commented 1 year ago

Currently our system relies on the following invariant to hold in order to run steadily:

avg barrier latency < barrier injection interval

To ensure the above invariant, currently we have the backpressure mechanism to stop polling source and prefer polling barriers with no data among them (the so-called empty barrier) when there are barrier piped out in the source barrier channel. This mechnism works only if barrier latency of empty barrier < barrier injection interval.

However, there still exists some cases that barrier latency of empty barrier can be large:

Apart from continuing optimizing our executor and storage to lower the barrier latency of empty barrier, I think we also need a safeguard to backpressure barrier injection if barrier latency of empty barrier > barrier injection interval. Some ideas:

  1. Merge multiple empry barriers into one if there are too many barriers in the barrier channel
  2. Adjust the barrier injection interval in the meta side if there are too many inflight barriers
chenzl25 commented 1 year ago

Dynamic filter will do storage I/O on seeing a barrier even though it is an empty barrier

Dynamic filters with empty barriers won't perform storage I/O, but temporal filter will, since the NOW() executor will continuously produce data which cause the dynamic filter to need to keep performing a search for each barrier.

BugenZhao commented 1 year ago

IMO, the empty epoch appears when there are available data in the connector, only if the interval of the source executor being scheduled by the runtime is greater than the barrier injection interval. This happens if...

The source executor being back-pressured means that there are depth * per-exchange-barrier-limit barriers in flight, or a barrier latency of depth * per-exchange-barrier-limit * barrier-interval. If the next barrier with empty epoch still encounters a latency larger than this value, we'll get a live lock on consuming the external data.

Based on this, it appears that a latency greater than the barrier injection interval does not always cause a problem since different actors can be pipelined. Instead, we should also take the graph depth into consideration.

fuyufjh commented 1 year ago

avg barrier latency < barrier injection interval

The root cause of this invariant is feat(streaming): limit concurrent barriers in exchange based on permits #9427. I don't think it is 100% reasonable and I am considering a better approach.


Update(july-7): Hmmm... I am now becoming suspicious about the "root cause" I said. 😆

Think from another perspective: consider barrier as a kind of event, what happened here is quite straight-forward: the consuming speed of events is slower than the generation speed, then, events piled. What a familiar story!

Therefore, what we can do is optimizing the speed of barrier processing on each operator, as mentioned in the PR description.

hzxa21 commented 1 year ago

Based on this, it appears that a latency greater than the barrier injection interval does not always cause a problem since different actors can be pipelined. Instead, we should also take the graph depth into consideration.

Hmm... I think barrier latency of empty barrier > barrier injection interval will always cause a problem even though the different actor can be pipelined because this is the lower bound of the barrier latency we will see in the streaming graph. If the lower bound latency is still larger than barrier injection interval, barriers will be piled up in source for sure.

github-actions[bot] commented 1 year ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.