apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

JDBC Sinks Stop Working as Intended after a Database Error #9464

Open alexanderursu99 opened 3 years ago

alexanderursu99 commented 3 years ago

Describe the bug When running a ClickHouse JDBC Sink, and encountering some error from the database (e.g. timeout), the sinks seems to continue consuming, but not actually insert or ack any further messages.

To Reproduce Steps to reproduce the behavior:

  1. Run a ClickHouse JDBC sink with batch size 100000 and timeout 60000ms, using kubernetes runtime on 2.6.1
  2. Restart the ClickHouse instance to produce an error for the jdbc-driver client in the sink
  3. Inspect logs and behaviour of sink from metrics

Expected behavior The sink should recover, and be able to continue inserting and acking messages.

Logs

23:10:12.763 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:12:12.766 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:13:10.794 [pool-5-thread-1] INFO ru.yandex.clickhouse.ClickHouseStatementImpl - Error during connection to ru.yandex.clickhouse.settings.ClickHouseProperties@3eb4c763, reporting failure to data source, message: Connect to 192.168.1.135:8123 [/192.168.1.135] failed: Connection refused (Connection refused)
23:13:10.796 [pool-5-thread-1] INFO ru.yandex.clickhouse.ClickHouseStatementImpl - Error sql: INSERT INTO skew_iv(currency,timestamp,spot_price,expiration_timestamp,ttm,ttm_fractional,rfr,alpha,beta,nu,rho,atm_iv,smile,skew,is_interpolated) VALUES('ETH',1612393920000,1641.27,1612425600000,31680000,0.0010038787,1.4379762,1.4130342,1.0,17.782494,0.21298867,1.4470383,-0.026410576,-0.0877126,0)
23:13:10.807 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 210, host: 192.168.1.135, port: 8123; Connect to 192.168.1.135:8123 [/192.168.1.135] failed: Connection refused (Connection refused)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:89) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:55) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:24) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:633) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:90) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.execute(ClickHouseStatementImpl.java:226) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.execute(ClickHousePreparedStatementImpl.java:105) ~[clickhouse-jdbc-0.2.4.jar:?]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.6.1.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to 192.168.1.135:8123 [/192.168.1.135] failed: Connection refused (Connection refused)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:159) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:373) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[httpclient-4.5.5.jar:4.5.5]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614) ~[clickhouse-jdbc-0.2.4.jar:?]
... 14 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_252]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_252]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_252]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_252]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_252]
at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_252]
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:373) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[httpclient-4.5.5.jar:4.5.5]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614) ~[clickhouse-jdbc-0.2.4.jar:?]
... 14 more
23:13:12.768 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:14:12.772 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:15:12.776 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

In the logs you can see that the sink logs the regular update, shows the error from having the connection refused by ClickHouse (for now this happens when we have a restart), and the regular updates are being logged again, similar to how they were before.

Screenshots

image

In this screenshot you can see how there was a point where the backlog was accumulating. This was one instance of this error affecting the sink. Then the backlog comes back down after I manually restarted the sink from the CLI, which had the sink running properly again. And then later, another instance of this error occurred, and the backlog begins to accumulate again.

Additional context Mentioned in the steps to reproduce:

  1. Using version 2.6.1
  2. Running function workers with kubernetes runtime
  3. Set sink batch size to to 100000
  4. Set sink timeout to 60000ms (1min)

Ideas

My working theory is that there's either something wrong logically with the JDBC sinks, where they somehow don't work properly after encountering some error from the database.

Or, there is something wrong more specifically with the ClickHouse JDBC driver being used, and it doesn't handle errors correctly.

I have not tested this with any other databases, but I imagine a quick test with either PostgreSQL or MySQL may reveal if this is a general issue with the JDBC sinks or not.

alexanderursu99 commented 3 years ago

Updated the title, since I tested this using a PostgreSQL sink connector and get the same result, and I now believe this is a general issue with JDBC sinks.

alexanderursu99 commented 3 years ago

Logs from the PostgreSQL sink connector. Configured with all the same settings as were used with the ClickHouse sink connector, and using the same Kubernetes runtime.

20:46:59.030 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:45:59.029 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:44:59.027 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:43:59.025 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:42:59.024 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:41:59.023 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:40:59.022 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:39:59.020 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
... 12 more
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2044) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.PGStream.receiveChar(PGStream.java:372) ~[postgresql-42.2.12.jar:42.2.12]
Caused by: java.io.EOFException
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_252]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.6.1.jar:?]
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:340) ~[postgresql-42.2.12.jar:42.2.12]
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
20:39:56.968 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
20:38:59.019 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:37:59.017 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:36:59.016 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:35:59.015 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:34:59.014 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:33:59.011 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:32:59.010 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:31:59.008 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:30:59.006 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 2.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
alexanderursu99 commented 3 years ago

Now believe that this issue is related to using the EFFECTIVELY_ONCE mode for the sink. The issue doesn't seem to happen when using ATLEAST_ONCE.

codelipenghui commented 3 years ago

Thanks @Alxander64 , Sorry for the late response, have you tried the new Pulsar version 2.7.0? or 2.6.3? If the problem still there, we need to fix it ASAP

alexanderursu99 commented 3 years ago

I have recently updated to 2.6.3, but since then I've only been running sinks on a more stable database. I first noticed this issue when sinking to ClickHouse, which I didn't have a great production setup for.

alexanderursu99 commented 3 years ago

For a simple test, I had my Pulsar cluster in k8s and brought up a singe Postgres replica with a Helm chart. I had a sink running configured like how I described above, and then I just deleted the pod running Postgres and waited for it to respawn. If new rows don't eventually populate in the table being sinked to, then the problem persists.