confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
23 stars 958 forks source link

Implicit Conversion error from topic to Microsoft SQL #1138

Open mikementzmaersk opened 3 years ago

mikementzmaersk commented 3 years ago

Hi Getting this error when trying to sink topic data to Microsoft SQL - any ideas of things to try to get around this please?

io.confluent.connect.jdbc.sink.JdbcSinkTask put - Write of 500 records failed, remainingRetries=0 java.sql.BatchUpdateException: Implicit conversion from data type nvarchar to varbinary(max) is not allowed. Use the CONVERT function to run this query. at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2088) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:221) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:187) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

mataralhawiti commented 3 years ago

Can you share your sink connector config please?

mikementzmaersk commented 2 years ago

Sure :-)

Pretty basic... I think it could be something around null values..

{ "name": "sqlsink1", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics.regex": "sqlserver1.dbo.", "connection.url": "jdbc:sqlserver://sqlserver1:1433;database=test;user=kafka1;password=redacted;encrypt=true", "transforms": "unwrap,striptopic", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.striptopic.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.striptopic.regex" : "sqlserver1.dbo.(.)", "transforms.striptopic.replacement" : "$1", "auto.create": "true", "auto.evolve": "true", "insert.mode": "upsert", "delete.enabled": "true", "pk.mode": "record_key" } }

mataralhawiti commented 2 years ago

Thanks. from this :

Implicit conversion from data type nvarchar to varbinary(max) is not allowed. Use the CONVERT function to run this query.

It seems you're trying to inset (nvarchar ) field into (varbinary) in the destination table. Checking your message schema in Kafka would be helpful.

Did you create the table in destination upfront or was it created by the connector ?

nwt commented 2 years ago

I've run into this problem too. It's caused by a message containing a null value for a nullable bytes field.

Here's a script to reproduce it:

#!/bin/sh -ex

curl -sO https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.0.1-post/cp-all-in-one/docker-compose.yml

cat <<EOF >>docker-compose.yml
  sqlserver:
    container_name: sqlserver
    image: mcr.microsoft.com/mssql/server:2019-latest
    ports:
     - 1433:1433
    environment:
     - ACCEPT_EULA=Y
     - MSSQL_AGENT_ENABLED=true
     - MSSQL_PID=Standard
     - SA_PASSWORD=Password!
EOF

docker-compose up -d connect sqlserver

docker exec -i connect sh -e <<EOF
curl -sO https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.6/confluentinc-kafka-connect-jdbc-10.2.6.zip
python -m zipfile -e confluentinc-kafka-connect-jdbc-10.2.6.zip /usr/share/java
EOF

# Wait for Connect.
sh -c 'while ! curl -fs localhost:8083; do sleep 1; done'

curl -H Content-Type:application/json -d @- \
  http://localhost:8083/connectors <<EOF
{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:sqlserver://sqlserver:1433",
    "connection.user": "sa",
    "connection.password": "Password!",
    "auto.create": "true",
    "pk.fields": "id",
    "pk.mode": "record_value",
    "topics": "jdbc-sink"
  }
}
EOF

# In case this timed out waiting for broker to start.
docker-compose up -d schema-registry
sh -c 'while ! curl -fs localhost:8081; do sleep 1; done'

echo '{"id": 1, "nullableBytes": null}' | 
  docker exec -i schema-registry kafka-avro-console-producer \
    --bootstrap-server broker:29092 --topic jdbc-sink \
    --property value.schema='
{
  "type": "record",
  "namespace": "test",
  "name": "Value",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "nullableBytes",
      "type": [
        "null",
        "bytes"
      ]
    }
  ]
}
'