apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.02k stars 1.82k forks source link

[Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer #4023

Closed chenwei182729 closed 1 year ago

chenwei182729 commented 1 year ago

Search before asking

What happened

当使用Seatunnel自已的引擎,sink到kafka时,报如下异常: 可能异常产生原因是: org.apache.kafka.common.serialization.ByteArraySerializer 使用的类加载器为SeatunnelChildFirstClassLoader,org.apache.kafka.common.serialization.Serializer使用的类加载器为AppClassLoader,

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
    at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
    at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
    at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
    at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
    ... 10 more

SeaTunnel Version

2.3.1

SeaTunnel Config

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  execution.checkpoint.interval = 5000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    result_table_name = "fake"
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
    parallelism = 3
  }
}

transform {
}

sink {
  kafka {
    topic = "quickstart-seatunnel"
    bootstrap.servers="127.0.0.1:9092"
    partition=1
    format=json
    kafka.request.timeout.ms=60000
    kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
    kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
  }
}

Running Command

org.apache.seatunnel.engine.client.SeaTunnelClientTest 类加,使用test方式运行

@Test
    public void testExecuteJob_kafka() {
        Common.setDeployMode(DeployMode.CLIENT);
        String filePath = TestUtils.getResource("/batch_fakesource_to_kafka.conf");
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName("fake_to_kafka");

        JobExecutionEnvironment jobExecutionEnv = CLIENT.createExecutionContext(filePath, jobConfig);

        try {
            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
                return clientJobProxy.waitForJobComplete();
            });

            await().atMost(180000, TimeUnit.MILLISECONDS)
                    .untilAsserted(() -> Assertions.assertTrue(
                            objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));

        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

### Error Exception

```log
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
    at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
    at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
    at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
    at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
    at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
    ... 10 more


### Flink or Spark Version

_No response_

### Java or Scala Version

1.8.0

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
hailin0 commented 1 year ago

@Hisoka-X #3817

Hisoka-X commented 1 year ago

Can you use dev branch test again?

chenwei182729 commented 1 year ago

Bug is fixed

hailin0 commented 6 months ago

link https://github.com/apache/seatunnel/pull/6355

LLyKy commented 3 weeks ago

i got this same error with flink

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:399) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709) ... 18 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)

    i need help