trinodb / trino-python-client

Python client for Trino
Apache License 2.0
311 stars 154 forks source link

Acknowledge reception of data in `TrinoResult` #220

Closed mdesmet closed 1 year ago

mdesmet commented 1 year ago

Description

Fixes #232, #95

Ensures the received data is properly acknowledged by calling the next_uri. This will avoid seeing failed queries in the query log when executing scalar queries as in the following example.

cur.execute("SELECT VERSION()")
cur.fetchone()
cur.cancel()

Release notes

( ) This is not user-visible and no release notes are required. ( ) Release notes are required, please propose a release note for me. (x) Release notes are required, with the following suggested text:

## Breaking Changes

* Make the `execute` method of the cursor block until at-least one row is received.
  This means users no longer need to call `fetchone` or `fetchall` to make sure query
  actually starts executing on the Trino server. Note that results still need to be consumed
  by calling `fetchone` or `fetchall` to make sure query isn't considered idle and terminated
  on the server. ([#232](https://trinodb/trino-python-client/issues/232))
* Properly propagate query failures to the client when using `fetchone`.
  ([#95](https://trinodb/trino-python-client/issues/95))
* Fix queries returning a single row from sometimes appearing as failed on the server.
  ([#220](https://trinodb/trino-python-client/issues/220))
mdesmet commented 1 year ago

The CI issues can be simulated by following code. Seems that although the result has returned the Trino API still provides one or more next_uri's to fetch in a minority of cases.

def test_query_cancellation_not_triggered(trino_connection):
    count_not_finished = 0
    for _ in range(0, 1000):
        cur = trino_connection.cursor()
        cur.execute("SELECT VERSION()")
        cur.fetchone()
        if not cur._query.finished:
            count_not_finished += 1

    print(str(count_not_finished) + " unfinished queries")

Some questions:

hashhar commented 1 year ago

Why do we mark a cancelled query as failed if it's a valid use case to only retrieve a number of results and bail out?

It's not valid. It's a hack that BI tools and clients use instead of limiting their query to pull only what is needed. e.g. As a cluster admin it's very useful to see clients who run a query that returns billions of rows but just take 100 rows and leave the query hanging (instead of either cancelling or consuming results) which means that the output buffer on the server (and other processes) keeps occupying memory until the query times out.

can we handle this on the serverside to not let this type of queries fail if all results have been consumed?

You know that version() would return one row but the server does not since results are streamed back to coordinator from workers and coordinator can't know that there isn't more data coming until it asks workers about it.

Change your experiment to a query which returns arbitrary number of rows and then you can't know anymore whether query is finished or not. Special casing the client protocol for queries which return single row doesn't seem useful.

hashhar commented 1 year ago

The alternative is to have a client protocol which is based on persistent TCP connections instead of HTTP long polling - which brings it's own set of problems.

hashhar commented 1 year ago

Also, specifically on why you cannot assume an empty rows being returned from API as proof that query has finished is the pipelined execution model. In queries it's possible for Trino to perform both table scans and output results at the same time. e.g. If you have a long chain of UNION ALL statements then Trino can start returning results as soon as the first UNION ALL query is done while other parts of the query are still executing .

This means that the client might observe periods of time where there is no data returned but a nextUri is still included. If the client were to assume no data == query finished then it'll drop any upcoming rows that will be produced.

nineinchnick commented 1 year ago

@hashhar would you agree that to address the original issue we should add a fetch call after scalar() to drain the cursor? We might also want to document this somewhere. I don't think we should try to make the driver too smart to work around these protocol limitations.

hashhar commented 1 year ago

@nineinchnick I don't agree 100%. It's still useful to make the client as smart as the JDBC driver where the implementation detail of the Trino REST protocol isn't visible to users.

But yes, since this might take more time than the quickfix I think we should go with the quickfix for now and then think about how to stop the protocol from leaking into user code.

mdesmet commented 1 year ago

@hashhar would you agree that to address the original issue we should add a fetch call after scalar() to drain the cursor? We might also want to document this somewhere. I don't think we should try to make the driver too smart to work around these protocol limitations.

This is exactly what sqlalchemy does when you scalar_one or scalar_one_or_none. Again we already fetch the next row in client module's fetch now, but even with fetching another record we have no guarantee that would finish the query (nextUri set to null) as @hashhar said.

https://github.com/sqlalchemy/sqlalchemy/blob/f8c4dba4e9f130c18ce00597c036bc26ae7abf90/lib/sqlalchemy/engine/result.py#L745-L748

hovaesco commented 1 year ago

Actually it will also resolve https://github.com/trinodb/trino-python-client/issues/95 which is pretty painful since a user needs to run fetchone or fetchall to get an error message from DDL, UPDATE and INSERT queries. Basically, it will be a quick win which will highly benefit users.

hashhar commented 1 year ago

@findepi could you help with a release from current master (and also see my comment above)?

(I don't consider this issue a release blocker since it's a change in server behaviour only and as long as you call fetchall you're guaranteed to not run into this issue).

findepi commented 1 year ago

could you help with a release from current master

sure, will coordinate over DM

mdesmet commented 1 year ago

However this is also a breaking change (even though it improves experience) so I'm approving it but not merging until we release a version from current master so that people can upgrade to that version and then choose to decide whether they want to stay there for sometime before migrating to new blocking API.

Can you explain why you think this is a breaking change? IMHO the only way to break existing usage is if users didn't catch exceptions onexecute(), which is a bug already as query submission can throw the same exceptions as fetch*(). The fetch*() operations continue to work as before, as proven by the integration tests.

Note that sqlalchemy doesn't call fetch*() on DML operations. Bringing this in would fix that.

And now that we are taking this direction it might be wise to decouple the Trino API handling from the db-api client and instead make available the fetched rows to the db-api cursor via a queue (list) instead of it directly fetching things from API. That gives future flexibility to introduce performance enhancements as well like the double-bufferring that the JDBC driver does for example and also make it possible to provide different cursor implementations.

IMHO this is already decoupled. The TrinoQuery exists in client module and the exposes a lazy collection (an Iterator powered by a generator). I think this is the correct abstraction to use. I don't see leakage of the API details being introduced in this PR, actually the opposite is true: I would argue that in current code execute() is only query submission while fetch*() is query execution and result set scrolling, while this PR makes execute() query submission and execution and fetch*() result set scrolling which seems semantically more correct and in line with other dbapi implementations.

The double buffering as in the java Trino client, is implemented in this PR. Note that also the Java client doesn't use threading at this moment. I think it is a good idea to investigate but can be done independent from this PR.

I don't see why cursor implementations are impacted, cursors would take the Iterable and convert it for example in a dict instead of a tuple (many dbapi implementations have a DictCursor).

hashhar commented 1 year ago

Can you explain why you think this is a breaking change?

Because the API is now blocking.

I don't see leakage of the API details being introduced in this PR, actually the opposite is true:

I don't mean that this PR leaks the API details. I meant the opposite. Now we are one step closer to hide the API details within TrinoQuery and TrinoResult. An example of what this PR allows to do (but probably doesn't make sense) is to have a different impl of TrinoQuery which can probably use a different fictional transport mechanism to talk to Trino (instead of the REST API).

The double buffering as in the java Trino client, is implemented in this PR.

True

Note that also the Java client doesn't use threading at this moment. I think it is a good idea to investigate but can be done independent from this PR.

Exactly what I said above.

I don't see why cursor implementations are impacted, cursors would take the Iterable and convert it for example in a dict instead of a tuple (many dbapi implementations have a DictCursor).

Again exactly what I said your PR allows us to do in future.

hashhar commented 1 year ago

Newer Trino versions will include https://github.com/trinodb/trino/pull/14122 which can mean CI can be green without this change.

I think we should add one more entry to matrix with 395 as the version being tested for the meantime.

mdesmet commented 1 year ago

Newer Trino versions will include trinodb/trino#14122 which can mean CI can be green without this change.

I think we should add one more entry to matrix with 395 as the version being tested for the meantime.

I added the entry.

mdesmet commented 1 year ago

@hashhar: Squashed those commits. Are we good to go?

hashhar commented 1 year ago

Thanks. Good to go.