eulerto / wal2json

JSON output plugin for changeset extraction
BSD 3-Clause "New" or "Revised" License
1.33k stars 161 forks source link

Null values #140

Closed winarama closed 4 years ago

winarama commented 4 years ago

Howdy folks,

Getting an issue when using wal2json_streaming.

I get a null pointer exception when a null value is encountered.

Can wal2json_streaming handle null values?

eulerto commented 4 years ago

Name is wal2json. Could you share a test case?

mickog commented 4 years ago

Hey, I was working with Winarama on this. We don't actually have access to the data. We are using the streaming plugin. So we have a kafka connector that is using the plugin to pull changes from postgres DB and writing into a topic, but we are getting this error.

org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.debezium.time.Conversions.toEpochMicros(Conversions.java:239)
at io.debezium.connector.postgresql.SourceInfo.update(SourceInfo.java:199)
at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:252)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:134)
at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.doProcessMessage(StreamingWal2JsonMessageDecoder.java:242)
at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.nonInitialChunk(StreamingWal2JsonMessageDecoder.java:178)
at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.processMessage(StreamingWal2JsonMessageDecoder.java:159)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:271)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:256)
at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:120)
... 5 more
mickog commented 4 years ago

We think it might be because Wal2Json won't accept null values, but bit lost at this stage. Can you provide any insight? Thanks

eulerto commented 4 years ago

@mickog This does not seems to be a wal2json bug. Open an issue into Debezium project. If you provide enough info (debezium version, wal2json version, complete log output) maybe @gunnarmorling @jpechane could help you.

mickog commented 4 years ago

Thanks @gunnarmorling and @jpechane I will create there also but if we could get any direction here that would be great, the plugin we are using is called wal2json_streaming we are using debezium-connector-postgres-1.0.0 and not sure of the wal2json_streaming version, I expect seen as we installed latest version of debezium and specified plugin.name as wal2json_streaming that it would use the latest plugin version.

Here is the surrounding logs

[2019-12-19 16:38:50,790] INFO Creating thread debezium-postgresconnector-wem-records-stream-producer (io.debezium.util.Threads)

16:38:50
[2019-12-19 16:38:50,791] INFO WorkerSourceTask{id=wem-postgres-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-12-19 16:38:50,791] INFO WorkerSourceTask{id=wem-postgres-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)

16:38:50
[2019-12-19 16:38:50,833] WARN Got out of order chunk ,{ "kind": "insert", "schema": "public", "table": "asynctaskproperty", "columnnames": ["id", "asynctask_id", "name", "txtvalue", "numericvalue", "datevalue"], "columntypes": ["numeric(19,0)", "numeric(19,0)", "character varying(255)", "character varying(2000)", "numeric(19,2)", "timestamp without time zone"], "columnoptional
[2019-12-19 16:38:50,833] WARN Got out of order chunk       ,
{
    "kind": "insert",
    "schema": "public",
    "table": "asynctaskproperty",
    "columnnames": [
        "id",
        "asynctask_id",
        "name",
        "txtvalue",
        "numericvalue",
        "datevalue"
    ],
    "columntypes": [
        "numeric(19,0)",
        "numeric(19,0)",
        "character varying(255)",
        "character varying(2000)",
        "numeric(19,2)",
        "timestamp without time zone"
    ],
    "columnoptionals": [
        false,
        false,
        false,
        true,
        true,
        true
    ],
    "columnvalues": [
        525633171000,
        525633056051,
        "periodType",
        "MONTHLY",
        null,
        null
    ]
}
  , recording artifical TX (io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder)

16:38:50
[2019-12-19 16:38:50,834] ERROR unexpected exception while streaming logical changes (io.debezium.connector.postgresql.RecordsStreamProducer)
[2019-12-19 16:38:50,834] ERROR unexpected exception while streaming logical changes (io.debezium.connector.postgresql.RecordsStreamProducer)

16:38:50
java.lang.NullPointerException at io.debezium.time.Conversions.toEpochMicros(Conversions.java:239) at io.debezium.connector.postgresql.SourceInfo.update(SourceInfo.java:199) at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:252) at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:134) at io.debe
java.lang.NullPointerException
    at io.debezium.time.Conversions.toEpochMicros(Conversions.java:239)
    at io.debezium.connector.postgresql.SourceInfo.update(SourceInfo.java:199)
    at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:252)
    at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:134)
    at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.doProcessMessage(StreamingWal2JsonMessageDecoder.java:242)
    at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.nonInitialChunk(StreamingWal2JsonMessageDecoder.java:178)
    at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.processMessage(StreamingWal2JsonMessageDecoder.java:159)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:271)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:256)
    at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
    at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:120)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

