r2dbc / r2dbc-mssql

R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
Apache License 2.0
183 stars 32 forks source link

Statements hang up on reading `nvarchar(max)` columns #216

Closed Azukovskij closed 3 years ago

Azukovskij commented 3 years ago

Select query seems to hang-up when fetching data from nvarchar(max), varchar(max) and text columns, e.g.

CREATE TABLE test.Test (id int PRIMARY KEY, data nvarchar(max));
INSERT INTO test.Test(id, data) VALUES(0,  @data100kb);
INSERT INTO test.Test(id, data) VALUES(1,  @data100kb);
...
INSERT INTO test.Test(id, data) VALUES(N,  @data100kb);

hangs up after reading ~10 records:

        final var counter = new AtomicLong();
        return Flux.from(c.createStatement("select * from test.Test").execute())
            .flatMap(r -> r.map((row, m) -> row.get(0)))
            .doOnNext(v -> System.out.println("~~loaded " + counter.incrementAndGet()));
inserted 29
[DEBUG] (main) Using Verbose Console logging
[DEBUG] (main) Creating MssqlConnectionFactory with configuration [MssqlConnectionConfiguration [applicationName="null", connectionId=e02df2d1-3564-46b7-aff5-89fb0f970784, connectTimeout="PT30S", database="null", host="localhost", hostNameInCertificate="localhost", password="**************************", preferCursoredExecution="io.r2dbc.mssql.MssqlConnectionConfiguration$Builder$$Lambda$195/0x000000080033e440@56d93692", port=32821, sendStringParametersAsUnicode=true, ssl=false, sslContextBuilderCustomizer=java.util.function.Function$$Lambda$196/0x000000080033d840@1686f0b4, sslTunnelSslContextBuilderCustomizer=null, tcpKeepAlive="false", tcpNoDelay="true", trustServerCertificate=false, trustStore="null", trustStorePassword="", trustStoreType="null", username="SA"]] from options [ConnectionFactoryOptions{options={host=localhost, username=SA, driver=sqlserver, password=REDACTED, port=32821, user=SA}}]
[DEBUG] (reactor-tcp-nio-2) CursorId: 180150003
~~loaded 1
~~loaded 2
~~loaded 3
~~loaded 4
~~loaded 5
~~loaded 6
~~loaded 7
~~loaded 8
~~loaded 9
~~loaded 10
... live lock past this point ...

in average query seems to hang-up after fetching ~1MB of data, e.g. first row is not loaded with 1MB data column and hangs up on second row with 512kb column size.

Issue is reproducible with integration test r2dbc-mssql-testcase.zip.

Note: adding sleep on each element seems to help the issue, thus it seems that live-lock is caused by incorrect backpressure handling.

gvisokinskas commented 3 years ago

The issue appears when a single row result is split into multiple objects (connection.inbound().receiveObject() returns multiple buffers in ReactorNettyClient).

Because the StreamDecoder implicitly ignores such messages and does not emit them through the responseProcessor, eventually, no new elements are requested from the subscription. (The downstream only requests new elements from the connection.inbound().receiveObject() when a message is emitted through the responseProcessor).

I have created a fix that always emits an IncompleteMessage to the responseProcessor in case the message can't be parsed to force it to request more elements from the subscription: https://github.com/r2dbc/r2dbc-mssql/pull/217

mp911de commented 3 years ago

Thanks a lot for looking into it and bringing up that topic. The writeup is a bit longer to capture the current state of affairs.

StreamDecoder tries to decode messages from the receive buffer. If message chunking happens, then it aggregates messages until it was able to decode something. responseProcessor subscribes to connection.inbound().receiveObject() and signals demand to the channel. StreamDecoder accepting a wrapped responseProcessor, emits individual messages to the subscriber.

If StreamDecoder isn't able to decode anything, then connection.inbound().receiveObject() tracks that it has emitted a message (n = n-1)but responseProcessor demand is still n.

That raises actually two questions:

  1. What to do when the decoder wasn't able to decode anything (1 inbound, 0 outbound)
  2. What to do when the decoder emits multiple messages out of one buffer (1 inbound, 2 outbound)

Going forward, this means that we need to buffer overflow messages and request more messages if we cannot decode a buffer into a message.

The alternative is to rewrite our decoder into accepting ByteBuf and emitting Collection<Message> with a flatMapIterable operator along with a handle operator that skips emission of Collection<Message> is empty.

The longer I think about it, the more I consider that being the easiest approach. Let me know what you think about it.

gvisokinskas commented 3 years ago

I would personally prefer to have the demand controlled by the native Reactor operators. For example, the ReactorNettyClient could look like this:

connection.inbound().receiveObject()
    .flatMapIterable(object -> decoder.decode(...))
    .onErrorResume(this::resumeError)
    .doOnComplete(this::handleClose)
    .subscribeWith(this.responseProcessor)

I experimented with this approach but decided to do a quick fix instead to reduce the risk of regressions and possible hidden side effects.

Another approach would be to maintain the reference to the current subscription and pass it to the decoder so that it would be able to manage the upstream demand based on the number of messages decoded. I'm not sure whether that would be a good solution as it would put too much responsibility on the decoder.

Additionally, I saw that the decoder behavior was changed to reduce the number of objects created (c3e23a791efdc83d42df562fc0a86213c3faa9d8), so the fix needs to account for possible performance regression.

mp911de commented 3 years ago

That's fixed and backported now.