apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.82k stars 1.76k forks source link

[Bug] [Spark]Hive to Clickhouse There is only one degree of parallelism when synchronizing data #5042

Closed Bingz2 closed 11 months ago

Bingz2 commented 1 year ago

Search before asking

What happened

There is only one degree of parallelism when synchronizing data from hive to clickhouse using Spark2

SeaTunnel Version

2.3.2

SeaTunnel Config

env {
  # You can set SeaTunnel environment configuration here
  job.mode = "BATCH"
  job.name = "seatunnel"
  checkpoint.interval = 10000
  spark.executor.instances = 2
  spark.executor.cores = 10
  spark.executor.memory = "20g"
  spark.driver.memory = "2g"
  spark.dynamicAllocation.enabled = false
}

source {

  Hive {
    table_name = "ads.ads_dvblive_user_personas_vertical_stat_dd"
    metastore_uri = "thrift://slave6.test.we:9083"
    result_table_name = "test"
  }

}
transform {
  Sql {
    source_table_name="test"
    query = "select user_id,label_id,label_code,label_name,label_value,day from test  where day='2023-06-11'  "
    result_table_name = "fake1"
  }
}
sink {
Clickhouse{
host="10.5.13.23:8123"
database="ads"
table="ads_dvblive_user_personas_vertical_stat_dd"
username="test"
password="123456"
bulk_size=50000
clickhouse.confg={"socket_timeout": "50000"}

}
}

Running Command

sh bin/start-seatunnel-spark-2-connector-v2.sh -m yarn -e client -c config/hive2ck.conf

Error Exception

23/07/07 17:50:34 INFO v2.DataSourceV2Strategy: 
Pushing operators to class org.apache.seatunnel.translation.spark.source.SeaTunnelSourceSupport
Pushed Filters: 
Post-Scan Filters: 
Output: user_id#0, mac#1, reserve_column#2, label_id#3, label_code#4, label_name#5, label_value#6, raw_partner_code#7, day#8, partner_code#9, product_line#10

23/07/07 17:50:34 INFO codegen.CodeGenerator: Code generated in 334.942834 ms
23/07/07 17:50:34 INFO codegen.CodeGenerator: Code generated in 24.618955 ms
23/07/07 17:50:35 INFO v2.WriteToDataSourceV2Exec: Start processing data source writer: org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@318353. The input RDD has 1 partitions.
23/07/07 17:50:35 INFO spark.SparkContext: Starting job: save at SinkExecuteProcessor.java:123
23/07/07 17:50:35 INFO scheduler.DAGScheduler: Got job 0 (save at SinkExecuteProcessor.java:123) with 1 output partitions
23/07/07 17:50:35 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (save at SinkExecuteProcessor.java:123)
23/07/07 17:50:35 INFO scheduler.DAGScheduler: Parents of final stage: List()
23/07/07 17:50:35 INFO scheduler.DAGScheduler: Missing parents: List()
23/07/07 17:50:35 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at save at SinkExecuteProcessor.java:123), which has no missing parents
23/07/07 17:50:35 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 55.2 KB, free 4.1 GB)
23/07/07 17:50:35 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 18.9 KB, free 4.1 GB)
23/07/07 17:50:35 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slave5.test.gitv.we:20280 (size: 18.9 KB, free: 4.1 GB)
23/07/07 17:50:35 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1161
23/07/07 17:50:35 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[4] at save at SinkExecuteProcessor.java:123) (first 15 tasks are for partitions Vector(0))
23/07/07 17:50:35 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks
23/07/07 17:50:35 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
23/07/07 17:50:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, slave6.test.gitv.we, executor 2, partition 0, PROCESS_LOCAL, 15260 bytes)
23/07/07 17:50:36 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slave6.test.gitv.we:36895 (size: 18.9 KB, free: 10.5 GB)

Flink or Spark Version

Spark Version:2.4.0

Java or Scala Version

1.8

Screenshots

image image

Are you willing to submit PR?

Code of Conduct

wuxizhi777 commented 1 year ago

the parallelism of spark is also depends on your data's partitions

Bingz2 commented 1 year ago

the parallelism of spark is also depends on your data's partitions

image This is the partition of the table

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 11 months ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.