r2dbc / r2dbc-spi

Service Provider Interface for R2DBC Implementations
Apache License 2.0
422 stars 57 forks source link

Allow consuming arbitrary result segments (rows, out parameters, update count, messages) #27

Closed lukaseder closed 3 years ago

lukaseder commented 5 years ago

I'm assuming that if the statement's returning stream is finished, the Publisher returned by Statement#execute() will simply stop producing results.

But if there is a result, in order to distinguish whether it is an update count or a result set, should we simply try calling Result#getRowsUpdated(), and check if the resulting Publisher is empty (which would mean that there must be a result set)?

Maybe, it would be a bit nicer if there was a third method Result#getResultType() returning an enum that describes the result. Or, at least, specify the behaviour in the Javadoc

mp911de commented 5 years ago

Publishers are one-time use only (similar to Java 8 Stream). With calling a method returning a Publisher, you indicate that you want to materialize a query using a specific projection. Subscribing to that Publisher will start executing whatever is required to get you that result.

That said, calling Statement#execute prepares SQL execution. Calling a method on Result starts consuming the server response in a specific way and as soon as the response is consumed, the Publisher signals completion. If you'd call an additional method on Result, there's no server response left to be processed.

Right now, Result#getRowsUpdated() are implemented to consider server frames reporting a count. If the server does not send a count, then we don't have a value to emit. I think it makes sense to get a standard behavior and make sure to emit a count value in any case.

I also think we should include bits of this answer into the Javadoc/spec doc.

lukaseder commented 5 years ago

So, we cannot probe Result#getRowsUpdated() to see which kind of result we currently have? Then we definitely need a Result#getResultType() method, or some other mechanism

mp911de commented 5 years ago

Exactly, if you consume rows, you can't get the update count and vice versa. I'm not sure whether a result type would work since reactive is all about deferring and streaming. So we would need to consume the response to inspect it but at the same time, we do not want to buffer responses so the response would be again consumed by peeking on the stream.

Can you shed a bit of light on the use case? In Spring Data R2DBC we basically have a similar situation. We have two cases where affected row count vs. rows plays a role:

  1. Usage of the client API executing arbitrary SQL: Client code decides whether it is interested in rows or the count.
  2. Repository usage: In the repository support we consume affected row count on DML queries and rows on SELECT queries.
lukaseder commented 5 years ago

Well, the use case is simple, see also this discussion: https://groups.google.com/forum/#!topic/r2dbc/QZpTpQtj1HA

Some databases (e.g. SQL Server) allow for arbitrary statement batches, e.g.

DECLARE @t TABLE(i INT);
INSERT INTO @t VALUES (1),(2),(3);
RAISERROR('message 1', 16, 2, 3);
RAISERROR('message 2', 16, 2, 3);
SELECT * FROM @t
RAISERROR('message 3', 16, 2, 3);

It should produce a result like this:

Update Count: 3
Exception   : message 1
Exception   : message 2
Result      :
  i: 1
  i: 2
  i: 3
Exception   : message 3

Now, the behaviour of these batches are not necessarily constant / known in advance. For instance, you could wrap some SELECT in an IF, and if that IF gets skipped, then all the results will shift by one (or more). These batches can also be contained in any stored procedure, or even in a trigger (!).

There definitely needs to be some way to discover without knowing what kind of result we're getting at any given position. I know this is an edge case, even for SQL Server users. But I do think it's worth thinking about this before releasing 1.0.0.GA.

This is an SPI to be consumed by tools that are not necessarily in control of the SQL being executed, so those tools need to be able to discover the results entirely dynamically.

mp911de commented 5 years ago

I think we should be able to allow result consumption and then asking for affected row counts. The other way round would not work. So you could call Result#getRowsUpdated() after Result#map but not Result#map after Result#getRowsUpdated().

lukaseder commented 5 years ago

... in fact, if you do not want to peek on the stream, then the only solution I can see is to replace the current 2 methods on Result by a single one, that does everything:

public interface Result {
    <T> Publisher<T> map(
        Function<Integer, T> rowsUpdated,
        BiFunction<Row, RowMetadata, ? extends T> rowsFetched
    );
}

This will make it a bit harder to be forwards compatible, e.g. if stored procedure out parameter support will be added only in a later release. In that case, maybe a wrapper type for these functions might be useful:

public interface Result {
    interface Mapping<T> {
        Function<Integer, T> rowsUpdated();
        BiFunction<Row, RowMetadata, ? extends T> rowsFetched();
        Function<Object, T> outParameterFetched();
    }

    <T> Publisher<T> map(Mapping<T> mapping);
}
lukaseder commented 5 years ago

