trinodb / trino-python-client

Python client for Trino
Apache License 2.0
309 stars 151 forks source link

KeyError raised on prepared statement deallocate #248

Closed mdesmet closed 1 year ago

mdesmet commented 1 year ago

Expected behavior

When executing a prepared statement, the statement shall be executed succesfully

Actual behavior

Multipe responses from Trino contain the same deallocate header, although the code only accepts this operation to be executed once.

See https://github.com/trinodb/trino-python-client/blob/f97aea6afb2663f0a29beb6477c18c9d9a88832e/trino/client.py#L610

Steps To Reproduce

I wasn't able to reproduce this locally, only in CI. So I assume some type of race condition is happening.

Log output

140581443462912: before: {}
140581443462912: after: {"st_ccb622831a2d43dfb03a186180695c1e": "INSERT INTO delta.test16650588382631922575_test_incremental_delete_insert.seed (\"state\", \"county\", \"city\", \"last_visit_date\")\n VALUES \n  ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n"}
140581443462912: Deallocating st_ccb622831a2d43dfb03a186180695c1e
140581443462912: before: {"st_ccb622831a2d43dfb03a186180695c1e": "INSERT INTO delta.test16650588382631922575_test_incremental_delete_insert.seed (\"state\", \"county\", \"city\", \"last_visit_date\")\n VALUES \n  ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n"}
140581443462912http://localhost:8080/v1/statement/executing/20[2210](https://github.com/mdesmet/dbt-trino/actions/runs/3197070204/jobs/5219814756#step:4:2211)06_122055_01035_2z8ej/ye8e084beb54f7dd32c5b3d682239264dbe6a6262/0 : <trino.client.TrinoRequest object at 0x7fdbd40a4b50> : st_ccb622831a2d43dfb03a186180695c1e
140581443462912http://localhost:8080/v1/statement/executing/20221006_122055_01035_2z8ej/y382a838b78d9dcd5df455e0fd1335d282cd7f1a2/1 : <trino.client.TrinoRequest object at 0x7fdbd40a4b50> : st_ccb622831a2d43dfb03a186180695c1e
Error while running:
insert into delta.test16650588382631922575_test_incremental_delete_insert.seed ("state", "county", "city", "last_visit_date") values
          (?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?)
'st_ccb622831a2d43dfb03a186180695c1e'

Operating System

Ubuntu

Trino Python client version

0.317.0

Trino Server version

398

Python version

3.10.5

Are you willing to submit PR?

findepi commented 1 year ago

Steps To Reproduce

I wasn't able to reproduce this locally, only in CI. So I assume some type of race condition is happening.

what steps is CI doing when this is reproducible?

mdesmet commented 1 year ago

It's the integration tests of dbt-trino on Trino 398 with 0.317.0 Python client.

The dbt project is executing statements with prepared statements using 4 threads. However the extract here is showing output of only one thread.

Within a single thread DDL queries are executed sequentially using the typical pattern. (No asyncio or additional threading is used)

conn = connect()
cursor = conn.cursor()
cursor.execute(query, (params))
results = cursor.fetchall()

This sometimes fails with following stacktrace in the processing of the http response from the trino server (self._request.process(response)):

  File "/home/runner/work/dbt-trino/dbt-trino/dbt/adapters/trino/connections.py", line 329, in exception_handler
    yield
  File "/home/runner/work/dbt-trino/dbt-trino/.tox/integration/lib/python3.9/site-packages/dbt/adapters/sql/connections.py", line 70, in add_query
    cursor.execute(sql, bindings)
  File "/home/runner/work/dbt-trino/dbt-trino/.tox/integration/lib/python3.9/site-packages/trino/dbapi.py", line 430, in execute
    self._deallocate_prepared_statement(statement_name)
  File "/home/runner/work/dbt-trino/dbt-trino/dbt/adapters/trino/connections.py", line 57, in _deallocate_prepared_statement
    old_deallocate_prepared_statement(self, statement_name)
  File "/home/runner/work/dbt-trino/dbt-trino/.tox/integration/lib/python3.9/site-packages/trino/dbapi.py", line 403, in _deallocate_prepared_statement
    query.execute()
  File "/home/runner/work/dbt-trino/dbt-trino/.tox/integration/lib/python3.9/site-packages/trino/client.py", line 758, in execute
    self._result.rows += self.fetch()
  File "/home/runner/work/dbt-trino/dbt-trino/.tox/integration/lib/python3.9/site-packages/trino/client.py", line 773, in fetch
    status = self._request.process(response)
  File "/home/runner/work/dbt-trino/dbt-trino/dbt/adapters/trino/connections.py", line 80, in process
    return old_process(self, http_response)
  File "/home/runner/work/dbt-trino/dbt-trino/.tox/integration/lib/python3.9/site-packages/trino/client.py", line 610, in process
    self._client_session.prepared_statements.pop(name)
