risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.78k stars 561 forks source link

jdbc sink stuck on jvm side #18372

Open wenym1 opened 1 week ago

wenym1 commented 1 week ago

Describe the bug

On a postgresql jdbc sink

log store read epoch does not increase for a long time

await-tree dump

>> Actor 63591
Actor 63591: `xxx` [4584.598s]
  Epoch 7084692759511040 [380.010ms]
    Sink F86700000002 [380.010ms]
      consume_log (sink_id 17099) [!!! 4584.598s]
        log_sinker_send_chunk (chunk 21) [!!! 4579.358s]
        log_sinker_wait_next_response [!!! 4579.358s]
      Merge F86700000001 [380.010ms]
        LocalInput (actor 63596) [380.010ms]
        LocalInput (actor 63595) [380.010ms]
        LocalInput (actor 63594) [380.010ms]
        LocalInput (actor 63593) [380.010ms]

jstack

"Thread-6964" #7058 prio=5 os_prio=0 cpu=329.34ms elapsed=5299.86s tid=0x0000fffe4786c800 nid=0x14d1 runnable  [0x0000fffbbbab8000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.Net.poll(java.base@17.0.12/Native Method)
    at sun.nio.ch.NioSocketImpl.park(java.base@17.0.12/NioSocketImpl.java:186)
    at sun.nio.ch.NioSocketImpl.park(java.base@17.0.12/NioSocketImpl.java:195)
    at sun.nio.ch.NioSocketImpl.implWrite(java.base@17.0.12/NioSocketImpl.java:420)
    at sun.nio.ch.NioSocketImpl.write(java.base@17.0.12/NioSocketImpl.java:445)
    at sun.nio.ch.NioSocketImpl$2.write(java.base@17.0.12/NioSocketImpl.java:831)
    at java.net.Socket$SocketOutputStream.write(java.base@17.0.12/Socket.java:1035)
    at sun.security.ssl.SSLSocketOutputRecord.deliver(java.base@17.0.12/SSLSocketOutputRecord.java:345)
    at sun.security.ssl.SSLSocketImpl$AppOutputStream.write(java.base@17.0.12/SSLSocketImpl.java:1308)
    at java.io.BufferedOutputStream.flushBuffer(java.base@17.0.12/BufferedOutputStream.java:81)
    at java.io.BufferedOutputStream.write(java.base@17.0.12/BufferedOutputStream.java:127)
    - locked <0x00000000ba305df8> (a java.io.BufferedOutputStream)
    at java.io.FilterOutputStream.write(java.base@17.0.12/FilterOutputStream.java:108)
    at org.postgresql.core.PGStream.sendInteger2(PGStream.java:375)
    at org.postgresql.core.v3.QueryExecutorImpl.sendBind(QueryExecutorImpl.java:1707)
    at org.postgresql.core.v3.QueryExecutorImpl.sendOneQuery(QueryExecutorImpl.java:1968)
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1488)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:546)
    - locked <0x00000000bb87e4e8> (a org.postgresql.core.v3.QueryExecutorImpl)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:893)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:916)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1684)
    at com.risingwave.connector.JDBCSink$JdbcStatements.executeStatement(JDBCSink.java:343)
    at com.risingwave.connector.JDBCSink$JdbcStatements.execute(JDBCSink.java:324)
    at com.risingwave.connector.JDBCSink.write(JDBCSink.java:153)
    at com.risingwave.connector.SinkWriterStreamObserver.onNext(SinkWriterStreamObserver.java:132)
    at com.risingwave.connector.JniSinkWriterHandler.runJniSinkWriterThread(JniSinkWriterHandler.java:40)

The code that gets stuck

private void park(FileDescriptor fd, int event, long nanos) throws IOException {
        Thread t = Thread.currentThread();
        if (t.isVirtual()) {
            ...
        } else {
            long millis;
            if (nanos == 0) {
                millis = -1;
            } else {
                ...
            }
            Net.poll(fd, event, millis);
        }
    }

    private void park(FileDescriptor fd, int event) throws IOException {
        park(fd, event, 0);
    }

The 0 timeout is hard coded, so it's unlikely to add timeout with some config.

Error message/log

No response

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

IMAGE: v1.10.1-patch-us-west-2-35-fix-serving-mapping

wenym1 commented 1 week ago

The stuck can not be resolved automatically.

The solution to it is to first run select * from pg_stat_activity; to inspect the status of all connections. From the query result, filter out rows with application_name as PostgreSQL JDBC Driver. In the remaining rows, find out rows that has wait_event as ClientWrite, state as active and query as a prepared DML statement. These rows are likely to be the problematic connections. We can get the pid of these connections, and call SELECT pg_terminate_backend(<pid>) to kill the connections so that the jdbc sink can be unstuck and trigger retry.

Nevertheless, we still need to figure out why the connection is stuck.

wenym1 commented 1 week ago

cc @StrikeW @hzxa21

hzxa21 commented 1 week ago

Maybe related: https://github.com/pgjdbc/pgjdbc/issues/194

StrikeW commented 1 week ago

Maybe related: pgjdbc/pgjdbc#194

I think so. The stack trace is similar to https://github.com/pgjdbc/pgjdbc/issues/194#issuecomment-2163403910. I am not very sure whether rewrite to Rust could improve the stability (https://github.com/risingwavelabs/risingwave/issues/16745). cc @fuyufjh to take a look of the priority.

hzxa21 commented 1 week ago

Maybe related: pgjdbc/pgjdbc#194

I think so. The stack trace is similar to pgjdbc/pgjdbc#194 (comment). I am not very sure whether rewrite to Rust could improve the stability (#16745). cc @fuyufjh to take a look of the priority.

I don't see user reporting issues related to stuck query in https://github.com/sfackler/rust-postgres/issues so I am optimistic about using rust to sink to PG can be more stable.

The issue with java/jdbc is that it is difficult (even impossible) to timeout a query because stmt.executeBatch() is not async and we cannot implement our own timeout mechanism as a safeguard to cancel the query and retry.