I think we should be able to allow result consumption and then asking for affected row counts

It would be possible, but complicated to get right. This is what makes JDBC so messy, the fact that it is such a stateful implementation of a stream. In order to get things right, the correct order of method calls has to be observed, and that's quite hard.

Even if you say that this SPI shouldn't be used by clients directly, it will be, and there will be tons of questions by beginners. :)

nebhale commented 5 years ago

Exactly, if you consume rows, you can't get the update count and vice versa.

Due to the poor documentation, it’s not clear, but this is not true. It may be PostgreSQL-specific (again, better specification) but you absolutely can peek and we do maintain a small amount of state to facilitate this. We .cache() the first three frames which includes the updated row count if it exists. Using filters in all the pertinent outputs, we can replay those first three frames, and any that follow to generate RowMetadatas, Rows, and update counts in parallel with each other. This would lead to behaviors like (and I’m freehanding, so forgive the likely non-compilation, you’ll get the idea):

result
  .rowsUpdated()
  .flatMap(count -> doSomethingWithCount(count))
  .switchIfEmpty(result.rows()
    .flatMap(row -> doSomethingWithRow(row)))
mp911de commented 5 years ago

For SQL Server, we receive count along with the Done message which comes last (either with a direct response or with the consumption of the entire cursor) so for SQL server we can't easily cache and replay the entire stream.

elefeint commented 5 years ago

Have you considered having different API for select statements vs DML statements? ResultSet containing two almost mutually exclusive types of data (number of rows affected vs rows returned) seems to point at separate concerns being handled in one interface.

Having separate methods on Connection for createStatement() and createDmlStatement() would let the client application express intent, and then to receive a specific, expected kind of result back.

Also*, if R2DBC someday begins supporting streaming data into the database, it will need a separate method on Connection anyway, something like

StreamingStatement createStreamingStatement(Publisher<Query> queries);

*This is a complete hypothetical right now, as we have no way of supporting streaming into the database with Cloud Spanner.

lukaseder commented 5 years ago

Having separate methods on Connection for createStatement() and createDmlStatement() would let the client application express intent, and then to receive a specific, expected kind of result back.

That may sound reasonable for most queries (on an 80/20 basis), but not for edge cases, which an SPI like R2DBC should support IMO. Have you seen my comment here: https://github.com/r2dbc/r2dbc-spi/issues/27#issuecomment-442007684

RDBMS like SQL Server support mixed result type statement batches, where a mix of result sets, update counts, and exceptions/messages/signals can result from a query execution.

On an SPI level, I'm not really fan of distinguishing between "types" of statements. On a higher level API level, such convenience API for the 80/20 case might definitely make sense.

elefeint commented 5 years ago

@lukaseder I was kind of inspired by your comment :) Looking at the stored-proc-like syntax got me to wonder: what would/could a client do with such mixed output, other than print it. This makes a separate result type that knows what it is more useful -- instead of getting back a dozen Results, each of which would have to be checked for the presence of getRowsUpdated(), perhaps, similarly to your original request for Result#getResultType(), there could be different result classes in the first place.

createStatement() and createDmlStatement() sound like they would preclude such mixed sql, but since stored procedures are getting designed somewhere, I would assume the execution pathway for multiple mixed statements would be similar to those.

lukaseder commented 5 years ago

what would/could a client do with such mixed output, other than print it

The main point of batching is always to reduce network traffic between client and server. If it is possible to send more "messages" in a single "envelope", we can improve throughput. Printing is a trivial use case of course, but composing messages for other use cases than printing them is surely desireable in general.

This was exceptionally difficult to get right with JDBC. I'm here to make sure it is less difficult with R2DBC - of course, without compromise on the much more popular use case of receiving only a single result set or a single update count.

createStatement() and createDmlStatement()

I understand the wish for distinction (it would have to focus on the nature of the outcome, not the statement type, as DML statements, assuming you mean insert/update/delete/merge, can also produce result sets via returning/output/data change delta table syntax, or triggers), but I still doubt the SPI needs to do it. An API building on top of it: Definitely. An example is this one: https://github.com/r2dbc/r2dbc-client

Perhaps, what you really wish for is to be able to provide a filter / predicate already when executing a statement.

This has been discussed elsewhere, among other places (I forgot?), here: https://groups.google.com/d/msg/r2dbc/12nq6d1l62Q/rPjStz5xBwAJ

That discussion on the group also evolves around out parameters (which can in turn be result sets, again). They're another way for a database to interleave a new type of result with the other ones.

mp911de commented 5 years ago

