apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Bug] [seatunnel-translation] java.lang.NullPointerException at org.apache.seatunnel.translation.flink.source.BaseSeatunnelSourceFunction.lambda$initializeState$0(BaseSeatunnelSourceFunction.java 155) #4568

Open wanwei120 opened 1 year ago

wanwei120 commented 1 year ago

Search before asking

What happened

The function implemented by the seatunnel task is to synchronize data from Kafka to ES. When the Seatunnel program restarts abnormally, it cannot restart the task from the checkpoint of Flink. The seatunnel task will continue to restart until the taskmanager crashes. This issue occurs when deploying Flink in both k8s and non k8s scenarios.

SeaTunnel Version

SeaTunnel 2.3.0 kafka 2.12-2.2.0 es 8.5

SeaTunnel Config

env
{
  job.name="xxx"
  job.mode="STREAMING"
}
source
{
   parallelism=3
   topic = "xxx"
   bootstrap.servers ="xxxx:9092"
   consumer.group = "seatunnel2"
   kafka.auto.offset.reset ="earliest"
   kafka.enable.auto.commit="true"
   start_mode="earliest"
   result_table_name="xxx"
   schema ={fields{xxxx=xxxx}}
}
sink
{
    Elasticesarch
   {
    parallelism=10
    es.cluster.name = "xxx"
    hosts=[xxx]
    username = "xx"
    password = "xx"
    index = "xx"
    max_batch_size =10000
   }
}

Running Command

flink run-application --target kubernetes-application -Dkuburnetes.namespace=flinkapp -Dkubernetes.jobmanager.service-account = flinkapp -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.container.image=xxx -Dkubernetes.pod-template-file= xxx -Dkubernetes.cluster-id =xx -c org.apache.seatunnel.core.starter.flink.SeatunnelFlink local:///opt/seatunnel/starter/seatunnel-flink-starter.jar --config xxx

Error Exception

java.lang.NullPointerException at org.apache.seatunnel.translation.flink.source.BaseSeatunnelSourceFunction.lambda$initializeState$0(BaseSeatunnelSourceFunction.java 155)
at java.util.ArrayList.foreach(ArrayList.java:1259)
...

Flink or Spark Version

flink:1.13.6

Java or Scala Version

java:1.8 scala:2.11

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.