confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

Redshift connector - Error: null (INT32) type doesn't have a mapping to the SQL database column type #1140

Open tooptoop4 opened 2 years ago

tooptoop4 commented 2 years ago

config is:

bootstrap.servers=redact
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
schema.registry.url=http://localhost:8081
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
offset.flush.timeout.ms=60000
plugin.path=/home/user/opt/kafka_2.12-2.8.1/connectors/confluentinc-kafka-connect-jdbc-10.2.5/lib
consumer.group.id=my-grouprs3
name=p-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
flush.size=30
topics=redact
tasks.max=1
consumer.group.id=sink3
auto.create=true
connection.url=jdbc:redact
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value

below error:

[2021-11-16 13:40:14,551] ERROR WorkerSinkTask{id=p-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null (INT32) type doesn't have a mapping to the SQL database column type (org.apache.kafka.connect.runtime.WorkerSinkTask:608)
org.apache.kafka.connect.errors.ConnectException: null (INT32) type doesn't have a mapping to the SQL database column type
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1911)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1827)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$39(GenericDatabaseDialect.java:1816)
        at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:560)
        at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:599)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1818)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1735)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:121)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2021-11-16 13:40:14,553] ERROR WorkerSinkTask{id=p-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:190)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: null (INT32) type doesn't have a mapping to the SQL database column type
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1911)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1827)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$39(GenericDatabaseDialect.java:1816)
        at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:560)
        at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:599)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1818)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1735)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:121)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
        ... 10 more
[2021-11-16 13:40:14,553] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:161)

if i don't have below 2 configs then error is about STRUCT doesn't have a mapping

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
gwasky commented 2 years ago

insert.mode property should be insert

sfc-gh-adlee commented 2 years ago

was this resolved?

tooptoop4 commented 2 years ago

u gotta make ur own class, but even then perf is very slow, less than 100 rows per min from memory