Like Lukas mentioned, this SPI is intended for driver/client library communication. Neither of these components is aware which type of statement is going to be executed when passing-thru statements. Postgres and SQL server report also the number of affected rows on SELECT statements. I'm not sure this assumption is true for other SQL databases, however, we see it as a by-product of queries.

Having this kind of split requires the client side to be aware of what type of SQL is going to be executed.

An INSERT statement can return tabular data when e.g. the database returns generated keys. Not sure if INSERT triggers could also return tabular data.

Regarding streaming into the database:

We entirely are missing the necessary infrastructure because most database protocols work command-oriented. If you had a stream source today, you would do the following:

Connection c = …;
Flux<String> stream = …;
stream.flatMap(it -> {
    return c.createStatement("INSERT INTO person VALUES($1)").bind("$1", it).execute();
}).flatMap(Result::getRowsUpdated)
lukaseder commented 5 years ago

Not sure if INSERT triggers could also return tabular data.

In SQL Server, and since 12c in Oracle, yes, they can.

mp911de commented 5 years ago

Thanks for confirming. This is yet another reason to keep the class structure as-is.

elefeint commented 5 years ago

Thanks! Having the client-friendly layer above the driver makes sense.

For Cloud Spanner, we'll have to do query type detection to handle DML, since there is no API that supports every type of query we'd have to handle.

I'll stop hijacking this thread for now.

lukaseder commented 3 years ago

I'm currently verifying what can be done with current versions of R2DBC. Looks like this is still an open issue?

  1. It seems the API is not yet ready to handle such cases.
  2. The interleaving of exceptions / warnings / signals between update counts and result sets hasn't been addressed at all, yet
  3. The SQL Server implementation doesn't even seem to get this case right:
System.out.println((
    Flux.from(connectionFactory.create())
        .flatMap(c -> c
            .createStatement("DECLARE @t TABLE(i INT);\n"
                + "INSERT INTO @t VALUES (1),(2),(3);\n"
                + "SELECT * FROM @t;\n")
            .execute())
        .flatMap(it -> {
            System.out.println("IT: " + it);
            return it.getRowsUpdated();
        })
        .collectList()
        .block()
));

The output being only a single update count:

IT: io.r2dbc.mssql.MssqlResult@3df5a869
[6]

I've created a new issue for this: https://github.com/r2dbc/r2dbc-mssql/issues/196

mp911de commented 3 years ago

This is still an open issue.

A challenge from what we've seen in drivers is to actually know what type of result something is. To know, whether it's an update-count only, drivers need to consume the entire response for Postgres and SQL Server (making sure there are no data rows). To determine whether something is an error/warning result, drivers need to consume the entire result to ensure there are no update counts or rows associated with the response.

In several cases, such an indicator would remove the streaming nature and that isn't something we're looking for. Probably the best way out would something along the lines of your earlier proposal https://github.com/r2dbc/r2dbc-spi/issues/27#issuecomment-442009634 to flatMap(…) incoming data segments (rows, output parameter, update count, error signals), ideally in combination with some possibility to apply a filter to skip unwanted elements.

lukaseder commented 3 years ago

