risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.88k stars 569 forks source link

why source throughput drops down to zero after 5 mins? is this a bug or a performance tunning problem? #17471

Open bymzy opened 3 months ago

bymzy commented 3 months ago

Describe the bug

image

Error message/log

No response

To Reproduce

CREATE MATERIALIZED VIEW  cleaned_meter_data AS
   select marked_data.*, 
           CASE
            WHEN marked_data.anomaly = 1 THEN 
                CASE
                    WHEN marked_data.prev_power IS NOT NULL AND marked_data.prev_power_2 IS NOT NULL AND marked_data.prev_power_3 IS NOT NULL THEN 
                        CASE 
                            WHEN marked_data.prev_power <= marked_data.run_cap * 5 AND marked_data.prev_power <= marked_data.rated_cap * 2 THEN marked_data.prev_power
                            WHEN marked_data.prev_power_2 <= marked_data.run_cap * 5 AND marked_data.prev_power_2 <= marked_data.rated_cap * 2 THEN marked_data.prev_power_2
                            WHEN marked_data.prev_power_3 <= marked_data.run_cap * 5 AND marked_data.prev_power_3 <= marked_data.rated_cap * 2 THEN marked_data.prev_power_3
                            ELSE NULL
                        END
                    ELSE NULL
                END
            WHEN marked_data.anomaly = 2 THEN 0
            WHEN marked_data.anomaly = 3 THEN 
                CASE
                    WHEN marked_data.prev_power IS NOT NULL THEN marked_data.prev_power
                    ELSE NULL
                END
            ELSE marked_data.power
         END AS fitted_power
         FROM
   (SELECT 
        da.cons_id,
        da.meter_id,
        md.datatime,
        md.data as power,
        da.run_cap,
        da.rated_cap,
        LAG(md.data) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power,
        LAG(md.data, 2) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power_2,
        LAG(md.data, 3) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power_3,
        CASE
            WHEN md.data IS NULL THEN 3 
            WHEN md.data < 0 THEN 2 
            WHEN md.data > da.run_cap  * 5 OR md.data > da.rated_cap  * 2 THEN 1 
            ELSE 0 
        END AS anomaly
    FROM device_accounts da 
    JOIN meter_data md ON md.meterId = da.meter_id
    where md.itemId = '2004_5'
   ) marked_data;

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

bymzy commented 3 months ago

i make a window over partition by.

any suggestion ?

lmatz commented 3 months ago

It can be bottlenecked on CPU or memory resources

is CPU fully utilized, what's the usage of memory?

are there many cache misses, how about remote I/O rate and its bandwidth?

bymzy commented 3 months ago

image image

the cpu and memory is not fully utilized。should i adjust parallelism or something? @lmatz

lmatz commented 3 months ago

is it possible to generate a snapshot of all the dashboards on Grafana?

By default, the "streaming_parallelism" is set to the total number of CPUs. Wonder if you changed this session variable before?

bymzy commented 3 months ago
        LAG(md.data) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power,
        LAG(md.data, 2) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power_2,
        LAG(md.data, 3) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power_3,

well, i found something about the window . does this means that the window size is unbounded? @lmatz

if it is , then that maybe the key point.

lmatz commented 3 months ago

I believe by default

The default framing option is RANGE UNBOUNDED PRECEDING, which is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. With ORDER BY, this sets the frame to be all rows from the partition start up through the current row's last ORDER BY peer.

https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS

cc @stdrc, wonder if RW's semantics of over window functions follow Postgres completely

stdrc commented 3 months ago
     LAG(md.data) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power,
        LAG(md.data, 2) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power_2,
        LAG(md.data, 3) OVER (PARTITION BY md.meterId ORDER BY md.datatime) AS prev_power_3,

well, i found something about the window . does this means that the window size is unbounded? @lmatz

No. lags with constant offsets are not "unbounded".

