apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.86k stars 1.77k forks source link

[Bug] [kafka source] 无法设置max.poll.records参数 #5006

Open ocean-zhc opened 1 year ago

ocean-zhc commented 1 year ago

Search before asking

What happened

source kafka kafka.config 无法设置 max.poll.records 参数,取消则正常!

SeaTunnel Version

2.3.2 2.3.2-SNAPSHOT

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.planner = blink
  job.name = "kafka_console_test"
  execution.checkpoint.interval = 60000
}

source {

  Kafka {
    result_table_name = "kafka_table"
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
    topic = "topic1"
    bootstrap.servers = "local:19092"
    kafka.config = {
      client.id = client_1
      max.poll.records = 10000
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    }
  }

}

transform {

}

sink {
  Hive {
    table_name = "default.user1"
    metastore_uri = "thrift://centos4:9083"
  }
}

Running Command

./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/kafka2hive.conf

Error Exception

java.lang.NullPointerException: null value in entry: max.poll.records=null
    at com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:32) ~[?:?]
    at com.google.common.collect.ImmutableMap.entryOf(ImmutableMap.java:176) ~[guava-11.0.2.jar:?]
    at com.google.common.collect.ImmutableMap$Builder.put(ImmutableMap.java:285) ~[guava-11.0.2.jar:?]
    at com.google.common.collect.Maps.fromProperties(Maps.java:1362) ~[guava-11.0.2.jar:?]
    at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader.lambda$null$3(KafkaSourceReader.java:134) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaConsumerThread.run(KafkaConsumerThread.java:58) ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_172]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_172]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_172]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_172]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_172]

Flink or Spark Version

1.16.2

Java or Scala Version

1.8.0_172

Screenshots

image

Are you willing to submit PR?

Code of Conduct

liugddx commented 1 year ago

I tried it and found no problem, can you try it with jdk1.8.351 or later.

ocean-zhc commented 1 year ago

I tried it and found no problem, can you try it with jdk1.8.351 or later.

jdk1.8.351 、jdk1.8.361 我都试了,好像不行

企业微信截图_d0f14283-94d8-4022-84dc-b353a1aa0cba image
ocean-zhc commented 1 year ago

max.poll.records = "10000" can solved!

github-actions[bot] commented 11 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.