apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.05k stars 1.83k forks source link

[Bug] [kakfa] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer #8090

Open aiyi926 opened 1 day ago

aiyi926 commented 1 day ago

Search before asking

What happened

When I read sink data from doris or clickhouse to kafka, when running with the zeta engine, an error serialization is reported

当我从doris或者clickhouse中读取数据sink到kafka中时,使用zeta引擎运行时,报错序列化

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 60000
}

source {
  Jdbc {
        url = "jdbc:mysql://xx.xx.xx.xx:9030?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 10000
        user = "root"
        password = "xxxx"
        query = "select * from test.user_doris limit 20"
        result_table_name = "doris_user"
  }
}

sink {
  kafka {
      topic = "user-kafka"
      bootstrap.servers = "xx.xx.xx.xx:19093"
      format = json
      kafka.request.timeout.ms = 60000
      semantics = EXACTLY_ONCE
      kafka.config = {
        acks = "-1"
        request.timeout.ms = 60000
        buffer.memory = 33554432
      }
  }
}

Running Command

sh seatunnel.sh -c /opt/apache-seatunnel-2.3.8/ngfaConfig/doris_to_kafka.config -m cluster

Error Exception

2024-11-20 19:54:24,525 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
        at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:281)
        at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:277)
        at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:390)
        at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:184)
        at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
        at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44)
        at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaInternalProducer.<init>(KafkaInternalProducer.java:46)
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.getTransactionProducer(KafkaTransactionSender.java:128)
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.beginTransaction(KafkaTransactionSender.java:62)
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:103)
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:56)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:293)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
        at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365)
        ... 27 more

        ... 12 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct