pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
12 stars 2 forks source link

Support setting nullable / non-nullable constraints in pipedag.Table() #159

Closed windiana42 closed 3 months ago

windiana42 commented 4 months ago

Since pipedag does the materialization, we want to add the feature to specify which columns should be non-nullable. Offering both nullable and non_nullable as optional parameters allows for specifying a positive or negative list. If both lists are given, they together must comprise all columns.

windiana42 commented 4 months ago

I was looking into the option to run the SELECT statement with LIMIT 0 and to retrieve types from the resultset object. This somewhat works with dbapi for a database which is not duckdb:

x = cur.execute("SELECT TOP 0 1 as x, 'aba' as y, cast(1.0 as numeric(15,2)) as z")
x.description
(('x', <class 'int'>, None, 10, 10, 0, False), ('y', <class 'str'>, None, 3, 3, 0, False), ('z', <class 'decimal.Decimal'>, None, 15, 15, 2, True))

However, I don't feel comfortable to assemble database types based on this output to produce a complete CREATE TABLE statement with given types. Thus I suggest to submit the query twice: 1st: CREATE TABLE x AS SELECT ... LIMIT 10. Then adjust the nullable/non-nullable columns. Finally, run a INSERT INTO x SELECT statement.

windiana42 commented 4 months ago

Duckdb simply creates NoneType columns for SELECT 1 as a. Thus it is not well suited for testing:

cur = connection.connection.cursor()
x = cur.execute("SELECT 1 as x")
x.description
(('x', None, None, None, None, None, None),)

Another attempt with SQLAlchemy:

Table('tmp', MetaData(), Column('x', NullType(), table=<tmp>), Column('y', NullType(), table=<tmp>), schema=None)
windiana42 commented 4 months ago

For INSERT INTO AS SELECT appending, we might have to do table level locking properly: Postgres:

ACCESS EXCLUSIVE (AccessExclusiveLock)

Conflicts with locks of all modes (ACCESS SHARE, ROW SHARE, ROW EXCLUSIVE, SHARE UPDATE EXCLUSIVE, SHARE, SHARE ROW EXCLUSIVE, EXCLUSIVE, and ACCESS EXCLUSIVE). This mode guarantees that the holder is the only transaction accessing the table in any way.

MSSQL: INSERT INTO x WITH(TABLOCKX) AS SELECT ...

DB2: we already perform locking

windiana42 commented 4 months ago

Unfortunately, adding the LIMIT 0 / TOP 0 is non-trivial for sa.text() handwritten queries. Here is an attempt:

sql = sa.text("SELECT 1 as x, 'test' as y, cast('2021-01-01' as date) as z, cast('2021-01-01' as datetime) as w, cast(1.0 as Numeric(15,6)) as v")
if isinstance(sql, sa.sql.expression.TextClause):
    parts = str(sql).split("SELECT")
    limit_sql = sa.select(sa.text("SELECT".join(parts[1:]).strip())).limit(0)
else:
    limit_sql = sql.limit(0)

It seems to work for mssql and postgres.

> str(limit_sql.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
"SELECT 1 as x, 'test' as y, cast('2021-01-01' as date) as z, cast('2021-01-01' as datetime) as w, cast(1.0 as Numeric(15,6)) as v \n LIMIT 0"
> str(limit_sql.compile(engine, compile_kwargs={"literal_binds": True}))
"SELECT TOP 0 1 as x, 'test' as y, cast('2021-01-01' as date) as z, cast('2021-01-01' as datetime) as w, cast(1.0 as Numeric(15,6)) as v"
windiana42 commented 4 months ago

We can add safety checks and in case something goes wrong, we can still fallback to issuing the Non-Nullable calls after loading of full table. This should be correct. Just slower.

windiana42 commented 3 months ago

I found a more reliable way to hack the LIMIT clause with a subquery (performance should not matter for the limit 0 query):

    def get_limit_query(
        self,
        query: sa.sql.expression.Selectable | sa.sql.expression.TextClause,
        rows: int,
    ) -> sa.sql.expression.Select:
        if isinstance(query, sa.sql.expression.TextClause):
            query_str = str(query)
            return (
                sa.select(sa.text("*"))
                .limit(rows)
                .select_from(sa.text("(" + query_str + ") AS A"))
            )
        else:
            return sa.select(sa.text("*")).limit(rows).select_from(query.alias("A"))