apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.03k stars 1.82k forks source link

[Bug] [MySQL-CDC] Flink ParallelSource don't support sending SourceEvent #4511

Open jonsey888 opened 1 year ago

jonsey888 commented 1 year ago

Search before asking

What happened

When I run tasks, source MySQLCDC, sink Console, both spark and flink engines will report errors

SeaTunnel Version

2.3.1

SeaTunnel Config

{
    "env" : {
        "spark.executor.instances" : 1,
        "spark.executor.cores" : 1,
        "spark.executor.memory" : "1g",
        "spark.master" : "local"
    },
    "source" : [
        {
            "base-url" : "jdbc:mysql://xxxx:3306/test",
            "password" : "****",
            "parallelism" : 1,
            "table-names" : [
                "xxx.xxx"
            ],
            "result_table_name" : "mytest",
            "plugin_name" : "MySQL-CDC",
            "server-id" : 5656,
            "username" : "xxx"
        }
    ],
    "transform" : [],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

Running Command

bin/start-seatunnel-spark-3-connector-v2.sh --config config/mysqlcdc-console.conf

Error Exception

23/04/07 09:24:57 ERROR ParallelBatchPartitionReader: BatchPartitionReader execute failed.
java.lang.UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source.
        at org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:61)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:148)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:133)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:204)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:180)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:161)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
        at org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:136)
        at org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader.lambda$prepare$0(ParallelBatchPartitionReader.java:104)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
23/04/07 09:24:57 ERROR Utils: Aborting task
java.lang.RuntimeException: java.lang.UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source.
        at org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader.get(ParallelBatchPartitionReader.java:122)
        at org.apache.seatunnel.translation.spark.source.partition.batch.SeaTunnelBatchPartitionReader.get(SeaTunnelBatchPartitionReader.java:40)
        at org.apache.seatunnel.translation.spark.source.partition.batch.SeaTunnelBatchPartitionReader.get(SeaTunnelBatchPartitionReader.java:25)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:133)
        at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:170)
        at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:167)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:67)
        at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:435)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source.
        at org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:61)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:148)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:133)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:204)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:180)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:161)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:92)
        at org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:136)
        at org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader.lambda$prepare$0(ParallelBatchPartitionReader.java:104)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        ... 3 more

Flink or Spark Version

Running Spark version 2.4.0 Running Spark version 3.3.0

Java or Scala Version

java version "1.8.0_301"

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

zhilinli123 commented 1 year ago

Currently cdc should only support Seatunnel's Zeta engine CC @ashulin

chaosQiX commented 1 year ago

Currently cdc should only support Seatunnel's Zeta engine CC @ashulin

The docs are really misunderstanding because there is no description mentioned that cdc only supports Zeta.

huang714669 commented 1 year ago

I have the same problem with flink 1.16, can you give me more details about Zeta, i don't find anything about that

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

fengcheche commented 11 months ago

I have the same problem with flink 1.16, can you give me more details about Zeta, i don't find anything about that我在 flink 1.16 上遇到了同样的问题,你能给我更多关于 Zeta 的细节吗,我没有找到任何关于它的信息

@huang714669 Have you solved this problem? I encountered the same problem in ubuntu20, jdk1.8, and seatunnel2.3.3,flink1.15,flink1.16