risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.89k stars 570 forks source link

Discussion: allow users to control backfilling throughput of different streams involved in one query #14559

Open lmatz opened 8 months ago

lmatz commented 8 months ago

The Issue

Before memtable spilling and other backpressure-base mechanisms are implemented, OOM sometimes occurs in a join query during the stage of backfilling and OOM rarely occurs after backfilling.

It is suspected that the data ingestion patterns during and after backfilling make such a difference.

For example, during normal processing, it is almost always the case that the dimension table does not change too often while the fact table changes constantly.

case (1) A row from the fact table can produce at most one row in join, we don't expect that a large amount of intermediate results will be generated in the join operator.

case (2) A row from the dimension table can potentially produce a huge amount of intermediate results. Even worse, these results may be further amplified in the following joins if it is the multi-join query.

However, during backfilling, we ingest both fact tables and dimension tables as quickly as possible. We can expect the total bandwidth for source ingestion to be evenly split among all the tables. Sometimes, there could be multiple dimension tables. So the source throughput of the fact table is only 1/N. Therefore, we expect case (2) to happen a lot at this phase.

After backfilling, we expect that the fact table ingests the majority of data. Therefore, we expect that there are much more case (1) than case (2).

Why It's Worth Addressing

Although memtable spilling together with other backpressure-based mechanisms should have alleviated the problem a lot, it is still beneficial that we try to avoid causing the system into a stressful state and rely on mechanisms that act as the last resort to prevent OOM. Besides, these mechanisms may cost performance a lot if they are triggered often.

There is some interesting discussion of the topic metastable failure

Bronson et al. define the metastable failure state as the state of a permanent overload with an ultra-low goodput (throughput of useful work). they also define the stable state as the state when a system experiences a low enough load than it can successfully recover from temporary overloads, and the vulnerable state as the state when a system experiences a high load, but it can successfully handle that load in the absence of temporary overloads. A system experiences a metastable failure when it is in a vulnerable state and a trigger causes a temporary overload that sets off a sustaining effect—a work amplification due to a common-case optimization—that tips the system into a metastable failure state. The distinguishing characteristic of a metastable failure is that the sustaining effect keeps the system in the metastable failure state even after the trigger is removed.

reference: https://www.usenix.org/conference/osdi22/presentation/huang-lexiang

My take from this is that when the system is stuck in an extremely stressful state, the mechanism that used to work as expected under a normal state will fail or even fire backward.

Although no direct evidence shows that RW would trigger such kind of failures, it is better if we can avoid RW going into a stressful state and prevent such failures by small efforts.

Potential Solutions

Suppose we have a multi-join query that involves all the tables below: SCR-20240113-1al_70

we tend to ingest the data from those tables that have the smallest in-degrees, e.g. region and part in this case. After they have been ingested, we remove them from the graph and continue looking for the ones that have the smallest in-degrees.

Of course, the real-world query and dependency between tables may not be this clear to RW, and RW, as a system, is in no position to learn too much of this kind of information automatically from users' queries.

Therefore, it would better give control to users to determine the orderings.

Solution 1: Manual Flow Control of Individual Source

It only requires a primitive that RW has already had: flow control.

RW has SET RW_STREAMING_RATE_LIMIT to XXX already. By setting it to 0, it makes the source throughput of the next materialized view to be 0.

Then, users manually turn on the flow of each source. We need a way to control the throughput of each individual source operator involved in a query.

Users monitor how many rows have been ingested by querying RW. Either an RW catalog or a state table of sources can serve this functionality.

If one source is done, e.g. the number of ingested rows has passed a certain threshold (totally up to users), then the user can move on to the next source.

Since this is an advanced functionality that we don't expect to use frequently, the ease of use could be compromised.

Solution 2: Custom Backfilling Orders

Add a with option to create materialized view, e.g.

create materialized view X as 
select * from 
t1 join t2 join t3 join t4 
with ( backfilling_orders="t4:10000;t1:25000" )

It means that RW must ingest 10K rows from t4 before it can ingest from t1. t2 and t3 do not appear, which means their order does not matter. They will be ingested together after RW ingested 25000 rows from t1.

Comments

  1. Both actions are triggered based on the number of messages input by users. This is because RW may not be able to know the size of the upstream, e.g. when it is a source or a table that is still backfilling.

  2. Both of them are sort of an enhanced version of SET RW_STREAMING_RATE_LIMIT to XXX as they incorporate domain knowledge from users. The benefit is more stability without compromising the performance.

kwannoel commented 8 months ago

Solution 3: I think we also have upstream hummock kv estimates, we can use that to estimate the cardinality of upstream tables, and determine backfill ordering from there. Seems like a more general solution.

This interface can also be used to plan join ordering https://github.com/risingwavelabs/risingwave/issues/14140

It will be slightly more involved, so solution 2 can work in the meantime. We can support solution 2 first (solution 1 is already sort of supported, with a min throughput of 1).

lmatz commented 8 months ago

solution 1 is already sort of supported

you mean right now RW already can query how many rows a table has been ingested and flow control the source operator of each upstream independently?

kwannoel commented 8 months ago

solution 1 is already sort of supported

you mean right now RW already can query how many rows a table has been ingested and flow control the source operator of each upstream independently?

For source yes, we currently support rate limit.

CREATE SOURCE s (...)
CREATE SOURCE s2 (...)
create materialized view m1 as select * from s join s2 on ...;

Then we can alter source rate limit for s, s2 with risectl. We also want to support it via ALTER https://github.com/risingwavelabs/risingwave/issues/13803. But that's WIP.

lmatz commented 8 months ago

Oh I see, does it work for table or materialized view if they are the upstream of create materialized view m1?

kwannoel commented 8 months ago

