StarRocks / starrocks

The world's fastest open query engine for sub-second analytics both on and off the data lakehouse. With the flexibility to support nearly any scenario, StarRocks provides best-in-class performance for multi-dimensional analytics, real-time analytics, and ad-hoc queries. A Linux Foundation project.
https://starrocks.io
Apache License 2.0
9.12k stars 1.82k forks source link

Aggregate model be uneven disk utilization #11744

Closed beanflower-bs closed 2 years ago

beanflower-bs commented 2 years ago

Steps to reproduce the behavior (Required)

  1. create agg model, partition by stat_date and distributed by key, all aggregate keys are a small number of dimension candidates

    CREATE TABLE `sal_eden_recommend_microvideo_nearby_feed_updown_agg` (
    `stat_date` date NULL COMMENT "yyyy-mm-dd fromat",
    `begin_time` char(20) NULL COMMENT "",
    `frequence` char(20) NULL COMMENT "",
    `layer_id` char(100) NULL COMMENT "",
    `experiment_id` char(100) NULL COMMENT "",
    `device_os` char(20) NULL COMMENT "",
    `gender` char(20) NULL COMMENT "",
    `video_time` double SUM NULL COMMENT "",
    `play_time` double SUM NULL COMMENT "",
    `play_completion_pv` bigint(20) SUM NULL COMMENT "",
    `click_pv` bigint(20) SUM NULL COMMENT "",
    `play_pv` bigint(20) SUM NULL COMMENT "",
    `follow_pv` bigint(20) SUM NULL COMMENT "",
    `comment_pv` bigint(20) SUM NULL COMMENT "",
    `like_pv` bigint(20) SUM NULL COMMENT "",
    `look_comment_pv` bigint(20) SUM NULL COMMENT "",
    `play_uv` bitmap BITMAP_UNION NULL COMMENT "",
    `follow_uv` bitmap BITMAP_UNION NULL COMMENT "",
    `comment_uv` bitmap BITMAP_UNION NULL COMMENT "",
    `like_uv` bitmap BITMAP_UNION NULL COMMENT "",
    `look_comment_uv` bitmap BITMAP_UNION NULL COMMENT "",
    `play_completion_uv` bitmap BITMAP_UNION NULL COMMENT ""
    ) ENGINE=OLAP
    AGGREGATE KEY(stat_date,begin_time,frequence,layer_id,experiment_id,device_os,gender)
    COMMENT "OLAP"
    PARTITION BY RANGE(`stat_date`)
    (PARTITION p0 VALUES [('0000-01-01'), ('2022-09-13')),
    PARTITION p20220913 VALUES [('2022-09-13'), ('2022-09-14')),
    PARTITION p20220914 VALUES [('2022-09-14'), ('2022-09-15')),
    PARTITION p20220915 VALUES [('2022-09-15'), ('2022-09-16')))
    DISTRIBUTED BY HASH(layer_id,experiment_id,device_os,gender) BUCKETS 32
    PROPERTIES (
    "replication_num" = "3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.time_zone" = "Asia/Shanghai",
    "dynamic_partition.start" = "-14",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "8",
    "dynamic_partition.replication_num" = "3",
    "in_memory" = "false",
    "storage_format" = "DEFAULT"
    );
  2. streaming load data to StarRocks by flink-doris-connector

  3. some message

    nums of task: 7
    qps for all tasks: 20000
    nums of be: 7
    memory of every be: 60G

Expected behavior (Required)

  1. Disk io is evenly distributed
  2. The task is running normally

Real behavior (Required)

  1. Only two machines are working 截屏2022-09-28 上午11 23 39

  2. Tasks fail frequently and restart

    
    Caused by: org.apache.flink.util.SerializedThrowable: Writing records to Doris failed.
    at org.apache.flink.connector.doris.sink.DorisSinkFunction.checkFlushException(DorisSinkFunction.java:307) ~[flink-connector-doris_2.11-momo-1.14.3.jar:momo-1.14.3]
    at org.apache.flink.connector.doris.sink.DorisSinkFunction.flush(DorisSinkFunction.java:172) ~[flink-connector-doris_2.11-momo-1.14.3.jar:momo-1.14.3]
    at org.apache.flink.connector.doris.sink.DorisSinkFunction.lambda$open$0(DorisSinkFunction.java:105) ~[flink-connector-doris_2.11-momo-1.14.3.jar:momo-1.14.3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_121]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_121]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_121]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_121]
    ... 1 more

Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Doris Stream load failed, load result={ "TxnId": 22245628, "Label": "81aef16f-fe94-44ce-9167-2fa477abda17_1664262491507", "Status": "Publish Timeout", "Message": "Publish timeout. The data will be visible after a while", "NumberTotalRows": 31, "NumberLoadedRows": 31, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 14156, "LoadTimeMs": 7714, "BeginTxnTimeMs": 121, "StreamLoadPutTimeMs": 35, "ReadDataTimeMs": 0, "WriteDataTimeMs": 56, "CommitAndPublishTimeMs": 0 } at org.apache.flink.connector.doris.sink.DorisSinkFunction.flush(DorisSinkFunction.java:187) ~[flink-connector-doris_2.11-momo-1.14.3.jar:momo-1.14.3] at org.apache.flink.connector.doris.sink.DorisSinkFunction.lambda$open$0(DorisSinkFunction.java:105) ~[flink-connector-doris_2.11-momo-1.14.3.jar:momo-1.14.3] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_121] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_121] ... 1 more



### StarRocks version (Required)
 - You can get the StarRocks version by executing SQL `select current_version()`
 - 2.2.0
imay commented 2 years ago

@beanflower-bs you should use starrocks-connector-for-apache-flink

beanflower-bs commented 2 years ago

@beanflower-bs you should use starrocks-connector-for-apache-flink

i will try it