However, by default we do cache all entries of a partition when the partition key is cached. So basically there're two levels of cache here: First, partition key -> partition (contains partition entries cache); second, inside partition entries cache order key + pk -> row. By default the second level contains all entries in cached partitions.

Two things worth trying here:

  1. Change the over window cache policy:

    set rw_streaming_over_window_cache_policy = recent; -- or recent_first_n or recent_last_n, depending on your workload pattern
  2. Create a base MV with order by <partition_key>, <order_key>, and create the desired MV on top of that. So that rows input to OverWindow operator should be ordered and can achieve better performance.

Hopefully these methods can help reduce the jaggies.

bymzy commented 3 months ago
rw-main risingwave_storage::hummock::event_handler::hummock_event_handler: cannot acquire lock for all read version pending_count=1 total_count=192

i found many log like this, what does this mean? and i found that minio gets quite busy, but RW not, problem of IO? image

@stdrc

stdrc commented 3 months ago
rw-main risingwave_storage::hummock::event_handler::hummock_event_handler: cannot acquire lock for all read version pending_count=1 total_count=192

i found many log like this, what does this mean? and i found that minio gets quite busy, but RW not, problem of IO? image

Not quite familiar with storage components. @Li0k @hzxa21, does this imply something?

hzxa21 commented 3 months ago
rw-main risingwave_storage::hummock::event_handler::hummock_event_handler: cannot acquire lock for all read version pending_count=1 total_count=192

i found many log like this, what does this mean? and i found that minio gets quite busy, but RW not, problem of IO? image

Not quite familiar with storage components. @Li0k @hzxa21, does this imply something?

No. This log is expected and is not relevant for the zero source throughput issue. I guess this log is added because we want to measure how frequent the contention happens in our implementation. Should we make this a debug log? cc @wenym1

Can you also share the "Barrier Sync Latency" and "Barrier Inflight Latency" panels? Also, can you follow the instruction here and share the await tree dump?

I suspect the reason why source throughput drops to 0 is because there is a backpressure somewhere. When there is severe backpressure in RisingWave, source will stop pulling data from upstream.

bymzy commented 3 months ago
rw-main risingwave_storage::hummock::event_handler::hummock_event_handler: cannot acquire lock for all read version pending_count=1 total_count=192

i found many log like this, what does this mean? and i found that minio gets quite busy, but RW not, problem of IO? image

Not quite familiar with storage components. @Li0k @hzxa21, does this imply something?

No. This log is expected and is not relevant for the zero source throughput issue. I guess this log is added because we want to measure how frequent the contention happens in our implementation. Should we make this a debug log? cc @wenym1

Can you also share the "Barrier Sync Latency" and "Barrier Inflight Latency" panels? Also, can you follow the instruction here and share the await tree dump?

I suspect the reason why source throughput drops to 0 is because there is a backpressure somewhere. When there is severe backpressure in RisingWave, source will stop pulling data from upstream.

sure. back pressure is quite high i guess. @hzxa21 image

image

image

bymzy commented 3 months ago

await tree dump.txt

@hzxa21 here is the await tree dump

bymzy commented 3 months ago

and fragment 22 looks like image

bymzy commented 3 months ago

More information. i found the executor cache memory gets up and down, up and down , repeatly. And every time it gets down, seems the executor write data to minio.

Does this means that the executor is waiting for something, or some timer mzybe?in my case, the executor (OverWindow) is waiting for input data? But i got so many messages not consumed in kafka , this is weird. @hzxa21

image

stdrc commented 2 months ago

More information. i found the executor cache memory gets up and down, up and down , repeatly. And every time it gets down, seems the executor write data to minio.

Does this means that the executor is waiting for something, or some timer mzybe?in my case, the executor (OverWindow) is waiting for input data? But i got so many messages not consumed in kafka , this is weird. @hzxa21

image

Have you tried to set rw_streaming_over_window_cache_policy = recent_last_n;?

github-actions[bot] commented 1 day ago

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean. Don't worry if you think the issue is still valuable to continue in the future. It's searchable and can be reopened when it's time. 😄