apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.04k stars 1.82k forks source link

[Bug] [Kafka-Connector] The DATE and DATETIME types of common json data in KAFKA are reported as null pointer exceptions #7079

Open LeonYoah opened 4 months ago

LeonYoah commented 4 months ago

Search before asking

What happened

This is my json,c_date field passed an empty string:{"id":123456789012345,"c_map":{"key1":1,"key2":2,"key3":3},"c_array":[1,2,3,4,5],"c_string":"","c_boolean":true,"c_tinyint":127,"c_smallint":32767,"c_int":2147483647,"c_bigint":9223372036854775807,"c_float":3.14,"c_double":1.7976931348623157E308,"c_decimal":12345.67,"c_bytes":"","c_date":"","c_timestamp":""};I found that if you pass an empty string, the DATE and DATETIME types are reported as null pointer exceptions。 be4f4254462760fd1f1b1bc9160d90f I think we should intercept the empty string here and set it to null: [jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode() || StringUtils.isBlank(jsonNode.asText())] image

SeaTunnel Version

dev

SeaTunnel Config

env {
    job.mode = "batch"
    parallelism = "1"
    job.retry.times = "0"
    job.name = "aace8bb9f8864562b0264ea75e3991f5"
    checkpoint.interval = "30000"
}

source {
    Kafka {
        schema = {
          fields {
            "university": "string"
            #"AppearTime": "timestamp",
            #"Calling": "int",
            #"DeviceID": "string",
            #"Direction": "string",
            #"DisappearTime": "timestamp",
          }
        }
        format = "json"
        bootstrap.servers = "10.28.xxxx:9092"
        format_error_handle_way = "skip"
        topic = "student2"
        consumer.group = "1111"
        semantics = EXACTLY_ONCE
        start_mode = "earliest"
        result_table_name = "hive1"
    }
}

sink {
  # choose stdout output plugin to output data to console

  Hive {
   source_table_name = "hive1"
    table_name = "test.university2"
    metastore_uri = "thrift://xxxx:9083"
    hdfs_site_path = "D:/安装包/kerberos/hive认证/hdfs-site.xml"
    hive_site_path = "D:/安装包/kerberos/hive认证/hive-site.xml"
    kerberos_principal = "hive/xxxx@HADOOP.COM"
    krb5_path = "D:/安装包/kerberos/hive认证/krb5.conf"
    kerberos_keytab_path = "D:/安装包/kerberos/hive认证/hive.service.keytab"
  }
}

Running Command

-e local

Error Exception

... 10 more
Caused by: java.lang.NullPointerException
    at org.apache.seatunnel.format.json.JsonToRowConverters.convertToLocalDate(JsonToRowConverters.java:260)
    at org.apache.seatunnel.format.json.JsonToRowConverters.access$300(JsonToRowConverters.java:61)
    at org.apache.seatunnel.format.json.JsonToRowConverters$7.convert(JsonToRowConverters.java:142)
    at org.apache.seatunnel.format.json.JsonToRowConverters$20.convert(JsonToRowConverters.java:431)
    at org.apache.seatunnel.format.json.JsonToRowConverters.convertField(JsonToRowConverters.java:419)
    at org.apache.seatunnel.format.json.JsonToRowConverters.access$1100(JsonToRowConverters.java:61)
    at org.apache.seatunnel.format.json.JsonToRowConverters$17.convert(JsonToRowConverters.java:348)

Zeta or Flink or Spark Version

dev

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

LeonYoah commented 4 months ago

In addition, if the value of the partition field is null,hive also reports a null pointer when concatenating partition paths, and c_date=null is concatenated. Theoretically, a default partition value should be given, such as c_date=__DEFAULT_PARTITION__

LeonYoah commented 4 months ago

I recall the processing logic :[StringUtils.isBlank(jsonNode.asText())], which causes the map and array types to return null