PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.78k stars 1.54k forks source link

prefect_sqlalchemy SyncDriver.POSTGRESQL_PSYCOPG2 returns duplicate results on second fetch_many execution #14645

Open markns opened 1 month ago

markns commented 1 month ago

First check

Bug summary

prefect_sqlalchemy 0.4.3 SyncDriver.POSTGRESQL_PSYCOPG2 returns duplicate results on second fetch_many execution

Run the flow below against a docker postgres database. Firstly, run it with the data population code uncommented so there is some data in the dataase. Then run it again with the data population code commented out.

On the second run, the first two fetch_many queries give the following logs:

08:40:24.548 | INFO    | Flow run 'deft-mustang' - Created a new engine.
08:40:24.582 | INFO    | Flow run 'deft-mustang' - Created a new connection.
[('1', 'A')]
08:40:24.598 | INFO    | Flow run 'deft-mustang' - Created a new connection.
08:40:24.606 | INFO    | Flow run 'deft-mustang' - Reset opened connections and their results.
08:40:24.608 | INFO    | Flow run 'deft-mustang' - Disposed the engine.
[('1', 'A')]

Reproduction

from prefect import flow
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
from pydantic.v1 import SecretStr

# docker run --name some-postgres -p 5432:5432 -e POSTGRES_PASSWORD=pwd123 -d postgres
@flow
def run_sqlalchemy_query():
    connection_info = ConnectionComponents(host="localhost", port="5432",
                                           driver=SyncDriver.POSTGRESQL_PSYCOPG2, database="postgres",
                                           username="postgres", password=SecretStr("pwd123"))

    # with SqlAlchemyConnector(connection_info=connection_info) as database:
    #     database.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
    #     database.execute_many(
    #         "INSERT INTO customers (name, address) VALUES (:name, :address);",
    #         seq_of_parameters=[
    #             {"name": "1", "address": "A"},
    #             {"name": "2", "address": "B"},
    #             {"name": "3", "address": "C"},
    #         ],
    #     )

    with SqlAlchemyConnector(connection_info=connection_info) as database:
        results = database.fetch_many("SELECT * FROM customers where name = :name;",
                                      parameters={'name': '1'}, size=2)
        print(results)
        results = database.fetch_many("SELECT * FROM customers where name = :name;",
                                      parameters={'name': '1'}, size=2)
        print(results)
        results = database.fetch_many("SELECT * FROM customers where name = :name;",
                                      parameters={'name': '1'}, size=2)
        print(results)

if __name__ == '__main__':
    run_sqlalchemy_query()

Error

08:40:24.035 | INFO    | prefect.engine - Created flow run 'deft-mustang' for flow 'run-sqlalchemy-query'
08:40:24.044 | INFO    | Flow run 'deft-mustang' - View at https://app.prefect.cloud/account/f11f03de-8f9c-4660-bbba-f00d85087da4/workspace/00985c36-8720-46ae-a667-258da45f0916/flow-runs/flow-run/0e839d07-8d08-45ba-89f1-8e2c1c6c1eb0
08:40:24.548 | INFO    | Flow run 'deft-mustang' - Created a new engine.
08:40:24.582 | INFO    | Flow run 'deft-mustang' - Created a new connection.
[('1', 'A')]
08:40:24.598 | INFO    | Flow run 'deft-mustang' - Created a new connection.
08:40:24.606 | INFO    | Flow run 'deft-mustang' - Reset opened connections and their results.
08:40:24.608 | INFO    | Flow run 'deft-mustang' - Disposed the engine.
[('1', 'A')]
[]
08:40:24.884 | INFO    | Flow run 'deft-mustang' - Finished in state Completed()

Versions (prefect version output)

alakazam-vR_Oc5bv-py3.10 ❯ prefect version
Version:             2.19.7
API version:         0.8.4
Python version:      3.10.3
Git commit:          60f05122
Built:               Fri, Jun 28, 2024 11:27 AM
OS/Arch:             darwin/arm64
Profile:             dev
Server type:         cloud

Additional context

No response

zhen0 commented 1 month ago

Thanks for the issue @markns!