apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.93k stars 1.79k forks source link

[Bug] [MYSQL-CDC] 并行读取MySQL表数据同步到doris时, 表有主键和唯一索引,但是数据不使用主键(id)进行分割,而是使用唯一键进行分割。 #7794

Open andytyi opened 1 week ago

andytyi commented 1 week ago

Search before asking

What happened

使用CDC同步MySQL一张表到doris,表数据大概有6千万,在进行数据分割时seatunnel同步任务并没有使用主键(id)进行分割,而是使用唯一索引(ad_type)进行分割,数据同步非常慢。使用show full processlist查看MySQL正在执行的SQL如下:SELECT MIN(ad_type), MAX(ad_type) FROM test.abroad_day_recharge ,同步任务一直卡在这个SQL处, 表结构如下: CREATE TABLE abroad_day_recharge ( id int unsigned NOT NULL AUTO_INCREMENT, union_link_id int unsigned DEFAULT '0', book_id bigint unsigned DEFAULT '0', system tinyint unsigned DEFAULT '1', user_date int unsigned DEFAULT '0', order_date int unsigned DEFAULT '0', platform int unsigned DEFAULT '0', ad_type tinyint DEFAULT '1', money int unsigned DEFAULT '0', created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP, updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY uniq_key_word (union_link_id,user_date,order_date,ad_type,system) USING BTREE, KEY idx_user_date (user_date), KEY idx_order_date (order_date), KEY idx_platform (platform) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  job.name = "Sync-Data-to-Doris"
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
  checkpoint.timeout = 600000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://192.168.5.18:3306/test"
    username = "xxxxxx"
    password = "xxxxxxxxxxxx"
    table-names = ["test.abroad_day_recharge"]
    startup.mode = "initial"
    snapshot.split.size = 100000
  }

}

sink {
    Doris {
        fenodes = "localhost:8030"
        username = root
        password = "xxxxxxxx"
        #schema_save_mode = "RECREATE_SCHEMA"        
        database = "${database_name}"
        table = "${table_name}"
        sink.enable-2pc = "true"
        sink.enable-delete = "true"
        sink.label-prefix = "sync-records"
        doris.config = {
            format="json"
            read_json_by_line="true"
        }
    }
}

Running Command

../bin/seatunnel.sh -c mysql-to-doris.conf -e local

Error Exception

2024-10-08 10:40:20,475 INFO  [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=895863132344287233, pipelineId=1, taskGroupId=1}] - Splitting table collect_data.abroad_day_recharge into chunks, split column: ad_type, min: 1, max: 2, chunk size: 100000, distribution factor upper: 100.0, distribution factor lower: 0.05, sample sharding threshold: 1000

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

XenosK commented 5 days ago

https://github.com/apache/seatunnel/blob/dev/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java

The split column selection order is: 1、primary key and primary key type is tinyint 2、unique key and unique key type is tinyint 3、no tinyint key , compare primary key and unique key type,the type order is: SMALLINT、INT、BIGINT、DECIMAL、STRING This is a design issue,and I think can add user defined split column,if need can assigned to me

andytyi commented 5 days ago

YES, I need.