Altinity / clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse®
https://www.altinity.com
Apache License 2.0
234 stars 54 forks source link

Tables does not get created in clickhouse #919

Open rezashabrang opened 1 week ago

rezashabrang commented 1 week ago

This is my debezium config:

  {
    "name": "${CONNECTOR_NAME}",
    "config": {
      "connector.class": "${CONNECTOR_CLASS}",
      "tasks.max": "1",
      "snapshot.mode": "schema_only",
      "snapshot.locking.mode": "none",
      "snapshot.delay.ms": 10000,
      "include.schema.changes":"true",
      "include.schema.comments": "true",
      "database.hostname": "${HOST}",
      "database.port": "${PORT}",
      "database.user": "${USER}",
      "database.password": "${PASSWORD}",
      "database.server.id": "${DATABASE_SERVER_ID}",
      "database.server.name": "${DATABASE_SERVER_NAME}",
      "database.whitelist": "${DBS}",
      "database.allowPublicKeyRetrieval":"true",
      "database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
      "database.history.kafka.topic": "${KAFKA_TOPIC}",
      "database.ssl.mode": "required",

      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",

      "key.converter.schema.registry.url": "http://schemaregistry:8081",
      "value.converter.schema.registry.url":"http://schemaregistry:8081",

      "topic.creation.$alias.partitions": 2,
      "topic.creation.default.replication.factor": 1,
      "topic.creation.default.partitions": 2,

      "provide.transaction.metadata": "true",
      "max.batch.size": 20000,
      "max.queue.size": 100000,
      "max.queue.size.in.bytes": 1000000000,

      "topic.prefix": "${DATABASE_SERVER_NAME}",
      "schema.history.internal.kafka.topic": "${KAFKA_TOPIC}",
      "schema.history.internal.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
      "tombstones.on.delete": "true"

    }
  }

And this is my sink config:

{
  "name": "${CONNECTOR_NAME}",
  "config": {
    "connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
    "tasks.max": "3",
    "topics.regex": "SERVER5432.**",
    "clickhouse.server.url": "${CLICKHOUSE_HOST}",
    "clickhouse.server.user": "${CLICKHOUSE_USER}",
    "clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",
    "clickhouse.server.port": ${CLICKHOUSE_PORT},
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schemaregistry:8081",
    "value.converter.schema.registry.url":"http://schemaregistry:8081",

    "enable.snapshot.ddl": true,

    "batch.max.records": 1000,
    "sink.connector.max.queue.size": 1000,

    "single.threaded": false,

    "topic.creation.default.partitions": 3,
    "snapshot.mode": "initial",

    "max.batch.size": 1000,
    "batch.size": 1000,

    "metrics.enable": true,
    "metrics.port": 8084,
    "buffer.flush.time.ms": 1000,
    "buffer.max.records": 10000,
    "thread.pool.size": 3,
    "fetch.min.bytes": 5242880,

    "enable.kafka.offset": false,
    "store.kafka.metadata": false,

    "auto.create.tables": true,
    "auto.create.tables.replicated": true,
    "schema.evolution": true,

    "replacingmergetree.delete.column": "_sign",
    "treat.tombstone.as.delete": true,
    "delete.enabled": true
    }
}

I get the log from container that it finds the type as ARRAY and converts it. I also see the preCommit in the logs but the tables are not created in clickhouse. My goal is to only make the tables from source (MySQL) in clickhouse. I am using altinity/clickhouse-sink-connector:2.4.0-kafka this image.

Is something missing?

aadant commented 1 week ago

I would recommend that you use the sink-connector lightweight. See for example https://github.com/Altinity/clickhouse-sink-connector/discussions/810. It works very well with MySQL including DDL