confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 954 forks source link

Write message to PostgreSQL #1313

Closed tb-tatewilks closed 1 year ago

tb-tatewilks commented 1 year ago

I've been going in circles with this for a few days now. I'm sending data to Kafka using kafkajs. Each time I produce a message, I assign a UUID to the message.key value, and the the message.value is set to an event like this and then stringified:

// the producer is written in typescript
const event = {
    eventtype: "event1",
    eventversion: "1.0.1",
    sourceurl: "https://some-url.com/source"
};
// stringified because the kafkajs producer only accepts `string` or `Buffer`
const stringifiedEvent = JSON.stringify(event);

I start my connect-standalone JDBC Sink Connector with the following configurations:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/eventservice
connection.password=postgres
connection.user=postgres

auto.create=true
auto.evolve=true
topics=topic1
tasks.max=1
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
value.converter.schema.registry.url=http://schema-registry:8081 

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

When I start the connector with connect-standalone worker.properties connect-standalone.properties, it spins up and connects to PostgreSQL with no issue. However, when I produce an event, it fails with this error message:

WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records 
with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:609)

With this stack trace:

org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with 
'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and 
non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254) 
with a HashMap value and null value schema.
    at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
    at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

I've been going back and forth trying to get it to read my messages, but I'm not sure what is going wrong. One solution just leads to another error, and the solution for the new error leads back to the previous error. What is the correct configuration? How do I resolve this?

tb-tatewilks commented 1 year ago

Through a lot of trial and error, I came up with these configuration files:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/db
connection.password=postgres
connection.user=postgres
auto.create=true
auto.evolve=true
topics=<topics>
tasks.max=1
insert.mode=insert
delete.enabled=false
pk.mode=none
consumer.override.auto.offset.reset=latest
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
consumer.override.auto.offset.reset=latest
connector.client.config.override.policy=All

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

There were a few key things that had to change:

Additionally, my JSON messages to Kafka needed to have a format similar to the following:

{
    "schema": { "..." },
    "payload": { "..." }
}

If the schema is not included with the payload, the JDBC Sink Connector cannot parse the JSON values and write them to the correct columns in the database.