redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.14k stars 840 forks source link

JSON datatype not supported for Clickhouse in sql_insert #2903

Open faisal00813 opened 1 month ago

faisal00813 commented 1 month ago

rpk connect version:

Version: 4.34.0
Date: 2024-09-04T20:52:28Z

Clickhouse version: ClickHouse server version 24.9.1 revision 54471

Table DDL:

CREATE TABLE default.test
(

    `date` DateTime,

    `userId` Nullable(String),

    `userEmail` Nullable(String),

    `userType` Nullable(String),

    `userLocation` JSON,

    `productName` Nullable(String),

    `ownerType` Nullable(String),

    `project` Nullable(String),

    `version` Nullable(String),

    `actionType` Nullable(String),

    `payload` JSON,

    `environment` Nullable(String),

    `osVersion` Nullable(String),

    `osName` Nullable(String),

    `osBuild` Nullable(String)
)
ENGINE = MergeTree
PRIMARY KEY (userEmail,
 date,
 actionType)
ORDER BY (userEmail,
 date,
 actionType)
TTL date + toIntervalDay(90)
SETTINGS allow_nullable_key = 1,
 index_granularity = 8192;

YAML:

input:
  broker:
    inputs:
      - generate:
          count: 1
          mapping: |
            root = {
                  "date": "2024-09-27T15:02:08",
                  "timestamp_epoch": 1727449129561,
                  "userId": "e25b407c-f6aa-4e73-88f5-d967d49477dc",
                  "userEmail": "abc@test.com",
                  "userType": "Owner",
                  "userLocation": {},
                  "productName": "mpulse",
                  "ownerType": "commercial",
                  "project": "FFF",
                  "version": "3.1.0",
                  "actionType": "App_Status",
                  "payload": {
                    "Appstate": "background"
                  },
                  "environment": "PROD",
                  "osVersion": "13",
                  "osBuild": "5936d14c3db14578",
                  "osName": "android"
                }
      - generate:
          count: 1
          mapping: |
            root = {
                "date": "2024-09-27T15:02:09",
                "timestamp_epoch": 1727449329561,
                "userId": "e25b407b-f6aa-4e73-88f5-d967d49477dc",
                "userEmail": "abd@test.com",
                "userType": "Owner",
                "userLocation": {},
                "productName": "mpulse",
                "ownerType": "commercial",
                "project": "AAA",
                "version": "3.1.0",
                "actionType": "App_Status",
                "payload": {
                  "Appstate": "background"
                },
                "environment": "PROD",
                "osVersion": "13",
                "osBuild": "5936d14c3db14578",
                "osName": "android"
              }
    batching:
      count: 1
      period: "100ms"
output:
  label: "clickhouse_insert"
  sql_insert:
    driver: clickhouse
    dsn: "clickhouse://default:password@localhost:9000/default?debug=true"
    table: test
    suffix: "FORMAT JSONEachRow"
    columns:
      - date
      - userId
      - userEmail
      - userType
      - userLocation
      - productName
      - ownerType
      - project
      - version
      - actionType
      - payload
      - environment
      - osVersion
      - osName
      - osBuild
    args_mapping: |
      root = [
      (this.timestamp_epoch/1000).ts_format("2006-01-02 15:04:05", "UTC"),
      this.userId,
      this.userEmail,
      this.userType,
      this.userLocation.string(),
      this.productName,
      this.ownerType,
      this.project,
      this.version,
      this.actionType,
      this.payload.string(),
      this.environment,
      this.osVersion,
      this.osName,
      this.osBuild
      ]
    batching:
      count: 1
      period: 1s

error:

INFO Received SIGINT, the service is closing       @service=redpanda-connect
[clickhouse][conn=7][52.14.155.152:9000][table columns]
[clickhouse][conn=7][52.14.155.152:9000][read data] decode error: clickhouse: unsupported column type "JSON"
[clickhouse-std][conn=0][localhost:9000] PrepareContext error: clickhouse: unsupported column type "JSON"
ERRO Failed to send message to sql_insert: clickhouse: unsupported column type "JSON"  @service=redpanda-connect label=clickhouse_insert path=root.output
[clickhouse][conn=8][52.14.155.152:9000][handshake] <- ClickHouse server version 24.9.1 revision 54471 (timezone Etc/UTC)
[clickhouse][conn=8][52.14.155.152:9000][send query] compression="none" INSERT INTO test (date,userId,userEmail,userType,userLocation,productName,ownerType,project,version,actionType,payload,environment,osVersion,osName,osBuild) FORMAT Native
[clickhouse][conn=8][52.14.155.152:9000][send data] compression="none"
[clickhouse][conn=8][52.14.155.152:9000][table columns]
[clickhouse][conn=8][52.14.155.152:9000][read data] decode error: clickhouse: unsupported column type "JSON"
[clickhouse-std][conn=0][localhost:9000] PrepareContext error: clickhouse: unsupported column type "JSON"
ERRO Failed to send message to sql_insert: clickhouse: unsupported column type "JSON"  @service=redpanda-connect label=clickhouse_insert path=root.output
faisal00813 commented 1 month ago

Side note:

  1. Removing .string() in args_mapping does not help
  2. Suffix seems to be not working
faisal00813 commented 1 month ago

sql_insert works fine when changing the column to object('json') from JSON. object('json') has been deprecated in Clickhouse

DDL


SET allow_experimental_json_type = 1;
CREATE TABLE default.horizon_events_test
(

    `date` DateTime,
    `timestamp_epoch` DateTime DEFAULT now(),

    `userId` Nullable(String),

    `userEmail` Nullable(String),

    `userType` Nullable(String),

    `userLocation` object('json'),

    `productName` Nullable(String),

    `ownerType` Nullable(String),

    `project` Nullable(String),

    `version` Nullable(String),

    `actionType` Nullable(String),

    `payload` object('json'),

    `environment` Nullable(String),

    `osVersion` Nullable(String),

    `osName` Nullable(String),

    `osBuild` Nullable(String)
)
ENGINE = MergeTree
PRIMARY KEY (userEmail,
 date,
 actionType)
ORDER BY (userEmail,
 date,
 actionType)
TTL date + toIntervalDay(90)
SETTINGS allow_nullable_key = 1,
 index_granularity = 8192;