Altinity / clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse®
https://www.altinity.com
Apache License 2.0
234 stars 54 forks source link

Kafka Sink Connector auto create for ClickHouse tables using ReplacingMergeTree Engine with _sign or is_deleted parameter #468

Closed TruongNT95 closed 7 months ago

TruongNT95 commented 8 months ago

Currently, I want to use _sign or is_deleted column as parameter for ReplacingMergeTree EngineTable in ClickHouse. Auto create table will create table in ClickHouse like below:

  │ CREATE TABLE default.test
  (
      `uid` String,
      `_version` UInt32,
      `is_deleted` UInt8
  )
  ENGINE = ReplacingMergeTree(version, is_deleted)
  ORDER BY uid
  SETTINGS index_granularity = 8192 │

Engine of created table as below:

  select engine_full from system.tables where name='test';
  SELECT engine_full
  FROM system.tables
  WHERE name = 'test'

  Query id: 0878247e-18de-49b1-a7c9-1349c1fb9553

  ┌─engine_full───────────────────────────────────────────────────────────────────────────────────────────────────────────┐
  │ ReplacingMergeTree(version, is_deleted) ORDER BY uid SETTINGS clean_deleted_rows = 'Always', index_granularity = 8192 │
  └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

  1 row in set. Elapsed: 0.003 sec. 

But, when I use Kafka Sink Connector to create table, engine of table show as below:

  select engine_full from system.tables where name='dim_user'
  SELECT engine_full
  FROM system.tables
  WHERE name = 'dim_user'

  Query id: a0f5a543-ba8b-45de-9ecb-828b8eb0c533

  ┌─engine_full─────────────────────────────────────────────────────────────────────────────────────┐
  │ ReplacingMergeTree(_version) PRIMARY KEY Email ORDER BY Email SETTINGS index_granularity = 8192 │
  └─────────────────────────────────────────────────────────────────────────────────────────────────┘

  1 row in set. Elapsed: 0.004 sec. 

My Kafka Sink Connector config:

   echo "Using confluent schema registry"
    cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
    {
      "name": "${CONNECTOR_NAME}",
      "config": {
        "connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
        "tasks.max": "20",
        "topics": "${TOPICS}",
        "clickhouse.topic2table.map": "${TOPICS_TABLE_MAP}",
        "clickhouse.server.url": "${CLICKHOUSE_HOST}",
        "clickhouse.server.user": "${CLICKHOUSE_USER}",
        "clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",
        "clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
        "clickhouse.server.port": ${CLICKHOUSE_PORT},
        "clickhouse.table.name": "${CLICKHOUSE_TABLE}",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schemaregistry:8081",
        "value.converter.schema.registry.url":"http://schemaregistry:8081",

        "store.kafka.metadata": true,
        "topic.creation.default.partitions": 6,

        "store.raw.data": false,
        "store.raw.data.column": "raw_data",

        "metrics.enable": false,

        "buffer.flush.time.ms": 500,
        "thread.pool.size": 1,
        "fetch.min.bytes": 52428800,

        "enable.kafka.offset": false,

        "auto.create.tables": true,
        "schema.evolution": false,
        "replacingmergetree.delete.column": "is_deleted"
        }
    }
  EOF

My Debezium Connector config:

    echo "Using confluent schema registry"
      cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
      {
        "name": "${CONNECTOR_NAME}",
        "config": {
          "connector.class": "${CONNECTOR_CLASS}",
          "tasks.max": "1",
          "snapshot.mode": "${SNAPSHOT_MODE}",
          "snapshot.locking.mode": "none",
          "snapshot.delay.ms": 10000,
          "include.schema.changes":"true",
          "include.schema.comments": "true",
          "database.hostname": "${HOST}",
          "database.port": "${PORT}",
          "database.user": "${USER}",
          "database.password": "${PASSWORD}",
          "database.server.id": "${DATABASE_SERVER_ID}",
          "database.server.name": "${DATABASE_SERVER_NAME}",
           "database.include.list": "${DBS}",
          "database.allowPublicKeyRetrieval":"true",
          "database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
          "database.history.kafka.topic": "${KAFKA_TOPIC}",
      "topic.prefix": "${DATABASE_SERVER_NAME}",
          "schema.history.internal.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}", 
          "schema.history.internal.kafka.topic": "schemahistory.${KAFKA_TOPIC}",      
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://schemaregistry:8081",
          "value.converter.schema.registry.url":"http://schemaregistry:8081",

          "topic.creation.$alias.partitions": 1,
          "topic.creation.default.replication.factor": 1,
          "topic.creation.default.partitions": 1,

          "provide.transaction.metadata": "true",
          "max.batch.size": 128000,
          "max.queue.size": 512000
        }
      }
