apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.86k stars 1.77k forks source link

[Bug] [Transform-JsonPath] The data is empty by using jsonpath #7084

Open Xzioc opened 3 months ago

Xzioc commented 3 months ago

Search before asking

What happened

按照官网示例使用JsonPath解析多层json时运行脚本无法获取到数据

数据格式

{
    "address": {
        "streetAddress": "naist street",
        "city": "Nara",
        "postalCode": "630-0192"
    }
}

读取的数据为0 image

SeaTunnel Version

2.3.5

SeaTunnel Config

{
    "env" : {
        "job.mode" : "BATCH",
        "parallelism" : "1",
        "job.retry.times" : "0",
        "checkpoint.interval" : "180000"
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "address" : "string"
                }
            },
            "format" : "json",
            "topic" : "test2",
            "bootstrap.servers" : "",
            "plugin_name" : "Kafka"
        }
    ],
    "transform" : [
        {
            "columns" : [
                {
                    "src_field" : "address",
                    "path" : "$.address.streetAddress",
                    "dest_field" : "streetAddress"
                },
                {
                    "src_field" : "address",
                    "path" : "$.address.city",
                    "dest_field" : "city"
                },
                {
                    "src_field" : "address",
                    "path" : "$.address.postalCode",
                    "dest_field" : "postalCode"
                }
            ],
            "plugin_name" : "JsonPath"
        }
    ],
    "sink" : [
        {
            "database" : "seatunnel-test",
            "password" : "123456",
            "driver" : "com.mysql.cj.jdbc.Driver",
            "generate_sink_sql" : "true",
            "plugin_name" : "JDBC",
            "user" : "root",
            "url" : "",
            "table" : "kafka-json-test"
        }
    ]
}

Running Command

./bin/seatunnel.sh --config ./script/test-trans.txt -e local
FuYouJ commented 3 months ago

I will try to reproduce this test case

liunaijie commented 3 months ago

hi @swpuEzio , from your informaction, the Total Read Count is 0, so the mysql won't have any data. i am not sure this issue is means target db not have data or the data column is empty? if not have data, that should be right, because the source data is 0.

Xzioc commented 3 months ago

hi @swpuEzio , from your informaction, the Total Read Count is 0, so the mysql won't have any data. i am not sure this issue is means target db not have data or the data column is empty? if not have data, that should be right, because the source data is 0.

no ,my source have data but not read.

I found that there is an issue with the SQL generated after using jsonpath to set src_field The SQL I generated is

image

but my script is image

Slice should not be added to SQL. SQL mapping must be added to remove unnecessary fields for successful execution image

Xzioc commented 3 months ago

I will try to reproduce this test case

I know the reason for this issue is due to the lack of specified source_tablename and result_tablename, but it has some other issues. The generated insert SQL will also include the src_field field field, causing migration data errors ,like my other reply

liunaijie commented 3 months ago

hi @swpuEzio , from your informaction, the Total Read Count is 0, so the mysql won't have any data. i am not sure this issue is means target db not have data or the data column is empty? if not have data, that should be right, because the source data is 0.

no ,my source have data but not read.

I found that there is an issue with the SQL generated after using jsonpath to set src_field The SQL I generated is

image

but my script is image

Slice should not be added to SQL. SQL mapping must be added to remove unnecessary fields for successful execution image

here are some advice:

  1. try to use console sink to verify the transform result, verify the schema and data is expected as you want
FuYouJ commented 3 months ago

en: I spent about two days setting up Kafka to reproduce your issue because I wanted to understand how Kafka messages are passed to the transformer.

First, you need to configure Kafka with JSON format serialization and specify the column name as address. The Kafka reader will then extract the address data from the JSON. Consequently, downstream, the data will be: cn:我花费了大概两天时间搭建kafka来复现你这个问题。因为我很想弄明白kafka的消息传到转换器是什么样子的。 首先你得kafka配置的json格式序列化,并且指定了address列名,那么kafkreader就会提取json数据里面的address数据,所以在下游的时候,数据是:

{
        "streetAddress": "naist street",
        "city": "Nara",
        "postalCode": "630-0192"
    }
image

en:Knowing this information, the JSONPath configuration needs to be modified accordingly. My modifications are as follows: cn: 知道了这个信息后,那么jsonpath的配置就要做一定的修改。我的修改如下:

env {
  parallelism = 2
  job.mode    = "BATCH"
}

source {
  Kafka {
    schema = {
      fields {
        address = "string"
      }
    }
    format           = json
    topic            = "test2"
    bootstrap.servers = "localhost:9092"
    kafka.config = {
      max.poll.records   = 500
      auto.offset.reset  = "earliest"
    }
  }
}

transform {
  JsonPath {
    columns = [
      {
        src_field  = "address"
        path       = "$.streetAddress"
        dest_field = "streetAddress"
      },
      {
        src_field  = "address"
        path       = "$.city"
        dest_field = "city"
      },
      {
        src_field  = "address"
        path       = "$.postalCode"
        dest_field = "postalCode"
      }
    ]
  }
}

sink {
  Console {}
}

en:After configuring it this way, the program runs correctly, as shown in the picture. en:这样配置后,程序就能正确运行了。如图所示。

image
Xzioc commented 2 months ago

en: I spent about two days setting up Kafka to reproduce your issue because I wanted to understand how Kafka messages are passed to the transformer.

