housepower / clickhouse_sinker

Easily load data from kafka to ClickHouse
https://housepower.github.io/clickhouse_sinker
Apache License 2.0
514 stars 118 forks source link

When inserting data in CSV format, the array field is empty #151

Closed upupfeng closed 2 years ago

upupfeng commented 2 years ago

My data format is CSV separated by "|".

After writing Clickhouse, other types of fields have values, and only the value of array field is empty.

ddl:

-- ddl
CREATE TABLE IF NOT EXISTS wan (
    timestampstr                  String           ,
    citycode                      String           ,
    deviceid                      String           ,
    sampletime                    DateTime         ,
    username                      String           ,
    attr Nested(
        wanindex                      Int32 ,
        wanname                       String,
        wanavertxrate                 String,
        wanaverrxrate                 String,
        wanmaxtxrate                  String,
        wanmaxrxrate                  String,
        wanupstatistics               String,
        wandownstatistics             String
    )
) ENGINE = MergeTree();

-- desc 
CREATE TABLE wan
(
    `timestampstr` String  ,
    `citycode` String  ,
    `deviceid` String ,
    `sampletime` DateTime  ,
    `username` String ,
    `attr.wanindex` Array(Int32),
    `attr.wanname` Array(String),
    `attr.wanavertxrate` Array(String),
    `attr.wanaverrxrate` Array(String),
    `attr.wanmaxtxrate` Array(String),
    `attr.wanmaxrxrate` Array(String),
    `attr.wanupstatistics` Array(String),
    `attr.wandownstatistics` Array(String)
)ENGINE = MergeTree();

task conf:

  "task": {
    "name": "go_ck_sinker_wan",
    "kafkaClient": "kafka-go",
    "topic": "wan",
    "consumerGroup": "go_ck_sinker_wan",
    "earliest": true,
    "parser": "csv",
    "csvFormat": [
      "timestampstr",
      "citycode",
      "deviceid",
      "sampletime",
      "username",
      "attr.wanindex",
      "attr.wanname",
      "attr.wanavertxrate",
      "attr.wanaverrxrate",
      "attr.wanmaxtxrate",
      "attr.wanmaxrxrate",
      "attr.wanupstatistics",
      "attr.wandownstatistics"
    ],
    "delimiter": "|",
    "autoSchema": false,
    "tableName": "wan",
    "shardingKey": "deviceid",
    "flushInterval": 5,
    "bufferSize": 50000,
    "dims": [
      {
        "name": "timestampstr",
        "type": "String"
      },
      {
        "name": "citycode",
        "type": "String"
      },
      {
        "name": "deviceid",
        "type": "String"
      },
      {
        "name": "sampletime",
        "type": "DateTime"
      },
      {
        "name": "username",
        "type": "String"
      },
      {
        "name": "attr.wanindex",
        "type": "Array(Int32)"
      },
      {
        "name": "attr.wanname",
        "type": "Array(String)"
      },
      {
        "name": "attr.wanavertxrate",
        "type": "Array(String)"
      },
      {
        "name": "attr.wanaverrxrate",
        "type": "Array(String)"
      },
      {
        "name": "attr.wanmaxtxrate",
        "type": "Array(String)"
      },
      {
        "name": "attr.wanmaxrxrate",
        "type": "Array(String)"
      },
      {
        "name": "attr.wanupstatistics",
        "type": "Array(String)"
      },
      {
        "name": "attr.wandownstatistics",
        "type": "Array(String)"
      }
    ]
  }

data:

2022-01-09 000000|0226|FH1111112|2022-01-08 23:59:57|122222|[1,2,3]|['1','2','3']|['0.0','21.0','0.0']|['0.0','1210.0','10403.0']|['0.0','61.0','0.0']|['0.0','3130.0','20808.0']|['1320','1641843','4180']|['1760','92998913','799015136']

result:

Row 1:
──────
timestampstr:           2022-01-09 000000
citycode:               0226
deviceid:               FH1111112
sampletime:             2022-01-08 23:59:57
username:               122222
attr.wanindex:          []
attr.wanname:           []
attr.wanavertxrate:     []
attr.wanaverrxrate:     []
attr.wanmaxtxrate:      []
attr.wanmaxrxrate:      []
attr.wanupstatistics:   []
attr.wandownstatistics: []

attr.* is empty.

What shall I do? Is there a problem with my configuration...

upupfeng commented 2 years ago

I know why.

attr.wanindex will be escaped internally as attr\\.wanindex, resulting in obtaining the data according to attr\\.wanindex the obtained value is null.

Will change attr.wanindex to attr\\.wanindex is enough.