apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.66k stars 1.7k forks source link

The flink engine source is mysqlcdc reporting an error #4466

Open fs3085 opened 1 year ago

fs3085 commented 1 year ago

Search before asking

What happened

Using the mysqlcdc source to get the mysql data and use the flink engine to synchronize it to starRocks and report an error. Looking at the error log, it seems to be a class implementation problem.

SeaTunnel Version

2.3.1

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
  execution.planner = blink
}
source {
  MySQL-CDC {
  debezium {
         snapshot.mode = "never"
         decimal.handling.mode = "double"
     }
    startup.mode = "initial"
    result_table_name = "mysqlcdc"
    server-id = 5656
    username = "BigData_canal"
    password = "TXojS3RaL8pwQqNs"
    table-names = ["finenter.writeoff_source"]
    base-url = "jdbc:mysql://xxx.xxx.xxx.xxx:8096/finenter"
  }
}
sink {
  StarRocks {
    nodeUrls = ["xxx.xxx.xxx.xxx:8030"]
    base-url = "jdbc:mysql://xxx.xxx.xxx.xxx:16033"
    username = ads_finenter_rd
    password = "d3673hl1"
    database = "ads_finenter"
    table = "writeoff_source_seatunnel"
    batch_max_rows = 10
    starrocks.config = {
      format = "JSON"
      strip_outer_array = true
    }
  }
}

Running Command

./bin/start-seatunnel-flink-15-connector-v2.sh --config config/example02.conf --deploy-mode run --target yarn-per-job

Error Exception

(72ca47b062541de96307f1515ce18081) switched from RUNNING to FAILED with failure cause: java.lang.UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source.
    at org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:61)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:148)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:133)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:204)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:180)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:161)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
    at org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:136)
    at org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction.run(BaseSeaTunnelSourceFunction.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)

Flink or Spark Version

flink 1.15.2

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

zorrofox commented 1 year ago

As far as I know the MySQL CDC source only support SeaTunnel native engine.

EricJoy2048 commented 1 year ago

As far as I know the MySQL CDC source only support SeaTunnel native engine.

Yes, you are right.

fs3085 commented 1 year ago

@zorrofox @EricJoy2048 Thank you for your answer,According to the prompt of the error log, Please implement the SupportCoordinate marker interface on the SeaTunnel source. Then I downloaded the source code, found the SeaTunnelSource interface through SeaTunnelFlink of the seatunnel-flink-15-starter module, implemented the SupportCoordinate interface according to the log information, image and compiled Packaging, just use the compiled flink engine to execute. Then it can be executed successfully, but I have a question image There is a judgment here. It turned out that this interface was not implemented, and all of them would go else. Now that this interface is implemented, all of them will go above. image image Is there any difference between these two? The judgment here is that the official has not yet made a fine-grained distinction?

ghost commented 1 year ago

@zorrofox @EricJoy2048 Thank you for your answer,According to the prompt of the error log, Please implement the SupportCoordinate marker interface on the SeaTunnel source. Then I downloaded the source code, found the SeaTunnelSource interface through SeaTunnelFlink of the seatunnel-flink-15-starter module, implemented the SupportCoordinate interface according to the log information, image and compiled Packaging, just use the compiled flink engine to execute. Then it can be executed successfully, but I have a question image There is a judgment here. It turned out that this interface was not implemented, and all of them would go else. Now that this interface is implemented, all of them will go above. image image Is there any difference between these two? The judgment here is that the official has not yet made a fine-grained distinction?

Your method does work, but it's not perfect. It can only fetch the data that is already in MySQL, the changed data cannot be synchronized.

fengcheche commented 8 months ago

@fs3085 This problem can you solve it, I am ubuntu20 jdk1.8, seatunnel2.3.3, flink1.16, flink1.15 encountered the same problem

zhilinli123 commented 8 months ago

@fs3085 This problem can you solve it, I am ubuntu20 jdk1.8, seatunnel2.3.3, flink1.16, flink1.15 encountered the same problem hi, as far as I know,SeaTunnel 2.3.3 already supports Mysql CDC running in the flink engine

lxp4352 commented 7 months ago

@fs3085 This problem can you solve it, I am ubuntu20 jdk1.8, seatunnel2.3.3, flink1.16, flink1.15 encountered the same problem hi, as far as I know,SeaTunnel 2.3.3 already supports Mysql CDC running in the flink engine

hi @zhilinli123 i have tested ,but this problem is still exist in SeaTunnel 2.3.3 , can you give a demo?