apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.79k stars 1.74k forks source link

[Bug] [CDC] mysql cdc to paimon #7208

Closed wuchang945 closed 1 week ago

wuchang945 commented 1 month ago

Search before asking

What happened

mysql to paimon error cannot assign instance of org.apache.paimon.table.PrimaryKeyFileStoreTable to field org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink.table of type org.apache.paimon.table.Table in instance of org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
}

source {
 MySQL-CDC {
    base-url = "jdbc:mysql://192.168.1.35:3306/seatunnel?serverTimezone=UTC"
    username = "root"
    password = "root"
    startup.mode="initial"
    stop.mode="never"
    snapshot.split.size=1
    exactly_once=true
    table-names = ["seatunnel.role"]
  }
}

sink {
  Paimon {
   warehouse = "hdfs://192.168.1.130:9000/paimon"
   database = "default"
   table = "ods_role"
  }
}

Running Command

./bin/seatunnel.sh --config ./config/mysqltopaimon.config.template -e local

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
    Caused by: java.util.concurrent.CompletionException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.ClassCastException: cannot assign instance of org.apache.paimon.table.PrimaryKeyFileStoreTable to field org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink.table of type org.apache.paimon.table.Table in instance of org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink
        at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
        at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
        at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
        at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
        at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
        at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
        at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
        at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
        at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
        at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
    Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.ClassCastException: cannot assign instance of org.apache.paimon.table.PrimaryKeyFileStoreTable to field org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink.table of type org.apache.paimon.table.Table in instance of org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink
        at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
        at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:355)
        at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:600)
        at org.apache.seatunnel.engine.core.dag.logical.LogicalVertex.readData(LogicalVertex.java:99)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
        at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
        at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:349)
        at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:600)
        at org.apache.seatunnel.engine.core.dag.logical.LogicalDag.readData(LogicalDag.java:154)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
        at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
        at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
        at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.deserializeWithCustomClassLoader(CustomClassLoadedObject.java:66)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:209)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$3(CoordinatorService.java:475)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.paimon.table.PrimaryKeyFileStoreTable to field org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink.table of type org.apache.paimon.table.Table in instance of org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at java.util.HashMap.readObject(HashMap.java:1404)
        at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

Zeta or Flink or Spark Version

zeta

Java or Scala Version

java 1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

hailin0 commented 1 month ago

cc @Hisoka-X

LoseYSelf commented 1 month ago

In newest seatunnel branch,paimon version is 0.7.0-incubating,but PrimaryKeyFileStoreTable is not a public class

github-actions[bot] commented 2 weeks ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 1 week ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.