StarRocks / starrocks

StarRocks, a Linux Foundation project, is a next-generation sub-second MPP OLAP database for full analytics scenarios, including multi-dimensional analytics, real-time analytics, and ad-hoc queries.
https://starrocks.io
Apache License 2.0
8.67k stars 1.75k forks source link

Empty kafka message leads data(following the empty message) lost in routine load #8534

Closed rickif closed 2 years ago

rickif commented 2 years ago

Steps to reproduce the behavior (Required)

  1. Create table
    CREATE TABLE `tbl_simple` ( 
    `key1` int(11) NULL COMMENT "",
    `key2` int(11) NULL COMMENT "" 
    ) ENGINE=OLAP 
    duplicate KEY(`key1`) 
    COMMENT "OLAP" 
    DISTRIBUTED BY HASH(`key1`) BUCKETS 3 
    PROPERTIES (
    "replication_num" = "1", 
    "in_memory" = "false", 
    "storage_format" = "DEFAULT" 
    );
  2. Prepare data The sample data contains 20 valid json message and a empty message.
    
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}
    {"key1":1,"key2":2}

{"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2} {"key1":1,"key2":2}

1. Create routine load
```SQL
CREATE ROUTINE LOAD rl_simple ON tbl_simple
COLUMNS (key1, key2) 
PROPERTIES ( 
    "format" = "json", 
    "strip_outer_array"="false", 
    "jsonpaths"="[\"$.key1\", \"$.key2\"]" 
) 
FROM KAFKA ( 
    "kafka_broker_list" = "xxx", 
    "kafka_topic" = "topic_simple", 
    "kafka_partitions" = "0", 
    "kafka_offsets" = "OFFSET_BEGINNING" 
); 

Expected behavior (Required)

All messges are inserted.

mysql> show routine load for rl_simple\G;
*************************** 1. row ***************************
                  Id: 74032
                Name: rl_simple
          CreateTime: 2022-07-11 16:29:46
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:db0
           TableName: tbl_simple
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"key1,key2","maxBatchIntervalS":"10","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.key1\", \"$.key2\"]","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"topic_blank","currentKafkaPartitions":"0","brokerList":"172.26.92.155:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":380,"errorRows":0,"committedTaskNum":1,"loadedRows":20,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":20,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":3009}
            Progress: {"0":"20"}
ReasonOfStateChanged:
        ErrorLogUrls:
            OtherMsg:
1 row in set (0.00 sec)

ERROR:
No query specified

mysql> select count(*) from tbl_simple;
+----------+
| count(*) |
+----------+
|       20 |
+----------+
1 row in set (0.00 sec)

Real behavior (Required)

Some messages behind the empty message may be skipped.

                  Id: 74024
                Name: rl_simple
          CreateTime: 2022-07-11 16:21:25
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:db0
           TableName: tbl_simple
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"key1,key2","maxBatchIntervalS":"10","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.key1\", \"$.key2\"]","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"topic_blank","currentKafkaPartitions":"0","brokerList":"172.26.92.155:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":380,"errorRows":0,"committedTaskNum":1,"loadedRows":10,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":10,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":3002}
            Progress: {"0":"20"}
ReasonOfStateChanged:
        ErrorLogUrls:
            OtherMsg:
2 rows in set (0.01 sec)

ERROR:
No query specified

mysql> select count(*) from tbl_simple;
+----------+
| count(*) |
+----------+
|       10 |
+----------+
1 row in set (0.01 sec)

StarRocks version (Required)

2.0.7

jaogoy commented 2 years ago

@rickif If this issue is resolved, you could closed it. And , Is there any problems with other versions? such as 2.1.x, 2.2.x ?