confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

NPE at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174) #789

Closed bcwoauthtest closed 3 years ago

bcwoauthtest commented 4 years ago

Thank you for providing a great kafka-connect-jdbc plugin, but when I tried to sink to DB2 in confluent5.4 , there were some NPE throwed out and the sink task is killed and I can,t find the reason, is there possibly a bug here? Could you please have a look at the following stack trace for me?

[2020-02-21 16:00:20,480] INFO Attempting to open connection #1 to Db2 (io.confluent.connect.jdbc.util.CachedConnectionProvider:87) [2020-02-21 16:00:20,689] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:49) [2020-02-21 16:00:20,701] ERROR WorkerSinkTask{id=test-sink-to-db2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559) java.lang.NullPointerException at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:159) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2020-02-21 16:00:20,703] ERROR WorkerSinkTask{id=test-sink-to-db2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:159) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) ... 10 more [2020-02-21 16:00:20,704] ERROR WorkerSinkTask{id=test-sink-to-db2-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

PrettyJerry commented 4 years ago

Unfortunately, we also encountered the same exception,when writing data to DB2 database through JDBC connector & confluent5.4 & db2jcc4.jar in CentOS . I believe NPE is just a mistake. please help me,thanks.

gharris1727 commented 4 years ago

It appears that line 174 is updateStatementBinder.bindRecord(record);, and the only NPE possible there is the updateStatementBinder.

In order for this to happen, flush() needs to be executed before add() can set up the updateStatementBinder. It also needs to be after there's at least one record in the buffer to get past the guard in flush() . I'm not sure how this is possible by just looking at the code, so I think it might be helpful to have some more detailed logs.

@PrettyJerry Can you re-run with TRACE level logging enabled, with and without deletes enabled? Do you see this same behavior with other topics, or with one topic in particular?

PrettyJerry commented 4 years ago

@gharris1727 Thank you very much for your reply. I did it as you suggested, after "delete.enabled=true" added, the exception as shown below:

[2020-02-22 14:45:30,970] INFO JdbcSinkConfig values: auto.create = true auto.evolve = false batch.size = 10 connection.password = [hidden] connection.url = jdbc:db2://10.70.150.33:50033/dbtest99 connection.user = dbtest99 db.timezone = UTC delete.enabled = true dialect.name = fields.whitelist = [] insert.mode = insert max.retries = 10 pk.fields = [] pk.mode = none quote.sql.identifiers = ALWAYS retry.backoff.ms = 3000 table.name.format = ${topic} (io.confluent.connect.jdbc.sink.JdbcSinkConfig:347) [2020-02-22 14:45:30,971] ERROR WorkerSinkTask{id=test-sink-to-db2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled at io.confluent.connect.jdbc.sink.JdbcSinkConfig.(JdbcSinkConfig.java:458) at io.confluent.connect.jdbc.sink.JdbcSinkTask.start(JdbcSinkTask.java:45) at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2020-02-22 14:45:30,972] ERROR WorkerSinkTask{id=test-sink-to-db2-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2020-02-22 14:45:30,973] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105) [2020-02-22 14:45:30,973] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSinkTask:160) java.lang.NullPointerException at io.confluent.connect.jdbc.sink.JdbcSinkTask.stop(JdbcSinkTask.java:107) at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:158) at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

PrettyJerry commented 4 years ago

according to the prompt of exception information above , i added "pk.mode=record_key" the exception as shown below:

[2020-02-22 14:50:47,426] DEBUG Received 500 records. First record kafka coordinates:(my-aggregate-bean-test-0-0). Writing them to the database... (io.confluent.connect.jdbc.sink.JdbcSinkTask:68) [2020-02-22 14:50:47,426] INFO Attempting to open connection #1 to Db2 (io.confluent.connect.jdbc.util.CachedConnectionProvider:87) [2020-02-22 14:50:47,635] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:49) [2020-02-22 14:50:47,645] DEBUG Flushing 10 buffered records (io.confluent.connect.jdbc.sink.BufferedRecords:169) [2020-02-22 14:50:47,645] ERROR WorkerSinkTask{id=test-sink-to-db2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559) java.lang.NullPointerException at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:159) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2020-02-22 14:50:47,647] DEBUG WorkerSinkTask{id=test-sink-to-db2-0} Skipping offset commit, no change since last commit (org.apache.kafka.connect.runtime.WorkerSinkTask:427) [2020-02-22 14:50:47,648] DEBUG WorkerSinkTask{id=test-sink-to-db2-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask:264) [2020-02-22 14:50:47,648] ERROR WorkerSinkTask{id=test-sink-to-db2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:159) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) ... 10 more

PrettyJerry commented 4 years ago

and my data content in kafka topic is : {"_id":"2020-02-20 03:55:55.000", "myKey":"", "myCount":5}

bcwoauthtest commented 4 years ago

