StarRocks / starrocks

The world's fastest open query engine for sub-second analytics both on and off the data lakehouse. With the flexibility to support nearly any scenario, StarRocks provides best-in-class performance for multi-dimensional analytics, real-time analytics, and ad-hoc queries. A Linux Foundation project.
https://starrocks.io
Apache License 2.0
8.89k stars 1.78k forks source link

Data lost in kafka routine load internal error #46960

Open sanikoyes opened 4 months ago

sanikoyes commented 4 months ago

Use the time_slice function to dynamically partition a table and use Kafka Routine Load to import data, an internal partition error may occur, and the data imported in this batch will be lost (the system will not retry importing this batch of data again)

This bug appeared randomly

Steps to reproduce the behavior (Required)

  1. Table schema
    
    CREATE TABLE `ta_event_119` (
    `$part_event` varchar(128) NOT NULL COMMENT "",
    `$part_date` varchar(32) NOT NULL COMMENT "",
    `#event_id` varchar(128) NOT NULL COMMENT "",
    `#user_id` bigint(20) NULL COMMENT "",
    `#event_name` varchar(128) NULL COMMENT "",
    `#event_time` datetime NULL COMMENT "",
    ...
    `evaluation` varchar(2056) NULL COMMENT ""
    ) ENGINE=OLAP 
    DUPLICATE KEY(`$part_event`, `$part_date`)
    PARTITION BY time_slice(#event_time, 7, 'day', 'floor')
    DISTRIBUTED BY HASH(`$part_event`, `$part_date`) BUCKETS 9 
    PROPERTIES (
    "replication_num" = "3",
    "in_memory" = "false",
    "enable_persistent_index" = "true",
    "replicated_storage" = "true",
    "storage_medium" = "SSD",
    "storage_cooldown_ttl" = "6 months",
    "compression" = "LZ4"
    );

2. Routine load information
```sql
StarRocks > show routine load for data_etl_ta_event_119\G
*************************** 1. row ***************************
                  Id: 60890
                Name: data_etl_ta_event_119
          CreateTime: 2024-06-13 15:00:47
           PauseTime: NULL
             EndTime: NULL
              DbName: ta
           TableName: ta_event_119
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 3
       JobProperties: {"partitions":"*","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"","taskConsumeSecond":"15","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"3","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"data_etl_ta_event_119","currentKafkaPartitions":"0,1,2,3,4,5,6,7,8","brokerList":"kafka1:9092,kafka2:9092,kafka3:9092"}
    CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"data_etl_ta_event_119_12d5d11f-ac73-4c16-9225-2cabd5ad1d51"}
           Statistic: {"receivedBytes":931802259,"errorRows":1,"committedTaskNum":330,"loadedRows":693084,"loadRowsRate":2000,"abortedTaskNum":1,"totalRows":693085,"unselectedRows":0,"receivedBytesRate":3762000,"taskExecuteTimeMs":247638}
            Progress: {"0":"77271","1":"76938","2":"76942","3":"77207","4":"76625","5":"76969","6":"77322","7":"76834","8":"76968"}
   TimestampProgress: {"0":"1718264141856","1":"1718264141287","2":"1718264141289","3":"1718264141856","4":"1718264140748","5":"1718264141287","6":"1718264141908","7":"1718264141248","8":"1718264141287"}
ReasonOfStateChanged: 
        ErrorLogUrls: http://be2.starrocks.com:8040/api/_load_error_log?file=error_log_ffcf4b3ddb25451e_86e1b23f7f1f6e03
         TrackingSQL: select tracking_log from information_schema.load_tracking_logs where job_id=60890
            OtherMsg: [2024-06-13 15:34:47] [task id: 9e32add0-a904-411f-90c8-d6daea659495] [txn id: -1] there is no new data in kafka, wait for 10 seconds to schedule again
LatestSourcePosition: {"0":"76980","1":"81222","2":"81243","3":"76915","4":"81269","5":"81250","6":"76804","7":"81103","8":"81269"}
  1. error log is

    StarRocks > select tracking_log from information_schema.load_tracking_logs where job_id=60890\G
    *************************** 1. row ***************************
    tracking_log: Error: The row create partition failed since Internal error: number of partition's index is not equal with schema's, num_part_indexes=1, num_schema_indexes=2
    be/src/exec/tablet_sink.cpp:419 _vectorized_partition->add_partitions(result.partitions). Row: ['Fishing', '2024-06-06', '7870108592636076', 7869934612971692, 'Fishing', 2024-06-06 13:22:26.646000, '6a951866a9724d338dcb9a76566101d0', '88991128', '5199976291-1708836573213', 2024-06-06 13:22:27.418000, NULL, '74e54458-07a9-4532-90a0-223eb6cb43ff@TE', 2024-06-13 15:02:30.730000, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 69, 'high', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Mini_Game', 79, 'Android', '2.2.0', 'wechat', NULL, 'ANA-AN00', NULL, NULL, NULL, NULL, '西门吹牛', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0, NULL, 28, NULL, NULL, NULL, '5199976291-1708836573213', NULL, NULL, 'null', NULL, 'wifi', NULL, NULL, 410375, 76, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, '{}', 1104, '逃跑', NULL, NULL, NULL, '222.179.154.186', NULL, '8.5.1', 5435, 8, NULL, '中国', 360, NULL, NULL, 'null', NULL, NULL, 8.05306e+08, NULL, NULL, '重庆', 'null', NULL, NULL, NULL, NULL, NULL, 'HUAWEI', 'MG', NULL, NULL, NULL, NULL, 1.94847e+08, NULL, NULL, NULL, NULL, 1.94847e+08, 'WX', NULL, NULL, 780, 249, NULL, NULL, '0', '重庆', NULL, 267914, NULL, NULL, 62258, 'CN', '12', NULL, '首次绑定', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]
  2. data lost:

    StarRocks > select * from ta_event_119 where `#user_id` = 7869934612971692 and `#uuid` = '74e54458-07a9-4532-90a0-223eb6cb43ff@TE'\G
    Empty set (0.08 sec)

Expected behavior (Required)

Retry import data when create partition failed in BE, data when loaded to table ta_event_119

Real behavior (Required)

Data lost ...

StarRocks version (Required)

xiangguangyxg commented 3 months ago

is there any schema change job running during kafka routine load ?

sanikoyes commented 3 months ago

is there any schema change job running during kafka routine load ?

Yes

xiangguangyxg commented 3 months ago

Now,automatically creating partition will fail during schema change