apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.86k stars 1.77k forks source link

[Feature][Connector-V2-Paimon] Concurrently read data, then write it using a single thread. #7769

Open quicklyfast opened 3 days ago

quicklyfast commented 3 days ago

Search before asking

Description

We want to concurrently sync data from MySQL to Paimon's PK table. However, due to the limitations of the Paimon PK table, we cannot update it concurrently and must set the concurrency level for reading to 1, which affects the efficiency of data synchronization to Paimon. We hope that we can set up the ability to read source data concurrently with three threads and then use a single thread to write to the Paimon table.

flowchart LR
   Mysql[(Database)] --split 1--> Reader
   Mysql[(Database)] --split 2--> Reader
   Mysql[(Database)] --split 3--> Reader

  Reader --> SinkWriter

  SinkWriter--> Paimon

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

liunaijie commented 2 days ago

you can

quicklyfast commented 2 days ago

I have tested and reviewed the SeaTunnel 2.3.7 source code, and found that the sink does not support configuring the parallelism parameter. The parallelism of the sink will be equal to the upstream parallelism.

https://github.com/apache/seatunnel/blob/0b735fc6f868ae26704c3779fdb96ed5ca0fbb3b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java#L568-L607

env { parallelism = 3 job.mode = "BATCH" checkpoint.interval = 10000 job.retry.times = 0 }

source { Jdbc { parallelism = 3 url = "jdbc:mysql://192.168.1.132:19030/test" driver = "com.mysql.jdbc.Driver" connection_check_timeout_sec = 100 user = "root" password = "****" table_path = "test.ods_ecrp_kd_order" partition_column = "id" split.size = 1000 properties { useSSL=false useCursorFetch=true fetchSize=1000 } } }

sink { Paimon { parallelism = 1 catalog_name="seatunnel_test" warehouse="file:/home/data/seatunnel_test" database="${database_name}" paimon.table.primary-keys = "id,create_time" paimon.table.write-props = { bucket = 4 bucket-key="create_time" snapshot.num-retained.min = 3 snapshot.num-retained.max = 10 file.format = "orc" deletion-vectors.enabled = "true" } table="${table_name}" } }

* job info

"createTime": "2024-09-30 09:16:21", "jobDag": { "vertices": [ { "id": 1, "name": "Source[0]-Jdbc(id=1)", "parallelism": 3 }, { "id": 3, "name": "Sink[0]-Paimon-MultiTableSink(id=3)", "parallelism": 3 } ], "edges": [ { "inputVertex": "Source[0]-Jdbc", "targetVertex": "Sink[0]-Paimon-MultiTableSink" } ] }