First, you need to configure Kafka with JSON format serialization and specify the column name as address. The Kafka reader will then extract the address data from the JSON. Consequently, downstream, the data will be: cn:我花费了大概两天时间搭建kafka来复现你这个问题。因为我很想弄明白kafka的消息传到转换器是什么样子的。 首先你得kafka配置的json格式序列化,并且指定了address列名,那么kafkreader就会提取json数据里面的address数据,所以在下游的时候,数据是:

{
        "streetAddress": "naist street",
        "city": "Nara",
        "postalCode": "630-0192"
    }
image

en:Knowing this information, the JSONPath configuration needs to be modified accordingly. My modifications are as follows: cn: 知道了这个信息后,那么jsonpath的配置就要做一定的修改。我的修改如下:

env {
  parallelism = 2
  job.mode    = "BATCH"
}

source {
  Kafka {
    schema = {
      fields {
        address = "string"
      }
    }
    format           = json
    topic            = "test2"
    bootstrap.servers = "localhost:9092"
    kafka.config = {
      max.poll.records   = 500
      auto.offset.reset  = "earliest"
    }
  }
}

transform {
  JsonPath {
    columns = [
      {
        src_field  = "address"
        path       = "$.streetAddress"
        dest_field = "streetAddress"
      },
      {
        src_field  = "address"
        path       = "$.city"
        dest_field = "city"
      },
      {
        src_field  = "address"
        path       = "$.postalCode"
        dest_field = "postalCode"
      }
    ]
  }
}

sink {
  Console {}
}

en:After configuring it this way, the program runs correctly, as shown in the picture. en:这样配置后,程序就能正确运行了。如图所示。

image

取值这个问题这样写能拿到,但是在能拿到字段信息了之后他有另外一个问题,这个同步到jdbc时生成的sql会将source_field也写入sql中 必须自己再加一层sql转换,具体截图可以看我另外一个回复

The value problem can be obtained by writing it this way, but after obtaining the field information, it has another problem. When synchronizing to jdbc, the generated SQL will also write the source_field into the SQL, and you must add another layer of SQL conversion yourself. For specific screenshots, please refer to my other reply

FuYouJ commented 2 months ago

en: I spent about two days setting up Kafka to reproduce your issue because I wanted to understand how Kafka messages are passed to the transformer.en: 我花了大约两天的时间设置 Kafka 来重现您的问题,因为我想了解 Kafka 消息如何传递到变压器。 First, you need to configure Kafka with JSON format serialization and specify the column name as address. The Kafka reader will then extract the address data from the JSON. Consequently, downstream, the data will be: cn:我花费了大概两天时间搭建kafka来复现你这个问题。因为我很想弄明白kafka的消息传到转换器是什么样子的。 首先你得kafka配置的json格式序列化,并且指定了address列名,那么kafkreader就会提取json数据里面的address数据,所以在下游的时候,数据是:

{
        "streetAddress": "naist street",
        "city": "Nara",
        "postalCode": "630-0192"
    }
image

en:Knowing this information, the JSONPath configuration needs to be modified accordingly. My modifications are as follows: cn: 知道了这个信息后,那么jsonpath的配置就要做一定的修改。我的修改如下: image en:知道此信息后,需要相应地修改 JSONPath 配置。我的修改如下: cn: 知道了这个信息后,那么jsonpath的配置就要做一定的修改。我的修改如下:``` env { parallelism = 2 job.mode = "BATCH" }

source { Kafka { schema = { fields { address = "string" } } format = json topic = "test2" bootstrap.servers = "localhost:9092" kafka.config = { max.poll.records = 500 auto.offset.reset = "earliest" } } }

transform { JsonPath { columns = [ { src_field = "address" path = "$.streetAddress" dest_field = "streetAddress" }, { src_field = "address" path = "$.city" dest_field = "city" }, { src_field = "address" path = "$.postalCode" dest_field = "postalCode" } ] } }

sink { Console {} }



en:After configuring it this way, the program runs correctly, as shown in the picture. en:这样配置后,程序就能正确运行了。如图所示。
<img alt="image" width="54.140625" height="21" src="https://private-user-images.githubusercontent.com/51348093/345083205-e5b94ec8-3952-49f9-b450-ee823f0a5be4.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjA0MjcwOTYsIm5iZiI6MTcyMDQyNjc5NiwicGF0aCI6Ii81MTM0ODA5My8zNDUwODMyMDUtZTViOTRlYzgtMzk1Mi00OWY5LWI0NTAtZWU4MjNmMGE1YmU0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzA4VDA4MTk1NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWNhMjljMmMyOWI3NzI4MDQ0Y2Y1MDkzNThhM2IxMWQ5OTQ2Njg4ZTUxYzc3ZTZmYTBjYzk2YmE1NjRiNTA1OGYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.Wzn9vzyM-WZoR1R_9YLun5wkcdWFGH3pSrUKN9WFjjk">

取值这个问题这样写能拿到,但是在能拿到字段信息了之后他有另外一个问题,这个同步到jdbc时生成的sql会将source_field也写入sql中 必须自己再加一层sql转换,具体截图可以看我另外一个回复

The value problem can be obtained by writing it this way, but after obtaining the field information, it has another problem. When synchronizing to jdbc, the generated SQL will also write the source_field into the SQL, and you must add another layer of SQL conversion yourself. For specific screenshots, please refer to my other reply这样写可以得到value的问题,但是得到字段信息后,又出现了一个问题。同步到jdbc时,生成的SQL也会将source_field写入SQL中,必须自己再添加一层SQL转换。具体截图可以参考我的另一条回复

I deliberately designed it this way, parsing field A will create new columns for the new content. 当初是故意这么设计的,参考了其他的转换器,会保留原始列,解析的内容会创建新的列。