confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

Getting NullPointerExecption when trying to write to TDEngine JDBC driver #1355

Closed jseparovic closed 1 year ago

jseparovic commented 1 year ago

Version: 10.6.1-SNAPSHOT

Getting the following issue when trying to write a row to TDEngine using their JDBC driver. Wondering if this is some issue with the database metadata API, or config based issue:

2023-07-17 06:03:01,753 ERROR || o.a.k.c.r.WorkerSinkTask: WorkerSinkTask{id=destination_sink_db_0501392c63e3417fb780918aad531577_2c07979cc2b946069569b24a1bbb6343_1689573758799-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null
java.lang.NullPointerException
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:828)
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:663)
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeTable(GenericDatabaseDialect.java:862)
    at io.confluent.connect.jdbc.util.TableDefinitions.get(TableDefinitions.java:62)
    at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:64)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:130)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:84)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Here is my connector config:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "{{ connectionUrl }}",
    "connection.user": "{{ username }}",
    "connection.password": "{{ password }}",
    "insert.mode": "insert",
    "delete.enabled": "false",
    "pk.fields": "startDateTime",
    "pk.mode": "record_value",
    "table.name.format": "_{{ destinationPolicy.id | replace('-', '') }}",

    "transforms": "unwrap,filter,rename,extractFieldData,fromJson,changeTopicName",

    "name": connectorName,
    "tasks.max": "1",

    "topics.regex": topicRegex,

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.replication.factor": this.PUBSUB_REPLICATION_FACTOR,
    "errors.log.enable": "true",

    "topic.creation.default.replication.factor": this.PUBSUB_REPLICATION_FACTOR,
    "topic.creation.default.partitions": this.PUBSUB_NUM_PARTITIONS,
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.retention.ms": "86400000", // 1 day retention only

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": true,
    "transforms.unwrap.add.headers": "op,table,source.ts_ms",

    "transforms.filter.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.filter.filter.condition": `$[?(@.${policyField} == '${policyId}')]`,
    "transforms.filter.filter.type": "include",

    "transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.rename.renames": `${dataField}:data`,

    "transforms.extractFieldData.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractFieldData.field": "data",

    "transforms.fromJson.type" : "com.github.jcustenborder.kafka.connect.json.FromJson$Value",
    "transforms.fromJson.json.schema.location" : "Inline",
    "transforms.fromJson.json.schema.inline" : JSONSchema,

    "transforms.changeTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.changeTopicName.regex": `^.+\\.{{ deploymentId }}\\.core\\.([^.]+)?(\\..+)?$`,
    "transforms.changeTopicName.replacement": "$1",
}
jseparovic commented 1 year ago

Didn’t realize tdengine already have a connector. Probably should use that instead. https://github.com/taosdata/kafka-connect-tdengine