@gharris1727 Thanks a lot for your reply, I have found that the problem is relevant to my configurations, I don't use the schema. Following is the converter config: key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false

and the variable schemaChanged (line 82 in file BufferedRecords.java) is always false , so the field updateStatementBinder (line 61 in file BufferedRecords.java) is not intialized at all. when the "add" method is invoked, NPE is thrown out at line 174 in file BufferedRecords.java. Hope this can help you diagnose and resolve the problem . Please help me,Thank you very much.

daehokimm commented 4 years ago

Hi All 👋, IMHO..

When BufferedRecords is created, member variable keySchema and valueSchema are not initialized. so these become to null(ref code). and when schemas.enable is set to false, the record's key&value schema become to null, too(ref code(kafka.connect)). as a result, below conditions are always skipped. and it causes NPE exception related to updateStatementBinder.

    boolean schemaChanged = false;
    if (!Objects.equals(keySchema, record.keySchema())) {
      keySchema = record.keySchema();
      schemaChanged = true;
    }
...
    else {
      // value schema is not null and has changed. This is a real schema change.
      valueSchema = record.valueSchema();
      schemaChanged = true;
    }
jain-lehar commented 4 years ago

Is there solution to this issue?

sandycomp commented 4 years ago

I'm facing the same issue, this time on line 179, there is no additional information. Any solution to this? [2020-04-13 12:05:37,117] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:49) [2020-04-13 12:05:37,126] ERROR WorkerSinkTask{id=mariadb-source-kafka-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559) java.lang.NullPointerException at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:179) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2020-04-13 12:05:37,129] ERROR WorkerSinkTask{id=mariadb-source-kafka-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:179) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) ... 10 more [2020-04-13 12:05:37,129] ERROR WorkerSinkTask{id=mariadb-source-kafka-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2020-04-13 12:05:37,129] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105) [

naartjie commented 4 years ago

The NPE is not a nice error message, and perhaps the purpose of this issue should be for the connector to provide a better, more helpful error message, by failing early. AFAIK jdbc connector doesn't support schemaless messages, and this isn't a legal configuration:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

In my experience, the (only) way to make it work with JSON is to enable schemas:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

And your key and value need to embed the schema via {"schema":..., "payload":...}

RichyP7 commented 4 years ago

AFAIK jdbc connector doesn't support schemaless messages, and this isn't a legal configuration: @naartjie thank you for your suggestions.

The problem occures when enabling the schemas i get an error like

'org.apache.kafka.connect.errors.ConnectException: VRZ.dbo.MyTable.Value (STRUCT) type doesn't have a mapping to the SQL database column type

I disabled it because of the suggestions in this blog

Is this a different error?

daehokimm commented 4 years ago

Hi @RichyP7 IMHO, The error occur when the message struct generated by Source connector is not acceptable with Jdbc Sink connector. check both message struct. :)

Before, when I setup connect pipeline with debezium-mysql-connector(source) and jdbc-sink-connector, I had to configure the unwrap transform to message generated by debezium side for changing message struct. ("transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope")

sandycomp commented 4 years ago

The NPE is not a nice error message, and perhaps the purpose of this issue should be for the connector to provide a better, more helpful error message, by failing early. AFAIK jdbc connector doesn't support schemaless messages, and this isn't a legal configuration:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

In my experience, the (only) way to make it work with JSON is to enable schemas:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

And your key and value need to embed the schema via {"schema":..., "payload":...}

Thanks, This resolved my issue

harjis commented 4 years ago

I had the same problem and enabling schemas fixed it.

My source file looks like this:

{
  "name": "folders-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres-cluster-ip-service",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "my_pgpassword",
    "database.dbname": "graphs-folders-microservice/folders",
    "database.server.name": "folders",
    "database.whitelist": "folders",
    "database.history.kafka.bootstrap.servers": "my-kafka-cp-kafka-headless:9092",
    "database.history.kafka.topic": "schema-changes.customers",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }

My sink file looks like this:

{
  "name": "sink-folders-graphs",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "folders",
    "connection.url": "jdbc:postgresql://postgres-cluster-ip-service:5432/graphs-folders-microservice/graphs",
    "connection.user": "postgres",
    "connection.password": "my_pgpassword",
    "dialect.name": "PostgreSqlDatabaseDialect",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.fields": "id",
    "pk.mode": "record_key"
  }
}

I have installed kafka with helm: helm install my-kafka confluent/cp-helm-charts -f k8s-kafka/values.yaml

The fix is here

echang0929 commented 3 years ago

The NPE is not a nice error message, and perhaps the purpose of this issue should be for the connector to provide a better, more helpful error message, by failing early. AFAIK jdbc connector doesn't support schemaless messages, and this isn't a legal configuration:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

In my experience, the (only) way to make it work with JSON is to enable schemas:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

And your key and value need to embed the schema via {"schema":..., "payload":...}

Yes, I got the same issue when use JsonConverter. Thank you.