laughingman7743 / PyAthena

PyAthena is a Python DB API 2.0 (PEP 249) client for Amazon Athena.
MIT License
456 stars 102 forks source link

Support for Iceberg FOR SYSTEM_VERSION AS OF #517

Closed jasonamyers closed 3 months ago

jasonamyers commented 3 months ago

When used with Iceberg tables, Athena can restrict the table by version ID and timestamp. I'd love to be able to leverage this in my SQLAlchemy queries. More details can be found at https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-table-data.html.

laughingman7743 commented 3 months ago

What use cases do you have in mind? Do you want to construct the time travel specification functionally when assembling the query? Or do you want to be able to specify timestamps and versions in the model definition?

I am not that familiar with extending SQLAlchemy. If you have any SQLAlchemy implementations in other databases that might be helpful, please let me know.

jasonamyers commented 3 months ago

I'd love to either do it during the table's metadata build-up, specify a version, and have it reflected in the resulting default_from clauses or make a manual call to set the version for each table if desired. A real-world use case would be Data QA. Selecting from a table up to the latest version that has been approved from a QA perspective and then as the data past that version is approved, advancing your queries to use the newer data.

jasonamyers commented 3 months ago

I'm asking in SQLAlchemy also if they have thoughts on this https://github.com/sqlalchemy/sqlalchemy/discussions/11161.

jasonamyers commented 3 months ago

I added a super naive function to test. This leverages the SQLAlchemy with_hints functionality.

Compiler Support:

class AthenaStatementCompiler(SQLCompiler):

    def get_from_hint_text(self, table, text):
        return text

Usage Example:

engine = get_engine('athena')
table = build_table(engine, 'my_table', 'my_schema')
ta = table.alias()
version = 8860342768238431084
query = select(func.count(ta.c.record_id)).with_hint(ta, f'FOR VERSION AS OF {version}')
result = str(query.compile(compile_kwargs={"literal_binds": True}, dialect=AthenaDialect()))

Output:

SELECT count(my_table.record_id) AS count_1 
FROM my_schema.my_table FOR VERSION AS OF 8860342768238431084 AS my_table_1

Would you accept a PR to support hints properly?

It would include:

jasonamyers commented 3 months ago

@laughingman7743 any chance we can get a release with this is in? I wanna move our usages to it.

laughingman7743 commented 3 months ago

I just released v3.5.0. 🎉 https://github.com/laughingman7743/PyAthena/releases/tag/v3.5.0 https://pypi.org/project/PyAthena/3.5.0/

jasonamyers commented 3 months ago

You rock thank you