apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.08k stars 1.83k forks source link

[Bug] [Postgres-CDC] 采用 Flink 引擎,取消任务后重新提交 CDC 任务全量重新跑 #7904

Open cobolbaby opened 1 month ago

cobolbaby commented 1 month ago

Search before asking

What happened

采用 Flink 引擎,取消任务后重新提交 CDC 任务全量重新跑,能否基于某个 checkpoint 进行恢复?是否只有 Zeta 引擎支持了该能力?

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 60000
  read_limit.bytes_per_second=50000000
  read_limit.rows_per_second=10000
}

source {
  Postgres-CDC {
    # 源端数据库 JDBC Url
    base-url = "jdbc:postgresql://***/bdc"
    username = "***"
    password = "***"
    database-names = ["bdc"]
    schema-names = ["dw"]
    table-names = ["bdc.dw.fact_cpu_sn", "bdc.dw.dim_cpu_dn"]
    result_table_name = "SQT_PG_BDC_CDC_dw_cpu"
  }
}

transform {

}

sink {
  jdbc {
    # https://seatunnel.apache.org/docs/2.3.8/connector-v2/sink/Jdbc
    source_table_name = "SQT_PG_BDC_CDC_dw_cpu"
    url = "jdbc:postgresql://***/bdc_test"
    driver = "org.postgresql.Driver"
    user = "***"
    password = "***"

    # You need to configure both database and table
    database = "bdc_test"
    table = "dw_sqt.${table_name}"
    primary_keys = ["${primary_key}"]
    schema_save_mode = "IGNORE"
    generate_sink_sql = "true"
  }
}

Running Command

./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/pgcdc2pg_bdc_dw_cpu.conf

Error Exception

-

Zeta or Flink or Spark Version

Flink: 1.18.1

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 4 days ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.