pgjdbc / r2dbc-postgresql

Postgresql R2DBC Driver
https://r2dbc.io
Apache License 2.0
1.01k stars 177 forks source link

ReactorNettyClient stucked on cancelled Conversation if that Conversation has more than 256 rows (size of reactor.bufferSize.small) #661

Open alexeykurshakov opened 1 month ago

alexeykurshakov commented 1 month ago

Bug Report

Versions

Current Behavior

When you have query zipped in parallel with some other failed function and that query return more than 256 rows it can leads to the case when you no have real consumer, because chain was cancelled, but you receive data from database that start to save it to ReactorNettyClinet.buffer. When this happens, any other attempts to get data from the database will fail because ReactorNettyClient.BackendMessageSubscriber.tryDrainLoop never call drainLoop because stucked conversation no have demands

private void tryDrainLoop() {
    while (hasBufferedItems() && hasDownstreamDemand()) {
        if (!drainLoop()) {
            return;
        }
    }
 }

Can reproduce using https://github.com/agorbachenko/r2dbc-connection-leak-demo If you increase System property "reactor.bufferSize.small" to 350, the attached example will start working

mp911de commented 1 month ago

Thanks a lot for chasing this issue down. Since you invested about 80% of the effort that is required to fix the issue, do you want to submit a pull request to clear out the cancelled conversations?

alexeykurshakov commented 1 month ago

I've never worked before with reactor library (mono, flux). But I found that it's not easy to track down what is the source of cancellation - error in parallel zip function, ordinal cancel or cancellation from Mono.from(fluxPublisher). For example

  Mono.from(Flux.just(1, 2, 3).doOnCancel(() -> {
                System.out.println("fire");
            })).subscribe();

will fire println with first emit and

 Flux.just(1, 2, 3).doOnCancel(() -> {
                System.out.println("fire");
            }).subscribe();

no println "fire" If you can help me track down the type of cancellation, sure, I can make a pull request.

chemicL commented 1 month ago

@alexeykurshakov these cancellations have reasonable explanations. A couple examples:

For inspiration regarding test cases, perhaps you can use my examples with mocks. This was part of the investigation whether the r2dbc-pool is responsible for the connection leaks in https://github.com/r2dbc/r2dbc-pool/issues/198#issuecomment-1980367615.

alexeykurshakov commented 1 month ago

@mp911de https://github.com/pgjdbc/r2dbc-postgresql/blob/a13c02c09f83d44feaaea6d3416e3d6d5e0a5ad6/src/main/java/io/r2dbc/postgresql/PostgresqlStatement.java#L257 if you in SimpleQueryMessageFlow.exchange the original cancellation just ignored. I don't understand the correct behaviour Why you discard cancellation with Operators.discardOnCancel and what .doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release) should do?

mp911de commented 1 month ago

Operators.discardOnCancel is to drain protocol frames off the transport so that we can finalize the conversation with the server. If we just cancelled the consumption, then response frames from an earlier conversation would remain on the transport and feed into the next conversation.

alexeykurshakov commented 1 month ago

Sounds like it should works, but not 🤣. According to an issue example badThread never consumed data and sending cancel signal after real data feed ReactorNettyClient that leads to the case when it saved this messages in internal buffer. So in that example discard happened too late.

alexeykurshakov commented 1 month ago

I can provide a timeline of what happened. And then we'll figure out how to fix it.

travispeloton commented 2 weeks ago

Hello! We've been hit by similar issue this past week during some load testing. I have attached a stacktrace. We also saw a few Netty LEAK errors stacktrace.