For context, I'm looking at the current implementation of MssqlResult. Simplified:

    public Mono<Integer> getRowsUpdated() {
        return this.messages
            .<Long>handle((message, sink) -> {
                if (message instanceof AbstractDoneToken) { ... }
                if (message instanceof ErrorToken) { ... }
    ...
    public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
        Assert.requireNonNull(f, "Mapping function must not be null");
        return this.messages
            .<T>handle((message, sink) -> {
                if (message.getClass() == ColumnMetadataToken.class) { ... }
                if (message.getClass() == RowToken.class || message.getClass() == NbcRowToken.class) { ... }
                if (message instanceof ErrorToken) { ... }
    ...

This looks exactly like what I'm thinking of here. Except that these instanceof calls or Class comparisons are distributed across two methods. Yes, that earlier proposal (https://github.com/r2dbc/r2dbc-spi/issues/27#issuecomment-442009634) still sounds like the way to go here.

It's not much more user-friendly than the JDBC version, but then again, this isn't something people do every day. In fact, it's hardly being done outside of SQL Server, Sybase, and very rarely MySQL, from what I've seen (I haven't noticed that Oracle's approach, using DBMS_SQL.RETURN_RESULT has been widely used since its introduction).

The cases where jOOQ needs to be able to distinguish between update counts and results, we already know the order of generated statements.

mp911de commented 3 years ago

Do you want to come up with a pull request so we can discuss your proposal for R2DBC SPI 0.9?

lukaseder commented 3 years ago

Sure why not. Any guidelines of what a PR should include? Or is this for high level discussion?

mp911de commented 3 years ago

Starting with the interface extensions and signal types would be a good starting point. I think we agree that there's a need for such an extension and design-wise we have a rough idea as well.

lukaseder commented 3 years ago

I've created a PR without signal types yet (assuming this means warnings/exceptions/etc.): https://github.com/r2dbc/r2dbc-spi/pull/207.

Looking forward to the discussion.

lukaseder commented 3 years ago

While implementing a suggestion for #27 in https://github.com/r2dbc/r2dbc-spi/pull/207, I couldn't help but wonder if the existing Result is well designed for the most general purposes. Since R2DBC is still in an early stage and with a 0.x release number, I wonder if breaking changes are still possible, knowing that for some changes, the ship might have already sailed.

Let's look at the inconsistency of this design:

public interface Result {
    Publisher<Integer> getRowsUpdated();
    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);
}

Observations:

Regarding the continued evolution, I don't have a clear picture regarding OUT parameters. They're not really a uniform stream. But a stream is required if collection out parameters (e.g. cursors, arrays, etc.) are to be supported, because those are streams nested in the OUT parameter stream. But then again, Row.get() doesn't offer streaming either, so I don't know how to bring consistency to this topic (I remember having discussed this on the mailing list, and on a phone call years ago, but I forgot what the conclusion was)

mp911de commented 3 years ago

I wonder if breaking changes are still possible, knowing that for some changes, the ship might have already sailed.

The chance to break the API was Ever Given since we introduced the 0.x versioning scheme. While we don't want to fully redesign R2DBC, we are free to change things.

The difference in design between both methods is based on how they are supposed to work. We didn't want to impose additional implementation/consumption complexity than already necessary. The row update count is a pure value being emitted eventually. There's no streaming of multiple values.

Row consumption is subject to streaming and typically, each row is associated with some resources (at least buffers). Therefore handing these out as Publisher<Row> would raise the question how to release resources after consuming these.

From what we've seen in drivers, consuming the updated rows count is the most-lightweight consumption of results. Row consumption (or avoiding consumption of rows) doesn't add a significant overhead as drivers need to decode metadata/row frames anyway to be able to parse the wire protocol. Additional overhead comes only with Row.get(…) into play where raw data is being fed into data decoders.

Why not allow for discarding rows right here, before we even fetch their contents

flatMap operators come typically with additional overhead.

The idea of Result filter(Predicate<IsItAUpdateCountRowOutParamOrError>) has been already around for a while and something like that could serve the purpose of early filtering. Another approach stems from a handler method that accepts a BiConsumer<Signal, Sink<T>>. Handlers are still lightweight and give more flexibility than map by emitting 0, 1, error or completing the stream. Emitting N isn't possible with a handler though.

Regarding the continued evolution, I don't have a clear picture regarding OUT parameters. They're not really a uniform stream. But a stream is required if collection out parameters (e.g. cursors, arrays, etc.) are to be supported

Having a mapping/flat-mapping operator that can handle out parameters seems to be a way out. Getting more insights into how one would like to consume out params would be indeed beneficial to come up with a proper design. In any case, collection out parameters such as arrays should work well with a map/flatMap design as the mapping function gets invoked only when the out parameter is fully materialized. I can only speak for Postgres regarding arrays. In Postgres, the entire row/parameter must be received and decoded by a client prior to handing in the value to the consumer. Not sure whether this applies to Oracle or other database systems. In any case, aggregation can be handled by the driver without blocking.

Regarding cursors, it seems that databases return a reference to a cursor that then needs to be fetched (see RefCursor). Cursors tend to remain valid until transaction cleanup. One should be able to escape the cursor object beyond the mapping function and fetch the result thereafter.

lukaseder commented 3 years ago

Alright, thanks for the explanations. I will try to think about this stuff again next week to see what a client like jOOQ would expect from the SPI to be able to read all sorts of OUT parameters. For most cases, consuming them all in one go doesn't seem unreasonable, so a dedicated API like Row could do the trick. In a way, OUT parameters do form a "Row", and PostgreSQL even pretends they are:

create or replace function f (a out int, b out int)
as 
$$
begin
  a := 1;
  b := 2;
end;
$$
language plpgsql;

select * from f()

Resulting in:

|a  |b  |
|---|---|
|1  |2  |

It's both weird and elegant 😀.

Other RDBMS are not like that, but they could be. So, perhaps, the existing map(BiFunction<Row, RowMetadata, T>) API could also be used to map OUT parameters to some suitable container? I have mixed feelings (because they're similar but not exactly the same thing), but I think it could work for most RDBMS.

A REF CURSOR isn't so much different when it is returned as OUT parameter or as a nested collection in an ordinary result set.