Oh I see, does it work for table or materialized view if they are the upstream of create materialized view m1?

Yeah, those have associated rate limit as well. Which should be able to be altered via risectl currently.

lmatz commented 8 months ago

Great! I will write a doc about how it can mitigate OOM and boost performance.

lmatz commented 8 months ago

Solution 3: I think we also have upstream hummock kv estimates, we can use that to estimate the cardinality of upstream tables, and determine backfill ordering from there. Seems like a more general solution.

The backfilling order fundamentally depends on the functional dependency among the tables involved in the join, but the cardinality can be a good enough proxy to infer which tables are dimension table and fact table, and thus automatically determine the backfilling order.

kwannoel commented 8 months ago

Discussed offline with @lmatz: Rate limit can't handle the case where there's lots of historical data, since upstream rate limit does not control downstream's table scan throughput, it only affects the upstream update side.

We need to support modifying individual rate limit for each table scan to support this solution 1.

For now, the only solution is to rate limit the entire stream job's backfill, rather than individual table scans.

In terms of implementation, there's a few parts:

  1. Syntax: i. Can use martin's suggested syntax:
       create materialized view X as 
    select * from 
    t1 join t2 join t3 join t4 
    with ( backfilling_orders="t4:10000;t1:25000" )
  2. Semantics: User specifies a fragment (directly / indirectly via id or table name) to throttle. Then the stream scan for that fragment will be rate limited accordingly.
  3. Path to create an mview with a rate limit set for each fragment: i. Parse the syntax into a map between relation names and their rate limit. ii. When creating the stream table scan plan, check the map and update the rate limit accordingly.
lmatz commented 8 months ago

https://buildkite.com/risingwave-test/tpch-benchmark/builds/973 "Good" Event order: "nation,region,part,partsupp,supplier,customer,orders,lineitem" Grafana: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1705918992930&to=1705926378139&var-datasource=P2453400D1763B4D9&var-namespace=tpch-1cn-affinity-lmatz-good&var-instance=benchmark-risingwave&var-pod=All&var-component=All&var-table=All

https://buildkite.com/risingwave-test/tpch-benchmark/builds/972#018d30b2-b3a7-4f3c-9af3-34a1cc8d61d1 "Bad" Event order: "lineitem,orders,customer,supplier,partsupp,part,region,nation" Grafana: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1705918992930&to=1705926378139&var-datasource=P2453400D1763B4D9&var-namespace=tpch-1cn-affinity-lmatz-bad&var-instance=benchmark-risingwave&var-pod=All&var-component=All&var-table=All

Executed TPC-H Q5, Q8, and Q9 because they involve the most number of tables out of 8 tables in TPC-H benchmark.

~https://github.com/risingwavelabs/risingwave/assets/5791930/b2c63fa0-8301-4d7f-9879-d0d8aa99bae1~ ~https://github.com/risingwavelabs/risingwave/assets/5791930/f31cc3c9-0894-4d57-95ff-76d39f340bc5~ ~https://github.com/risingwavelabs/risingwave/assets/5791930/0231dcfc-88bb-4ccf-8b2b-10ddb0fc1017~

Surprisingly, the idea that fact table last should be always better is not true for Q8.

Investigating...

~https://github.com/risingwavelabs/risingwave/assets/5791930/b0cef3f5-2795-4f51-bae7-70a557ed83f6~

~https://github.com/risingwavelabs/risingwave/assets/5791930/5b83de27-3cf4-4de3-a523-f05817617175~

This experiment has a query runtime limit of 30 minutes, some query cannot run to the end, so not a good comparison object, e.g. some query do not consume all of the dimension tables, which didn't trigger the part of the most intensive computation.

lmatz commented 8 months ago

v1.6.0 scale factor 50 runtime limit 60 minutes

Fact Table Last: https://buildkite.com/risingwave-test/tpch-benchmark/builds/980#018d35bc-85d8-4e1a-898b-39aa62d88960 Fact Table First: https://buildkite.com/risingwave-test/tpch-benchmark/builds/981

Query Good Order Throught (K rows/s) Bad Order Throughput (K rows/s)
Q1 675 672
Q2 567 522
Q3 368 218
Q4 355 323
Q5 259 164
Q6 672 679
Q7 240 118
Q8 270 217
Q9 218 120
Q10 203 124
Q11 611 625
Q12 486 570
Q13 484 525
Q14 529 419
Q15 290 425
Q16 528 565
Q17 43 194
Q18 171 251
Q19 684 627
Q20 44 147
Q21 76 122
Q22 517
kwannoel commented 8 months ago

v1.6.0 scale factor 50 runtime limit 60 minutes

Fact Table Last: https://buildkite.com/risingwave-test/tpch-benchmark/builds/980#018d35bc-85d8-4e1a-898b-39aa62d88960 Fact Table First: https://buildkite.com/risingwave-test/tpch-benchmark/builds/981

Query Good Order Bad Order
Q1 675 672
Q2 567 522
Q3 368 218
Q4 355 323
Q5 259 164
Q6 672 679
Q7 240 118
Q8 270 217
Q9 218 120
Q10 203 124
Q11 611 625
Q12 486 570
Q13 484 525
Q14 529 419
Q15 290 425
Q16 528 565
Q17 43 194
Q18 171 251
Q19 684 627
Q20 44 147
Q21 76
Q22 517

What do the numbers mean? Throuhgput? Or duration

lmatz commented 8 months ago

throughput, I modified the original table

The observation is that fact table last not necessarily the best in terms of throughput, memory usage can be another story worth investigating because the purpose is to mitigate the OOM issue

so fact table last can still be a good heuristic, but I think we'd better still give users the control of orderings in the near term

github-actions[bot] commented 4 months ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.