16:38:50
[2019-12-19 16:38:50,885] INFO WorkerSourceTask{id=wem-postgres-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-12-19 16:38:50,885] INFO WorkerSourceTask{id=wem-postgres-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)

16:38:50
[2019-12-19 16:38:50,885] INFO WorkerSourceTask{id=wem-postgres-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-12-19 16:38:50,885] INFO WorkerSourceTask{id=wem-postgres-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)

16:38:50
[2019-12-19 16:38:50,885] ERROR WorkerSourceTask{id=wem-postgres-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
[2019-12-19 16:38:50,885] ERROR WorkerSourceTask{id=wem-postgres-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)

16:38:50
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped. at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151) at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorT
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
    at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
    at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

16:38:50
Caused by: java.lang.NullPointerException at io.debezium.time.Conversions.toEpochMicros(Conversions.java:239) at io.debezium.connector.postgresql.SourceInfo.update(SourceInfo.java:199) at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:252) at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:134)
Caused by: java.lang.NullPointerException
    at io.debezium.time.Conversions.toEpochMicros(Conversions.java:239)
    at io.debezium.connector.postgresql.SourceInfo.update(SourceInfo.java:199)
    at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:252)
    at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:134)
    at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.doProcessMessage(StreamingWal2JsonMessageDecoder.java:242)
    at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.nonInitialChunk(StreamingWal2JsonMessageDecoder.java:178)
    at io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder.processMessage(StreamingWal2JsonMessageDecoder.java:159)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:271)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:256)
    at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
    at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:120)
    ... 5 more

16:38:50
[2019-12-19 16:38:50,885] ERROR WorkerSourceTask{id=wem-postgres-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2019-12-19 16:38:50,885] ERROR WorkerSourceTask{id=wem-postgres-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

16:38:50
[2019-12-19 16:38:50,886] INFO [Producer clientId=producer-6] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2019-12-19 16:38:50,886] INFO [Producer clientId=producer-6] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)

Thanks

gunnarmorling commented 4 years ago

Yes, that NPE definitely is something we should fix in Debezium. Question though is why the commit timestamp seems to be null in this case (this is not about any specific column value). The line numbers from the stacktrace don't match 1.0.0 btw. If you file an issue in our JIRA tracker with minimal steps for reproducing, we can take a look.

mickog commented 4 years ago

Thanks for the reply, the problem is when we get the null pointer exception everything stops and our replication slot turns to false and starts backing up large amount of data. Is there a way to skip on past the null value or anything like that?

Its hard for us to reproduce this because we are only setting up the connector, and have limited knowledge on the data that is coming through. Do you think this looks like it is due to a null value coming through in our data or is it in relation to the timestamp being null?

That is interesting about the version I am going to double check that now!

Thanks

mickog commented 4 years ago

@gunnarmorling we checked the jar is 1.0.0.Final, the logs suggest otherwise?

gunnarmorling commented 4 years ago

we checked the jar is 1.0.0.Final, the logs suggest otherwise?

Something doesn't add up. Here's SourceInfo from 1.0.0: https://github.com/debezium/debezium/blob/v1.0.0.Final/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java#L106-L120

It's not on line 199 as in your stacktrace. It doesn't even invoke toEpochMicros() in that version. Seems you're pulling in and older version from somewhere.

mickog commented 4 years ago

We had been using an older version and updated, this is strange. Going to look into it. Thanks for the quick responses we are a bit lost on this one.

mickog commented 4 years ago

Yea you were right we were looking at the version in the wrong place, we were on version 9.4, updated to latest version and everything started moving again! Appreciate the help, definitely would have missed that had you not spotted it was different version from the logs.