KeyError: 'st_ccb622831a2d43dfb03a186180695c1e'

I have added logging to allocation, deallocation and processing of http responses to better understand the behavior as I'm not able to reproduce this locally and found out that there are multiple urls that return the deallocate header.

19:16:08  1 of 1 START seed file test16650837400962147469_test_incremental_merge.seed .... [RUN]
140454490273536: Executing insert into iceberg.test16650837400962147469_test_incremental_merge.seed ("state", "county", "city", "last_visit_date") values
          (?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?) with st_9e801a0e14114c8cbab672447af4be3c
140454490273536 : before: {}
140454490273536 : after: {"st_9e801a0e14114c8cbab672447af4be3c": "INSERT INTO iceberg.test16650837400962147469_test_incremental_merge.seed (\"state\", \"county\", \"city\", \"last_visit_date\")\n VALUES \n  ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n"}
140454490273536 : Deallocating st_9e801a0e14114c8cbab672447af4be3c
140454490273536 : before: {"st_9e801a0e14114c8cbab672447af4be3c": "INSERT INTO iceberg.test16650837400962147469_test_incremental_merge.seed (\"state\", \"county\", \"city\", \"last_visit_date\")\n VALUES \n  ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n, ROW (?, ?, ?, ?)\n"}
140454490273536 : http://localhost:8080/v1/statement/executing/20221006_191609_01373_ecvz6/yc0339e0b529652fd5df69d08b69e713d067399cc/0 : GET : <trino.client.TrinoRequest object at 0x7fbe481bb3a0> : st_9e801a0e14114c8cbab672447af4be3c
140454490273536 : http://localhost:8080/v1/statement/executing/20221006_191609_01373_ecvz6/ya2582c58bdae318c54edc3fedcaddc1fccfa2b25/1 : GET : <trino.client.TrinoRequest object at 0x7fbe481bb3a0> : st_9e801a0e14114c8cbab672447af4be3c
Error while running:
insert into iceberg.test16650837400962147469_test_incremental_merge.seed ("state", "county", "city", "last_visit_date") values
          (?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?)
'st_9e801a0e14114c8cbab672447af4be3c'

My conclusion is that during dbapi.execute() of the DEALLOCATE query. multiple responses contain the deallocate header. Note this is not usually the case (I can't reproduce it locally using the same code that runs in CI)

https://github.com/trinodb/trino-python-client/blob/39546f79ddc1284a7d0aece9fd5c38f1ba61f055/trino/client.py#L761-L764

mdesmet commented 1 year ago

@findepi: Also in Trino we actually ignore this:

Note the signature Set<String> deallocatedPreparedStatements = Sets.newConcurrentHashSet().

boolean add(E e);
Returns:
true if this set did not already contain the specified element

https://github.com/trinodb/trino/blob/639889b4216bc2b88a2797845320a00f287ab324/client/trino-client/src/main/java/io/trino/client/StatementClientV1.java#L427-L429

I think we should merge this, as it can fail a client for a reason that shoudn't.

findepi commented 1 year ago

I have added logging to allocation, deallocation and processing of http responses to better understand the behavior as I'm not able to reproduce this locally and found out that there are multiple urls that return the deallocate header.

@electrum ptal

mdesmet commented 1 year ago

@hashhar, @findepi, @electrum:

I checked in Trino. It seems that deallocated prepared statements are always added onto a query but never removed. In case the query is not finished (ie a nextUri is available), the next uri will also contain the same deallocated prepared statement headers.

Adding of deallocated headers

https://github.com/trinodb/trino/blob/4d235b0143051b04adda0e9d988a3521e1d5bedd/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java#L844-L851

hashhar commented 1 year ago

Ah so the JDBC client maintains two sets - one for added and another for deallocated and when making the requests it merges them together. But here in Python client we merge them when processing headers itself (on each next-uri).

So for now we can "fix" the python client by ignoring the deallocation if statement is already removed and follow-up to see if Trino should do things differently or not. If not having two separate sets makes some sense as it's easier to reason about (and inspect).

findepi commented 1 year ago

In case the query is not finished (ie a nextUri is available), the next uri will also contain the same deallocated prepared statement headers.

@electrum is it intentional?

hashhar commented 1 year ago

Reopening this to continue discussion.