confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

It is possible to sink data using JDBC connector without a schema in message? #1347

Open ivanilk opened 1 year ago

ivanilk commented 1 year ago

Hi There, I’m trying to insert my data from a Kafka Topic into clickhouse with theJDBC Sink Connector. This is my connector config

{
  "name": "JdbcSinkConnector",
  "config": {
    "value.converter.schemas.enable": "false",
    "name": "JdbcSinkConnector_my_topic",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.retry.timeout": "100",
    "errors.retry.delay.max.ms": "60000",
    "topics": "my_topic",
    "connection.url": "jdbc:clickhouse://clickhouse:8123/my_db",
    "connection.user": "default",
    "connection.password": "",
    "insert.mode": "insert",
    "batch.size": "1000",
    "table.name.format": "my_tb",
    "pk.mode": "record_value",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

and this is my message

{
  "ts": 1682006722000,
  "sessionId": 194860,
  "auth": "Logged In",
  "level": "free",
  "itemInSession": 5,
  "city": "Erie",
}

but got an errorError: Sink connector 'JdbcSinkConnector_my_topic' is configured with 'delete.enabled=false' and 'pk.mode=record_value' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='my_topic',partition=0,offset=0,timestamp=1687020175058) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask) Pls help me, thanks all

OneCricketeer commented 10 months ago

A schema is required. Otherwise, it cannot guarantee that any two events have the exact same fields or types

OneCricketeer commented 10 months ago

Worth pointing out that Clickhouse has its own Kafka ingest mechanisms. https://clickhouse.com/docs/en/integrations/kafka#choosing-an-option