apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

JDBC Postgresql Error: transaction is aborted #13942

Open Paninka opened 2 years ago

Paninka commented 2 years ago

Describe the bug I'm using the JDBC driver to sink Postgresql, and when it gets an error from postgres, i'm not able to ingest more data because I get this Error: current transaction is aborted, commands ignored until end of transaction block

To Reproduce Steps to reproduce the behavior:

  1. Set JDBC Postgresql sink. I did this quickstart https://pulsar.apache.org/docs/en/io-quickstart/#connect-pulsar-to-postgresql

  2. I will send two messages to Pulsar topic from a Java Client and It has setted a schema definition

    getProducer(eventTopic).newMessage(Schema.AVRO(Esquema.class))
         .key("test")
         .value(m)
         .send();
  3. The table of Postgres has a primary key. So It will get an error when It gets a duplicate key

CREATE TABLE test (
    id integer PRIMARY KEY,
    name text null
);
  1. I sent two messages with the same primary key and i got this error, but It keeps on "Trasaction aborted" and i can't send new messages until I delete the sink
2022-01-25T13:22:49,236+0000 [pool-6-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception 
org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2533) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2268) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) ~[postgresql-42.2.12.jar:42.2.12]
        at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.9.1.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "id_pkey"
  Detail: Key (id)=(16377) already exists.
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2533) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2268) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159) ~[postgresql-42.2.12.jar:42.2.12]
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) ~[postgresql-42.2.12.jar:42.2.12]
        at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.9.1.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        ... 3 more

All new messages that i send to topic, it will be ignored by this transaction block.

Desktop (please complete the following information):

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.