EOF

My Docker-compose.yaml

  version: "3.4"

  # Ubuntu , set this for redpanda to start
  # https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm

  # Clickhouse Table Schema
  # create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id;

  services:
    mysql-master:
      container_name: mysql-master
      image: docker.io/bitnami/mysql:latest
      restart: "no"
      ports:
        - "3306:3306"
      environment:
        - MYSQL_ROOT_PASSWORD=root
        - MYSQL_DATABASE=test
        - MYSQL_REPLICATION_MODE=master
        - MYSQL_REPLICATION_USER=repl_user
        - ALLOW_EMPTY_PASSWORD=yes
      volumes:
       - ./mysqld.cnf:/opt/bitnami/mysql/conf/my_custom.cnf
      # - ../sql/init_mysql.sql:/docker-entrypoint-initdb.d/init_mysql.sql
      healthcheck:
        test: [ 'CMD', '/opt/bitnami/scripts/mysql/healthcheck.sh' ]
        interval: 15s
        timeout: 5s
        retries: 6

    schemaregistry:
      container_name: schemaregistry
      #image: apicurio/apicurio-registry-mem:latest-release
      image: confluentinc/cp-schema-registry:latest
      restart: "no"
      ports:
        - "8081:8081"
      environment:
          - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
          - SCHEMA_REGISTRY_HOST_NAME=schemaregistry
          - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
          - SCHEMA_REGISTRY_DEBUG=true

      depends_on:
        - kafka

    debezium:
      container_name: debezium
      #image: debezium/connect:1.9.5.Final
      build:
        context: ../../docker/debezium_jmx
        args:
          DEBEZIUM_VERSION: 2.1.0.Alpha1
          JMX_AGENT_VERSION: 0.15.0
      restart: "no"
      ports:
        - "8083:8083"
        - "1976:1976"
      extra_hosts:
        - "host.docker.internal:host-gateway"

      environment:
        # Where to find Kafka broker
        - BOOTSTRAP_SERVERS=kafka:9092
        - GROUP_ID=1
        - CONFIG_STORAGE_TOPIC=config-storage-topic-debezium
        - OFFSET_STORAGE_TOPIC=offset-storage-topic-debezium
        - STATUS_STORAGE_TOPIC=status-storage-topic-debezium
        - LOG_LEVEL=INFO
        - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
        - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
        - KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
        - JMXHOST=localhost
        - JMXPORT=1976
        - KAFKA_HEAP_OPTS=-Xms1G -Xmx5G
        #- LOG_LEVEL=DEBUG
      depends_on:
        - kafka

    kafka:
      container_name: kafka
      image: vectorized/redpanda:v22.1.3
      restart: "no"
      ports:
        # Expose LOCALHOST_NETWORK listener on localhost
        - "19092:19092"
      command:
        - redpanda
        - start
        - --overprovisioned
        # Setup listeners for docker network and localhost network
        - --kafka-addr
        - DOCKER_NETWORK://0.0.0.0:9092,LOCALHOST_NETWORK://0.0.0.0:19092
        - --advertise-kafka-addr
        - DOCKER_NETWORK://kafka:9092,LOCALHOST_NETWORK://127.0.0.1:19092

    sink:
      container_name: sink
      image: altinity/clickhouse-sink-connector:${SINK_VERSION}
      restart: "no"
      ports:
        - "18083:8083"
        - "5005:5005"
        - "39999:39999"
      environment:
        # Where to find Kafka broker
        - BOOTSTRAP_SERVERS=kafka:9092
        - GROUP_ID=2
        - CONFIG_STORAGE_TOPIC=config-storage-topic-sink
        - OFFSET_STORAGE_TOPIC=offset-storage-topic-sink
        - STATUS_STORAGE_TOPIC=status-storage-topic-sink
        - LOG_LEVEL=INFO
        - JAVA_DEBUG_PORT=*:5005
        - DEFAULT_JAVA_DEBUG_PORT=*:5005
        - KAFKA_DEBUG=true
        - JMX_PORT=39999
        - KAFKA_HEAP_OPTS=-Xms1G -Xmx5G
        #- LOG_LEVEL=DEBUG
      depends_on:
        - kafka

    clickhouse:
      # clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test
      container_name: clickhouse
      image: clickhouse/clickhouse-server:latest
      restart: "no"
      ports:
        - "8123:8123"
        - "9000:9000"
      environment:
        - CLICKHOUSE_USER=root
        - CLICKHOUSE_PASSWORD=root
        - CLICKHOUSE_DB=test
      ulimits:
        nofile:
          soft: "262144"
          hard: "262144"
      volumes:
        - ../sql/init_clickhouse.sql:/docker-entrypoint-initdb.d/init_clickhouse.sql
        -  ./clickhouse/users.xml:/etc/clickhouse-server/users.xml

    #### MONITORING #####
  #  https://stackoverflow.com/questions/55473562/configuring-a-jmx-prometheus-exporter-docker-container-to-read-jmx-local-applica
  #   Bitnami bitnami/jmx-exporter vs sscaling images

    ## KAFKA UI #####
    console:
      image: docker.redpanda.com/vectorized/console:master-217260f
      restart: on-failure
      entrypoint: /bin/sh
      command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
      environment:
        CONFIG_FILEPATH: /tmp/config.yml
        CONSOLE_CONFIG_FILE: |
          kafka:
            brokers: ["kafka:9092"]
          connect:
            enabled: true
            clusters:
              - name: datagen
                url: http://debezium:8083
      ports:
        - "8089:8080"
      depends_on:
        - kafka

