apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.91k stars 1.79k forks source link

[Bug] [spark.source.FakeStream] runtime error->ConfigObject is immutable, you can't call Map.put #1853

Open tmljob opened 2 years ago

tmljob commented 2 years ago

Search before asking

What happened

When running the template configuration of spark streaming(spark.streaming.conf.template), the error is as follows.

SeaTunnel Version

dev

SeaTunnel Config

env {
  # You can set spark configuration here
  # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.streaming.batchDuration = 5
}

source {
  # This is a example input plugin **only for test and demonstrate the feature input plugin**
  FakeStream {
    content = ["Hello World, SeaTunnel"]
  }

  # You can also use other input plugins, such as file
  # file {
  #   result_table_name = "accesslog"
  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog"
  #   format = "json"
  # }

  # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
  # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/FakeStream
}

transform {

  split {
    fields = ["msg", "name"]
    delimiter = ","
  }

  # you can also use other filter plugins, such as sql
  # Sql {
  #   sql = "select * from accesslog where request_time > 1000"
  # }

  # If you would like to get more information about how to configure seatunnel and see full list of filter plugins,
  # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
}

sink {
  # choose stdout output plugin to output data to console
  Console {}

  # you can also use other output plugins, such as hdfs
  # hdfs {
  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed"
  #   save_mode = "append"
  # }

  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
  # please go to https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console
}

Running Command

./bin/start-seatunnel-spark.sh --master local --deploy-mode client --config ./config/spark.streaming.conf.template

Error Exception

22/05/11 20:22:53 ERROR util.Utils: Exception encountered
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put
Serialization trace:
object (org.apache.seatunnel.shade.com.typesafe.config.impl.SimpleConfig)
config (org.apache.seatunnel.spark.fake.source.FakeReceiver)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36)
        at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:279)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)
        at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:211)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1374)
        at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
        at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put
        at org.apache.seatunnel.shade.com.typesafe.config.impl.AbstractConfigObject.weAreImmutable(AbstractConfigObject.java:193)
        at org.apache.seatunnel.shade.com.typesafe.config.impl.AbstractConfigObject.put(AbstractConfigObject.java:204)
        at org.apache.seatunnel.shade.com.typesafe.config.impl.AbstractConfigObject.put(AbstractConfigObject.java:20)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        ... 38 more

Flink or Spark Version

spark 2.4

Java or Scala Version

java 8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

EricJoy2048 commented 2 years ago

Is this issue same as https://github.com/apache/incubator-seatunnel/issues/1512 ?

tmljob commented 2 years ago

Is this issue same as #1512 ?

These two issues have similar error messages, but in different versions and in different execution scenarios.

1512,seatunnel1.5.7

1853,dev

einvince commented 2 years ago

apache-seatunnel-incubating-2.1.1 spark-2.4.7

Caused by: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put

liuq2008202 commented 1 year ago

@tmljob i have the same problem,do you solve it?

pdlovedy commented 1 year ago

@tmljob i have the same problem,do you solve it?

me too,st2.1.3 spark2.4.8