apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.05k stars 1.82k forks source link

[Bug] [all-output-plugins]When the input is Kafka streaming and the output is Clickhouse, the data cannot enter the Clickhouse due to any data problem, and the whole program exits #754

Closed chenhu closed 1 week ago

chenhu commented 2 years ago

Search before asking

What happened

When the input is Kafka streaming and the output is Clickhouse, the data cannot enter the Clickhouse due to any data problem, and the whole program exits

SeaTunnel Version

1.5.3

SeaTunnel Config

spark {
  spark.streaming.batchDuration = 5
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

input {
  kafkaStream {
    topics = "VIO"
    consumer.bootstrap.servers = "hdp1:6667,hdp4:6667,hdp5:6667"
    consumer.group.id = "waterdrop_group"
  }
}

filter {
   remove {
    source_field = ["topic"]
   }
   json {
    source_field = "raw_message"
    result_table_name = "test"
    #target_field = "message"
    #schema_file = "test.json"
   }
}

output {
  #stdout {}
  clickhouse {
  host = "ch1:8123"
  clickhouse.socket_timeout = 50000
  database = "tutorial"
  table = "wd_kafka_ck"
  #fields = ["name","age"]
  username = "default"
  password = ""
  bulk_size = 20000
 }
}

Running Command

./bin/start-waterdrop.sh ....

Error Exception

Exception in thread "main" java.lang.Exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 7, hdp1.les.com, executor 2): java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
 at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
 at io.github.interestinglab.waterdrop.output.batch.Clickhouse.renderBaseTypeStatement(Clickhouse.scala:353)
 at io.github.interestinglab.waterdrop.output.batch.Clickhouse.io$github$interestinglab$waterdrop$output$batch$Clickhouse$$renderStatementEntry(Clickhouse.scala:375)
 at io.github.interestinglab.waterdrop.output.batch.Clickhouse$$anonfun$io$github$interestinglab$waterdrop$output$batch$Clickhouse$$renderStatement$1.apply$mcVI$sp(Clickhouse.scala:403)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
 at io.github.interestinglab.waterdrop.output.batch.Clickhouse.io$github$interestinglab$waterdrop$output$batch$Clickhouse$$renderStatement(Clickhouse.scala:391)
 at io.github.interestinglab.waterdrop.output.batch.Clickhouse$$anonfun$process$2.apply(Clickhouse.scala:187)
 at io.github.interestinglab.waterdrop.output.batch.Clickhouse$$anonfun$process$2.apply(Clickhouse.scala:162)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
 at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Flink or Spark Version

spark2.3

Java or Scala Version

java8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

chenhu commented 2 years ago

Exceptions are allowed, but the whole program cannot crash