What am I doing wrong?

aadant commented 8 months ago

@subkanthi looks like a config issue ?

TruongNT95 commented 8 months ago

@aadant can you help me for right config pls!!

aadant commented 8 months ago

@TruongNT95 in the description of the problem, i see both _version and version in the table. How can this even work in ClickHouse ?

the sink-connector should determine the name of the version, sign and is_deleted column from the config. sign can be empty. One of is_deleted and sign column should be set. Version column is mandatory.

you can have a sign column and is_deleted, although it is redundant information

please try to define them.

@subkanthi : do you understand the problem here ?

subkanthi commented 8 months ago

@aadant can you help me for right config pls!!

Hi @TruongNT95 the current version of kafka auto create does not create is_deleted columns, anyway you can use the light weight version?

We will work on adding a fix for this.

TruongNT95 commented 8 months ago

@subkanthi @aadant may i use sign column as is_deleted parameter for ReplacingMergeTree engine? If it does how can i config it for Kafka sink connector?

DenisFilippovArammeem commented 8 months ago

Guys how can I specify version, and is_deleted column names in the config?

subkanthi commented 7 months ago

Fixed in 2.0.2 , auto create table should create table with is_deleted column for kafka.

f1ed5d6333f8 :) show create table temporal_types_YEAR4;

SHOW CREATE TABLE temporal_types_YEAR4

Query id: b49f546f-66b5-45a1-ab30-4da2ec55984e

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE datatypes.temporal_types_YEAR4
(
    `Type` String,
    `Minimum_Value` Int32,
    `Mid_Value` Int32,
    `Maximum_Value` Int32,
    `Null_Value` Nullable(Int32),
    `_version` UInt64,
    `is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(_version, is_deleted)
PRIMARY KEY Type
ORDER BY Type
SETTINGS index_granularity = 8192 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

1 row in set. Elapsed: 0.002 sec.