pgjdbc / r2dbc-postgresql

Postgresql R2DBC Driver
https://r2dbc.io
Apache License 2.0
1.01k stars 177 forks source link

Hanging when execute statement DELETE and RETURNING #650

Open vttranlina opened 7 months ago

vttranlina commented 7 months ago

Bug Report

Versions

Current Behavior

When attempting to execute a DELETE statement with a RETURNING clause using Flux.from(con.createStatement(...)), the application hangs after processing a few elements in the Flux.

My statement

Flux.from(con.createStatement("DELETE FROM message_mailbox WHERE mailbox_id = $1 RETURNING message_id")

Table schema

create table message_mailbox
(
    mailbox_id    uuid                  not null,
    message_uid   bigint                not null,
    message_id    uuid                  not null,
    save_date     timestamp(6),
    primary key (mailbox_id, message_uid)
);

Steps to reproduce

Input Code ```java public Flux deleteByMailboxIdAndReturning(PostgresMailboxId mailboxId) { AtomicInteger counter = new AtomicInteger(0); AtomicInteger doOnNextCounter = new AtomicInteger(0); return postgresExecutor.connection() .flatMapMany(con -> Flux.from(con.createStatement("DELETE FROM message_mailbox WHERE mailbox_id = $1 RETURNING message_id") .bind(0, mailboxId.asUuid()) .execute()) .flatMap(result -> result.map((row, rowMetadata) -> { String msgIdWasDeleted = row.get(0, String.class); System.out.println("msg was deleted(" + counter.incrementAndGet() + "): " + msgIdWasDeleted); return msgIdWasDeleted; }), 5,15)) .doOnNext(e -> { System.out.println("____doOnNext("+doOnNextCounter.incrementAndGet()+"): " + e); }) .map(UUID::fromString); } ``` It hanging The console output: ``` msg was deleted(1): 018ecb78-7627-7c44-88b5-7f4ec96c757f ____doOnNext(1): 018ecb78-7627-7c44-88b5-7f4ec96c757f msg was deleted(2): 018ecb78-76b9-742c-99cd-02a06d448a27 ____doOnNext(2): 018ecb78-76b9-742c-99cd-02a06d448a27 msg was deleted(3): 018ecb78-76d2-76bb-bfdc-55a19366dce6 ____doOnNext(3): 018ecb78-76d2-76bb-bfdc-55a19366dce6 msg was deleted(4): 018ecb78-76e9-76d2-9084-6b569cfca91e ____doOnNext(4): 018ecb78-76e9-76d2-9084-6b569cfca91e msg was deleted(5): 018ecb78-76ff-783d-9be7-2617dfdb4b39 msg was deleted(6): 018ecb78-771e-7c34-a883-c53dbca3cdaf msg was deleted(7): 018ecb78-7735-79f7-9c4a-468a5ba9eff3 msg was deleted(8): 018ecb78-774c-71b9-81cc-80101fc9553a msg was deleted(9): 018ecb78-7768-7a86-9f58-8bf5c598adf8 msg was deleted(10): 018ecb78-777f-794a-8a5c-3f4c7c42055f msg was deleted(11): 018ecb78-7798-7dc5-b44f-7e521679c832 msg was deleted(12): 018ecb78-77b1-72f8-a7ca-79ebccda235f msg was deleted(13): 018ecb78-77c9-7f52-b58a-81397862e0e3 msg was deleted(14): 018ecb78-77de-7f1a-b2eb-0f53f672d913 msg was deleted(15): 018ecb78-77f6-7bcc-8559-1d2d4e9ddbbc ``` The number of "doOnNext" should be equal to "msg was deleted", but it not If I changed the Flux to Mono> and revert to Flux::fromIterable It works The code just add `.collectList().flatMapMany(Flux::fromIterable)` ```java public Flux deleteByMailboxIdAndReturning(PostgresMailboxId mailboxId) { AtomicInteger counter = new AtomicInteger(0); AtomicInteger doOnNextCounter = new AtomicInteger(0); return postgresExecutor.connection() .flatMapMany(con -> Flux.from(con.createStatement("DELETE FROM message_mailbox WHERE mailbox_id = $1 RETURNING message_id") .bind(0, mailboxId.asUuid()) .execute()) .flatMap(result -> result.map((row, rowMetadata) -> { String msgIdWasDeleted = row.get(0, String.class); System.out.println("msg was deleted(" + counter.incrementAndGet() + "): " + msgIdWasDeleted); return msgIdWasDeleted; }), 5,15)).collectList().flatMapMany(Flux::fromIterable) .doOnNext(e -> { System.out.println("____doOnNext("+doOnNextCounter.incrementAndGet()+"): " + e); }) .map(UUID::fromString); } ``` Then the output is ```text msg was deleted(1): 018ecb7b-dad2-79e5-96ba-4269fb5339f6 msg was deleted(2): 018ecb7b-db69-7d8a-a475-278b2df269f2 ... msg was deleted(49): 018ecb7b-df6c-7a0a-9258-55817f6cc934 msg was deleted(50): 018ecb7b-df81-779c-be86-c7445186aaff ____doOnNext(1): 018ecb7b-dad2-79e5-96ba-4269fb5339f6 ____doOnNext(2): 018ecb7b-db69-7d8a-a475-278b2df269f2 ... ____doOnNext(49): 018ecb7b-df6c-7a0a-9258-55817f6cc934 ____doOnNext(50): 018ecb7b-df81-779c-be86-c7445186aaff ```

Expected behavior/code

The Flux.from(connection.createStatement(...)) should execute normally without hanging.

Additional context


updated: The code for reproduce is more complex: https://github.com/pgjdbc/r2dbc-postgresql/issues/650#issuecomment-2054007452

vttranlina commented 7 months ago

I updated the thread dump when the reactor process hanging threads_report1.txt

mp911de commented 7 months ago

There's no blocked thread, the reported issue requires a bit more digging. I suggest enabling debug logging for the driver so that you see at which Postgres frame the process gets locked up.

vttranlina commented 7 months ago

There's no blocked thread, the reported issue requires a bit more digging. I suggest enabling debug logging for the driver so that you see at which Postgres frame the process gets locked up.

I created a simple project to reproduce it: https://github.com/vttranlina/r2dbc-postgresql-test.git

I updated the re-produce code (it is more complex a bit than what I wrote in the description above)

I tried to print all debug logs, but I don't see anything that I can dig more